From 8253157792c9785df5dda1372adc6c71a65680c4 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Fri, 13 Apr 2018 18:40:38 +0100 Subject: MessagingConsumer accepts messages from multiple producers The messaging consumer now can store a list of PID of several producers. The notification handler can compare the procedence of a message from a list of PID. JIRA: YARDSTICK-1074 Change-Id: I193f83c2b471e5bf1298ac728be52533aded0c1a Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/common/messaging/consumer.py | 14 +++---- .../functional/common/messaging/test_messaging.py | 45 ++++++++++++---------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py index a0feeb300..24ec6f184 100644 --- a/yardstick/common/messaging/consumer.py +++ b/yardstick/common/messaging/consumer.py @@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__) class NotificationHandler(object): """Abstract class to define a endpoint object for a MessagingConsumer""" - def __init__(self, id, ctx_pid, queue): - self._id = id - self._ctx_pid = ctx_pid + def __init__(self, _id, ctx_pids, queue): + self._id = _id + self._ctx_pids = ctx_pids self._queue = queue @@ -43,12 +43,12 @@ class MessagingConsumer(object): the messages published by a `MessagingNotifier`. """ - def __init__(self, topic, pid, endpoints, fanout=True): + def __init__(self, topic, pids, endpoints, fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pid: (int) PID of the process implementing the MQ Notifier which - will be in the message context + :param pids: (list of int) list of PIDs of the processes implementing + the MQ Notifier which will be in the message context :param endpoints: (list of class) list of classes implementing the methods (see `MessagingNotifier.send_message) used by the Notifier @@ -58,7 +58,7 @@ class MessagingConsumer(object): :returns: `MessagingConsumer` class object """ - self._pid = pid + self._pids = pids self._endpoints = endpoints self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py index 96deeb35b..99874343b 100644 --- a/yardstick/tests/functional/common/messaging/test_messaging.py +++ b/yardstick/tests/functional/common/messaging/test_messaging.py @@ -13,7 +13,6 @@ # limitations under the License. import multiprocessing -import os import time from yardstick.common.messaging import consumer @@ -33,24 +32,25 @@ class DummyPayload(payloads.Payload): class DummyEndpoint(consumer.NotificationHandler): def info(self, ctxt, **kwargs): - if ctxt['pid'] == self._ctx_pid: - self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data'])) + if ctxt['pid'] in self._ctx_pids: + self._queue.put('ID {}, data: {}, pid: {}'.format( + self._id, kwargs['data'], ctxt['pid'])) class DummyConsumer(consumer.MessagingConsumer): - def __init__(self, id, ctx_pid, queue): - self._id = id - endpoints = [DummyEndpoint(id, ctx_pid, queue)] - super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints) + def __init__(self, _id, ctx_pids, queue): + self._id = _id + endpoints = [DummyEndpoint(_id, ctx_pids, queue)] + super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints) class DummyProducer(producer.MessagingProducer): pass -def _run_consumer(id, ctx_pid, queue): - _consumer = DummyConsumer(id, ctx_pid, queue) +def _run_consumer(_id, ctx_pids, queue): + _consumer = DummyConsumer(_id, ctx_pids, queue) _consumer.start_rpc_server() _consumer.wait() @@ -65,30 +65,35 @@ class MessagingTestCase(base.BaseFunctionalTestCase): def test_run_five_consumers(self): output_queue = multiprocessing.Queue() num_consumers = 10 - ctx_id = os.getpid() - producer = DummyProducer(TOPIC, pid=ctx_id) + ctx_1 = 100001 + ctx_2 = 100002 + producers = [DummyProducer(TOPIC, pid=ctx_1), + DummyProducer(TOPIC, pid=ctx_2)] processes = [] for i in range(num_consumers): processes.append(multiprocessing.Process( name='consumer_{}'.format(i), target=_run_consumer, - args=(i, ctx_id, output_queue))) + args=(i, [ctx_1, ctx_2], output_queue))) processes[i].start() self.addCleanup(self._terminate_consumers, num_consumers, processes) time.sleep(2) # Let consumers to create the listeners - producer.send_message(METHOD_INFO, DummyPayload(version=1, - data='message 0')) - producer.send_message(METHOD_INFO, DummyPayload(version=1, - data='message 1')) - time.sleep(2) # Let consumers attend the calls + for producer in producers: + for message in ['message 0', 'message 1']: + producer.send_message(METHOD_INFO, + DummyPayload(version=1, data=message)) + time.sleep(2) # Let consumers attend the calls output = [] while not output_queue.empty(): output.append(output_queue.get(True, 1)) - self.assertEqual(num_consumers * 2, len(output)) + self.assertEqual(num_consumers * 4, len(output)) + msg_template = 'ID {}, data: {}, pid: {}' for i in range(num_consumers): - self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output) - self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output) + for ctx in [ctx_1, ctx_2]: + for message in ['message 0', 'message 1']: + msg = msg_template.format(i, message, ctx) + self.assertIn(msg, output) -- cgit 1.2.3-korg