diff options
Diffstat (limited to 'yardstick/tests/functional/common/messaging/test_messaging.py')
-rw-r--r-- | yardstick/tests/functional/common/messaging/test_messaging.py | 94 |
1 files changed, 94 insertions, 0 deletions
diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py new file mode 100644 index 000000000..96deeb35b --- /dev/null +++ b/yardstick/tests/functional/common/messaging/test_messaging.py @@ -0,0 +1,94 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import os +import time + +from yardstick.common.messaging import consumer +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer +from yardstick.tests.functional import base + + +TOPIC = 'topic_MQ' +METHOD_INFO = 'info' + + +class DummyPayload(payloads.Payload): + REQUIRED_FIELDS = {'version', 'data'} + + +class DummyEndpoint(consumer.NotificationHandler): + + def info(self, ctxt, **kwargs): + if ctxt['pid'] == self._ctx_pid: + self._queue.put('ID {}, data: {}'.format(self._id, kwargs['data'])) + + +class DummyConsumer(consumer.MessagingConsumer): + + def __init__(self, id, ctx_pid, queue): + self._id = id + endpoints = [DummyEndpoint(id, ctx_pid, queue)] + super(DummyConsumer, self).__init__(TOPIC, ctx_pid, endpoints) + + +class DummyProducer(producer.MessagingProducer): + pass + + +def _run_consumer(id, ctx_pid, queue): + _consumer = DummyConsumer(id, ctx_pid, queue) + _consumer.start_rpc_server() + _consumer.wait() + + +class MessagingTestCase(base.BaseFunctionalTestCase): + + @staticmethod + def _terminate_consumers(num_consumers, processes): + for i in range(num_consumers): + processes[i].terminate() + + def test_run_five_consumers(self): + output_queue = multiprocessing.Queue() + num_consumers = 10 + ctx_id = os.getpid() + producer = DummyProducer(TOPIC, pid=ctx_id) + + processes = [] + for i in range(num_consumers): + processes.append(multiprocessing.Process( + name='consumer_{}'.format(i), + target=_run_consumer, + args=(i, ctx_id, output_queue))) + processes[i].start() + self.addCleanup(self._terminate_consumers, num_consumers, processes) + + time.sleep(2) # Let consumers to create the listeners + producer.send_message(METHOD_INFO, DummyPayload(version=1, + data='message 0')) + producer.send_message(METHOD_INFO, DummyPayload(version=1, + data='message 1')) + time.sleep(2) # Let consumers attend the calls + + output = [] + while not output_queue.empty(): + output.append(output_queue.get(True, 1)) + + self.assertEqual(num_consumers * 2, len(output)) + for i in range(num_consumers): + self.assertIn('ID {}, data: {}'.format(1, 'message 0'), output) + self.assertIn('ID {}, data: {}'.format(1, 'message 1'), output) |