aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark
diff options
context:
space:
mode:
authorMytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>2019-01-24 17:46:04 +0200
committerVolodymyr Mytnyk <volodymyrx.mytnyk@intel.com>2019-01-29 12:57:25 +0000
commit679b6d94288d801e84c0f22c6c0d712c08eb8458 (patch)
tree031cfad1c1a3d495af07a6222f73afa3d9437e3e /yardstick/benchmark
parente3871dda8a4deb1a0f7e72050cd9fbba6cc0ecb9 (diff)
NSB sync: clean-up draft IPC implementation (part 1)
The existing implementation of IPC is not finished and isn't used by any of the VNFs/TG implementation. It is used in the code but does nothing from functionality perspective. New syncronization mechanism is going to be implemented by using different approach than it was designed before. Thus, the current IPC mechanism is not going to be re-used. So, removing it. The IPC consumer/producer implementation is left as it may be required for other purposes. - Remove SampleVNF MQ consumer class - Remove IterationIPC MQ producer for VNF control messages - Remove MQ producer from SampleVNFTrafficGen class - Remove TrafficGeneratorProducer class - Remove IterationIPC runner - Remove unused task_id form VNF Generic initialization as it is not required for synchronization of VNFs/TGs anymore. - Fix UT JIRA: YARDSTICK-1592 Change-Id: I65fe51bcbd1bfeea0c43eb79ca6fb2aab5b65ae7 Signed-off-by: Mytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>
Diffstat (limited to 'yardstick/benchmark')
-rwxr-xr-xyardstick/benchmark/runners/base.py22
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py205
-rw-r--r--yardstick/benchmark/scenarios/base.py4
-rw-r--r--yardstick/benchmark/scenarios/networking/vnf_generic.py8
4 files changed, 1 insertions, 238 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index af2557441..3878f20aa 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -25,9 +25,6 @@ import traceback
from six import moves
from yardstick.benchmark.scenarios import base as base_scenario
-from yardstick.common import messaging
-from yardstick.common.messaging import payloads
-from yardstick.common.messaging import producer
from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
@@ -271,22 +268,3 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
-
-
-class RunnerProducer(producer.MessagingProducer):
- """Class implementing the message producer for runners"""
-
- def __init__(self, _id):
- super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
-
- def start_iteration(self, version=1, data=None):
- data = {} if not data else data
- self.send_message(
- messaging.RUNNER_METHOD_START_ITERATION,
- payloads.RunnerPayload(version=version, data=data))
-
- def stop_iteration(self, version=1, data=None):
- data = {} if not data else data
- self.send_message(
- messaging.RUNNER_METHOD_STOP_ITERATION,
- payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
deleted file mode 100644
index a0335fdc7..000000000
--- a/yardstick/benchmark/runners/iteration_ipc.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2018: Intel Corporation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""A runner that runs a configurable number of times before it returns. Each
- iteration has a configurable timeout. The loop control depends on the
- feedback received from the running VNFs. The context PIDs from the VNFs
- to listen the messages from are given in the scenario "setup" method.
-"""
-
-import logging
-import multiprocessing
-import time
-import traceback
-
-import os
-
-from yardstick.benchmark.runners import base as base_runner
-from yardstick.common import exceptions
-from yardstick.common import messaging
-from yardstick.common import utils
-from yardstick.common.messaging import consumer
-from yardstick.common.messaging import payloads
-
-
-LOG = logging.getLogger(__name__)
-
-QUEUE_PUT_TIMEOUT = 10
-ITERATION_TIMEOUT = 180
-
-
-class RunnerIterationIPCEndpoint(consumer.NotificationHandler):
- """Endpoint class for ``RunnerIterationIPCConsumer``"""
-
- def tg_method_started(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_STARTED,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)},
- QUEUE_PUT_TIMEOUT)
-
- def tg_method_finished(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_FINISHED,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)})
-
- def tg_method_iteration(self, ctxt, **kwargs):
- if ctxt['id'] in self._ctx_ids:
- self._queue.put(
- {'id': ctxt['id'],
- 'action': messaging.TG_METHOD_ITERATION,
- 'payload': payloads.TrafficGeneratorPayload.dict_to_obj(
- kwargs)})
-
-
-class RunnerIterationIPCConsumer(consumer.MessagingConsumer):
- """MQ consumer for "IterationIPC" runner"""
-
- def __init__(self, _id, ctx_ids):
- self._id = _id
- self._queue = multiprocessing.Queue()
- endpoints = [RunnerIterationIPCEndpoint(_id, ctx_ids, self._queue)]
- super(RunnerIterationIPCConsumer, self).__init__(
- messaging.TOPIC_TG, ctx_ids, endpoints)
- self._kpi_per_id = {ctx: [] for ctx in ctx_ids}
- self.iteration_index = None
-
- def is_all_kpis_received_in_iteration(self):
- """Check if all producers registered have sent the ITERATION msg
-
- During the present iteration, all producers (traffic generators) must
- start and finish the traffic injection, and at the end of the traffic
- injection a TG_METHOD_ITERATION must be sent. This function will check
- all KPIs in the present iteration are received. E.g.:
- self.iteration_index = 2
-
- self._kpi_per_id = {
- 'ctx1': [kpi0, kpi1, kpi2],
- 'ctx2': [kpi0, kpi1]} --> return False
-
- self._kpi_per_id = {
- 'ctx1': [kpi0, kpi1, kpi2],
- 'ctx2': [kpi0, kpi1, kpi2]} --> return True
- """
- while not self._queue.empty():
- msg = self._queue.get(True, 1)
- if msg['action'] == messaging.TG_METHOD_ITERATION:
- id_iter_list = self._kpi_per_id[msg['id']]
- id_iter_list.append(msg['payload'].kpi)
-
- return all(len(id_iter_list) == self.iteration_index
- for id_iter_list in self._kpi_per_id.values())
-
-
-def _worker_process(queue, cls, method_name, scenario_cfg,
- context_cfg, aborted, output_queue): # pragma: no cover
- runner_cfg = scenario_cfg['runner']
-
- timeout = runner_cfg.get('timeout', ITERATION_TIMEOUT)
- iterations = runner_cfg.get('iterations', 1)
- run_step = runner_cfg.get('run_step', 'setup,run,teardown')
- LOG.info('Worker START. Iterations %d times, class %s', iterations, cls)
-
- runner_cfg['runner_id'] = os.getpid()
-
- benchmark = cls(scenario_cfg, context_cfg)
- method = getattr(benchmark, method_name)
-
- if 'setup' not in run_step:
- raise exceptions.RunnerIterationIPCSetupActionNeeded()
- benchmark.setup()
- producer_ctxs = benchmark.get_mq_ids()
- if not producer_ctxs:
- raise exceptions.RunnerIterationIPCNoCtxs()
-
- mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
- mq_consumer.start_rpc_server()
- mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
-
- iteration_index = 1
- while 'run' in run_step:
- LOG.debug('runner=%(runner)s seq=%(sequence)s START',
- {'runner': runner_cfg['runner_id'],
- 'sequence': iteration_index})
- data = {}
- result = None
- errors = ''
- mq_consumer.iteration_index = iteration_index
- mq_producer.start_iteration()
-
- try:
- utils.wait_until_true(
- mq_consumer.is_all_kpis_received_in_iteration,
- timeout=timeout, sleep=2)
- result = method(data)
- except Exception: # pylint: disable=broad-except
- errors = traceback.format_exc()
- LOG.exception(errors)
-
- mq_producer.stop_iteration()
-
- if result:
- output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
- benchmark_output = {'timestamp': time.time(),
- 'sequence': iteration_index,
- 'data': data,
- 'errors': errors}
- queue.put(benchmark_output, True, QUEUE_PUT_TIMEOUT)
-
- LOG.debug('runner=%(runner)s seq=%(sequence)s END',
- {'runner': runner_cfg['runner_id'],
- 'sequence': iteration_index})
-
- iteration_index += 1
- if iteration_index > iterations or aborted.is_set():
- LOG.info('"IterationIPC" worker END')
- break
-
- if 'teardown' in run_step:
- try:
- benchmark.teardown()
- except Exception:
- LOG.exception('Exception during teardown process')
- mq_consumer.stop_rpc_server()
- raise SystemExit(1)
-
- LOG.debug('Data queue size = %s', queue.qsize())
- LOG.debug('Output queue size = %s', output_queue.qsize())
- mq_consumer.stop_rpc_server()
-
-
-class IterationIPCRunner(base_runner.Runner):
- """Run a scenario for a configurable number of times.
-
- Each iteration has a configurable timeout. The loop control depends on the
- feedback received from the running VNFs. The context PIDs from the VNFs to
- listen the messages from are given in the scenario "setup" method.
- """
- __execution_type__ = 'IterationIPC'
-
- def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
- name = '{}-{}-{}'.format(
- self.__execution_type__, scenario_cfg.get('type'), os.getpid())
- self.process = multiprocessing.Process(
- name=name,
- target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg,
- context_cfg, self.aborted, self.output_queue))
- self.process.start()
diff --git a/yardstick/benchmark/scenarios/base.py b/yardstick/benchmark/scenarios/base.py
index 1737bb942..ae8bfad71 100644
--- a/yardstick/benchmark/scenarios/base.py
+++ b/yardstick/benchmark/scenarios/base.py
@@ -122,7 +122,3 @@ class Scenario(object):
except TypeError:
dic[k] = v
return dic
-
- def get_mq_ids(self): # pragma: no cover
- """Return stored MQ producer IDs, if defined"""
- pass
diff --git a/yardstick/benchmark/scenarios/networking/vnf_generic.py b/yardstick/benchmark/scenarios/networking/vnf_generic.py
index 5ac51cdfc..daca9bafc 100644
--- a/yardstick/benchmark/scenarios/networking/vnf_generic.py
+++ b/yardstick/benchmark/scenarios/networking/vnf_generic.py
@@ -61,7 +61,6 @@ class NetworkServiceTestCase(scenario_base.Scenario):
self.traffic_profile = None
self.node_netdevs = {}
self.bin_path = get_nsb_option('bin_path', '')
- self._mq_ids = []
def is_ended(self):
return self.traffic_profile is not None and self.traffic_profile.is_ended()
@@ -446,7 +445,7 @@ class NetworkServiceTestCase(scenario_base.Scenario):
pass
self.create_interfaces_from_node(vnfd, node)
vnf_impl = self.get_vnf_impl(vnfd['id'])
- vnf_instance = vnf_impl(node_name, vnfd, scenario_cfg['task_id'])
+ vnf_instance = vnf_impl(node_name, vnfd)
vnfs.append(vnf_instance)
self.vnfs = vnfs
@@ -495,11 +494,6 @@ class NetworkServiceTestCase(scenario_base.Scenario):
for traffic_gen in traffic_runners:
LOG.info("Starting traffic on %s", traffic_gen.name)
traffic_gen.run_traffic(self.traffic_profile)
- self._mq_ids.append(traffic_gen.get_mq_producer_id())
-
- def get_mq_ids(self): # pragma: no cover
- """Return stored MQ producer IDs"""
- return self._mq_ids
def run(self, result): # yardstick API
""" Yardstick calls run() at intervals defined in the yaml and