aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/core/task.py
diff options
context:
space:
mode:
authorchenjiankun <chenjiankun1@huawei.com>2017-05-24 07:22:51 +0000
committerchenjiankun <chenjiankun1@huawei.com>2017-06-22 11:06:17 +0000
commitacc757fc7cf9db54d97d4563cd294efafc3f7747 (patch)
tree20a55964d84fa84c06a91b2e9b9233902f42c2c7 /yardstick/benchmark/core/task.py
parent5c33b82efbc0f7e58bdcfc4288ce08b7b3c999f2 (diff)
Yardstick output format unified
JIRA: YARDSTICK-658 Currently the yardstick have three dispatcher: file, influxdb, mongodb. (influxdb using API to get result and mongodb using testAPI to get result) But their output format is different. It is hard to use. In this patch, make all dispatchers using the same data source. And make the output format of file and influxdb unified. As for mongodb, since it is related to testAPI, so I make it push data every test case. The unified output format is: http://paste.openstack.org/show/610125/ Change-Id: I854ac4f03e6f904469b07b0c924c7d850545ae5b Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
Diffstat (limited to 'yardstick/benchmark/core/task.py')
-rw-r--r--yardstick/benchmark/core/task.py112
1 files changed, 102 insertions, 10 deletions
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index c44081b73..091aa99d3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -24,6 +24,7 @@ from six.moves import filter
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.runners import base as base_runner
+from yardstick.dispatcher.base import Base as DispatcherBase
from yardstick.common.task_template import TaskTemplate
from yardstick.common.utils import source_env
from yardstick.common import utils
@@ -42,7 +43,6 @@ class Task(object): # pragma: no cover
"""
def __init__(self):
- self.config = {}
self.contexts = []
self.outputs = {}
@@ -55,7 +55,14 @@ class Task(object): # pragma: no cover
check_environment()
- self.config['yardstick'] = utils.parse_ini_file(config_file)
+ output_config = utils.parse_ini_file(config_file)
+ self._init_output_config(output_config)
+ self._set_output_config(output_config, args.output_file)
+ LOG.debug('Output configuration is: %s', output_config)
+
+ if output_config['DEFAULT'].get('dispatcher') == 'file':
+ result = {'status': 0, 'result': {}}
+ utils.write_json_to_file(args.output_file, result)
total_start_time = time.time()
parser = TaskParser(args.inputfile[0])
@@ -75,6 +82,7 @@ class Task(object): # pragma: no cover
if args.parse_only:
sys.exit(0)
+ testcases = {}
# parse task_files
for i in range(0, len(task_files)):
one_task_start_time = time.time()
@@ -90,7 +98,15 @@ class Task(object): # pragma: no cover
meet_precondition)
continue
- self._run(scenarios, run_in_parallel, args.output_file)
+ case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
+ try:
+ data = self._run(scenarios, run_in_parallel, args.output_file)
+ except KeyboardInterrupt:
+ raise
+ except Exception:
+ testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
+ else:
+ testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
if args.keep_deploy:
# keep deployment, forget about stack
@@ -104,6 +120,10 @@ class Task(object): # pragma: no cover
LOG.info("task %s finished in %d secs", task_files[i],
one_task_end_time - one_task_start_time)
+ result = self._get_format_result(testcases)
+
+ self._do_output(output_config, result)
+
total_end_time = time.time()
LOG.info("total finished in %d secs",
total_end_time - total_start_time)
@@ -114,6 +134,65 @@ class Task(object): # pragma: no cover
print("Done, exiting")
+ def _init_output_config(self, output_config):
+ output_config.setdefault('DEFAULT', {})
+ output_config.setdefault('dispatcher_http', {})
+ output_config.setdefault('dispatcher_file', {})
+ output_config.setdefault('dispatcher_influxdb', {})
+ output_config.setdefault('nsb', {})
+
+ def _set_output_config(self, output_config, file_path):
+ try:
+ out_type = os.environ['DISPATCHER']
+ except KeyError:
+ output_config['DEFAULT'].setdefault('dispatcher', 'file')
+ else:
+ output_config['DEFAULT']['dispatcher'] = out_type
+
+ output_config['dispatcher_file']['file_path'] = file_path
+
+ try:
+ target = os.environ['TARGET']
+ except KeyError:
+ pass
+ else:
+ k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
+ output_config[k]['target'] = target
+
+ def _get_format_result(self, testcases):
+ criteria = self._get_task_criteria(testcases)
+
+ info = {
+ 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
+ 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
+ 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
+ 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
+ }
+
+ result = {
+ 'status': 1,
+ 'result': {
+ 'criteria': criteria,
+ 'task_id': self.task_id,
+ 'info': info,
+ 'testcases': testcases
+ }
+ }
+
+ return result
+
+ def _get_task_criteria(self, testcases):
+ criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
+ if criteria:
+ return 'FAIL'
+ else:
+ return 'PASS'
+
+ def _do_output(self, output_config, result):
+
+ dispatcher = DispatcherBase.get(output_config)
+ dispatcher.flush_result_data(result)
+
def _run(self, scenarios, run_in_parallel, output_file):
"""Deploys context and calls runners"""
for context in self.contexts:
@@ -121,6 +200,7 @@ class Task(object): # pragma: no cover
background_runners = []
+ result = []
# Start all background scenarios
for scenario in filter(_is_background_scenario, scenarios):
scenario["runner"] = dict(type="Duration", duration=1000000000)
@@ -136,16 +216,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ LOG.error('Scenario: %s ERROR', scenario.get('type'))
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
# Abort background runners
@@ -154,15 +241,21 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- if runner.join(timeout=60) is None:
+ status = runner.join(timeout=60)
+ if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner_join(runner)
+ status = runner_join(runner)
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
else:
base_runner.Runner.release(runner)
+ if status != 0:
+ raise RuntimeError
print("Background task ended")
+ return result
+
def atexit_handler(self):
"""handler for process termination"""
base_runner.Runner.terminate_all()
@@ -227,7 +320,7 @@ class Task(object): # pragma: no cover
if "nodes" in scenario_cfg:
context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
- runner = base_runner.Runner.get(runner_cfg, self.config)
+ runner = base_runner.Runner.get(runner_cfg)
print("Starting runner of type '%s'" % runner_cfg["type"])
runner.run(scenario_cfg, context_cfg)
@@ -489,8 +582,7 @@ def runner_join(runner):
"""join (wait for) a runner, exit process at runner failure"""
status = runner.join()
base_runner.Runner.release(runner)
- if status != 0:
- sys.exit("Runner failed")
+ return status
def print_invalid_header(source_name, args):