summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-04-22 19:05:00 +0000
committerGerrit Code Review <gerrit@opnfv.org>2018-04-22 19:05:00 +0000
commit0aa73e85efc60a9f5f0dc1f063aad1d342a123ca (patch)
treef338781299c0787f0f9b5baca3ccbec2b5ee098b
parent004d162adb417c0f89a79db0b24e83fbe1b83356 (diff)
parent8253157792c9785df5dda1372adc6c71a65680c4 (diff)
Merge "MessagingConsumer accepts messages from multiple producers"
-rw-r--r--yardstick/common/messaging/consumer.py14
-rw-r--r--yardstick/tests/functional/common/messaging/test_messaging.py45
2 files changed, 32 insertions, 27 deletions
diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py
index a0feeb300..24ec6f184 100644
--- a/yardstick/common/messaging/consumer.py
+++ b/yardstick/common/messaging/consumer.py
@@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__)
class NotificationHandler(object):
"""Abstract class to define a endpoint object for a MessagingConsumer"""
- def __init__(self, id, ctx_pid, queue):
- self._id = id
- self._ctx_pid = ctx_pid
+ def __init__(self, _id, ctx_pids, queue):
+ self._id = _id
+ self._ctx_pids = ctx_pids
self._queue = queue
@@ -43,12 +43,12 @@ class MessagingConsumer(object):
the messages published by a `MessagingNotifier`.
"""
- def __init__(self, topic, pid, endpoints, fanout=True):
+ def __init__(self, topic, pids, endpoints, fanout=True):
"""Init function.
:param topic: (string) MQ exchange topic
- :param pid: (int) PID of the process implementing the MQ Notifier which
- will be in the message context
+ :param pids: (list of int) list of PIDs of the processes implementing
+ the MQ Notifier which will be in the message context
:param endpoints: (list of class) list of classes implementing the
methods (see `MessagingNotifier.send_message) used by
the Notifier
@@ -58,7 +58,7 @@ class MessagingConsumer(object):
:returns: `MessagingConsumer` class object
"""
- self._pid = pid
+ self._pids = pids
self._endpoints = endpoints
self._transport = oslo_messaging.get_rpc_transport(
cfg.CONF, url=messaging.TRANSPORT_URL)
diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py
index 96deeb35b..99874343b 100644
--- a/yardstick/tests/functional/common/messaging/test_messaging.py
+++ b/yardstick/tests/functional/common/messaging/test_messaging.py
@@ -13,7 +13,6 @@
# limitations under the License.
import multiprocessing
-import os
import time
from yardstick.common.messaging import consumer
@@ -33,24 +32,25 @@ class DummyPayload(payloads.Payload):
class DummyEndpoint(consumer.NotificationHandler):
def info(self, ctxt, **kwargs):
- if ctxt['pid'] == self._ctx_pid:
- self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data']))
+ if ctxt['pid'] in self._ctx_pids:
+ self._queue.put('ID {}, data: {}, pid: {}'.format(
+ self._id, kwargs['data'], ctxt['pid']))
class DummyConsumer(consumer.MessagingConsumer):
- def __init__(self, id, ctx_pid, queue):
- self._id = id
- endpoints = [DummyEndpoint(id, ctx_pid, queue)]
- super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints)
+ def __init__(self, _id, ctx_pids, queue):
+ self._id = _id
+ endpoints = [DummyEndpoint(_id, ctx_pids, queue)]
+ super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints)
class DummyProducer(producer.MessagingProducer):
pass
-def _run_consumer(id, ctx_pid, queue):
- _consumer = DummyConsumer(id, ctx_pid, queue)
+def _run_consumer(_id, ctx_pids, queue):
+ _consumer = DummyConsumer(_id, ctx_pids, queue)
_consumer.start_rpc_server()
_consumer.wait()
@@ -65,30 +65,35 @@ class MessagingTestCase(base.BaseFunctionalTestCase):
def test_run_five_consumers(self):
output_queue = multiprocessing.Queue()
num_consumers = 10
- ctx_id = os.getpid()
- producer = DummyProducer(TOPIC, pid=ctx_id)
+ ctx_1 = 100001
+ ctx_2 = 100002
+ producers = [DummyProducer(TOPIC, pid=ctx_1),
+ DummyProducer(TOPIC, pid=ctx_2)]
processes = []
for i in range(num_consumers):
processes.append(multiprocessing.Process(
name='consumer_{}'.format(i),
target=_run_consumer,
- args=(i, ctx_id, output_queue)))
+ args=(i, [ctx_1, ctx_2], output_queue)))
processes[i].start()
self.addCleanup(self._terminate_consumers, num_consumers, processes)
time.sleep(2) # Let consumers to create the listeners
- producer.send_message(METHOD_INFO, DummyPayload(version=1,
- data='message 0'))
- producer.send_message(METHOD_INFO, DummyPayload(version=1,
- data='message 1'))
- time.sleep(2) # Let consumers attend the calls
+ for producer in producers:
+ for message in ['message 0', 'message 1']:
+ producer.send_message(METHOD_INFO,
+ DummyPayload(version=1, data=message))
+ time.sleep(2) # Let consumers attend the calls
output = []
while not output_queue.empty():
output.append(output_queue.get(True, 1))
- self.assertEqual(num_consumers * 2, len(output))
+ self.assertEqual(num_consumers * 4, len(output))
+ msg_template = 'ID {}, data: {}, pid: {}'
for i in range(num_consumers):
- self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output)
- self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output)
+ for ctx in [ctx_1, ctx_2]:
+ for message in ['message 0', 'message 1']:
+ msg = msg_template.format(i, message, ctx)
+ self.assertIn(msg, output)