From 51bc9b51362ca76011bb201353de5354907332d1 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Tue, 3 Jul 2018 09:14:35 +0100 Subject: Make "IterationIPC" MQ producer for VNF control messages "IterationIPC" runner class is a consumer for MQ aware VNFs. A MQ aware traffic generator can send "started", "finished" and "iteration" messages. This feature implements a MQ producer in the runner in order to send messages to the VNFs. The messages implemented are: - "start_iteration" - "stop_iteration" JIRA: YARDSTICK-1286 Change-Id: I706f9a9dda5e5beed52231be7d71452945a7dbed Signed-off-by: Rodolfo Alonso Hernandez --- yardstick/benchmark/runners/base.py | 44 ++++++++++++++++++++-------- yardstick/benchmark/runners/iteration_ipc.py | 8 +++-- 2 files changed, 37 insertions(+), 15 deletions(-) (limited to 'yardstick/benchmark/runners') diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index fbdf6c281..af2557441 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -12,27 +12,26 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +# +# This is a modified copy of ``rally/rally/benchmark/runners/base.py`` -# yardstick comment: this is a modified copy of -# rally/rally/benchmark/runners/base.py - -from __future__ import absolute_import - +import importlib import logging import multiprocessing import subprocess import time import traceback -from subprocess import CalledProcessError - -import importlib -from six.moves.queue import Empty +from six import moves -import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.common import messaging +from yardstick.common.messaging import payloads +from yardstick.common.messaging import producer +from yardstick.common import utils from yardstick.dispatcher.base import Base as DispatcherBase + log = logging.getLogger(__name__) @@ -41,7 +40,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except CalledProcessError: + except subprocess.CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -245,7 +244,7 @@ class Runner(object): log.debug("output_queue size %s", self.output_queue.qsize()) try: result.update(self.output_queue.get(True, 1)) - except Empty: + except moves.queue.Empty: pass return result @@ -259,7 +258,7 @@ class Runner(object): log.debug("result_queue size %s", self.result_queue.qsize()) try: one_record = self.result_queue.get(True, 1) - except Empty: + except moves.queue.Empty: pass else: if output_in_influxdb: @@ -272,3 +271,22 @@ class Runner(object): dispatchers = DispatcherBase.get(self.config['output_config']) dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) + + +class RunnerProducer(producer.MessagingProducer): + """Class implementing the message producer for runners""" + + def __init__(self, _id): + super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id) + + def start_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_START_ITERATION, + payloads.RunnerPayload(version=version, data=data)) + + def stop_iteration(self, version=1, data=None): + data = {} if not data else data + self.send_message( + messaging.RUNNER_METHOD_STOP_ITERATION, + payloads.RunnerPayload(version=version, data=data)) diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py index 43aa7f489..a0335fdc7 100644 --- a/yardstick/benchmark/runners/iteration_ipc.py +++ b/yardstick/benchmark/runners/iteration_ipc.py @@ -26,7 +26,7 @@ import traceback import os -from yardstick.benchmark.runners import base +from yardstick.benchmark.runners import base as base_runner from yardstick.common import exceptions from yardstick.common import messaging from yardstick.common import utils @@ -131,6 +131,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs) mq_consumer.start_rpc_server() + mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id']) iteration_index = 1 while 'run' in run_step: @@ -141,6 +142,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, result = None errors = '' mq_consumer.iteration_index = iteration_index + mq_producer.start_iteration() try: utils.wait_until_true( @@ -151,6 +153,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = traceback.format_exc() LOG.exception(errors) + mq_producer.stop_iteration() + if result: output_queue.put(result, True, QUEUE_PUT_TIMEOUT) benchmark_output = {'timestamp': time.time(), @@ -181,7 +185,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, mq_consumer.stop_rpc_server() -class IterationIPCRunner(base.Runner): +class IterationIPCRunner(base_runner.Runner): """Run a scenario for a configurable number of times. Each iteration has a configurable timeout. The loop control depends on the -- cgit 1.2.3-korg