aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark
diff options
context:
space:
mode:
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-04-20 11:34:41 +0100
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-07-05 08:37:47 +0100
commit3aed8d1ad85f8a128a22a0a0f2436eb94562799f (patch)
tree1b7d26ada395175e5849c794685b7d765924e80c /yardstick/benchmark
parent4b957421d071cd5120a6e59b0e3bcba98641833f (diff)
Add "IterationIPC" runner
"IterationIPC" is a runner that runs a configurable number of times before it returns. Each iteration has a configurable timeout. The loop control depends on the feedback received from the running VNFs. The context PIDs from the VNFs to listen the messages from are given in the scenario "setup" method. The MQ consumer, "RunnerIterationIPCConsumer", is subscribed to messages sent to topic "messaging.TOPIC_TG". The endpoints are defined in the class "RunnerIterationIPCEndpoint". Three are implemented: "messaging.TG_METHOD_STARTED", "messaging.TG_METHOD_FINISHED" and "messaging.TG_METHOD_ITERATION". JIRA: YARDSTICK-1127 Change-Id: I4b2f11a15ef41e6f3c70bd64188d5c7fbcdb7eed Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Diffstat (limited to 'yardstick/benchmark')
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py200
1 files changed, 200 insertions, 0 deletions
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
new file mode 100644
index 000000000..690f2660b
--- /dev/null
+++ b/yardstick/benchmark/runners/iteration_ipc.py
@@ -0,0 +1,200 @@
+# Copyright 2018: Intel Corporation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""A runner that runs a configurable number of times before it returns. Each
+ iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs
+ to listen the messages from are given in the scenario "setup" method.
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from yardstick.benchmark.runners import base
+from yardstick.common import exceptions
+from yardstick.common import messaging
+from yardstick.common import utils
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_PUT_TIMEOUT = 10
+ITERATION_TIMEOUT = 180
+
+
+class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``RunnerIterationIPCConsumer``"""
+
+ def tg_method_started(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_STARTED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)},
+ QUEUE_PUT_TIMEOUT)
+
+ def tg_method_finished(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_FINISHED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+ def tg_method_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_pids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_ITERATION,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+
+class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
+ """MQ consumer for "IterationIPC" runner"""
+
+ def __init__(self, _id, ctx_ids):
+ self._id = _id
+ self._queue = multiprocessing.Queue()
+ endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
+ super(RunnerIterationIPCConsumer, self).__init__(
+ messaging.TOPIC_TG, ctx_ids, endpoints)
+ self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
+ self.iteration_index = None
+
+ def is_all_kpis_received_in_iteration(self):
+ """Check if all producers registered have sent the ITERATION msg
+
+ During the present iteration, all producers (traffic generators) must
+ start and finish the traffic injection, and at the end of the traffic
+ injection a TG_METHOD_ITERATION must be sent. This function will check
+ all KPIs in the present iteration are received. E.g.:
+ self.iteration_index = 2
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1]} --> return False
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1, kpi2]} --> return True
+ """
+ while not self._queue.empty():
+ msg = self._queue.get(True, 1)
+ if msg['action'] == messaging.TG_METHOD_ITERATION:
+ id_iter_list = self._kpi_per_id[msg['id']]
+ id_iter_list.append(msg['payload'].kpi)
+
+ return all(len(id_iter_list) == self.iteration_index
+ for id_iter_list in self._kpi_per_id.values())
+
+
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted, output_queue): # pragma: no cover
+ runner_cfg = scenario_cfg['runner']
+
+ timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
+ iterations = runner_cfg.get('iterations', 1)
+ run_step = runner_cfg.get('run_step', 'setup,run,teardown')
+ LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ benchmark = cls(scenario_cfg, context_cfg)
+ method = getattr(benchmark, method_name)
+
+ if 'setup' not in run_step:
+ raise exceptions.RunnerIterationIPCSetupActionNeeded()
+ producer_ctxs = benchmark.setup()
+ if not producer_ctxs:
+ raise exceptions.RunnerIterationIPCNoCtxs()
+
+ mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
+ mq_consumer.start_rpc_server()
+
+ iteration_index = 1
+ while 'run' in run_step:
+ LOG.debug('runner=%(runner)s seq=%(sequence)s START',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+ data = {}
+ result = None
+ errors = ''
+ mq_consumer.iteration_index = iteration_index
+
+ try:
+ utils.wait_until_true(
+ mq_consumer.is_all_kpis_received_in_iteration,
+ timeout=timeout, sleep=2)
+ result = method(data)
+ except Exception: # pylint: disable=broad-except
+ errors = traceback.format_exc()
+ LOG.exception(errors)
+
+ if result:
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+ benchmark_output = {'timestamp': time.time(),
+ 'sequence': iteration_index,
+ 'data': data,
+ 'errors': errors}
+ queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug('runner=%(runner)s seq=%(sequence)s END',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+
+ iteration_index += 1
+ if iteration_index > iterations or aborted.is_set():
+ LOG.info('"IterationIPC" worker END')
+ break
+
+ if 'teardown' in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ LOG.exception('Exception during teardown process')
+ mq_consumer.stop_rpc_server()
+ raise SystemExit(1)
+
+ LOG.debug('Data queue size = %s', queue.qsize())
+ LOG.debug('Output queue size = %s', output_queue.qsize())
+ mq_consumer.stop_rpc_server()
+
+
+class IterationIPCRunner(base.Runner):
+ """Run a scenario for a configurable number of times.
+
+ Each iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs to
+ listen the messages from are given in the scenario "setup" method.
+ """
+ __execution_type__ = 'IterationIPC'
+
+ def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = '{}-{}-{}'.format(
+ self.__execution_type__, scenario_cfg.get('type'), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()