aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners
diff options
context:
space:
mode:
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-07-09 07:07:57 +0000
committerGerrit Code Review <gerrit@opnfv.org>2018-07-09 07:07:57 +0000
commitef78f436ce067ee2ffbf5a0fcc00e082edf61d6b (patch)
tree2078bfcfd22542c95ee72d43e900917b40cd4cf7 /yardstick/benchmark/runners
parentab8629071966fb46a4eeac473cda3352424fa350 (diff)
parent51bc9b51362ca76011bb201353de5354907332d1 (diff)
Merge changes from topics 'YARDSTICK-1286', 'YARDSTICK-1127'
* changes: Make "IterationIPC" MQ producer for VNF control messages Enable traffic generator PID in "NSPerf" scenario setup Enable the MQ producer in "SampleVNFTrafficGen" class Add "TrafficGeneratorProducer" for "GenericTrafficGen" instances Add "IterationIPC" runner
Diffstat (limited to 'yardstick/benchmark/runners')
-rwxr-xr-xyardstick/benchmark/runners/base.py44
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py205
2 files changed, 236 insertions, 13 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index fbdf6c281..af2557441 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -12,27 +12,26 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-from subprocess import CalledProcessError
-
-import importlib
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except CalledProcessError:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
@@ -259,7 +258,7 @@ class Runner(object):
log.debug("result_queue size %s", self.result_queue.qsize())
try:
one_record = self.result_queue.get(True, 1)
- except Empty:
+ except moves.queue.Empty:
pass
else:
if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+ """Class implementing the message producer for runners"""
+
+ def __init__(self, _id):
+ super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+ def start_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_START_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
+
+ def stop_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_STOP_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
new file mode 100644
index 000000000..a0335fdc7
--- /dev/null
+++ b/yardstick/benchmark/runners/iteration_ipc.py
@@ -0,0 +1,205 @@
+# Copyright 2018: Intel Corporation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""A runner that runs a configurable number of times before it returns. Each
+ iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs
+ to listen the messages from are given in the scenario "setup" method.
+"""
+
+import logging
+import multiprocessing
+import time
+import traceback
+
+import os
+
+from yardstick.benchmark.runners import base as base_runner
+from yardstick.common import exceptions
+from yardstick.common import messaging
+from yardstick.common import utils
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+
+
+LOG = logging.getLogger(__name__)
+
+QUEUE_PUT_TIMEOUT = 10
+ITERATION_TIMEOUT = 180
+
+
+class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``RunnerIterationIPCConsumer``"""
+
+ def tg_method_started(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_STARTED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)},
+ QUEUE_PUT_TIMEOUT)
+
+ def tg_method_finished(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_FINISHED,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+ def tg_method_iteration(self, ctxt, **kwargs):
+ if ctxt['id'] in self._ctx_ids:
+ self._queue.put(
+ {'id': ctxt['id'],
+ 'action': messaging.TG_METHOD_ITERATION,
+ 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
+ kwargs)})
+
+
+class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
+ """MQ consumer for "IterationIPC" runner"""
+
+ def __init__(self, _id, ctx_ids):
+ self._id = _id
+ self._queue = multiprocessing.Queue()
+ endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
+ super(RunnerIterationIPCConsumer, self).__init__(
+ messaging.TOPIC_TG, ctx_ids, endpoints)
+ self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
+ self.iteration_index = None
+
+ def is_all_kpis_received_in_iteration(self):
+ """Check if all producers registered have sent the ITERATION msg
+
+ During the present iteration, all producers (traffic generators) must
+ start and finish the traffic injection, and at the end of the traffic
+ injection a TG_METHOD_ITERATION must be sent. This function will check
+ all KPIs in the present iteration are received. E.g.:
+ self.iteration_index = 2
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1]} --> return False
+
+ self._kpi_per_id = {
+ 'ctx1': [kpi0, kpi1, kpi2],
+ 'ctx2': [kpi0, kpi1, kpi2]} --> return True
+ """
+ while not self._queue.empty():
+ msg = self._queue.get(True, 1)
+ if msg['action'] == messaging.TG_METHOD_ITERATION:
+ id_iter_list = self._kpi_per_id[msg['id']]
+ id_iter_list.append(msg['payload'].kpi)
+
+ return all(len(id_iter_list) == self.iteration_index
+ for id_iter_list in self._kpi_per_id.values())
+
+
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted, output_queue): # pragma: no cover
+ runner_cfg = scenario_cfg['runner']
+
+ timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
+ iterations = runner_cfg.get('iterations', 1)
+ run_step = runner_cfg.get('run_step', 'setup,run,teardown')
+ LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
+
+ runner_cfg['runner_id'] = os.getpid()
+
+ benchmark = cls(scenario_cfg, context_cfg)
+ method = getattr(benchmark, method_name)
+
+ if 'setup' not in run_step:
+ raise exceptions.RunnerIterationIPCSetupActionNeeded()
+ benchmark.setup()
+ producer_ctxs = benchmark.get_mq_ids()
+ if not producer_ctxs:
+ raise exceptions.RunnerIterationIPCNoCtxs()
+
+ mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
+ mq_consumer.start_rpc_server()
+ mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
+
+ iteration_index = 1
+ while 'run' in run_step:
+ LOG.debug('runner=%(runner)s seq=%(sequence)s START',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+ data = {}
+ result = None
+ errors = ''
+ mq_consumer.iteration_index = iteration_index
+ mq_producer.start_iteration()
+
+ try:
+ utils.wait_until_true(
+ mq_consumer.is_all_kpis_received_in_iteration,
+ timeout=timeout, sleep=2)
+ result = method(data)
+ except Exception: # pylint: disable=broad-except
+ errors = traceback.format_exc()
+ LOG.exception(errors)
+
+ mq_producer.stop_iteration()
+
+ if result:
+ output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
+ benchmark_output = {'timestamp': time.time(),
+ 'sequence': iteration_index,
+ 'data': data,
+ 'errors': errors}
+ queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+
+ LOG.debug('runner=%(runner)s seq=%(sequence)s END',
+ {'runner': runner_cfg['runner_id'],
+ 'sequence': iteration_index})
+
+ iteration_index += 1
+ if iteration_index > iterations or aborted.is_set():
+ LOG.info('"IterationIPC" worker END')
+ break
+
+ if 'teardown' in run_step:
+ try:
+ benchmark.teardown()
+ except Exception:
+ LOG.exception('Exception during teardown process')
+ mq_consumer.stop_rpc_server()
+ raise SystemExit(1)
+
+ LOG.debug('Data queue size = %s', queue.qsize())
+ LOG.debug('Output queue size = %s', output_queue.qsize())
+ mq_consumer.stop_rpc_server()
+
+
+class IterationIPCRunner(base_runner.Runner):
+ """Run a scenario for a configurable number of times.
+
+ Each iteration has a configurable timeout. The loop control depends on the
+ feedback received from the running VNFs. The context PIDs from the VNFs to
+ listen the messages from are given in the scenario "setup" method.
+ """
+ __execution_type__ = 'IterationIPC'
+
+ def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+ name = '{}-{}-{}'.format(
+ self.__execution_type__, scenario_cfg.get('type'), os.getpid())
+ self.process = multiprocessing.Process(
+ name=name,
+ target=_worker_process,
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted, self.output_queue))
+ self.process.start()