aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark
diff options
context:
space:
mode:
authorRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-07-03 09:14:35 +0100
committerRodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>2018-07-05 07:59:45 +0000
commit51bc9b51362ca76011bb201353de5354907332d1 (patch)
tree0b44dad2deb61fc40d97e2800e35c2d17011295e /yardstick/benchmark
parent1004f2ac1ea0394afdc2d5a8ca20c8c1a6d2cd93 (diff)
Make "IterationIPC" MQ producer for VNF control messages
"IterationIPC" runner class is a consumer for MQ aware VNFs. A MQ aware traffic generator can send "started", "finished" and "iteration" messages. This feature implements a MQ producer in the runner in order to send messages to the VNFs. The messages implemented are: - "start_iteration" - "stop_iteration" JIRA: YARDSTICK-1286 Change-Id: I706f9a9dda5e5beed52231be7d71452945a7dbed Signed-off-by: Rodolfo Alonso Hernandez <rodolfo.alonso.hernandez@intel.com>
Diffstat (limited to 'yardstick/benchmark')
-rwxr-xr-xyardstick/benchmark/runners/base.py44
-rw-r--r--yardstick/benchmark/runners/iteration_ipc.py8
2 files changed, 37 insertions, 15 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index fbdf6c281..af2557441 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -12,27 +12,26 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
+# This is a modified copy of ``rally/rally/benchmark/runners/base.py``
-# yardstick comment: this is a modified copy of
-# rally/rally/benchmark/runners/base.py
-
-from __future__ import absolute_import
-
+import importlib
import logging
import multiprocessing
import subprocess
import time
import traceback
-from subprocess import CalledProcessError
-
-import importlib
-from six.moves.queue import Empty
+from six import moves
-import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.common import messaging
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
+from yardstick.common import utils
from yardstick.dispatcher.base import Base as DispatcherBase
+
log = logging.getLogger(__name__)
@@ -41,7 +40,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except CalledProcessError:
+ except subprocess.CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -245,7 +244,7 @@ class Runner(object):
log.debug("output_queue size %s", self.output_queue.qsize())
try:
result.update(self.output_queue.get(True, 1))
- except Empty:
+ except moves.queue.Empty:
pass
return result
@@ -259,7 +258,7 @@ class Runner(object):
log.debug("result_queue size %s", self.result_queue.qsize())
try:
one_record = self.result_queue.get(True, 1)
- except Empty:
+ except moves.queue.Empty:
pass
else:
if output_in_influxdb:
@@ -272,3 +271,22 @@ class Runner(object):
dispatchers = DispatcherBase.get(self.config['output_config'])
dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)
+
+
+class RunnerProducer(producer.MessagingProducer):
+ """Class implementing the message producer for runners"""
+
+ def __init__(self, _id):
+ super(RunnerProducer, self).__init__(messaging.TOPIC_RUNNER, _id=_id)
+
+ def start_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_START_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
+
+ def stop_iteration(self, version=1, data=None):
+ data = {} if not data else data
+ self.send_message(
+ messaging.RUNNER_METHOD_STOP_ITERATION,
+ payloads.RunnerPayload(version=version, data=data))
diff --git a/yardstick/benchmark/runners/iteration_ipc.py b/yardstick/benchmark/runners/iteration_ipc.py
index 43aa7f489..a0335fdc7 100644
--- a/yardstick/benchmark/runners/iteration_ipc.py
+++ b/yardstick/benchmark/runners/iteration_ipc.py
@@ -26,7 +26,7 @@ import traceback
import os
-from yardstick.benchmark.runners import base
+from yardstick.benchmark.runners import base as base_runner
from yardstick.common import exceptions
from yardstick.common import messaging
from yardstick.common import utils
@@ -131,6 +131,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
mq_consumer = RunnerIterationIPCConsumer(os.getpid(), producer_ctxs)
mq_consumer.start_rpc_server()
+ mq_producer = base_runner.RunnerProducer(scenario_cfg['task_id'])
iteration_index = 1
while 'run' in run_step:
@@ -141,6 +142,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
result = None
errors = ''
mq_consumer.iteration_index = iteration_index
+ mq_producer.start_iteration()
try:
utils.wait_until_true(
@@ -151,6 +153,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
errors = traceback.format_exc()
LOG.exception(errors)
+ mq_producer.stop_iteration()
+
if result:
output_queue.put(result, True, QUEUE_PUT_TIMEOUT)
benchmark_output = {'timestamp': time.time(),
@@ -181,7 +185,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
mq_consumer.stop_rpc_server()
-class IterationIPCRunner(base.Runner):
+class IterationIPCRunner(base_runner.Runner):
"""Run a scenario for a configurable number of times.
Each iteration has a configurable timeout. The loop control depends on the