From 3aed8d1ad85f8a128a22a0a0f2436eb94562799f Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Fri, 20 Apr 2018 11:34:41 +0100 Subject: Add "IterationIPC" runner "IterationIPC" is a runner that runs a configurable number of times before it returns. Each iteration has a configurable timeout. The loop control depends on the feedback received from the running VNFs. The context PIDs from the VNFs to listen the messages from are given in the scenario "setup" method. The MQ consumer, "RunnerIterationIPCConsumer", is subscribed to messages sent to topic "messaging.TOPIC_TG". The endpoints are defined in the class "RunnerIterationIPCEndpoint". Three are implemented: "messaging.TG_METHOD_STARTED", "messaging.TG_METHOD_FINISHED" and "messaging.TG_METHOD_ITERATION". JIRA: YARDSTICK-1127 Change-Id: I4b2f11a15ef41e6f3c70bd64188d5c7fbcdb7eed Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/benchmark/runners/iteration_ipc.py | 200 +++++++++++++++++++++ yardstick/common/exceptions.py | 9 + yardstick/common/messaging/__init__.py | 10 +- yardstick/common/messaging/payloads.py | 12 ++ .../unit/benchmark/runner/test_iteration_ipc.py | 136 ++++++++++++++ .../tests/unit/common/messaging/test_payloads.py | 19 ++ 6 files changed, 382 insertions(+), 4 deletions(-) create mode 100644 yardstick/benchmark/runners/iteration_ipc.py create mode 100644 yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py new file mode 100644 index 000000000..690f2660b --- /dev/null +++ b/yardstick/benchmark/runners/iteration_ipc.py @@ -0,0 +1,200 @@ +# Copyright 2018: Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""A runner that runs a configurable number of times before it returns. Each + iteration has a configurable timeout. The loop control depends on the + feedback received from the running VNFs. The context PIDs from the VNFs + to listen the messages from are given in the scenario "setup" method. +""" + +import logging +import multiprocessing +import time +import traceback + +import os + +from yardstick.benchmark.runners import base +from yardstick.common import exceptions +from yardstick.common import messaging +from yardstick.common import utils +from yardstick.common.messaging import consumer +from yardstick.common.messaging import payloads + + +LOG = logging.getLogger(__name__) + +QUEUE_PUT_TIMEOUT = 10 +ITERATION_TIMEOUT = 180 + + +class RunnerIterationIPCEndpoint(consumer.NotificationHandler): + """Endpoint class for ``RunnerIterationIPCConsumer``""" + + def tg_method_started(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_pids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_STARTED, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}, + QUEUE_PUT_TIMEOUT) + + def tg_method_finished(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_pids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_FINISHED, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}) + + def tg_method_iteration(self, ctxt, **kwargs): + if ctxt['id'] in self._ctx_pids: + self._queue.put( + {'id': ctxt['id'], + 'action': messaging.TG_METHOD_ITERATION, + 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( + kwargs)}) + + +class RunnerIterationIPCConsumer(consumer.MessagingConsumer): + """MQ consumer for "IterationIPC" runner""" + + def __init__(self, _id, ctx_ids): + self._id = _id + self._queue = multiprocessing.Queue() + endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)] + super(RunnerIterationIPCConsumer, self).__init__( + messaging.TOPIC_TG, ctx_ids, endpoints) + self._kpi_per_id = {ctx: [] for ctx in ctx_ids} + self.iteration_index = None + + def is_all_kpis_received_in_iteration(self): + """Check if all producers registered have sent the ITERATION msg + + During the present iteration, all producers (traffic generators) must + start and finish the traffic injection, and at the end of the traffic + injection a TG_METHOD_ITERATION must be sent. This function will check + all KPIs in the present iteration are received. E.g.: + self.iteration_index = 2 + + self._kpi_per_id = { + 'ctx1': [kpi0, kpi1, kpi2], + 'ctx2': [kpi0, kpi1]} --> return False + + self._kpi_per_id = { + 'ctx1': [kpi0, kpi1, kpi2], + 'ctx2': [kpi0, kpi1, kpi2]} --> return True + """ + while not self._queue.empty(): + msg = self._queue.get(True, 1) + if msg['action'] == messaging.TG_METHOD_ITERATION: + id_iter_list = self._kpi_per_id[msg['id']] + id_iter_list.append(msg['payload'].kpi) + + return all(len(id_iter_list) == self.iteration_index + for id_iter_list in self._kpi_per_id.values()) + + +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): # pragma: no cover + runner_cfg = scenario_cfg['runner'] + + timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT) + iterations = runner_cfg.get('iterations', 1) + run_step = runner_cfg.get('run_step', 'setup,run,teardown') + LOG.info('Worker START. Iterations %d times, class %s', iterations, cls) + + runner_cfg['runner_id'] = os.getpid() + + benchmark = cls(scenario_cfg, context_cfg) + method = getattr(benchmark, method_name) + + if 'setup' not in run_step: + raise exceptions.RunnerIterationIPCSetupActionNeeded() + producer_ctxs = benchmark.setup() + if not producer_ctxs: + raise exceptions.RunnerIterationIPCNoCtxs() + + mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs) + mq_consumer.start_rpc_server() + + iteration_index = 1 + while 'run' in run_step: + LOG.debug('runner=%(runner)s seq=%(sequence)s START', + {'runner': runner_cfg['runner_id'], + 'sequence': iteration_index}) + data = {} + result = None + errors = '' + mq_consumer.iteration_index = iteration_index + + try: + utils.wait_until_true( + mq_consumer.is_all_kpis_received_in_iteration, + timeout=timeout, sleep=2) + result = method(data) + except Exception: # pylint: disable=broad-except + errors = traceback.format_exc() + LOG.exception(errors) + + if result: + output_queue.put(result, True, QUEUE_PUT_TIMEOUT) + benchmark_output = {'timestamp': time.time(), + 'sequence': iteration_index, + 'data': data, + 'errors': errors} + queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) + + LOG.debug('runner=%(runner)s seq=%(sequence)s END', + {'runner': runner_cfg['runner_id'], + 'sequence': iteration_index}) + + iteration_index += 1 + if iteration_index > iterations or aborted.is_set(): + LOG.info('"IterationIPC" worker END') + break + + if 'teardown' in run_step: + try: + benchmark.teardown() + except Exception: + LOG.exception('Exception during teardown process') + mq_consumer.stop_rpc_server() + raise SystemExit(1) + + LOG.debug('Data queue size = %s', queue.qsize()) + LOG.debug('Output queue size = %s', output_queue.qsize()) + mq_consumer.stop_rpc_server() + + +class IterationIPCRunner(base.Runner): + """Run a scenario for a configurable number of times. + + Each iteration has a configurable timeout. The loop control depends on the + feedback received from the running VNFs. The context PIDs from the VNFs to + listen the messages from are given in the scenario "setup" method. + """ + __execution_type__ = 'IterationIPC' + + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = '{}-{}-{}'.format( + self.__execution_type__, scenario_cfg.get('type'), os.getpid()) + self.process = multiprocessing.Process( + name=name, + target=_worker_process, + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) + self.process.start() diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py index 935c77866..8afd27085 100644 --- a/yardstick/common/exceptions.py +++ b/yardstick/common/exceptions.py @@ -191,6 +191,15 @@ class TaskRenderError(YardstickException): message = 'Failed to render template:\n%(input_task)s' +class RunnerIterationIPCSetupActionNeeded(YardstickException): + message = ('IterationIPC needs the "setup" action to retrieve the VNF ' + 'handling processes PIDs to receive the messages sent') + + +class RunnerIterationIPCNoCtxs(YardstickException): + message = 'Benchmark "setup" action did not return any VNF process PID' + + class TimerTimeout(YardstickException): message = 'Timer timeout expired, %(timeout)s seconds' diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py index f0f012ec3..dc8c51b70 100644 --- a/yardstick/common/messaging/__init__.py +++ b/yardstick/common/messaging/__init__.py @@ -28,9 +28,11 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER + RPC_SERVER_EXECUTOR = 'threading' # Topics. -RUNNER = 'runner' +TOPIC_TG = 'topic_traffic_generator' # Methods. -# RUNNER methods: -RUNNER_INFO = 'runner_info' -RUNNER_LOOP = 'runner_loop' +# Traffic generator consumers methods. Names must match the methods implemented +# in the consumer endpoint class, ``RunnerIterationIPCEndpoint``. +TG_METHOD_STARTED = 'tg_method_started' +TG_METHOD_FINISHED = 'tg_method_finished' +TG_METHOD_ITERATION = 'tg_method_iteration' diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py index d29d79808..c59c87536 100644 --- a/yardstick/common/messaging/payloads.py +++ b/yardstick/common/messaging/payloads.py @@ -51,3 +51,15 @@ class Payload(object): def dict_to_obj(cls, _dict): """Returns a Payload object built from the dictionary elements""" return cls(**_dict) + + +class TrafficGeneratorPayload(Payload): + """Base traffic generator payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'iteration', # (int) iteration index during the traffic injection, + # starting from 1. + 'kpi' # (dict) collection of KPIs collected from the traffic + # injection. The content will depend on the generator and the + # traffic type. + } diff --git a/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py new file mode 100644 index 000000000..10d14a8a0 --- /dev/null +++ b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py @@ -0,0 +1,136 @@ +# Copyright (c) 2018 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import time +import os +import uuid + +import mock + +from yardstick.benchmark.runners import iteration_ipc +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.tests.unit import base as ut_base + + +class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int] + self._queue = multiprocessing.Queue() + self.runner = iteration_ipc.RunnerIterationIPCEndpoint( + self._id, self._ctx_ids, self._queue) + self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}} + self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj( + self._kwargs).obj_to_dict() + + def test_tg_method_started(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_started(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + def test_tg_method_finished(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_finished(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + def test_tg_method_iteration(self): + self._queue.empty() + ctxt = {'id': self._ctx_ids[0]} + self.runner.tg_method_iteration(ctxt, **self._kwargs) + time.sleep(0.2) + + output = [] + while not self._queue.empty(): + output.append(self._queue.get(True, 1)) + + self.assertEqual(1, len(output)) + self.assertEqual(self._ctx_ids[0], output[0]['id']) + self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action']) + self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict()) + + +class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase): + + def setUp(self): + self._id = uuid.uuid1().int + self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int] + self.consumer = iteration_ipc.RunnerIterationIPCConsumer( + self._id, self._ctx_ids) + self.consumer._queue = mock.Mock() + + def test__init(self): + self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []}, + self.consumer._kpi_per_id) + + def test_is_all_kpis_received_in_iteration(self): + payload = payloads.TrafficGeneratorPayload( + version=1, iteration=1, kpi={}) + msg1 = {'action': messaging.TG_METHOD_ITERATION, + 'id': self._ctx_ids[0], 'payload': payload} + msg2 = {'action': messaging.TG_METHOD_ITERATION, + 'id': self._ctx_ids[1], 'payload': payload} + self.consumer.iteration_index = 1 + + self.consumer._queue.empty.side_effect = [False, True] + self.consumer._queue.get.return_value = msg1 + self.assertFalse(self.consumer.is_all_kpis_received_in_iteration()) + + self.consumer._queue.empty.side_effect = [False, True] + self.consumer._queue.get.return_value = msg2 + self.assertTrue(self.consumer.is_all_kpis_received_in_iteration()) + + +class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase): + + @mock.patch.object(iteration_ipc, '_worker_process') + @mock.patch.object(os, 'getpid', return_value=12345678) + @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock()) + def test__run_benchmark(self, mock_process, mock_getpid, mock_worker): + method = 'method' + scenario_cfg = {'type': 'scenario_type'} + context_cfg = 'context_cfg' + name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678) + runner = iteration_ipc.IterationIPCRunner(mock.ANY) + mock_getpid.reset_mock() + + runner._run_benchmark('class', method, scenario_cfg, context_cfg) + mock_process.assert_called_once_with( + name=name, + target=mock_worker, + args=(runner.result_queue, 'class', method, scenario_cfg, + context_cfg, runner.aborted, runner.output_queue)) + mock_getpid.assert_called_once() diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py index 00ec220c9..7930b8d34 100644 --- a/yardstick/tests/unit/common/messaging/test_payloads.py +++ b/yardstick/tests/unit/common/messaging/test_payloads.py @@ -44,3 +44,22 @@ class PayloadTestCase(ut_base.BaseUnitTestCase): _dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'} payload = _DummyPayload.dict_to_obj(_dict) self.assertEqual(set(_dict.keys()), payload._fields) + + +class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase): + + def test_init(self): + tg_payload = payloads.TrafficGeneratorPayload( + version=1, iteration=10, kpi={'key1': 'value1'}) + self.assertEqual(1, tg_payload.version) + self.assertEqual(10, tg_payload.iteration) + self.assertEqual({'key1': 'value1'}, tg_payload.kpi) + self.assertEqual(3, len(tg_payload._fields)) + + def test__init_missing_required_fields(self): + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(version=1, iteration=10) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(iteration=10, kpi={}) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.TrafficGeneratorPayload(iteration=10) -- cgit From 5ea92a3ba1c5bbd011c9d9ae0f2c686aa59417af Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Sun, 22 Apr 2018 19:52:49 +0100 Subject: Add "TrafficGeneratorProducer" for "GenericTrafficGen" instances Added "TrafficGeneratorProducer" class, a message producer specific for "GenericTrafficGen" derived classes. JIRA: YARDSTICK-1127 Change-Id: Icc87a6920155612e759a1d4d2f29028940c2e040 Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/network_services/vnf_generic/vnf/base.py | 21 +++++++++++++++++ .../network_services/vnf_generic/vnf/test_base.py | 27 +++++++++++++++++++++- 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 9ceac3167..804257fe9 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -18,6 +18,8 @@ import abc import logging import six +from yardstick.common import messaging +from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -138,6 +140,17 @@ class VnfdHelper(dict): yield port_name, port_num +class TrafficGeneratorProducer(producer.MessagingProducer): + """Class implementing the message producer for traffic generators + + This message producer must be instantiated in the process created + "run_traffic" process. + """ + def __init__(self, pid): + super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, + pid=pid) + + @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): """Class providing file-like API for generic VNF implementation @@ -216,6 +229,7 @@ class GenericTrafficGen(GenericVNF): super(GenericTrafficGen, self).__init__(name, vnfd) self.runs_traffic = True self.traffic_finished = False + self._mq_producer = None @abc.abstractmethod def run_traffic(self, traffic_profile): @@ -286,3 +300,10 @@ class GenericTrafficGen(GenericVNF): :return: True/False """ pass + + def _setup_mq_producer(self, pid): + """Setup the TG MQ producer to send messages between processes + + :return: (derived class from ``MessagingProducer``) MQ producer object + """ + return TrafficGeneratorProducer(pid) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py index ebedcb451..ca3831ce8 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py @@ -15,10 +15,14 @@ import multiprocessing import os +import uuid import mock +from oslo_config import cfg +import oslo_messaging import unittest +from yardstick.common import messaging from yardstick.network_services.vnf_generic.vnf import base from yardstick.ssh import SSH @@ -221,7 +225,7 @@ class TestGenericVNF(unittest.TestCase): self.assertEqual(msg, str(exc.exception)) -class TestGenericTrafficGen(unittest.TestCase): +class GenericTrafficGenTestCase(unittest.TestCase): def test_definition(self): """Make sure that the abstract class cannot be instantiated""" @@ -234,3 +238,24 @@ class TestGenericTrafficGen(unittest.TestCase): "abstract methods collect_kpi, instantiate, run_traffic, " "scale, terminate") self.assertEqual(msg, str(exc.exception)) + + +class TrafficGeneratorProducerTestCase(unittest.TestCase): + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(cfg, 'CONF') + def test__init(self, mock_config, mock_transport, mock_rpcclient, + mock_target): + pid = uuid.uuid1().int + tg_producer = base.TrafficGeneratorProducer(pid) + mock_transport.assert_called_once_with( + mock_config, url='rabbit://yardstick:yardstick@localhost:5672/') + mock_target.assert_called_once_with(topic=messaging.TOPIC_TG, + fanout=True, + server=messaging.SERVER) + mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') + self.assertEqual(pid, tg_producer._pid) + self.assertEqual(messaging.TOPIC_TG, tg_producer._topic) -- cgit From 5ca6368e609d9dc8ec19590835d91b36b2d1fa78 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Mon, 23 Apr 2018 17:06:25 +0100 Subject: Enable the MQ producer in "SampleVNFTrafficGen" class Now all traffic generators using the default implementation of "ClientResourceHelper.run_traffic" will update the status of the traffic injection using the MQ. The available methods are listed in common.messsaging (VNF_METHOD_*): - tg_method_started: VNF app started - tg_method_finished: VNF app finished - tg_method_iteration: VNF app execution loop started JIRA: YARDSTICK-1127 Change-Id: I12829e1762fc20cc95da3b50767a66f031e25ee8 Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/network_services/vnf_generic/vnf/base.py | 23 ++++++++++ .../vnf_generic/vnf/prox_helpers.py | 2 +- .../network_services/vnf_generic/vnf/sample_vnf.py | 26 +++++++---- .../network_services/vnf_generic/vnf/tg_ping.py | 2 +- .../vnf_generic/vnf/tg_rfc2544_ixia.py | 2 +- .../network_services/vnf_generic/vnf/test_base.py | 49 +++++++++++++++++++++ .../vnf_generic/vnf/test_sample_vnf.py | 51 ++++++++++++++++++++++ .../vnf_generic/vnf/test_tg_prox.py | 3 +- .../vnf_generic/vnf/test_tg_rfc2544_ixia.py | 3 +- .../vnf_generic/vnf/test_tg_trex.py | 3 +- 10 files changed, 150 insertions(+), 14 deletions(-) diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index 804257fe9..c385eb6fd 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -19,6 +19,7 @@ import logging import six from yardstick.common import messaging +from yardstick.common.messaging import payloads from yardstick.common.messaging import producer from yardstick.network_services.helpers.samplevnf_helper import PortPairs @@ -150,6 +151,28 @@ class TrafficGeneratorProducer(producer.MessagingProducer): super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, pid=pid) + def tg_method_started(self, version=1): + """Send a message to inform the traffic generation has started""" + self.send_message( + messaging.TG_METHOD_STARTED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_finished(self, version=1): + """Send a message to inform the traffic generation has finished""" + self.send_message( + messaging.TG_METHOD_FINISHED, + payloads.TrafficGeneratorPayload(version=version, iteration=0, + kpi={})) + + def tg_method_iteration(self, iteration, version=1, kpi=None): + """Send a message, with KPI, once an iteration has finished""" + kpi = {} if kpi is None else kpi + self.send_message( + messaging.TG_METHOD_ITERATION, + payloads.TrafficGeneratorPayload(version=version, + iteration=iteration, kpi=kpi)) + @six.add_metaclass(abc.ABCMeta) class GenericVNF(object): diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 6d28f4750..3241719e8 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -969,7 +969,7 @@ class ProxResourceHelper(ClientResourceHelper): self._test_type = self.setup_helper.find_in_section('global', 'name', None) return self._test_type - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 1ee71aa25..a37f4f72d 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -14,14 +14,15 @@ import logging from multiprocessing import Queue, Value, Process - import os import posixpath import re -import six +import uuid import subprocess import time +import six + from trex_stl_lib.trex_stl_client import LoggerApi from trex_stl_lib.trex_stl_client import STLClient from trex_stl_lib.trex_stl_exceptions import STLError @@ -408,12 +409,13 @@ class ClientResourceHelper(ResourceHelper): time.sleep(self.QUEUE_WAIT_TIME) self._queue.put(samples) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, mq_producer): # if we don't do this we can hang waiting for the queue to drain # have to do this in the subprocess self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path + mq_producer.tg_method_started() try: self._build_ports() self.client = self._connect() @@ -421,8 +423,11 @@ class ClientResourceHelper(ResourceHelper): self.client.remove_all_streams(self.all_ports) # remove all streams traffic_profile.register_generator(self) + iteration_index = 0 while self._terminated.value == 0: + iteration_index += 1 self._run_traffic_once(traffic_profile) + mq_producer.tg_method_iteration(iteration_index) self.client.stop(self.all_ports) self.client.disconnect() @@ -433,6 +438,8 @@ class ClientResourceHelper(ResourceHelper): return # return if trex/tg server is stopped. raise + mq_producer.tg_method_finished() + def terminate(self): self._terminated.value = 1 # stop client @@ -911,12 +918,13 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile): + def _traffic_runner(self, traffic_profile, mq_pid): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self.resource_helper.run_traffic(traffic_profile) + self._mq_producer = self._setup_mq_producer(mq_pid) + self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): """ Generate traffic on the wire according to the given params. @@ -926,10 +934,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): :param traffic_profile: :return: True/False """ - name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, + name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, + traffic_profile.__class__.__name__, os.getpid()) + mq_pid = uuid.uuid1().int self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile,)) + args=(traffic_profile, mq_pid)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -938,7 +948,7 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return self._traffic_process.is_alive() + return mq_pid def collect_kpi(self): # check if the tg processes have exited diff --git a/yardstick/network_services/vnf_generic/vnf/tg_ping.py b/yardstick/network_services/vnf_generic/vnf/tg_ping.py index a989543f5..4050dc6e2 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_ping.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_ping.py @@ -71,7 +71,7 @@ class PingResourceHelper(ClientResourceHelper): self._queue = Queue() self._parser = PingParser(self._queue) - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): # drop the connection in order to force a new one self.ssh_helper.drop_connection() diff --git a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py index a1f9fbeb4..875ae93b9 100644 --- a/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py +++ b/yardstick/network_services/vnf_generic/vnf/tg_rfc2544_ixia.py @@ -102,7 +102,7 @@ class IxiaResourceHelper(ClientResourceHelper): self.client.assign_ports() self.client.create_traffic_model() - def run_traffic(self, traffic_profile): + def run_traffic(self, traffic_profile, *args): if self._terminated.value: return diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py index ca3831ce8..11cba4d79 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py @@ -23,6 +23,7 @@ import oslo_messaging import unittest from yardstick.common import messaging +from yardstick.common.messaging import payloads from yardstick.network_services.vnf_generic.vnf import base from yardstick.ssh import SSH @@ -259,3 +260,51 @@ class TrafficGeneratorProducerTestCase(unittest.TestCase): mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') self.assertEqual(pid, tg_producer._pid) self.assertEqual(messaging.TOPIC_TG, tg_producer._topic) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_started(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_started(version=10) + + mock_message.assert_called_once_with(messaging.TG_METHOD_STARTED, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=10, iteration=0, + kpi={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_finished(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_finished(version=20) + + mock_message.assert_called_once_with(messaging.TG_METHOD_FINISHED, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=20, iteration=0, + kpi={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'TrafficGeneratorPayload', + return_value='tg_pload') + def test_tg_method_iteration(self, mock_tg_payload, *args): + tg_producer = base.TrafficGeneratorProducer(uuid.uuid1().int) + with mock.patch.object(tg_producer, 'send_message') as mock_message: + tg_producer.tg_method_iteration(100, version=30, kpi={'k': 'v'}) + + mock_message.assert_called_once_with(messaging.TG_METHOD_ITERATION, + 'tg_pload') + mock_tg_payload.assert_called_once_with(version=30, iteration=100, + kpi={'k': 'v'}) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py index 331e80d00..48ae3b505 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py @@ -1090,6 +1090,57 @@ class TestClientResourceHelper(unittest.TestCase): self.assertIs(client_resource_helper._connect(client), client) + @mock.patch.object(ClientResourceHelper, '_build_ports') + @mock.patch.object(ClientResourceHelper, '_run_traffic_once') + def test_run_traffic(self, mock_run_traffic_once, mock_build_ports): + client_resource_helper = ClientResourceHelper(mock.Mock()) + client = mock.Mock() + traffic_profile = mock.Mock() + mq_producer = mock.Mock() + with mock.patch.object(client_resource_helper, '_connect') \ + as mock_connect, \ + mock.patch.object(client_resource_helper, '_terminated') \ + as mock_terminated: + mock_connect.return_value = client + type(mock_terminated).value = mock.PropertyMock( + side_effect=[0, 1, lambda x: x]) + client_resource_helper.run_traffic(traffic_profile, mq_producer) + + mock_build_ports.assert_called_once() + traffic_profile.register_generator.assert_called_once() + mq_producer.tg_method_started.assert_called_once() + mq_producer.tg_method_finished.assert_called_once() + mq_producer.tg_method_iteration.assert_called_once_with(1) + mock_run_traffic_once.assert_called_once_with(traffic_profile) + + @mock.patch.object(ClientResourceHelper, '_build_ports') + @mock.patch.object(ClientResourceHelper, '_run_traffic_once', + side_effect=Exception) + def test_run_traffic_exception(self, mock_run_traffic_once, + mock_build_ports): + client_resource_helper = ClientResourceHelper(mock.Mock()) + client = mock.Mock() + traffic_profile = mock.Mock() + mq_producer = mock.Mock() + with mock.patch.object(client_resource_helper, '_connect') \ + as mock_connect, \ + mock.patch.object(client_resource_helper, '_terminated') \ + as mock_terminated: + mock_connect.return_value = client + type(mock_terminated).value = mock.PropertyMock(return_value=0) + mq_producer.reset_mock() + # NOTE(ralonsoh): "trex_stl_exceptions.STLError" is mocked + with self.assertRaises(Exception): + client_resource_helper.run_traffic(traffic_profile, + mq_producer) + + mock_build_ports.assert_called_once() + traffic_profile.register_generator.assert_called_once() + mock_run_traffic_once.assert_called_once_with(traffic_profile) + mq_producer.tg_method_started.assert_called_once() + mq_producer.tg_method_finished.assert_not_called() + mq_producer.tg_method_iteration.assert_not_called() + class TestRfc2544ResourceHelper(unittest.TestCase): diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py index 050aa4aa0..3e2f598d2 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py @@ -406,7 +406,8 @@ class TestProxTrafficGen(unittest.TestCase): sut.setup_helper.prox_config_dict = {} sut._connect_client = mock.Mock(autospec=STLClient) sut._connect_client.get_stats = mock.Mock(return_value="0") - sut._traffic_runner(mock_traffic_profile) + sut._setup_mq_producer = mock.Mock(return_value='mq_producer') + sut._traffic_runner(mock_traffic_profile, mock.ANY) @mock.patch('yardstick.network_services.vnf_generic.vnf.prox_helpers.socket') @mock.patch(SSH_HELPER) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py index 42ac40b50..4ade157a3 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_rfc2544_ixia.py @@ -379,7 +379,8 @@ class TestIXIATrafficGen(unittest.TestCase): mock.mock_open(), create=True) @mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception') def _traffic_runner(*args): - result = sut._traffic_runner(mock_traffic_profile) + sut._setup_mq_producer = mock.Mock(return_value='mq_producer') + result = sut._traffic_runner(mock_traffic_profile, mock.ANY) self.assertIsNone(result) _traffic_runner() diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py index 350ba8448..08ae27422 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py @@ -387,8 +387,9 @@ class TestTrexTrafficGen(unittest.TestCase): # must generate cfg before we can run traffic so Trex port mapping is # created self.sut.resource_helper.generate_cfg() + self.sut._setup_mq_producer = mock.Mock() with mock.patch.object(self.sut.resource_helper, 'run_traffic'): - self.sut._traffic_runner(mock_traffic_profile) + self.sut._traffic_runner(mock_traffic_profile, mock.ANY) def test__generate_trex_cfg(self): vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] -- cgit From 1004f2ac1ea0394afdc2d5a8ca20c8c1a6d2cd93 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 24 Apr 2018 10:35:50 +0100 Subject: Enable traffic generator PID in "NSPerf" scenario setup Now "NSPerf" scenario will inform about the traffic generator PIDs after setup process. With this information, IterationIPC runner will be able to receive the messages sent by those traffic generators and control the main iteration loop. The following example, using vFW as VNF and OpenStack as context, makes use of this new runner implementation: /samples/vnf_samples/nsut/vfw/ tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml JIRA: YARDSTICK-1127 Change-Id: I46b1368bc209680b88ff9fb5c3b9beadf6271ac9 Signed-off-by: Rodolfo Alonso Hernandez --- ...544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml | 96 ++++++++++++++++++++++ yardstick/benchmark/runners/iteration_ipc.py | 9 +- yardstick/benchmark/scenarios/base.py | 4 + .../benchmark/scenarios/networking/vnf_generic.py | 33 ++++---- yardstick/common/messaging/consumer.py | 10 +-- yardstick/common/messaging/producer.py | 13 ++- yardstick/network_services/vnf_generic/vnf/base.py | 14 +++- .../network_services/vnf_generic/vnf/sample_vnf.py | 12 ++- .../functional/common/messaging/test_messaging.py | 22 ++--- .../scenarios/networking/test_vnf_generic.py | 5 +- .../tests/unit/common/messaging/test_producer.py | 7 ++ .../network_services/vnf_generic/vnf/test_base.py | 33 +++++++- .../vnf_generic/vnf/test_tg_trex.py | 3 +- 13 files changed, 205 insertions(+), 56 deletions(-) create mode 100644 samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml diff --git a/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml new file mode 100644 index 000000000..184ed6881 --- /dev/null +++ b/samples/vnf_samples/nsut/vfw/tc_heat_rfc2544_ipv4_1rule_1flow_64B_trex_iterationipc.yaml @@ -0,0 +1,96 @@ +# Copyright (c) 2016-2017 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +{% set provider = provider or none %} +{% set physical_networks = physical_networks or ['physnet1', 'physnet2'] %} +{% set segmentation_id = segmentation_id or none %} + +schema: yardstick:task:0.1 +scenarios: +- type: NSPerf + traffic_profile: ../../traffic_profiles/ipv4_throughput.yaml + topology: vfw-tg-topology.yaml + nodes: + tg__0: trafficgen_1.yardstick + vnf__0: vnf.yardstick + options: + hugepages_gb: 8 + framesize: + uplink: {64B: 100} + downlink: {64B: 100} + flow: + src_ip: [{'tg__0': 'xe0'}] + dst_ip: [{'tg__0': 'xe1'}] + count: 1 + traffic_type: 4 + rfc2544: + allowed_drop_rate: 0.0001 - 0.0001 + vnf__0: + rules: acl_1rule.yaml + vnf_config: {lb_config: 'SW', lb_count: 1, worker_config: '1C/1T', worker_threads: 1} + runner: + type: IterationIPC + iterations: 10 + timeout: 60 +context: + # put node context first, so we don't HEAT deploy if node has errors + name: yardstick + image: yardstick-samplevnfs + flavor: + vcpus: 10 + ram: 12288 + disk: 6 + extra_specs: + hw:cpu_sockets: 1 + hw:cpu_cores: 10 + hw:cpu_threads: 1 + user: ubuntu + placement_groups: + pgrp1: + policy: "availability" + servers: + vnf: + floating_ip: true + placement: "pgrp1" + trafficgen_1: + floating_ip: true + placement: "pgrp1" + networks: + mgmt: + cidr: '10.0.1.0/24' + xe0: + cidr: '10.0.2.0/24' + gateway_ip: 'null' + {% if provider %} + provider: {{ provider }} + physical_network: {{ physical_networks[0] }} + {% if segmentation_id %} + segmentation_id: {{ segmentation_id }} + {% endif %} + {% endif %} + port_security_enabled: False + enable_dhcp: 'false' + xe1: + cidr: '10.0.3.0/24' + gateway_ip: 'null' + {% if provider %} + provider: {{ provider }} + physical_network: {{ physical_networks[1] }} + {% if segmentation_id %} + segmentation_id: {{ segmentation_id }} + {% endif %} + {% endif %} + port_security_enabled: False + enable_dhcp: 'false' diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py index 690f2660b..43aa7f489 100644 --- a/yardstick/benchmark/runners/iteration_ipc.py +++ b/yardstick/benchmark/runners/iteration_ipc.py @@ -44,7 +44,7 @@ class RunnerIterationIPCEndpoint(consumer.NotificationHandler): """Endpoint class for ``RunnerIterationIPCConsumer``""" def tg_method_started(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_pids: + if ctxt['id'] in self._ctx_ids: self._queue.put( {'id': ctxt['id'], 'action': messaging.TG_METHOD_STARTED, @@ -53,7 +53,7 @@ class RunnerIterationIPCEndpoint(consumer.NotificationHandler): QUEUE_PUT_TIMEOUT) def tg_method_finished(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_pids: + if ctxt['id'] in self._ctx_ids: self._queue.put( {'id': ctxt['id'], 'action': messaging.TG_METHOD_FINISHED, @@ -61,7 +61,7 @@ class RunnerIterationIPCEndpoint(consumer.NotificationHandler): kwargs)}) def tg_method_iteration(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_pids: + if ctxt['id'] in self._ctx_ids: self._queue.put( {'id': ctxt['id'], 'action': messaging.TG_METHOD_ITERATION, @@ -124,7 +124,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if 'setup' not in run_step: raise exceptions.RunnerIterationIPCSetupActionNeeded() - producer_ctxs = benchmark.setup() + benchmark.setup() + producer_ctxs = benchmark.get_mq_ids() if not producer_ctxs: raise exceptions.RunnerIterationIPCNoCtxs() diff --git a/yardstick/benchmark/scenarios/base.py b/yardstick/benchmark/scenarios/base.py index 30ac1bea9..90a87ac29 100644 --- a/yardstick/benchmark/scenarios/base.py +++ b/yardstick/benchmark/scenarios/base.py @@ -119,3 +119,7 @@ class Scenario(object): except TypeError: dic[k] = v return dic + + def get_mq_ids(self): # pragma: no cover + """Return stored MQ producer IDs, if defined""" + pass diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index eb62d6222..3bb168c70 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -50,7 +50,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): __scenario_type__ = "NSPerf" - def __init__(self, scenario_cfg, context_cfg): # Yardstick API + def __init__(self, scenario_cfg, context_cfg): # pragma: no cover super(NetworkServiceTestCase, self).__init__() self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg @@ -61,6 +61,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): self.traffic_profile = None self.node_netdevs = {} self.bin_path = get_nsb_option('bin_path', '') + self._mq_ids = [] def _get_ip_flow_range(self, ip_start_range): @@ -168,18 +169,18 @@ class NetworkServiceTestCase(scenario_base.Scenario): topology_yaml = vnfdgen.generate_vnfd(topology, topolgy_data) self.topology = topology_yaml["nsd:nsd-catalog"]["nsd"][0] - def _find_vnf_name_from_id(self, vnf_id): + def _find_vnf_name_from_id(self, vnf_id): # pragma: no cover return next((vnfd["vnfd-id-ref"] for vnfd in self.topology["constituent-vnfd"] if vnf_id == vnfd["member-vnf-index"]), None) - def _find_vnfd_from_vnf_idx(self, vnf_id): + def _find_vnfd_from_vnf_idx(self, vnf_id): # pragma: no cover return next((vnfd for vnfd in self.topology["constituent-vnfd"] if vnf_id == vnfd["member-vnf-index"]), None) @staticmethod - def find_node_if(nodes, name, if_name, vld_id): + def find_node_if(nodes, name, if_name, vld_id): # pragma: no cover try: # check for xe0, xe1 intf = nodes[name]["interfaces"][if_name] @@ -272,14 +273,14 @@ class NetworkServiceTestCase(scenario_base.Scenario): node0_if["peer_intf"] = node1_copy node1_if["peer_intf"] = node0_copy - def _update_context_with_topology(self): + def _update_context_with_topology(self): # pragma: no cover for vnfd in self.topology["constituent-vnfd"]: vnf_idx = vnfd["member-vnf-index"] vnf_name = self._find_vnf_name_from_id(vnf_idx) vnfd = self._find_vnfd_from_vnf_idx(vnf_idx) self.context_cfg["nodes"][vnf_name].update(vnfd) - def _generate_pod_yaml(self): + def _generate_pod_yaml(self): # pragma: no cover context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id'])) # convert OrderedDict to a list # pod.yaml nodes is a list @@ -293,7 +294,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): explicit_start=True) @staticmethod - def _serialize_node(node): + def _serialize_node(node): # pragma: no cover new_node = copy.deepcopy(node) # name field is required # remove context suffix @@ -315,7 +316,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): self._update_context_with_topology() @classmethod - def get_vnf_impl(cls, vnf_model_id): + def get_vnf_impl(cls, vnf_model_id): # pragma: no cover """ Find the implementing class from vnf_model["vnf"]["name"] field :param vnf_model_id: parsed vnfd model ID field @@ -343,7 +344,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): raise exceptions.IncorrectConfig(error_msg=message) @staticmethod - def create_interfaces_from_node(vnfd, node): + def create_interfaces_from_node(vnfd, node): # pragma: no cover ext_intfs = vnfd["vdu"][0]["external-interface"] = [] # have to sort so xe0 goes first for intf_name, intf in sorted(node['interfaces'].items()): @@ -412,10 +413,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): return vnfs def setup(self): - """ Setup infrastructure, provission VNFs & start traffic - - :return: - """ + """Setup infrastructure, provission VNFs & start traffic""" # 1. Verify if infrastructure mapping can meet topology self.map_topology_to_infrastructure() # 1a. Load VNF models @@ -457,6 +455,11 @@ class NetworkServiceTestCase(scenario_base.Scenario): for traffic_gen in traffic_runners: LOG.info("Starting traffic on %s", traffic_gen.name) traffic_gen.run_traffic(self.traffic_profile) + self._mq_ids.append(traffic_gen.get_mq_producer_id()) + + def get_mq_ids(self): # pragma: no cover + """Return stored MQ producer IDs""" + return self._mq_ids def run(self, result): # yardstick API """ Yardstick calls run() at intervals defined in the yaml and @@ -495,10 +498,10 @@ class NetworkServiceTestCase(scenario_base.Scenario): LOG.exception("") raise RuntimeError("Error in teardown") - def pre_run_wait_time(self, time_seconds): + def pre_run_wait_time(self, time_seconds): # pragma: no cover """Time waited before executing the run method""" time.sleep(time_seconds) - def post_run_wait_time(self, time_seconds): + def post_run_wait_time(self, time_seconds): # pragma: no cover """Time waited after executing the run method""" pass diff --git a/yardstick/common/messaging/consumer.py b/yardstick/common/messaging/consumer.py index 24ec6f184..c99d7ed27 100644 --- a/yardstick/common/messaging/consumer.py +++ b/yardstick/common/messaging/consumer.py @@ -29,9 +29,9 @@ LOG = logging.getLogger(__name__) class NotificationHandler(object): """Abstract class to define a endpoint object for a MessagingConsumer""" - def __init__(self, _id, ctx_pids, queue): + def __init__(self, _id, ctx_ids, queue): self._id = _id - self._ctx_pids = ctx_pids + self._ctx_ids = ctx_ids self._queue = queue @@ -43,11 +43,11 @@ class MessagingConsumer(object): the messages published by a `MessagingNotifier`. """ - def __init__(self, topic, pids, endpoints, fanout=True): + def __init__(self, topic, ids, endpoints, fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pids: (list of int) list of PIDs of the processes implementing + :param ids: (list of int) list of IDs of the processes implementing the MQ Notifier which will be in the message context :param endpoints: (list of class) list of classes implementing the methods (see `MessagingNotifier.send_message) used by @@ -58,7 +58,7 @@ class MessagingConsumer(object): :returns: `MessagingConsumer` class object """ - self._pids = pids + self._ids = ids self._endpoints = endpoints self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) diff --git a/yardstick/common/messaging/producer.py b/yardstick/common/messaging/producer.py index b6adc0c17..aadab649d 100644 --- a/yardstick/common/messaging/producer.py +++ b/yardstick/common/messaging/producer.py @@ -34,18 +34,18 @@ class MessagingProducer(object): messages in a message queue. """ - def __init__(self, topic, pid=os.getpid(), fanout=True): + def __init__(self, topic, _id=os.getpid(), fanout=True): """Init function. :param topic: (string) MQ exchange topic - :param pid: (int) PID of the process implementing this MQ Notifier + :param id: (int) ID of the process implementing this MQ Notifier :param fanout: (bool) MQ clients may request that a copy of the message be delivered to all servers listening on a topic by setting fanout to ``True``, rather than just one of them :returns: `MessagingNotifier` class object """ self._topic = topic - self._pid = pid + self._id = _id self._fanout = fanout self._transport = oslo_messaging.get_rpc_transport( cfg.CONF, url=messaging.TRANSPORT_URL) @@ -65,6 +65,11 @@ class MessagingProducer(object): consumer endpoints :param payload: (subclass `Payload`) payload content """ - self._notifier.cast({'pid': self._pid}, + self._notifier.cast({'id': self._id}, method, **payload.obj_to_dict()) + + @property + def id(self): + """Return MQ producer ID""" + return self._id diff --git a/yardstick/network_services/vnf_generic/vnf/base.py b/yardstick/network_services/vnf_generic/vnf/base.py index c385eb6fd..fb41a4e4a 100644 --- a/yardstick/network_services/vnf_generic/vnf/base.py +++ b/yardstick/network_services/vnf_generic/vnf/base.py @@ -147,9 +147,9 @@ class TrafficGeneratorProducer(producer.MessagingProducer): This message producer must be instantiated in the process created "run_traffic" process. """ - def __init__(self, pid): + def __init__(self, _id): super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG, - pid=pid) + _id=_id) def tg_method_started(self, version=1): """Send a message to inform the traffic generation has started""" @@ -324,9 +324,15 @@ class GenericTrafficGen(GenericVNF): """ pass - def _setup_mq_producer(self, pid): + @staticmethod + def _setup_mq_producer(id): """Setup the TG MQ producer to send messages between processes :return: (derived class from ``MessagingProducer``) MQ producer object """ - return TrafficGeneratorProducer(pid) + return TrafficGeneratorProducer(id) + + def get_mq_producer_id(self): + """Return the MQ producer ID if initialized""" + if self._mq_producer: + return self._mq_producer.get_id() diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index a37f4f72d..bc65380d3 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -918,12 +918,12 @@ class SampleVNFTrafficGen(GenericTrafficGen): LOG.info("%s TG Server is up and running.", self.APP_NAME) return self._tg_process.exitcode - def _traffic_runner(self, traffic_profile, mq_pid): + def _traffic_runner(self, traffic_profile, mq_id): # always drop connections first thing in new processes # so we don't get paramiko errors self.ssh_helper.drop_connection() LOG.info("Starting %s client...", self.APP_NAME) - self._mq_producer = self._setup_mq_producer(mq_pid) + self._mq_producer = self._setup_mq_producer(mq_id) self.resource_helper.run_traffic(traffic_profile, self._mq_producer) def run_traffic(self, traffic_profile): @@ -937,9 +937,9 @@ class SampleVNFTrafficGen(GenericTrafficGen): name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME, traffic_profile.__class__.__name__, os.getpid()) - mq_pid = uuid.uuid1().int - self._traffic_process = Process(name=name, target=self._traffic_runner, - args=(traffic_profile, mq_pid)) + self._traffic_process = Process( + name=name, target=self._traffic_runner, + args=(traffic_profile, uuid.uuid1().int)) self._traffic_process.start() # Wait for traffic process to start while self.resource_helper.client_started.value == 0: @@ -948,8 +948,6 @@ class SampleVNFTrafficGen(GenericTrafficGen): if not self._traffic_process.is_alive(): break - return mq_pid - def collect_kpi(self): # check if the tg processes have exited physical_node = Context.get_physical_node_from_server( diff --git a/yardstick/tests/functional/common/messaging/test_messaging.py b/yardstick/tests/functional/common/messaging/test_messaging.py index 99874343b..f3e31e718 100644 --- a/yardstick/tests/functional/common/messaging/test_messaging.py +++ b/yardstick/tests/functional/common/messaging/test_messaging.py @@ -32,25 +32,25 @@ class DummyPayload(payloads.Payload): class DummyEndpoint(consumer.NotificationHandler): def info(self, ctxt, **kwargs): - if ctxt['pid'] in self._ctx_pids: - self._queue.put('ID {}, data: {}, pid: {}'.format( - self._id, kwargs['data'], ctxt['pid'])) + if ctxt['id'] in self._ctx_ids: + self._queue.put('Nr {}, data: {}, id: {}'.format( + self._id, kwargs['data'], ctxt['id'])) class DummyConsumer(consumer.MessagingConsumer): - def __init__(self, _id, ctx_pids, queue): + def __init__(self, _id, ctx_ids, queue): self._id = _id - endpoints = [DummyEndpoint(_id, ctx_pids, queue)] - super(DummyConsumer, self).__init__(TOPIC, ctx_pids, endpoints) + endpoints = [DummyEndpoint(_id, ctx_ids, queue)] + super(DummyConsumer, self).__init__(TOPIC, ctx_ids, endpoints) class DummyProducer(producer.MessagingProducer): pass -def _run_consumer(_id, ctx_pids, queue): - _consumer = DummyConsumer(_id, ctx_pids, queue) +def _run_consumer(_id, ctx_ids, queue): + _consumer = DummyConsumer(_id, ctx_ids, queue) _consumer.start_rpc_server() _consumer.wait() @@ -67,8 +67,8 @@ class MessagingTestCase(base.BaseFunctionalTestCase): num_consumers = 10 ctx_1 = 100001 ctx_2 = 100002 - producers = [DummyProducer(TOPIC, pid=ctx_1), - DummyProducer(TOPIC, pid=ctx_2)] + producers = [DummyProducer(TOPIC, _id=ctx_1), + DummyProducer(TOPIC, _id=ctx_2)] processes = [] for i in range(num_consumers): @@ -91,7 +91,7 @@ class MessagingTestCase(base.BaseFunctionalTestCase): output.append(output_queue.get(True, 1)) self.assertEqual(num_consumers * 4, len(output)) - msg_template = 'ID {}, data: {}, pid: {}' + msg_template = 'Nr {}, data: {}, id: {}' for i in range(num_consumers): for ctx in [ctx_1, ctx_2]: for message in ['message 0', 'message 1']: diff --git a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py index bb1a7aaca..77a54c0b8 100644 --- a/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py +++ b/yardstick/tests/unit/benchmark/scenarios/networking/test_vnf_generic.py @@ -553,6 +553,7 @@ class TestNetworkServiceTestCase(unittest.TestCase): tgen.verify_traffic = lambda x: verified_dict tgen.terminate = mock.Mock(return_value=True) tgen.name = "tgen__1" + tgen.run_traffic.return_value = 'tg_id' vnf = mock.Mock(autospec=GenericVNF) vnf.runs_traffic = False vnf.terminate = mock.Mock(return_value=True) @@ -565,7 +566,6 @@ class TestNetworkServiceTestCase(unittest.TestCase): self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs) self.s._fill_traffic_profile = \ mock.Mock(return_value=TRAFFIC_PROFILE) - self.assertIsNone(self.s.setup()) def test_setup_exception(self): with mock.patch("yardstick.ssh.SSH") as ssh: @@ -656,6 +656,9 @@ class TestNetworkServiceTestCase(unittest.TestCase): ) self.assertEqual(self.s.topology, 'fake_nsd') + def test_get_mq_ids(self): + self.assertEqual(self.s._mq_ids, self.s.get_mq_ids()) + def test_teardown(self): vnf = mock.Mock(autospec=GenericVNF) vnf.terminate = mock.Mock(return_value=True) diff --git a/yardstick/tests/unit/common/messaging/test_producer.py b/yardstick/tests/unit/common/messaging/test_producer.py index 0289689dc..22286e5c3 100644 --- a/yardstick/tests/unit/common/messaging/test_producer.py +++ b/yardstick/tests/unit/common/messaging/test_producer.py @@ -44,3 +44,10 @@ class MessagingProducerTestCase(ut_base.BaseUnitTestCase): topic='test_topic', fanout=True, server=messaging.SERVER) mock_RPCClient.assert_called_once_with('test_rpc_transport', 'test_Target') + + def test_id(self): + with mock.patch.object(oslo_messaging, 'RPCClient'), \ + mock.patch.object(oslo_messaging, 'get_rpc_transport'), \ + mock.patch.object(oslo_messaging, 'Target'): + msg_producer = _MessagingProducer('topic', 'id_to_check') + self.assertEqual('id_to_check', msg_producer.id) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py index 11cba4d79..43e5ac839 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_base.py @@ -145,6 +145,24 @@ VNFD = { } +class _DummyGenericTrafficGen(base.GenericTrafficGen): # pragma: no cover + + def run_traffic(self, *args): + pass + + def terminate(self): + pass + + def collect_kpi(self): + pass + + def instantiate(self, *args): + pass + + def scale(self, flavor=''): + pass + + class FileAbsPath(object): def __init__(self, module_file): super(FileAbsPath, self).__init__() @@ -240,6 +258,15 @@ class GenericTrafficGenTestCase(unittest.TestCase): "scale, terminate") self.assertEqual(msg, str(exc.exception)) + def test_get_mq_producer_id(self): + vnfd = {'benchmark': {'kpi': mock.ANY}, + 'vdu': [{'external-interface': 'ext_int'}] + } + tg = _DummyGenericTrafficGen('name', vnfd) + tg._mq_producer = mock.Mock() + tg._mq_producer.get_id.return_value = 'fake_id' + self.assertEqual('fake_id', tg.get_mq_producer_id()) + class TrafficGeneratorProducerTestCase(unittest.TestCase): @@ -250,15 +277,15 @@ class TrafficGeneratorProducerTestCase(unittest.TestCase): @mock.patch.object(cfg, 'CONF') def test__init(self, mock_config, mock_transport, mock_rpcclient, mock_target): - pid = uuid.uuid1().int - tg_producer = base.TrafficGeneratorProducer(pid) + _id = uuid.uuid1().int + tg_producer = base.TrafficGeneratorProducer(_id) mock_transport.assert_called_once_with( mock_config, url='rabbit://yardstick:yardstick@localhost:5672/') mock_target.assert_called_once_with(topic=messaging.TOPIC_TG, fanout=True, server=messaging.SERVER) mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') - self.assertEqual(pid, tg_producer._pid) + self.assertEqual(_id, tg_producer._id) self.assertEqual(messaging.TOPIC_TG, tg_producer._topic) @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py index 08ae27422..700e910f9 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_tg_trex.py @@ -454,9 +454,8 @@ class TestTrexTrafficGen(unittest.TestCase): self.sut.ssh_helper.run = mock.Mock() self.sut._traffic_runner = mock.Mock(return_value=0) self.sut.resource_helper.client_started.value = 1 - result = self.sut.run_traffic(mock_traffic_profile) + self.sut.run_traffic(mock_traffic_profile) self.sut._traffic_process.terminate() - self.assertIsNotNone(result) def test_terminate(self): vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] -- cgit From 51bc9b51362ca76011bb201353de5354907332d1 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 3 Jul 2018 09:14:35 +0100 Subject: Make "IterationIPC" MQ producer for VNF control messages "IterationIPC" runner class is a consumer for MQ aware VNFs. A MQ aware traffic generator can send "started", "finished" and "iteration" messages. This feature implements a MQ producer in the runner in order to send messages to the VNFs. The messages implemented are: - "start_iteration" - "stop_iteration" JIRA: YARDSTICK-1286 Change-Id: I706f9a9dda5e5beed52231be7d71452945a7dbed Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/benchmark/runners/base.py | 44 ++++++++++++----- yardstick/benchmark/runners/iteration_ipc.py | 8 +++- yardstick/common/messaging/__init__.py | 19 +++----- yardstick/common/messaging/payloads.py | 8 ++++ yardstick/tests/unit/benchmark/runner/test_base.py | 56 ++++++++++++++++++++++ .../tests/unit/common/messaging/test_payloads.py | 17 +++++++ 6 files changed, 125 insertions(+), 27 deletions(-) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index fbdf6c281..af2557441 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -12,27 +12,26 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# +# This is a modified copy of ``rally/rally/benchmark/runners/base.py`` -# yardstick comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import - +import importlib import logging import multiprocessing import subprocess import time import traceback -from subprocess import CalledProcessError - -import importlib -from six.moves.queue import Empty +from six import moves -import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer +from yardstick.common import utils from yardstick.dispatcher.base import Base as DispatcherBase + log = logging.getLogger(__name__) @@ -41,7 +40,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except CalledProcessError: + except subprocess.CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -245,7 +244,7 @@ class Runner(object): log.debug("output_queue size %s", self.output_queue.qsize()) try: result.update(self.output_queue.get(True, 1)) - except Empty: + except moves.queue.Empty: pass return result @@ -259,7 +258,7 @@ class Runner(object): log.debug("result_queue size %s", self.result_queue.qsize()) try: one_record = self.result_queue.get(True, 1) - except Empty: + except moves.queue.Empty: pass else: if output_in_influxdb: @@ -272,3 +271,22 @@ class Runner(object): dispatchers = DispatcherBase.get(self.config['output_config']) dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) + + +class RunnerProducer(producer.MessagingProducer): + """Class implementing the message producer for runners""" + + def __init__(self, _id): + super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id) + + def start_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_START_ITERATION, + payloads.RunnerPayload(version=version, data=data)) + + def stop_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_STOP_ITERATION, + payloads.RunnerPayload(version=version, data=data)) diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py index 43aa7f489..a0335fdc7 100644 --- a/yardstick/benchmark/runners/iteration_ipc.py +++ b/yardstick/benchmark/runners/iteration_ipc.py @@ -26,7 +26,7 @@ import traceback import os -from yardstick.benchmark.runners import base +from yardstick.benchmark.runners import base as base_runner from yardstick.common import exceptions from yardstick.common import messaging from yardstick.common import utils @@ -131,6 +131,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs) mq_consumer.start_rpc_server() + mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id']) iteration_index = 1 while 'run' in run_step: @@ -141,6 +142,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, result = None errors = '' mq_consumer.iteration_index = iteration_index + mq_producer.start_iteration() try: utils.wait_until_true( @@ -151,6 +153,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = traceback.format_exc() LOG.exception(errors) + mq_producer.stop_iteration() + if result: output_queue.put(result, True, QUEUE_PUT_TIMEOUT) benchmark_output = {'timestamp': time.time(), @@ -181,7 +185,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, mq_consumer.stop_rpc_server() -class IterationIPCRunner(base.Runner): +class IterationIPCRunner(base_runner.Runner): """Run a scenario for a configurable number of times. Each iteration has a configurable timeout. The loop control depends on the diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py index dc8c51b70..bd700d9b1 100644 --- a/yardstick/common/messaging/__init__.py +++ b/yardstick/common/messaging/__init__.py @@ -1,14 +1,3 @@ -# Copyright (c) 2018 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. @@ -29,10 +18,16 @@ RPC_SERVER_EXECUTOR = 'threading' # Topics. TOPIC_TG = 'topic_traffic_generator' +TOPIC_RUNNER = 'topic_runner' # Methods. # Traffic generator consumers methods. Names must match the methods implemented -# in the consumer endpoint class, ``RunnerIterationIPCEndpoint``. +# in the consumer endpoint class. TG_METHOD_STARTED = 'tg_method_started' TG_METHOD_FINISHED = 'tg_method_finished' TG_METHOD_ITERATION = 'tg_method_iteration' + +# Runner consumers methods. Names must match the methods implemented in the +# consumer endpoint class. +RUNNER_METHOD_START_ITERATION = "runner_method_start_iteration" +RUNNER_METHOD_STOP_ITERATION = "runner_method_stop_iteration" diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py index c59c87536..8ede1e58e 100644 --- a/yardstick/common/messaging/payloads.py +++ b/yardstick/common/messaging/payloads.py @@ -63,3 +63,11 @@ class TrafficGeneratorPayload(Payload): # injection. The content will depend on the generator and the # traffic type. } + + +class RunnerPayload(Payload): + """Base runner payload class""" + REQUIRED_FIELDS = { + 'version', # (str) version of the payload transmitted. + 'data' # (dict) generic container of data to be used if needed. + } diff --git a/yardstick/tests/unit/benchmark/runner/test_base.py b/yardstick/tests/unit/benchmark/runner/test_base.py index 559c991f3..49ba1efe4 100644 --- a/yardstick/tests/unit/benchmark/runner/test_base.py +++ b/yardstick/tests/unit/benchmark/runner/test_base.py @@ -8,12 +8,17 @@ ############################################################################## import time +import uuid import mock +from oslo_config import cfg +import oslo_messaging import subprocess from yardstick.benchmark.runners import base as runner_base from yardstick.benchmark.runners import iteration +from yardstick.common import messaging +from yardstick.common.messaging import payloads from yardstick.tests.unit import base as ut_base @@ -94,3 +99,54 @@ class RunnerTestCase(ut_base.BaseUnitTestCase): with self.assertRaises(NotImplementedError): runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) + + +class RunnerProducerTestCase(ut_base.BaseUnitTestCase): + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(cfg, 'CONF') + def test__init(self, mock_config, mock_transport, mock_rpcclient, + mock_target): + _id = uuid.uuid1().int + runner_producer = runner_base.RunnerProducer(_id) + mock_transport.assert_called_once_with( + mock_config, url='rabbit://yardstick:yardstick@localhost:5672/') + mock_target.assert_called_once_with(topic=messaging.TOPIC_RUNNER, + fanout=True, + server=messaging.SERVER) + mock_rpcclient.assert_called_once_with('rpc_transport', 'rpc_target') + self.assertEqual(_id, runner_producer._id) + self.assertEqual(messaging.TOPIC_RUNNER, runner_producer._topic) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload') + def test_start_iteration(self, mock_runner_payload, *args): + runner_producer = runner_base.RunnerProducer(uuid.uuid1().int) + with mock.patch.object(runner_producer, + 'send_message') as mock_message: + runner_producer.start_iteration(version=10) + + mock_message.assert_called_once_with( + messaging.RUNNER_METHOD_START_ITERATION, 'runner_pload') + mock_runner_payload.assert_called_once_with(version=10, data={}) + + @mock.patch.object(oslo_messaging, 'Target', return_value='rpc_target') + @mock.patch.object(oslo_messaging, 'RPCClient') + @mock.patch.object(oslo_messaging, 'get_rpc_transport', + return_value='rpc_transport') + @mock.patch.object(payloads, 'RunnerPayload', return_value='runner_pload') + def test_stop_iteration(self, mock_runner_payload, *args): + runner_producer = runner_base.RunnerProducer(uuid.uuid1().int) + with mock.patch.object(runner_producer, + 'send_message') as mock_message: + runner_producer.stop_iteration(version=15) + + mock_message.assert_called_once_with( + messaging.RUNNER_METHOD_STOP_ITERATION, 'runner_pload') + mock_runner_payload.assert_called_once_with(version=15, data={}) diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py index 7930b8d34..37b1f1926 100644 --- a/yardstick/tests/unit/common/messaging/test_payloads.py +++ b/yardstick/tests/unit/common/messaging/test_payloads.py @@ -63,3 +63,20 @@ class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase): payloads.TrafficGeneratorPayload(iteration=10, kpi={}) with self.assertRaises(exceptions.PayloadMissingAttributes): payloads.TrafficGeneratorPayload(iteration=10) + + +class RunnerPayloadTestCase(ut_base.BaseUnitTestCase): + + def test_init(self): + runner_payload = payloads.RunnerPayload(version=5, + data={'key1': 'value1'}) + self.assertEqual(5, runner_payload.version) + self.assertEqual({'key1': 'value1'}, runner_payload.data) + + def test__init_missing_required_fields(self): + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload(version=1) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload(data=None) + with self.assertRaises(exceptions.PayloadMissingAttributes): + payloads.RunnerPayload() -- cgit