aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py200
-rw-r--r--yardstick/common/exceptions.py9
-rw-r--r--yardstick/common/messaging/__init__.py10
-rw-r--r--yardstick/common/messaging/payloads.py12
-rw-r--r--yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py136
-rw-r--r--yardstick/tests/unit/common/messaging/test_payloads.py19
6 files changed, 382 insertions, 4 deletions
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
new file mode 100644
index 000000000..690f2660b
--- /dev/null
+++ b/yardstick/benchmark/runners/iteration_ipc.py
@@ -0,0 +1,200 @@
+# Copyright 2018: Intel Corporation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""A runner that runs a configurable number of times before it returns. Each
+ iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs
+ to listen the messages from are given in the scenario "setup" method.
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from yardstick.benchmark.runners import base
+from yardstick.common import exceptions
+from yardstick.common import messaging
+from yardstick.common import utils
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_PUT_TIMEOUT = 10
+ITERATION_TIMEOUT = 180
+
+
+class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``RunnerIterationIPCConsumer``"""
+
+ def tg_method_started(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_STARTED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)},
+ QUEUE_PUT_TIMEOUT)
+
+ def tg_method_finished(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_FINISHED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+ def tg_method_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_ITERATION,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+
+class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
+ """MQ consumer for "IterationIPC" runner"""
+
+ def __init__(self, _id, ctx_ids):
+ self._id = _id
+ self._queue = multiprocessing.Queue()
+ endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
+ super(RunnerIterationIPCConsumer, self).__init__(
+ messaging.TOPIC_TG, ctx_ids, endpoints)
+ self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
+ self.iteration_index = None
+
+ def is_all_kpis_received_in_iteration(self):
+ """Check if all producers registered have sent the ITERATION msg
+
+ During the present iteration, all producers (traffic generators) must
+ start and finish the traffic injection, and at the end of the traffic
+ injection a TG_METHOD_ITERATION must be sent. This function will check
+ all KPIs in the present iteration are received. E.g.:
+ self.iteration_index = 2
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1]} --> return False
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1, kpi2]} --> return True
+ """
+ while not self._queue.empty():
+ msg = self._queue.get(True, 1)
+ if msg['action'] == messaging.TG_METHOD_ITERATION:
+ id_iter_list = self._kpi_per_id[msg['id']]
+ id_iter_list.append(msg['payload'].kpi)
+
+ return all(len(id_iter_list) == self.iteration_index
+ for id_iter_list in self._kpi_per_id.values())
+
+
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted, output_queue): # pragma: no cover
+ runner_cfg = scenario_cfg['runner']
+
+ timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
+ iterations = runner_cfg.get('iterations', 1)
+ run_step = runner_cfg.get('run_step', 'setup,run,teardown')
+ LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ benchmark = cls(scenario_cfg, context_cfg)
+ method = getattr(benchmark, method_name)
+
+ if 'setup' not in run_step:
+ raise exceptions.RunnerIterationIPCSetupActionNeeded()
+ producer_ctxs = benchmark.setup()
+ if not producer_ctxs:
+ raise exceptions.RunnerIterationIPCNoCtxs()
+
+ mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
+ mq_consumer.start_rpc_server()
+
+ iteration_index = 1
+ while 'run' in run_step:
+ LOG.debug('runner=%(runner)s seq=%(sequence)s START',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+ data = {}
+ result = None
+ errors = ''
+ mq_consumer.iteration_index = iteration_index
+
+ try:
+ utils.wait_until_true(
+ mq_consumer.is_all_kpis_received_in_iteration,
+ timeout=timeout, sleep=2)
+ result = method(data)
+ except Exception: # pylint: disable=broad-except
+ errors = traceback.format_exc()
+ LOG.exception(errors)
+
+ if result:
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+ benchmark_output = {'timestamp': time.time(),
+ 'sequence': iteration_index,
+ 'data': data,
+ 'errors': errors}
+ queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug('runner=%(runner)s seq=%(sequence)s END',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+
+ iteration_index += 1
+ if iteration_index > iterations or aborted.is_set():
+ LOG.info('"IterationIPC" worker END')
+ break
+
+ if 'teardown' in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ LOG.exception('Exception during teardown process')
+ mq_consumer.stop_rpc_server()
+ raise SystemExit(1)
+
+ LOG.debug('Data queue size = %s', queue.qsize())
+ LOG.debug('Output queue size = %s', output_queue.qsize())
+ mq_consumer.stop_rpc_server()
+
+
+class IterationIPCRunner(base.Runner):
+ """Run a scenario for a configurable number of times.
+
+ Each iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs to
+ listen the messages from are given in the scenario "setup" method.
+ """
+ __execution_type__ = 'IterationIPC'
+
+ def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = '{}-{}-{}'.format(
+ self.__execution_type__, scenario_cfg.get('type'), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()
diff --git a/yardstick/common/exceptions.py b/yardstick/common/exceptions.py
index 935c77866..8afd27085 100644
--- a/yardstick/common/exceptions.py
+++ b/yardstick/common/exceptions.py
@@ -191,6 +191,15 @@ class TaskRenderError(YardstickException):
message = 'Failed to render template:\n%(input_task)s'
+class RunnerIterationIPCSetupActionNeeded(YardstickException):
+ message = ('IterationIPC needs the "setup" action to retrieve the VNF '
+ 'handling processes PIDs to receive the messages sent')
+
+
+class RunnerIterationIPCNoCtxs(YardstickException):
+ message = 'Benchmark "setup" action did not return any VNF process PID'
+
+
class TimerTimeout(YardstickException):
message = 'Timer timeout expired, %(timeout)s seconds'
diff --git a/yardstick/common/messaging/__init__.py b/yardstick/common/messaging/__init__.py
index f0f012ec3..dc8c51b70 100644
--- a/yardstick/common/messaging/__init__.py
+++ b/yardstick/common/messaging/__init__.py
@@ -28,9 +28,11 @@ TRANSPORT_URL = (MQ_SERVICE + '://' + MQ_USER + ':' + MQ_PASS + '@' + SERVER +
RPC_SERVER_EXECUTOR = 'threading'
# Topics.
-RUNNER = 'runner'
+TOPIC_TG = 'topic_traffic_generator'
# Methods.
-# RUNNER methods:
-RUNNER_INFO = 'runner_info'
-RUNNER_LOOP = 'runner_loop'
+# Traffic generator consumers methods. Names must match the methods implemented
+# in the consumer endpoint class, ``RunnerIterationIPCEndpoint``.
+TG_METHOD_STARTED = 'tg_method_started'
+TG_METHOD_FINISHED = 'tg_method_finished'
+TG_METHOD_ITERATION = 'tg_method_iteration'
diff --git a/yardstick/common/messaging/payloads.py b/yardstick/common/messaging/payloads.py
index d29d79808..c59c87536 100644
--- a/yardstick/common/messaging/payloads.py
+++ b/yardstick/common/messaging/payloads.py
@@ -51,3 +51,15 @@ class Payload(object):
def dict_to_obj(cls, _dict):
"""Returns a Payload object built from the dictionary elements"""
return cls(**_dict)
+
+
+class TrafficGeneratorPayload(Payload):
+ """Base traffic generator payload class"""
+ REQUIRED_FIELDS = {
+ 'version', # (str) version of the payload transmitted.
+ 'iteration', # (int) iteration index during the traffic injection,
+ # starting from 1.
+ 'kpi' # (dict) collection of KPIs collected from the traffic
+ # injection. The content will depend on the generator and the
+ # traffic type.
+ }
diff --git a/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py
new file mode 100644
index 000000000..10d14a8a0
--- /dev/null
+++ b/yardstick/tests/unit/benchmark/runner/test_iteration_ipc.py
@@ -0,0 +1,136 @@
+# Copyright (c) 2018 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+import time
+import os
+import uuid
+
+import mock
+
+from yardstick.benchmark.runners import iteration_ipc
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.tests.unit import base as ut_base
+
+
+class RunnerIterationIPCEndpointTestCase(ut_base.BaseUnitTestCase):
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+ self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+ self._queue = multiprocessing.Queue()
+ self.runner = iteration_ipc.RunnerIterationIPCEndpoint(
+ self._id, self._ctx_ids, self._queue)
+ self._kwargs = {'version': 1, 'iteration': 10, 'kpi': {}}
+ self._pload_dict = payloads.TrafficGeneratorPayload.dict_to_obj(
+ self._kwargs).obj_to_dict()
+
+ def test_tg_method_started(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_started(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_STARTED, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+ def test_tg_method_finished(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_finished(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_FINISHED, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+ def test_tg_method_iteration(self):
+ self._queue.empty()
+ ctxt = {'id': self._ctx_ids[0]}
+ self.runner.tg_method_iteration(ctxt, **self._kwargs)
+ time.sleep(0.2)
+
+ output = []
+ while not self._queue.empty():
+ output.append(self._queue.get(True, 1))
+
+ self.assertEqual(1, len(output))
+ self.assertEqual(self._ctx_ids[0], output[0]['id'])
+ self.assertEqual(messaging.TG_METHOD_ITERATION, output[0]['action'])
+ self.assertEqual(self._pload_dict, output[0]['payload'].obj_to_dict())
+
+
+class RunnerIterationIPCConsumerTestCase(ut_base.BaseUnitTestCase):
+
+ def setUp(self):
+ self._id = uuid.uuid1().int
+ self._ctx_ids = [uuid.uuid1().int, uuid.uuid1().int]
+ self.consumer = iteration_ipc.RunnerIterationIPCConsumer(
+ self._id, self._ctx_ids)
+ self.consumer._queue = mock.Mock()
+
+ def test__init(self):
+ self.assertEqual({self._ctx_ids[0]: [], self._ctx_ids[1]: []},
+ self.consumer._kpi_per_id)
+
+ def test_is_all_kpis_received_in_iteration(self):
+ payload = payloads.TrafficGeneratorPayload(
+ version=1, iteration=1, kpi={})
+ msg1 = {'action': messaging.TG_METHOD_ITERATION,
+ 'id': self._ctx_ids[0], 'payload': payload}
+ msg2 = {'action': messaging.TG_METHOD_ITERATION,
+ 'id': self._ctx_ids[1], 'payload': payload}
+ self.consumer.iteration_index = 1
+
+ self.consumer._queue.empty.side_effect = [False, True]
+ self.consumer._queue.get.return_value = msg1
+ self.assertFalse(self.consumer.is_all_kpis_received_in_iteration())
+
+ self.consumer._queue.empty.side_effect = [False, True]
+ self.consumer._queue.get.return_value = msg2
+ self.assertTrue(self.consumer.is_all_kpis_received_in_iteration())
+
+
+class IterationIPCRunnerTestCase(ut_base.BaseUnitTestCase):
+
+ @mock.patch.object(iteration_ipc, '_worker_process')
+ @mock.patch.object(os, 'getpid', return_value=12345678)
+ @mock.patch.object(multiprocessing, 'Process', return_value=mock.Mock())
+ def test__run_benchmark(self, mock_process, mock_getpid, mock_worker):
+ method = 'method'
+ scenario_cfg = {'type': 'scenario_type'}
+ context_cfg = 'context_cfg'
+ name = '%s-%s-%s' % ('IterationIPC', 'scenario_type', 12345678)
+ runner = iteration_ipc.IterationIPCRunner(mock.ANY)
+ mock_getpid.reset_mock()
+
+ runner._run_benchmark('class', method, scenario_cfg, context_cfg)
+ mock_process.assert_called_once_with(
+ name=name,
+ target=mock_worker,
+ args=(runner.result_queue, 'class', method, scenario_cfg,
+ context_cfg, runner.aborted, runner.output_queue))
+ mock_getpid.assert_called_once()
diff --git a/yardstick/tests/unit/common/messaging/test_payloads.py b/yardstick/tests/unit/common/messaging/test_payloads.py
index 00ec220c9..7930b8d34 100644
--- a/yardstick/tests/unit/common/messaging/test_payloads.py
+++ b/yardstick/tests/unit/common/messaging/test_payloads.py
@@ -44,3 +44,22 @@ class PayloadTestCase(ut_base.BaseUnitTestCase):
_dict = {'version': 2, 'key1': 'value100', 'key2': 'value200'}
payload = _DummyPayload.dict_to_obj(_dict)
self.assertEqual(set(_dict.keys()), payload._fields)
+
+
+class TrafficGeneratorPayloadTestCase(ut_base.BaseUnitTestCase):
+
+ def test_init(self):
+ tg_payload = payloads.TrafficGeneratorPayload(
+ version=1, iteration=10, kpi={'key1': 'value1'})
+ self.assertEqual(1, tg_payload.version)
+ self.assertEqual(10, tg_payload.iteration)
+ self.assertEqual({'key1': 'value1'}, tg_payload.kpi)
+ self.assertEqual(3, len(tg_payload._fields))
+
+ def test__init_missing_required_fields(self):
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(version=1, iteration=10)
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(iteration=10, kpi={})
+ with self.assertRaises(exceptions.PayloadMissingAttributes):
+ payloads.TrafficGeneratorPayload(iteration=10)