aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners')
-rwxr-xr-xyardstick/benchmark/runners/base.py49
-rw-r--r--yardstick/benchmark/runners/iteration.py37
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py205
-rw-r--r--yardstick/benchmark/runners/sequence.py33
4 files changed, 55 insertions, 269 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index af2557441..94de45d1e 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -25,9 +25,6 @@ import traceback
from six import moves
from yardstick.benchmark.scenarios import base as base_scenario
-from yardstick.common import messaging
-from yardstick.common.messaging import payloads
-from yardstick.common.messaging import producer
from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
@@ -80,6 +77,33 @@ def _periodic_action(interval, command, queue):
queue.put({'periodic-action-data': data})
+class ScenarioOutput(dict):
+
+ QUEUE_PUT_TIMEOUT = 10
+
+ def __init__(self, queue, **kwargs):
+ super(ScenarioOutput, self).__init__()
+ self._queue = queue
+ self.result_ext = dict()
+ for key, val in kwargs.items():
+ self.result_ext[key] = val
+ setattr(self, key, val)
+
+ def push(self, data=None, add_timestamp=True):
+ if data is None:
+ data = dict(self)
+
+ if add_timestamp:
+ result = {'timestamp': time.time(), 'data': data}
+ else:
+ result = data
+
+ for key in self.result_ext.keys():
+ result[key] = getattr(self, key)
+
+ self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT)
+
+
class Runner(object):
runners = []
@@ -271,22 +295,3 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
-
-
-class RunnerProducer(producer.MessagingProducer):
- """Class implementing the message producer for runners"""
-
- def __init__(self, _id):
- super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
-
- def start_iteration(self, version=1, data=None):
- data = {} if not data else data
- self.send_message(
- messaging.RUNNER_METHOD_START_ITERATION,
- payloads.RunnerPayload(version=version, data=data))
-
- def stop_iteration(self, version=1, data=None):
- data = {} if not data else data
- self.send_message(
- messaging.RUNNER_METHOD_STOP_ITERATION,
- payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 58ab06a32..15dad2cd5 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -23,7 +23,6 @@ from __future__ import absolute_import
import logging
import multiprocessing
-import time
import traceback
import os
@@ -40,8 +39,6 @@ QUEUE_PUT_TIMEOUT = 10
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted, output_queue):
- sequence = 1
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -53,6 +50,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
runner_cfg['runner_id'] = os.getpid()
+ scenario_output = base.ScenarioOutput(queue, sequence=1, errors="")
benchmark = cls(scenario_cfg, context_cfg)
if "setup" in run_step:
benchmark.setup()
@@ -67,22 +65,21 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
{"runner": runner_cfg["runner_id"],
- "sequence": sequence})
-
- data = {}
- errors = ""
+ "sequence": scenario_output.sequence})
+ scenario_output.clear()
+ scenario_output.errors = ""
benchmark.pre_run_wait_time(interval)
try:
- result = method(data)
+ result = method(scenario_output)
except y_exc.SLAValidationError as error:
# SLA validation failed in scenario, determine what to do now
if sla_action == "assert":
raise
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", error.args)
- errors = error.args
+ scenario_output.errors = error.args
elif sla_action == "rate-control":
try:
scenario_cfg['options']['rate']
@@ -91,10 +88,10 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
scenario_cfg['options']['rate'] = 100
scenario_cfg['options']['rate'] -= delta
- sequence = 1
+ scenario_output.sequence = 1
continue
except Exception: # pylint: disable=broad-except
- errors = traceback.format_exc()
+ scenario_output.errors = traceback.format_exc()
LOG.exception("")
raise
else:
@@ -105,23 +102,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
benchmark.post_run_wait_time(interval)
- benchmark_output = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors
- }
-
- queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
+ if scenario_output:
+ scenario_output.push()
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
- "sequence": sequence})
+ "sequence": scenario_output.sequence})
- sequence += 1
+ scenario_output.sequence += 1
- if (errors and sla_action is None) or \
- (sequence > iterations or aborted.is_set()):
+ if (scenario_output.errors and sla_action is None) or \
+ (scenario_output.sequence > iterations or aborted.is_set()):
LOG.info("worker END")
break
if "teardown" in run_step:
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
deleted file mode 100644
index a0335fdc7..000000000
--- a/yardstick/benchmark/runners/iteration_ipc.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2018: Intel Corporation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""A runner that runs a configurable number of times before it returns. Each
- iteration has a configurable timeout. The loop control depends on the
- feedback received from the running VNFs. The context PIDs from the VNFs
- to listen the messages from are given in the scenario "setup" method.
-"""
-
-import logging
-import multiprocessing
-import time
-import traceback
-
-import os
-
-from yardstick.benchmark.runners import base as base_runner
-from yardstick.common import exceptions
-from yardstick.common import messaging
-from yardstick.common import utils
-from yardstick.common.messaging import consumer
-from yardstick.common.messaging import payloads
-
-
-LOG = logging.getLogger(__name__)
-
-QUEUE_PUT_TIMEOUT = 10
-ITERATION_TIMEOUT = 180
-
-
-class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
- """Endpoint class for ``RunnerIterationIPCConsumer``"""
-
- def tg_method_started(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_STARTED,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)},
- QUEUE_PUT_TIMEOUT)
-
- def tg_method_finished(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_FINISHED,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)})
-
- def tg_method_iteration(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_ITERATION,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)})
-
-
-class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
- """MQ consumer for "IterationIPC" runner"""
-
- def __init__(self, _id, ctx_ids):
- self._id = _id
- self._queue = multiprocessing.Queue()
- endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
- super(RunnerIterationIPCConsumer, self).__init__(
- messaging.TOPIC_TG, ctx_ids, endpoints)
- self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
- self.iteration_index = None
-
- def is_all_kpis_received_in_iteration(self):
- """Check if all producers registered have sent the ITERATION msg
-
- During the present iteration, all producers (traffic generators) must
- start and finish the traffic injection, and at the end of the traffic
- injection a TG_METHOD_ITERATION must be sent. This function will check
- all KPIs in the present iteration are received. E.g.:
- self.iteration_index = 2
-
- self._kpi_per_id = {
- 'ctx1': [kpi0, kpi1, kpi2],
- 'ctx2': [kpi0, kpi1]} --> return False
-
- self._kpi_per_id = {
- 'ctx1': [kpi0, kpi1, kpi2],
- 'ctx2': [kpi0, kpi1, kpi2]} --> return True
- """
- while not self._queue.empty():
- msg = self._queue.get(True, 1)
- if msg['action'] == messaging.TG_METHOD_ITERATION:
- id_iter_list = self._kpi_per_id[msg['id']]
- id_iter_list.append(msg['payload'].kpi)
-
- return all(len(id_iter_list) == self.iteration_index
- for id_iter_list in self._kpi_per_id.values())
-
-
-def _worker_process(queue, cls, method_name, scenario_cfg,
- context_cfg, aborted, output_queue): # pragma: no cover
- runner_cfg = scenario_cfg['runner']
-
- timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
- iterations = runner_cfg.get('iterations', 1)
- run_step = runner_cfg.get('run_step', 'setup,run,teardown')
- LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
-
- runner_cfg['runner_id'] = os.getpid()
-
- benchmark = cls(scenario_cfg, context_cfg)
- method = getattr(benchmark, method_name)
-
- if 'setup' not in run_step:
- raise exceptions.RunnerIterationIPCSetupActionNeeded()
- benchmark.setup()
- producer_ctxs = benchmark.get_mq_ids()
- if not producer_ctxs:
- raise exceptions.RunnerIterationIPCNoCtxs()
-
- mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
- mq_consumer.start_rpc_server()
- mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
-
- iteration_index = 1
- while 'run' in run_step:
- LOG.debug('runner=%(runner)s seq=%(sequence)s START',
- {'runner': runner_cfg['runner_id'],
- 'sequence': iteration_index})
- data = {}
- result = None
- errors = ''
- mq_consumer.iteration_index = iteration_index
- mq_producer.start_iteration()
-
- try:
- utils.wait_until_true(
- mq_consumer.is_all_kpis_received_in_iteration,
- timeout=timeout, sleep=2)
- result = method(data)
- except Exception: # pylint: disable=broad-except
- errors = traceback.format_exc()
- LOG.exception(errors)
-
- mq_producer.stop_iteration()
-
- if result:
- output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
- benchmark_output = {'timestamp': time.time(),
- 'sequence': iteration_index,
- 'data': data,
- 'errors': errors}
- queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
-
- LOG.debug('runner=%(runner)s seq=%(sequence)s END',
- {'runner': runner_cfg['runner_id'],
- 'sequence': iteration_index})
-
- iteration_index += 1
- if iteration_index > iterations or aborted.is_set():
- LOG.info('"IterationIPC" worker END')
- break
-
- if 'teardown' in run_step:
- try:
- benchmark.teardown()
- except Exception:
- LOG.exception('Exception during teardown process')
- mq_consumer.stop_rpc_server()
- raise SystemExit(1)
-
- LOG.debug('Data queue size = %s', queue.qsize())
- LOG.debug('Output queue size = %s', output_queue.qsize())
- mq_consumer.stop_rpc_server()
-
-
-class IterationIPCRunner(base_runner.Runner):
- """Run a scenario for a configurable number of times.
-
- Each iteration has a configurable timeout. The loop control depends on the
- feedback received from the running VNFs. The context PIDs from the VNFs to
- listen the messages from are given in the scenario "setup" method.
- """
- __execution_type__ = 'IterationIPC'
-
- def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
- name = '{}-{}-{}'.format(
- self.__execution_type__, scenario_cfg.get('type'), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg,
- context_cfg, self.aborted, self.output_queue))
- self.process.start()
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index 0148a45b2..58ffddd22 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -38,8 +38,6 @@ LOG = logging.getLogger(__name__)
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted, output_queue):
- sequence = 1
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -56,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.info("worker START, sequence_values(%s, %s), class %s",
arg_name, sequence_values, cls)
+ scenario_output = base.ScenarioOutput(queue, sequence=1, errors="")
benchmark = cls(scenario_cfg, context_cfg)
benchmark.setup()
method = getattr(benchmark, method_name)
@@ -68,22 +67,23 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
options[arg_name] = value
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
+ {"runner": runner_cfg["runner_id"],
+ "sequence": scenario_output.sequence})
- data = {}
- errors = ""
+ scenario_output.clear()
+ scenario_output.errors = ""
try:
- result = method(data)
+ result = method(scenario_output)
except y_exc.SLAValidationError as error:
# SLA validation failed in scenario, determine what to do now
if sla_action == "assert":
raise
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", error.args)
- errors = error.args
+ scenario_output.errors = error.args
except Exception as e: # pylint: disable=broad-except
- errors = traceback.format_exc()
+ scenario_output.errors = traceback.format_exc()
LOG.exception(e)
else:
if result:
@@ -91,21 +91,16 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
time.sleep(interval)
- benchmark_output = {
- 'timestamp': time.time(),
- 'sequence': sequence,
- 'data': data,
- 'errors': errors
- }
-
- queue.put(benchmark_output)
+ if scenario_output:
+ scenario_output.push()
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
- {"runner": runner_cfg["runner_id"], "sequence": sequence})
+ {"runner": runner_cfg["runner_id"],
+ "sequence": scenario_output.sequence})
- sequence += 1
+ scenario_output.sequence += 1
- if (errors and sla_action is None) or aborted.is_set():
+ if (scenario_output.errors and sla_action is None) or aborted.is_set():
break
try: