From aed4e1e61e4e808ec5b6ef594fb2db34dfaeddf4 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Wed, 14 Mar 2018 11:27:25 +0000 Subject: Add MQ consumer, producer and payload base classes Added MessagingProducer base class. A class implementing this base class can send a cast message using the MQ service installed in the Yardstick jumphost (RabbitMQ by default, other MQ could be implemented). The producer will send messages to an specific topic. Added MessagingConsumer base class. A class implementing this base class will be able to receive any message sent by a MessagingPorducer class publishing messages in the topic subscribed. By default both Producer and Consumer "fanout" is True. That means every Consumer will create a fanout Queue attached to the MQ Exchange topic. All Consumers attached to this topic will receive the message sent by the Producer [1]. Added Payload base class. To send data through the message queue service, a Payload derived object should be created. This base class allows to define the attributes container in the class, convert the object to a dict and retrieve the object from a dict. Added a new library, "oslo.messaging", to implement the RPC client and server. [1]http://blog.thedigitalcatonline.com/blog/2013/08/21/some-tips-about-amqp-direct-exchanges/ JIRA: YARDSTICK-1074 Change-Id: I63932b5fb3de2bdc1270fc83295630a2a349e2a6 Signed-off-by: Rodolfo Alonso Hernandez --- requirements.txt | 1 + yardstick/common/exceptions.py | 5 ++ yardstick/common/messaging/__init__.py | 36 +++++++++ yardstick/common/messaging/consumer.py | 85 +++++++++++++++++++ yardstick/common/messaging/payloads.py | 53 ++++++++++++ yardstick/common/messaging/producer.py | 70 ++++++++++++++++ .../tests/functional/common/messaging/__init__.py | 0 .../functional/common/messaging/test_messaging.py | 94 ++++++++++++++++++++++ yardstick/tests/unit/common/messaging/__init__.py | 0 .../tests/unit/common/messaging/test_consumer.py | 54 +++++++++++++ .../tests/unit/common/messaging/test_payloads.py | 46 +++++++++++ .../tests/unit/common/messaging/test_producer.py | 46 +++++++++++ 12 files changed, 490 insertions(+) create mode 100644 yardstick/common/messaging/__init__.py create mode 100644 yardstick/common/messaging/consumer.py create mode 100644 yardstick/common/messaging/payloads.py create mode 100644 yardstick/common/messaging/producer.py create mode 100644 yardstick/tests/functional/common/messaging/__init__.py create mode 100644 yardstick/tests/functional/common/messaging/test_messaging.py create mode 100644 yardstick/tests/unit/common/messaging/__init__.py create mode 100644 yardstick/tests/unit/common/messaging/test_consumer.py create mode 100644 yardstick/tests/unit/common/messaging/test_payloads.py create mode 100644 yardstick/tests/unit/common/messaging/test_producer.py diff --git a/requirements.txt b/requirements.txt index 02545de1d..43f0e796c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -37,6 +37,7 @@ os-client-config==1.28.0 # OSI Approved Apache Software License osc-lib==1.7.0 # OSI Approved Apache Software License oslo.config==4.11.1 # OSI Approved Apache Software License oslo.i18n==3.17.0 # OSI Approved Apache Software License +oslo.messaging===5.30.2 # OSI Approved Apache Software License oslo.privsep===1.22.1 # OSI Approved Apache Software License oslo.serialization==2.20.1 # OSI Approved Apache Software License oslo.utils==3.28.0 # OSI Approved Apache Software License diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py index 439b9cb1b..65e444071 100644 --- a/yardstick/common/exceptions.py +++ b/yardstick/common/exceptions.py @@ -64,6 +64,11 @@ class YardstickBannedModuleImported(YardstickException): message = 'Module "%(module)s" cannnot be imported. Reason: "%(reason)s"' +class PayloadMissingAttributes(YardstickException): + message = ('Error instantiating a Payload class, missing attributes: ' + '%(missing_attributes)s') + + class HeatTemplateError(YardstickException): """Error in Heat during the stack deployment""" message = ('Error in Heat during the creation of the OpenStack stack ' diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py new file mode 100644 index 000000000..f0f012ec3 --- /dev/null +++ b/yardstick/common/messaging/__init__.py @@ -0,0 +1,36 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# MQ is statically configured: +# - MQ service: RabbitMQ +# - user/password: yardstick/yardstick +# - host:port: localhost:5672 +MQ_USER = 'yardstick' +MQ_PASS = 'yardstick' +MQ_SERVICE = 'rabbit' +SERVER = 'localhost' +PORT = 5672 +TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER + + ':' + str(PORT) + '/') + +# RPC server. +RPC_SERVER_EXECUTOR = 'threading' + +# Topics. +RUNNER = 'runner' + +# Methods. +# RUNNER methods: +RUNNER_INFO = 'runner_info' +RUNNER_LOOP = 'runner_loop' diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py new file mode 100644 index 000000000..a0feeb300 --- /dev/null +++ b/yardstick/common/messaging/consumer.py @@ -0,0 +1,85 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging + +from oslo_config import cfg +import oslo_messaging +import six + +from yardstick.common import messaging + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class NotificationHandler(object): + """Abstract class to define a endpoint object for a MessagingConsumer""" + + def __init__(self, id, ctx_pid, queue): + self._id = id + self._ctx_pid = ctx_pid + self._queue = queue + + +@six.add_metaclass(abc.ABCMeta) +class MessagingConsumer(object): + """Abstract class to implement a MQ consumer + + This abstract class allows a class implementing this interface to receive + the messages published by a `MessagingNotifier`. + """ + + def __init__(self, topic, pid, endpoints, fanout=True): + """Init function. + + :param topic: (string) MQ exchange topic + :param pid: (int) PID of the process implementing the MQ Notifier which + will be in the message context + :param endpoints: (list of class) list of classes implementing the + methods (see `MessagingNotifier.send_message) used by + the Notifier + :param fanout: (bool) MQ clients may request that a copy of the message + be delivered to all servers listening on a topic by + setting fanout to ``True``, rather than just one of them + :returns: `MessagingConsumer` class object + """ + + self._pid = pid + self._endpoints = endpoints + self._transport = oslo_messaging.get_rpc_transport( + cfg.CONF, url=messaging.TRANSPORT_URL) + self._target = oslo_messaging.Target(topic=topic, fanout=fanout, + server=messaging.SERVER) + self._server = oslo_messaging.get_rpc_server( + self._transport, self._target, self._endpoints, + executor=messaging.RPC_SERVER_EXECUTOR, + access_policy=oslo_messaging.DefaultRPCAccessPolicy) + + def start_rpc_server(self): + """Start the RPC server.""" + if self._server: + self._server.start() + + def stop_rpc_server(self): + """Stop the RPC server.""" + if self._server: + self._server.stop() + + def wait(self): + """Wait for message processing to complete (blocking).""" + if self._server: + self._server.wait() diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py new file mode 100644 index 000000000..d29d79808 --- /dev/null +++ b/yardstick/common/messaging/payloads.py @@ -0,0 +1,53 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc + +import six + +from yardstick.common import exceptions + + +@six.add_metaclass(abc.ABCMeta) +class Payload(object): + """Base Payload class to transfer data through the MQ service""" + + REQUIRED_FIELDS = {'version'} + + def __init__(self, **kwargs): + """Init method + + :param kwargs: (dictionary) attributes and values of the object + :returns: Payload object + """ + + if not all(req_field in kwargs for req_field in self.REQUIRED_FIELDS): + _attrs = set(kwargs) - self.REQUIRED_FIELDS + missing_attributes = ', '.join(str(_attr) for _attr in _attrs) + raise exceptions.PayloadMissingAttributes( + missing_attributes=missing_attributes) + + for name, value in kwargs.items(): + setattr(self, name, value) + + self._fields = set(kwargs.keys()) + + def obj_to_dict(self): + """Returns a dictionary with the attributes of the object""" + return {field: getattr(self, field) for field in self._fields} + + @classmethod + def dict_to_obj(cls, _dict): + """Returns a Payload object built from the dictionary elements""" + return cls(**_dict) diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py new file mode 100644 index 000000000..b6adc0c17 --- /dev/null +++ b/yardstick/common/messaging/producer.py @@ -0,0 +1,70 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import os + +from oslo_config import cfg +import oslo_messaging +import six + +from yardstick.common import messaging + + +LOG = logging.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class MessagingProducer(object): + """Abstract class to implement a MQ producer + + This abstract class allows a class implementing this interface to publish + messages in a message queue. + """ + + def __init__(self, topic, pid=os.getpid(), fanout=True): + """Init function. + + :param topic: (string) MQ exchange topic + :param pid: (int) PID of the process implementing this MQ Notifier + :param fanout: (bool) MQ clients may request that a copy of the message + be delivered to all servers listening on a topic by + setting fanout to ``True``, rather than just one of them + :returns: `MessagingNotifier` class object + """ + self._topic = topic + self._pid = pid + self._fanout = fanout + self._transport = oslo_messaging.get_rpc_transport( + cfg.CONF, url=messaging.TRANSPORT_URL) + self._target = oslo_messaging.Target(topic=topic, fanout=fanout, + server=messaging.SERVER) + self._notifier = oslo_messaging.RPCClient(self._transport, + self._target) + + def send_message(self, method, payload): + """Send a cast message, that will invoke a method without blocking. + + The cast() method is used to invoke an RPC method that does not return + a value. cast() RPC requests may be broadcast to all Servers listening + on a given topic by setting the fanout Target property to ``True``. + + :param methos: (string) method name, that must be implemented in the + consumer endpoints + :param payload: (subclass `Payload`) payload content + """ + self._notifier.cast({'pid': self._pid}, + method, + **payload.obj_to_dict()) diff --git a/yardstick/tests/functional/common/messaging/__init__.py b/yardstick/tests/functional/common/messaging/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py new file mode 100644 index 000000000..96deeb35b --- /dev/null +++ b/yardstick/tests/functional/common/messaging/test_messaging.py @@ -0,0 +1,94 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import os +import time + +from yardstick.common.messaging import consumer +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer +from yardstick.tests.functional import base + + +TOPIC = 'topic_MQ' +METHOD_INFO = 'info' + + +class DummyPayload(payloads.Payload): + REQUIRED_FIELDS = {'version', 'data'} + + +class DummyEndpoint(consumer.NotificationHandler): + + def info(self, ctxt, **kwargs): + if ctxt['pid'] == self._ctx_pid: + self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data'])) + + +class DummyConsumer(consumer.MessagingConsumer): + + def __init__(self, id, ctx_pid, queue): + self._id = id + endpoints = [DummyEndpoint(id, ctx_pid, queue)] + super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints) + + +class DummyProducer(producer.MessagingProducer): + pass + + +def _run_consumer(id, ctx_pid, queue): + _consumer = DummyConsumer(id, ctx_pid, queue) + _consumer.start_rpc_server() + _consumer.wait() + + +class MessagingTestCase(base.BaseFunctionalTestCase): + + @staticmethod + def _terminate_consumers(num_consumers, processes): + for i in range(num_consumers): + processes[i].terminate() + + def test_run_five_consumers(self): + output_queue = multiprocessing.Queue() + num_consumers = 10 + ctx_id = os.getpid() + producer = DummyProducer(TOPIC, pid=ctx_id) + + processes = [] + for i in range(num_consumers): + processes.append(multiprocessing.Process( + name='consumer_{}'.format(i), + target=_run_consumer, + args=(i, ctx_id, output_queue))) + processes[i].start() + self.addCleanup(self._terminate_consumers, num_consumers, processes) + + time.sleep(2) # Let consumers to create the listeners + producer.send_message(METHOD_INFO, DummyPayload(version=1, + data='message 0')) + producer.send_message(METHOD_INFO, DummyPayload(version=1, + data='message 1')) + time.sleep(2) # Let consumers attend the calls + + output = [] + while not output_queue.empty(): + output.append(output_queue.get(True, 1)) + + self.assertEqual(num_consumers * 2, len(output)) + for i in range(num_consumers): + self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output) + self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output) diff --git a/yardstick/tests/unit/common/messaging/__init__.py b/yardstick/tests/unit/common/messaging/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/yardstick/tests/unit/common/messaging/test_consumer.py b/yardstick/tests/unit/common/messaging/test_consumer.py new file mode 100644 index 000000000..612dcaecd --- /dev/null +++ b/yardstick/tests/unit/common/messaging/test_consumer.py @@ -0,0 +1,54 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo_config import cfg +import oslo_messaging + +from yardstick.common import messaging +from yardstick.common.messaging import consumer +from yardstick.tests.unit import base as ut_base + + +class TestEndPoint(object): + def action_1(self): + pass + + +class _MessagingConsumer(consumer.MessagingConsumer): + pass + + +class MessagingConsumerTestCase(ut_base.BaseUnitTestCase): + + def test__init(self): + with mock.patch.object(oslo_messaging, 'get_rpc_server') as \ + mock_get_rpc_server, \ + mock.patch.object(oslo_messaging, 'get_rpc_transport') as \ + mock_get_rpc_transport, \ + mock.patch.object(oslo_messaging, 'Target') as \ + mock_Target: + mock_get_rpc_transport.return_value = 'test_rpc_transport' + mock_Target.return_value = 'test_Target' + + _MessagingConsumer('test_topic', 'test_pid', [TestEndPoint], + fanout=True) + mock_get_rpc_transport.assert_called_once_with( + cfg.CONF, url=messaging.TRANSPORT_URL) + mock_Target.assert_called_once_with( + topic='test_topic', fanout=True, server=messaging.SERVER) + mock_get_rpc_server.assert_called_once_with( + 'test_rpc_transport', 'test_Target', [TestEndPoint], + executor=messaging.RPC_SERVER_EXECUTOR, + access_policy=oslo_messaging.DefaultRPCAccessPolicy) diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py new file mode 100644 index 000000000..00ec220c9 --- /dev/null +++ b/yardstick/tests/unit/common/messaging/test_payloads.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from yardstick.common import exceptions +from yardstick.common.messaging import payloads +from yardstick.tests.unit import base as ut_base + + +class _DummyPayload(payloads.Payload): + REQUIRED_FIELDS = {'version', 'key1', 'key2'} + + +class PayloadTestCase(ut_base.BaseUnitTestCase): + + def test__init(self): + payload = _DummyPayload(version=1, key1='value1', key2='value2') + self.assertEqual(1, payload.version) + self.assertEqual('value1', payload.key1) + self.assertEqual('value2', payload.key2) + self.assertEqual(3, len(payload._fields)) + + def test__init_missing_required_fields(self): + with self.assertRaises(exceptions.PayloadMissingAttributes): + _DummyPayload(key1='value1', key2='value2') + + def test_obj_to_dict(self): + payload = _DummyPayload(version=1, key1='value1', key2='value2') + payload_dict = payload.obj_to_dict() + self.assertEqual({'version': 1, 'key1': 'value1', 'key2': 'value2'}, + payload_dict) + + def test_dict_to_obj(self): + _dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'} + payload = _DummyPayload.dict_to_obj(_dict) + self.assertEqual(set(_dict.keys()), payload._fields) diff --git a/yardstick/tests/unit/common/messaging/test_producer.py b/yardstick/tests/unit/common/messaging/test_producer.py new file mode 100644 index 000000000..0289689dc --- /dev/null +++ b/yardstick/tests/unit/common/messaging/test_producer.py @@ -0,0 +1,46 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo_config import cfg +import oslo_messaging + +from yardstick.common import messaging +from yardstick.common.messaging import producer +from yardstick.tests.unit import base as ut_base + + +class _MessagingProducer(producer.MessagingProducer): + pass + + +class MessagingProducerTestCase(ut_base.BaseUnitTestCase): + + def test__init(self): + with mock.patch.object(oslo_messaging, 'RPCClient') as \ + mock_RPCClient, \ + mock.patch.object(oslo_messaging, 'get_rpc_transport') as \ + mock_get_rpc_transport, \ + mock.patch.object(oslo_messaging, 'Target') as \ + mock_Target: + mock_get_rpc_transport.return_value = 'test_rpc_transport' + mock_Target.return_value = 'test_Target' + + _MessagingProducer('test_topic', 'test_pid', fanout=True) + mock_get_rpc_transport.assert_called_once_with( + cfg.CONF, url=messaging.TRANSPORT_URL) + mock_Target.assert_called_once_with( + topic='test_topic', fanout=True, server=messaging.SERVER) + mock_RPCClient.assert_called_once_with('test_rpc_transport', + 'test_Target') -- cgit 1.2.3-korg