diff options
author | Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com> | 2018-03-14 11:27:25 +0000 |
---|---|---|
committer | Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com> | 2018-04-13 16:51:18 +0000 |
commit | aed4e1e61e4e808ec5b6ef594fb2db34dfaeddf4 (patch) | |
tree | 0f0aa0421284f6a83589931ee4538b8bcdc351b7 /yardstick/common/messaging | |
parent | 5be3ad75d9c46e7b816418d4ddf410b1f67110c6 (diff) |
Add MQ consumer, producer and payload base classes
Added MessagingProducer base class. A class implementing this base class
can send a cast message using the MQ service installed in the Yardstick
jumphost (RabbitMQ by default, other MQ could be implemented). The producer
will send messages to an specific topic.
Added MessagingConsumer base class. A class implementing this base class
will be able to receive any message sent by a MessagingPorducer class
publishing messages in the topic subscribed.
By default both Producer and Consumer "fanout" is True. That means every
Consumer will create a fanout Queue attached to the MQ Exchange topic. All
Consumers attached to this topic will receive the message sent by the
Producer [1].
Added Payload base class. To send data through the message queue service,
a Payload derived object should be created. This base class allows to
define the attributes container in the class, convert the object to a dict
and retrieve the object from a dict.
Added a new library, "oslo.messaging", to implement the RPC client and
server.
[1]http://blog.thedigitalcatonline.com/blog/2013/08/21/some-tips-about-amqp-direct-exchanges/
JIRA: YARDSTICK-1074
Change-Id: I63932b5fb3de2bdc1270fc83295630a2a349e2a6
Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Diffstat (limited to 'yardstick/common/messaging')
-rw-r--r-- | yardstick/common/messaging/__init__.py | 36 | ||||
-rw-r--r-- | yardstick/common/messaging/consumer.py | 85 | ||||
-rw-r--r-- | yardstick/common/messaging/payloads.py | 53 | ||||
-rw-r--r-- | yardstick/common/messaging/producer.py | 70 |
4 files changed, 244 insertions, 0 deletions
diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py new file mode 100644 index 000000000..f0f012ec3 --- /dev/null +++ b/yardstick/common/messaging/__init__.py @@ -0,0 +1,36 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# MQ is statically configured: +# - MQ service: RabbitMQ +# - user/password: yardstick/yardstick +# - host:port: localhost:5672 +MQ_USER = 'yardstick' +MQ_PASS = 'yardstick' +MQ_SERVICE = 'rabbit' +SERVER = 'localhost' +PORT = 5672 +TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER + + ':' + str(PORT) + '/') + +# RPC server. +RPC_SERVER_EXECUTOR = 'threading' + +# Topics. +RUNNER = 'runner' + +# Methods. +# RUNNER methods: +RUNNER_INFO = 'runner_info' +RUNNER_LOOP = 'runner_loop' diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py new file mode 100644 index 000000000..a0feeb300 --- /dev/null +++ b/yardstick/common/messaging/consumer.py @@ -0,0 +1,85 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging + +from oslo_config import cfg +import oslo_messaging +import six + +from yardstick.common import messaging + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class NotificationHandler(object): + """Abstract class to define a endpoint object for a MessagingConsumer""" + + def __init__(self, id, ctx_pid, queue): + self._id = id + self._ctx_pid = ctx_pid + self._queue = queue + + +@six.add_metaclass(abc.ABCMeta) +class MessagingConsumer(object): + """Abstract class to implement a MQ consumer + + This abstract class allows a class implementing this interface to receive + the messages published by a `MessagingNotifier`. + """ + + def __init__(self, topic, pid, endpoints, fanout=True): + """Init function. + + :param topic: (string) MQ exchange topic + :param pid: (int) PID of the process implementing the MQ Notifier which + will be in the message context + :param endpoints: (list of class) list of classes implementing the + methods (see `MessagingNotifier.send_message) used by + the Notifier + :param fanout: (bool) MQ clients may request that a copy of the message + be delivered to all servers listening on a topic by + setting fanout to ``True``, rather than just one of them + :returns: `MessagingConsumer` class object + """ + + self._pid = pid + self._endpoints = endpoints + self._transport = oslo_messaging.get_rpc_transport( + cfg.CONF, url=messaging.TRANSPORT_URL) + self._target = oslo_messaging.Target(topic=topic, fanout=fanout, + server=messaging.SERVER) + self._server = oslo_messaging.get_rpc_server( + self._transport, self._target, self._endpoints, + executor=messaging.RPC_SERVER_EXECUTOR, + access_policy=oslo_messaging.DefaultRPCAccessPolicy) + + def start_rpc_server(self): + """Start the RPC server.""" + if self._server: + self._server.start() + + def stop_rpc_server(self): + """Stop the RPC server.""" + if self._server: + self._server.stop() + + def wait(self): + """Wait for message processing to complete (blocking).""" + if self._server: + self._server.wait() diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py new file mode 100644 index 000000000..d29d79808 --- /dev/null +++ b/yardstick/common/messaging/payloads.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import six + +from yardstick.common import exceptions + + +@six.add_metaclass(abc.ABCMeta) +class Payload(object): + """Base Payload class to transfer data through the MQ service""" + + REQUIRED_FIELDS = {'version'} + + def __init__(self, **kwargs): + """Init method + + :param kwargs: (dictionary) attributes and values of the object + :returns: Payload object + """ + + if not all(req_field in kwargs for req_field in self.REQUIRED_FIELDS): + _attrs = set(kwargs) - self.REQUIRED_FIELDS + missing_attributes = ', '.join(str(_attr) for _attr in _attrs) + raise exceptions.PayloadMissingAttributes( + missing_attributes=missing_attributes) + + for name, value in kwargs.items(): + setattr(self, name, value) + + self._fields = set(kwargs.keys()) + + def obj_to_dict(self): + """Returns a dictionary with the attributes of the object""" + return {field: getattr(self, field) for field in self._fields} + + @classmethod + def dict_to_obj(cls, _dict): + """Returns a Payload object built from the dictionary elements""" + return cls(**_dict) diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py new file mode 100644 index 000000000..b6adc0c17 --- /dev/null +++ b/yardstick/common/messaging/producer.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import os + +from oslo_config import cfg +import oslo_messaging +import six + +from yardstick.common import messaging + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class MessagingProducer(object): + """Abstract class to implement a MQ producer + + This abstract class allows a class implementing this interface to publish + messages in a message queue. + """ + + def __init__(self, topic, pid=os.getpid(), fanout=True): + """Init function. + + :param topic: (string) MQ exchange topic + :param pid: (int) PID of the process implementing this MQ Notifier + :param fanout: (bool) MQ clients may request that a copy of the message + be delivered to all servers listening on a topic by + setting fanout to ``True``, rather than just one of them + :returns: `MessagingNotifier` class object + """ + self._topic = topic + self._pid = pid + self._fanout = fanout + self._transport = oslo_messaging.get_rpc_transport( + cfg.CONF, url=messaging.TRANSPORT_URL) + self._target = oslo_messaging.Target(topic=topic, fanout=fanout, + server=messaging.SERVER) + self._notifier = oslo_messaging.RPCClient(self._transport, + self._target) + + def send_message(self, method, payload): + """Send a cast message, that will invoke a method without blocking. + + The cast() method is used to invoke an RPC method that does not return + a value. cast() RPC requests may be broadcast to all Servers listening + on a given topic by setting the fanout Target property to ``True``. + + :param methos: (string) method name, that must be implemented in the + consumer endpoints + :param payload: (subclass `Payload`) payload content + """ + self._notifier.cast({'pid': self._pid}, + method, + **payload.obj_to_dict()) |