aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark
diff options
context:
space:
mode:
authorchenjiankun <chenjiankun1@huawei.com>2018-02-05 08:13:07 +0000
committerchenjiankun <chenjiankun1@huawei.com>2018-02-23 06:52:21 +0000
commited30870a1a5823423a44f8fa2ef559526e2849f8 (patch)
tree4c70cd7ecd5bb583c48dc42fe7cdab17d552bbb8 /yardstick/benchmark
parenta134474819400f2f7075d62990899943a06a5086 (diff)
Yardstick real-time influxdb KPI reporting regressions
JIRA: YARDSTICK-989 We used to have real-time influxdb reporting of test KPIs. The user could monitor using grafana and see the real-time output. The record format was changed to now only report KPIs at the end of the test. This is a problem for test cases which run for a long duration, we need to wait until the end of the test execution to get any results from influxdb. If the test fails in between or doesn't exit cleanly for some reason, we are left with no results stored in influxdb which gives the user no information. This is also a regression from the previous behaviour. Change-Id: I0f476dff9162a359f0286fb421f2e9c4befaa5cc Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
Diffstat (limited to 'yardstick/benchmark')
-rw-r--r--yardstick/benchmark/core/task.py19
-rwxr-xr-xyardstick/benchmark/runners/base.py24
2 files changed, 32 insertions, 11 deletions
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index 5fcc9182c..f5d2b18ac 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -120,7 +120,7 @@ class Task(object): # pragma: no cover
case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
try:
- data = self._run(scenarios, run_in_parallel, args.output_file)
+ data = self._run(scenarios, run_in_parallel, output_config)
except KeyboardInterrupt:
raise
except Exception: # pylint: disable=broad-except
@@ -232,11 +232,12 @@ class Task(object): # pragma: no cover
def _do_output(self, output_config, result):
dispatchers = DispatcherBase.get(output_config)
+ dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb')
for dispatcher in dispatchers:
dispatcher.flush_result_data(result)
- def _run(self, scenarios, run_in_parallel, output_file):
+ def _run(self, scenarios, run_in_parallel, output_config):
"""Deploys context and calls runners"""
for context in self.contexts:
context.deploy()
@@ -247,14 +248,14 @@ class Task(object): # pragma: no cover
# Start all background scenarios
for scenario in filter(_is_background_scenario, scenarios):
scenario["runner"] = dict(type="Duration", duration=1000000000)
- runner = self.run_one_scenario(scenario, output_file)
+ runner = self.run_one_scenario(scenario, output_config)
background_runners.append(runner)
runners = []
if run_in_parallel:
for scenario in scenarios:
if not _is_background_scenario(scenario):
- runner = self.run_one_scenario(scenario, output_file)
+ runner = self.run_one_scenario(scenario, output_config)
runners.append(runner)
# Wait for runners to finish
@@ -263,12 +264,12 @@ class Task(object): # pragma: no cover
if status != 0:
raise RuntimeError(
"{0} runner status {1}".format(runner.__execution_type__, status))
- LOG.info("Runner ended, output in %s", output_file)
+ LOG.info("Runner ended")
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
- runner = self.run_one_scenario(scenario, output_file)
+ runner = self.run_one_scenario(scenario, output_config)
status = runner_join(runner, background_runners, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
@@ -276,7 +277,7 @@ class Task(object): # pragma: no cover
scenario.get('type'))
raise RuntimeError(
"{0} runner status {1}".format(runner.__execution_type__, status))
- LOG.info("Runner ended, output in %s", output_file)
+ LOG.info("Runner ended")
# Abort background runners
for runner in background_runners:
@@ -313,10 +314,10 @@ class Task(object): # pragma: no cover
else:
return op
- def run_one_scenario(self, scenario_cfg, output_file):
+ def run_one_scenario(self, scenario_cfg, output_config):
"""run one scenario using context"""
runner_cfg = scenario_cfg["runner"]
- runner_cfg['output_filename'] = output_file
+ runner_cfg['output_config'] = output_config
options = scenario_cfg.get('options', {})
scenario_cfg['options'] = self._parse_options(options)
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a887fa5b3..99386a440 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -23,6 +23,7 @@ import multiprocessing
import subprocess
import time
import traceback
+from subprocess import CalledProcessError
import importlib
@@ -30,6 +31,7 @@ from six.moves.queue import Empty
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
+from yardstick.dispatcher.base import Base as DispatcherBase
log = logging.getLogger(__name__)
@@ -39,7 +41,7 @@ def _execute_shell_command(command):
exitcode = 0
try:
output = subprocess.check_output(command, shell=True)
- except Exception:
+ except CalledProcessError:
exitcode = -1
output = traceback.format_exc()
log.error("exec command '%s' error:\n ", command)
@@ -137,6 +139,8 @@ class Runner(object):
Runner.release(runner)
def __init__(self, config):
+ self.task_id = None
+ self.case_name = None
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
@@ -170,6 +174,8 @@ class Runner(object):
cls = getattr(module, path_split[-1])
self.config['object'] = class_name
+ self.case_name = scenario_cfg['tc']
+ self.task_id = scenario_cfg['task_id']
self.aborted.clear()
# run a potentially configured pre-start action
@@ -245,10 +251,24 @@ class Runner(object):
def get_result(self):
result = []
+
+ dispatcher = self.config['output_config']['DEFAULT']['dispatcher']
+ output_in_influxdb = 'influxdb' in dispatcher
+
while not self.result_queue.empty():
log.debug("result_queue size %s", self.result_queue.qsize())
try:
- result.append(self.result_queue.get(True, 1))
+ one_record = self.result_queue.get(True, 1)
except Empty:
pass
+ else:
+ if output_in_influxdb:
+ self._output_to_influxdb(one_record)
+
+ result.append(one_record)
return result
+
+ def _output_to_influxdb(self, record):
+ dispatchers = DispatcherBase.get(self.config['output_config'])
+ dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb'))
+ dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id)