aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/tests/functional
diff options
context:
space:
mode:
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-03-14 11:27:25 +0000
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-04-13 16:51:18 +0000
commitaed4e1e61e4e808ec5b6ef594fb2db34dfaeddf4 (patch)
tree0f0aa0421284f6a83589931ee4538b8bcdc351b7 /yardstick/tests/functional
parent5be3ad75d9c46e7b816418d4ddf410b1f67110c6 (diff)
Add MQ consumer, producer and payload base classes
Added MessagingProducer base class. A class implementing this base class can send a cast message using the MQ service installed in the Yardstick jumphost (RabbitMQ by default, other MQ could be implemented). The producer will send messages to an specific topic. Added MessagingConsumer base class. A class implementing this base class will be able to receive any message sent by a MessagingPorducer class publishing messages in the topic subscribed. By default both Producer and Consumer "fanout" is True. That means every Consumer will create a fanout Queue attached to the MQ Exchange topic. All Consumers attached to this topic will receive the message sent by the Producer [1]. Added Payload base class. To send data through the message queue service, a Payload derived object should be created. This base class allows to define the attributes container in the class, convert the object to a dict and retrieve the object from a dict. Added a new library, "oslo.messaging", to implement the RPC client and server. [1]http://blog.thedigitalcatonline.com/blog/2013/08/21/some-tips-about-amqp-direct-exchanges/ JIRA: YARDSTICK-1074 Change-Id: I63932b5fb3de2bdc1270fc83295630a2a349e2a6 Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Diffstat (limited to 'yardstick/tests/functional')
-rw-r--r--yardstick/tests/functional/common/messaging/__init__.py0
-rw-r--r--yardstick/tests/functional/common/messaging/test_messaging.py94
2 files changed, 94 insertions, 0 deletions
diff --git a/yardstick/tests/functional/common/messaging/__init__.py b/yardstick/tests/functional/common/messaging/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/yardstick/tests/functional/common/messaging/__init__.py
diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py
new file mode 100644
index 000000000..96deeb35b
--- /dev/null
+++ b/yardstick/tests/functional/common/messaging/test_messaging.py
@@ -0,0 +1,94 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+import os
+import time
+
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.tests.functional import base
+
+
+TOPIC = 'topic_MQ'
+METHOD_INFO = 'info'
+
+
+class DummyPayload(payloads.Payload):
+ REQUIRED_FIELDS = {'version', 'data'}
+
+
+class DummyEndpoint(consumer.NotificationHandler):
+
+ def info(self, ctxt, **kwargs):
+ if ctxt['pid'] == self._ctx_pid:
+ self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data']))
+
+
+class DummyConsumer(consumer.MessagingConsumer):
+
+ def __init__(self, id, ctx_pid, queue):
+ self._id = id
+ endpoints = [DummyEndpoint(id, ctx_pid, queue)]
+ super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints)
+
+
+class DummyProducer(producer.MessagingProducer):
+ pass
+
+
+def _run_consumer(id, ctx_pid, queue):
+ _consumer = DummyConsumer(id, ctx_pid, queue)
+ _consumer.start_rpc_server()
+ _consumer.wait()
+
+
+class MessagingTestCase(base.BaseFunctionalTestCase):
+
+ @staticmethod
+ def _terminate_consumers(num_consumers, processes):
+ for i in range(num_consumers):
+ processes[i].terminate()
+
+ def test_run_five_consumers(self):
+ output_queue = multiprocessing.Queue()
+ num_consumers = 10
+ ctx_id = os.getpid()
+ producer = DummyProducer(TOPIC, pid=ctx_id)
+
+ processes = []
+ for i in range(num_consumers):
+ processes.append(multiprocessing.Process(
+ name='consumer_{}'.format(i),
+ target=_run_consumer,
+ args=(i, ctx_id, output_queue)))
+ processes[i].start()
+ self.addCleanup(self._terminate_consumers, num_consumers, processes)
+
+ time.sleep(2) # Let consumers to create the listeners
+ producer.send_message(METHOD_INFO, DummyPayload(version=1,
+ data='message 0'))
+ producer.send_message(METHOD_INFO, DummyPayload(version=1,
+ data='message 1'))
+ time.sleep(2) # Let consumers attend the calls
+
+ output = []
+ while not output_queue.empty():
+ output.append(output_queue.get(True, 1))
+
+ self.assertEqual(num_consumers * 2, len(output))
+ for i in range(num_consumers):
+ self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output)
+ self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output)