diff options
Diffstat (limited to 'yardstick/benchmark')
-rw-r--r-- | yardstick/benchmark/core/report.py | 31 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 49 | ||||
-rw-r--r-- | yardstick/benchmark/runners/iteration.py | 37 | ||||
-rw-r--r-- | yardstick/benchmark/runners/iteration_ipc.py | 205 | ||||
-rw-r--r-- | yardstick/benchmark/runners/sequence.py | 33 | ||||
-rw-r--r-- | yardstick/benchmark/scenarios/base.py | 4 | ||||
-rw-r--r-- | yardstick/benchmark/scenarios/networking/vnf_generic.py | 156 |
7 files changed, 195 insertions, 320 deletions
diff --git a/yardstick/benchmark/core/report.py b/yardstick/benchmark/core/report.py index 0819cd497..b7d2fd02b 100644 --- a/yardstick/benchmark/core/report.py +++ b/yardstick/benchmark/core/report.py @@ -1,6 +1,6 @@ ############################################################################## # Copyright (c) 2017 Rajesh Kudaka <4k.rajesh@gmail.com> -# Copyright (c) 2018 Intel Corporation. +# Copyright (c) 2018-2019 Intel Corporation. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 @@ -121,6 +121,25 @@ class Report(object): else: raise KeyError("Task ID or Test case not found.") + def _get_trimmed_timestamp(self, metric_time, resolution=4): + if not isinstance(metric_time, str): + metric_time = metric_time.encode('utf8') # PY2: unicode to str + metric_time = metric_time[11:] # skip date, keep time + head, _, tail = metric_time.partition('.') # split HH:MM:SS & nsZ + metric_time = head + '.' + tail[:resolution] # join HH:MM:SS & .us + return metric_time + + def _get_timestamps(self, metrics, resolution=6): + # Extract the timestamps from a list of metrics + timestamps = [] + for metric in metrics: + metric_time = self._get_trimmed_timestamp( + metric['time'], resolution) + timestamps.append(metric_time) # HH:MM:SS.micros + return timestamps + + @cliargs("task_id", type=str, help=" task id", nargs=1) + @cliargs("yaml_name", type=str, help=" Yaml file Name", nargs=1) def _generate_common(self, args): """Actions that are common to both report formats. @@ -147,15 +166,7 @@ class Report(object): for field in db_fieldkeys]] # extract timestamps - self.Timestamp = [] - for metric in db_metrics: - metric_time = metric['time'] # in RFC3339 format - if not isinstance(metric_time, str): - metric_time = metric_time.encode('utf8') # PY2: unicode to str - metric_time = metric_time[11:] # skip date, keep time - head, _, tail = metric_time.partition('.') # split HH:MM:SS and nsZ - metric_time = head + '.' + tail[:6] # join HH:MM:SS and .us - self.Timestamp.append(metric_time) # HH:MM:SS.micros + self.Timestamp = self._get_timestamps(db_metrics) # prepare return values datasets = [] diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index af2557441..94de45d1e 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -25,9 +25,6 @@ import traceback from six import moves from yardstick.benchmark.scenarios import base as base_scenario -from yardstick.common import messaging -from yardstick.common.messaging import payloads -from yardstick.common.messaging import producer from yardstick.common import utils from yardstick.dispatcher.base import Base as DispatcherBase @@ -80,6 +77,33 @@ def _periodic_action(interval, command, queue): queue.put({'periodic-action-data': data}) +class ScenarioOutput(dict): + + QUEUE_PUT_TIMEOUT = 10 + + def __init__(self, queue, **kwargs): + super(ScenarioOutput, self).__init__() + self._queue = queue + self.result_ext = dict() + for key, val in kwargs.items(): + self.result_ext[key] = val + setattr(self, key, val) + + def push(self, data=None, add_timestamp=True): + if data is None: + data = dict(self) + + if add_timestamp: + result = {'timestamp': time.time(), 'data': data} + else: + result = data + + for key in self.result_ext.keys(): + result[key] = getattr(self, key) + + self._queue.put(result, True, self.QUEUE_PUT_TIMEOUT) + + class Runner(object): runners = [] @@ -271,22 +295,3 @@ class Runner(object): dispatchers = DispatcherBase.get(self.config['output_config']) dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) - - -class RunnerProducer(producer.MessagingProducer): - """Class implementing the message producer for runners""" - - def __init__(self, _id): - super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id) - - def start_iteration(self, version=1, data=None): - data = {} if not data else data - self.send_message( - messaging.RUNNER_METHOD_START_ITERATION, - payloads.RunnerPayload(version=version, data=data)) - - def stop_iteration(self, version=1, data=None): - data = {} if not data else data - self.send_message( - messaging.RUNNER_METHOD_STOP_ITERATION, - payloads.RunnerPayload(version=version, data=data)) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 58ab06a32..15dad2cd5 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -23,7 +23,6 @@ from __future__ import absolute_import import logging import multiprocessing -import time import traceback import os @@ -40,8 +39,6 @@ QUEUE_PUT_TIMEOUT = 10 def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg, aborted, output_queue): - sequence = 1 - runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) @@ -53,6 +50,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, runner_cfg['runner_id'] = os.getpid() + scenario_output = base.ScenarioOutput(queue, sequence=1, errors="") benchmark = cls(scenario_cfg, context_cfg) if "setup" in run_step: benchmark.setup() @@ -67,22 +65,21 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.debug("runner=%(runner)s seq=%(sequence)s START", {"runner": runner_cfg["runner_id"], - "sequence": sequence}) - - data = {} - errors = "" + "sequence": scenario_output.sequence}) + scenario_output.clear() + scenario_output.errors = "" benchmark.pre_run_wait_time(interval) try: - result = method(data) + result = method(scenario_output) except y_exc.SLAValidationError as error: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": raise elif sla_action == "monitor": LOG.warning("SLA validation failed: %s", error.args) - errors = error.args + scenario_output.errors = error.args elif sla_action == "rate-control": try: scenario_cfg['options']['rate'] @@ -91,10 +88,10 @@ def _worker_process(queue, cls, method_name, scenario_cfg, scenario_cfg['options']['rate'] = 100 scenario_cfg['options']['rate'] -= delta - sequence = 1 + scenario_output.sequence = 1 continue except Exception: # pylint: disable=broad-except - errors = traceback.format_exc() + scenario_output.errors = traceback.format_exc() LOG.exception("") raise else: @@ -105,23 +102,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg, benchmark.post_run_wait_time(interval) - benchmark_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) + if scenario_output: + scenario_output.push() LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], - "sequence": sequence}) + "sequence": scenario_output.sequence}) - sequence += 1 + scenario_output.sequence += 1 - if (errors and sla_action is None) or \ - (sequence > iterations or aborted.is_set()): + if (scenario_output.errors and sla_action is None) or \ + (scenario_output.sequence > iterations or aborted.is_set()): LOG.info("worker END") break if "teardown" in run_step: diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py deleted file mode 100644 index a0335fdc7..000000000 --- a/yardstick/benchmark/runners/iteration_ipc.py +++ /dev/null @@ -1,205 +0,0 @@ -# Copyright 2018: Intel Corporation -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""A runner that runs a configurable number of times before it returns. Each - iteration has a configurable timeout. The loop control depends on the - feedback received from the running VNFs. The context PIDs from the VNFs - to listen the messages from are given in the scenario "setup" method. -""" - -import logging -import multiprocessing -import time -import traceback - -import os - -from yardstick.benchmark.runners import base as base_runner -from yardstick.common import exceptions -from yardstick.common import messaging -from yardstick.common import utils -from yardstick.common.messaging import consumer -from yardstick.common.messaging import payloads - - -LOG = logging.getLogger(__name__) - -QUEUE_PUT_TIMEOUT = 10 -ITERATION_TIMEOUT = 180 - - -class RunnerIterationIPCEndpoint(consumer.NotificationHandler): - """Endpoint class for ``RunnerIterationIPCConsumer``""" - - def tg_method_started(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_ids: - self._queue.put( - {'id': ctxt['id'], - 'action': messaging.TG_METHOD_STARTED, - 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( - kwargs)}, - QUEUE_PUT_TIMEOUT) - - def tg_method_finished(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_ids: - self._queue.put( - {'id': ctxt['id'], - 'action': messaging.TG_METHOD_FINISHED, - 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( - kwargs)}) - - def tg_method_iteration(self, ctxt, **kwargs): - if ctxt['id'] in self._ctx_ids: - self._queue.put( - {'id': ctxt['id'], - 'action': messaging.TG_METHOD_ITERATION, - 'payload': payloads.TrafficGeneratorPayload.dict_to_obj( - kwargs)}) - - -class RunnerIterationIPCConsumer(consumer.MessagingConsumer): - """MQ consumer for "IterationIPC" runner""" - - def __init__(self, _id, ctx_ids): - self._id = _id - self._queue = multiprocessing.Queue() - endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)] - super(RunnerIterationIPCConsumer, self).__init__( - messaging.TOPIC_TG, ctx_ids, endpoints) - self._kpi_per_id = {ctx: [] for ctx in ctx_ids} - self.iteration_index = None - - def is_all_kpis_received_in_iteration(self): - """Check if all producers registered have sent the ITERATION msg - - During the present iteration, all producers (traffic generators) must - start and finish the traffic injection, and at the end of the traffic - injection a TG_METHOD_ITERATION must be sent. This function will check - all KPIs in the present iteration are received. E.g.: - self.iteration_index = 2 - - self._kpi_per_id = { - 'ctx1': [kpi0, kpi1, kpi2], - 'ctx2': [kpi0, kpi1]} --> return False - - self._kpi_per_id = { - 'ctx1': [kpi0, kpi1, kpi2], - 'ctx2': [kpi0, kpi1, kpi2]} --> return True - """ - while not self._queue.empty(): - msg = self._queue.get(True, 1) - if msg['action'] == messaging.TG_METHOD_ITERATION: - id_iter_list = self._kpi_per_id[msg['id']] - id_iter_list.append(msg['payload'].kpi) - - return all(len(id_iter_list) == self.iteration_index - for id_iter_list in self._kpi_per_id.values()) - - -def _worker_process(queue, cls, method_name, scenario_cfg, - context_cfg, aborted, output_queue): # pragma: no cover - runner_cfg = scenario_cfg['runner'] - - timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT) - iterations = runner_cfg.get('iterations', 1) - run_step = runner_cfg.get('run_step', 'setup,run,teardown') - LOG.info('Worker START. Iterations %d times, class %s', iterations, cls) - - runner_cfg['runner_id'] = os.getpid() - - benchmark = cls(scenario_cfg, context_cfg) - method = getattr(benchmark, method_name) - - if 'setup' not in run_step: - raise exceptions.RunnerIterationIPCSetupActionNeeded() - benchmark.setup() - producer_ctxs = benchmark.get_mq_ids() - if not producer_ctxs: - raise exceptions.RunnerIterationIPCNoCtxs() - - mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs) - mq_consumer.start_rpc_server() - mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id']) - - iteration_index = 1 - while 'run' in run_step: - LOG.debug('runner=%(runner)s seq=%(sequence)s START', - {'runner': runner_cfg['runner_id'], - 'sequence': iteration_index}) - data = {} - result = None - errors = '' - mq_consumer.iteration_index = iteration_index - mq_producer.start_iteration() - - try: - utils.wait_until_true( - mq_consumer.is_all_kpis_received_in_iteration, - timeout=timeout, sleep=2) - result = method(data) - except Exception: # pylint: disable=broad-except - errors = traceback.format_exc() - LOG.exception(errors) - - mq_producer.stop_iteration() - - if result: - output_queue.put(result, True, QUEUE_PUT_TIMEOUT) - benchmark_output = {'timestamp': time.time(), - 'sequence': iteration_index, - 'data': data, - 'errors': errors} - queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT) - - LOG.debug('runner=%(runner)s seq=%(sequence)s END', - {'runner': runner_cfg['runner_id'], - 'sequence': iteration_index}) - - iteration_index += 1 - if iteration_index > iterations or aborted.is_set(): - LOG.info('"IterationIPC" worker END') - break - - if 'teardown' in run_step: - try: - benchmark.teardown() - except Exception: - LOG.exception('Exception during teardown process') - mq_consumer.stop_rpc_server() - raise SystemExit(1) - - LOG.debug('Data queue size = %s', queue.qsize()) - LOG.debug('Output queue size = %s', output_queue.qsize()) - mq_consumer.stop_rpc_server() - - -class IterationIPCRunner(base_runner.Runner): - """Run a scenario for a configurable number of times. - - Each iteration has a configurable timeout. The loop control depends on the - feedback received from the running VNFs. The context PIDs from the VNFs to - listen the messages from are given in the scenario "setup" method. - """ - __execution_type__ = 'IterationIPC' - - def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): - name = '{}-{}-{}'.format( - self.__execution_type__, scenario_cfg.get('type'), os.getpid()) - self.process = multiprocessing.Process( - name=name, - target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, - context_cfg, self.aborted, self.output_queue)) - self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 0148a45b2..58ffddd22 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -38,8 +38,6 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg, aborted, output_queue): - sequence = 1 - runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) @@ -56,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.info("worker START, sequence_values(%s, %s), class %s", arg_name, sequence_values, cls) + scenario_output = base.ScenarioOutput(queue, sequence=1, errors="") benchmark = cls(scenario_cfg, context_cfg) benchmark.setup() method = getattr(benchmark, method_name) @@ -68,22 +67,23 @@ def _worker_process(queue, cls, method_name, scenario_cfg, options[arg_name] = value LOG.debug("runner=%(runner)s seq=%(sequence)s START", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) + {"runner": runner_cfg["runner_id"], + "sequence": scenario_output.sequence}) - data = {} - errors = "" + scenario_output.clear() + scenario_output.errors = "" try: - result = method(data) + result = method(scenario_output) except y_exc.SLAValidationError as error: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": raise elif sla_action == "monitor": LOG.warning("SLA validation failed: %s", error.args) - errors = error.args + scenario_output.errors = error.args except Exception as e: # pylint: disable=broad-except - errors = traceback.format_exc() + scenario_output.errors = traceback.format_exc() LOG.exception(e) else: if result: @@ -91,21 +91,16 @@ def _worker_process(queue, cls, method_name, scenario_cfg, time.sleep(interval) - benchmark_output = { - 'timestamp': time.time(), - 'sequence': sequence, - 'data': data, - 'errors': errors - } - - queue.put(benchmark_output) + if scenario_output: + scenario_output.push() LOG.debug("runner=%(runner)s seq=%(sequence)s END", - {"runner": runner_cfg["runner_id"], "sequence": sequence}) + {"runner": runner_cfg["runner_id"], + "sequence": scenario_output.sequence}) - sequence += 1 + scenario_output.sequence += 1 - if (errors and sla_action is None) or aborted.is_set(): + if (scenario_output.errors and sla_action is None) or aborted.is_set(): break try: diff --git a/yardstick/benchmark/scenarios/base.py b/yardstick/benchmark/scenarios/base.py index 1737bb942..ae8bfad71 100644 --- a/yardstick/benchmark/scenarios/base.py +++ b/yardstick/benchmark/scenarios/base.py @@ -122,7 +122,3 @@ class Scenario(object): except TypeError: dic[k] = v return dic - - def get_mq_ids(self): # pragma: no cover - """Return stored MQ producer IDs, if defined""" - pass diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py index 5ac51cdfc..c88ea51c3 100644 --- a/yardstick/benchmark/scenarios/networking/vnf_generic.py +++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016-2017 Intel Corporation +# Copyright (c) 2016-2019 Intel Corporation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -44,14 +44,13 @@ traffic_profile.register_modules() LOG = logging.getLogger(__name__) -class NetworkServiceTestCase(scenario_base.Scenario): - """Class handles Generic framework to do pre-deployment VNF & - Network service testing """ +class NetworkServiceBase(scenario_base.Scenario): + """Base class for Network service testing scenarios""" - __scenario_type__ = "NSPerf" + __scenario_type__ = "" def __init__(self, scenario_cfg, context_cfg): # pragma: no cover - super(NetworkServiceTestCase, self).__init__() + super(NetworkServiceBase, self).__init__() self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg @@ -61,7 +60,32 @@ class NetworkServiceTestCase(scenario_base.Scenario): self.traffic_profile = None self.node_netdevs = {} self.bin_path = get_nsb_option('bin_path', '') - self._mq_ids = [] + + def run(self, *args): + pass + + def teardown(self): + """ Stop the collector and terminate VNF & TG instance + + :return + """ + + try: + try: + self.collector.stop() + for vnf in self.vnfs: + LOG.info("Stopping %s", vnf.name) + vnf.terminate() + LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) + finally: + terminate_children() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise RuntimeError("Error in teardown") def is_ended(self): return self.traffic_profile is not None and self.traffic_profile.is_ended() @@ -140,6 +164,13 @@ class NetworkServiceTestCase(scenario_base.Scenario): imix = {} return imix + def _get_ip_priority(self): + try: + priority = self.scenario_cfg['options']['priority'] + except KeyError: + priority = {} + return priority + def _get_traffic_profile(self): profile = self.scenario_cfg["traffic_profile"] path = self.scenario_cfg["task_path"] @@ -177,6 +208,7 @@ class NetworkServiceTestCase(scenario_base.Scenario): tprofile_data = { 'flow': self._get_traffic_flow(), 'imix': self._get_traffic_imix(), + 'priority': self._get_ip_priority(), tprofile_base.TrafficProfile.UPLINK: {}, tprofile_base.TrafficProfile.DOWNLINK: {}, 'extra_args': extra_args, @@ -446,12 +478,30 @@ class NetworkServiceTestCase(scenario_base.Scenario): pass self.create_interfaces_from_node(vnfd, node) vnf_impl = self.get_vnf_impl(vnfd['id']) - vnf_instance = vnf_impl(node_name, vnfd, scenario_cfg['task_id']) + vnf_instance = vnf_impl(node_name, vnfd) vnfs.append(vnf_instance) self.vnfs = vnfs return vnfs + def pre_run_wait_time(self, time_seconds): # pragma: no cover + """Time waited before executing the run method""" + time.sleep(time_seconds) + + def post_run_wait_time(self, time_seconds): # pragma: no cover + """Time waited after executing the run method""" + pass + + +class NetworkServiceTestCase(NetworkServiceBase): + """Class handles Generic framework to do pre-deployment VNF & + Network service testing """ + + __scenario_type__ = "NSPerf" + + def __init__(self, scenario_cfg, context_cfg): # pragma: no cover + super(NetworkServiceTestCase, self).__init__(scenario_cfg, context_cfg) + def setup(self): """Setup infrastructure, provission VNFs & start traffic""" # 1. Verify if infrastructure mapping can meet topology @@ -495,11 +545,6 @@ class NetworkServiceTestCase(scenario_base.Scenario): for traffic_gen in traffic_runners: LOG.info("Starting traffic on %s", traffic_gen.name) traffic_gen.run_traffic(self.traffic_profile) - self._mq_ids.append(traffic_gen.get_mq_producer_id()) - - def get_mq_ids(self): # pragma: no cover - """Return stored MQ producer IDs""" - return self._mq_ids def run(self, result): # yardstick API """ Yardstick calls run() at intervals defined in the yaml and @@ -515,33 +560,70 @@ class NetworkServiceTestCase(scenario_base.Scenario): result.update(self.collector.get_kpi()) - def teardown(self): - """ Stop the collector and terminate VNF & TG instance - :return - """ +class NetworkServiceRFC2544(NetworkServiceBase): + """Class handles RFC2544 Network service testing""" + __scenario_type__ = "NSPerf-RFC2544" + + def __init__(self, scenario_cfg, context_cfg): # pragma: no cover + super(NetworkServiceRFC2544, self).__init__(scenario_cfg, context_cfg) + + def setup(self): + """Setup infrastructure, provision VNFs""" + self.map_topology_to_infrastructure() + self.load_vnf_models() + + traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic] + non_traffic_runners = [vnf for vnf in self.vnfs if not vnf.runs_traffic] try: - try: - self.collector.stop() - for vnf in self.vnfs: - LOG.info("Stopping %s", vnf.name) - vnf.terminate() - LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs)) - finally: - terminate_children() - except Exception: - # catch any exception in teardown and convert to simple exception - # never pass exceptions back to multiprocessing, because some exceptions can - # be unpicklable - # https://bugs.python.org/issue9400 + for vnf in chain(traffic_runners, non_traffic_runners): + LOG.info("Instantiating %s", vnf.name) + vnf.instantiate(self.scenario_cfg, self.context_cfg) + LOG.info("Waiting for %s to instantiate", vnf.name) + vnf.wait_for_instantiate() + except: LOG.exception("") - raise RuntimeError("Error in teardown") + for vnf in self.vnfs: + vnf.terminate() + raise - def pre_run_wait_time(self, time_seconds): # pragma: no cover - """Time waited before executing the run method""" - time.sleep(time_seconds) + self._generate_pod_yaml() - def post_run_wait_time(self, time_seconds): # pragma: no cover - """Time waited after executing the run method""" - pass + def run(self, output): + """ Run experiment + + :param output: scenario output to push results + :return: None + """ + + self._fill_traffic_profile() + + traffic_runners = [vnf for vnf in self.vnfs if vnf.runs_traffic] + + for traffic_gen in traffic_runners: + traffic_gen.listen_traffic(self.traffic_profile) + + self.collector = Collector(self.vnfs, + context_base.Context.get_physical_nodes()) + self.collector.start() + + test_completed = False + while not test_completed: + for traffic_gen in traffic_runners: + LOG.info("Run traffic on %s", traffic_gen.name) + traffic_gen.run_traffic_once(self.traffic_profile) + + test_completed = True + for traffic_gen in traffic_runners: + # wait for all tg to complete running traffic + status = traffic_gen.wait_on_traffic() + LOG.info("Run traffic on %s complete status=%s", + traffic_gen.name, status) + if status == 'CONTINUE': + # continue running if at least one tg is running + test_completed = False + + output.push(self.collector.get_kpi()) + + self.collector.stop() |