aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/core/task.py
diff options
context:
space:
mode:
authorJing Lu <lvjing5@huawei.com>2017-06-24 09:17:51 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-06-24 09:17:51 +0000
commit3ca356db275f543bcbbb39701207a0faa2e29e02 (patch)
treec6c81ac80b1930eb5fabc2496a020ea9250be481 /yardstick/benchmark/core/task.py
parentc04372b8630169a3fcc4b52e8d2b935f110519a6 (diff)
parentacc757fc7cf9db54d97d4563cd294efafc3f7747 (diff)
Merge "Yardstick output format unified"
Diffstat (limited to 'yardstick/benchmark/core/task.py')
-rw-r--r--yardstick/benchmark/core/task.py112
1 files changed, 102 insertions, 10 deletions
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index 5a006f2b2..478a51f9d 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -24,6 +24,7 @@ from six.moves import filter
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.runners import base as base_runner
+from yardstick.dispatcher.base import Base as DispatcherBase
from yardstick.common.task_template import TaskTemplate
from yardstick.common.utils import source_env
from yardstick.common import utils
@@ -42,7 +43,6 @@ class Task(object): # pragma: no cover
"""
def __init__(self):
- self.config = {}
self.contexts = []
self.outputs = {}
@@ -55,7 +55,14 @@ class Task(object): # pragma: no cover
check_environment()
- self.config['yardstick'] = utils.parse_ini_file(config_file)
+ output_config = utils.parse_ini_file(config_file)
+ self._init_output_config(output_config)
+ self._set_output_config(output_config, args.output_file)
+ LOG.debug('Output configuration is: %s', output_config)
+
+ if output_config['DEFAULT'].get('dispatcher') == 'file':
+ result = {'status': 0, 'result': {}}
+ utils.write_json_to_file(args.output_file, result)
total_start_time = time.time()
parser = TaskParser(args.inputfile[0])
@@ -75,6 +82,7 @@ class Task(object): # pragma: no cover
if args.parse_only:
sys.exit(0)
+ testcases = {}
# parse task_files
for i in range(0, len(task_files)):
one_task_start_time = time.time()
@@ -90,7 +98,15 @@ class Task(object): # pragma: no cover
meet_precondition)
continue
- self._run(scenarios, run_in_parallel, args.output_file)
+ case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
+ try:
+ data = self._run(scenarios, run_in_parallel, args.output_file)
+ except KeyboardInterrupt:
+ raise
+ except Exception:
+ testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
+ else:
+ testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
if args.keep_deploy:
# keep deployment, forget about stack
@@ -104,6 +120,10 @@ class Task(object): # pragma: no cover
LOG.info("task %s finished in %d secs", task_files[i],
one_task_end_time - one_task_start_time)
+ result = self._get_format_result(testcases)
+
+ self._do_output(output_config, result)
+
total_end_time = time.time()
LOG.info("total finished in %d secs",
total_end_time - total_start_time)
@@ -114,6 +134,65 @@ class Task(object): # pragma: no cover
print("Done, exiting")
+ def _init_output_config(self, output_config):
+ output_config.setdefault('DEFAULT', {})
+ output_config.setdefault('dispatcher_http', {})
+ output_config.setdefault('dispatcher_file', {})
+ output_config.setdefault('dispatcher_influxdb', {})
+ output_config.setdefault('nsb', {})
+
+ def _set_output_config(self, output_config, file_path):
+ try:
+ out_type = os.environ['DISPATCHER']
+ except KeyError:
+ output_config['DEFAULT'].setdefault('dispatcher', 'file')
+ else:
+ output_config['DEFAULT']['dispatcher'] = out_type
+
+ output_config['dispatcher_file']['file_path'] = file_path
+
+ try:
+ target = os.environ['TARGET']
+ except KeyError:
+ pass
+ else:
+ k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
+ output_config[k]['target'] = target
+
+ def _get_format_result(self, testcases):
+ criteria = self._get_task_criteria(testcases)
+
+ info = {
+ 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
+ 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
+ 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
+ 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
+ }
+
+ result = {
+ 'status': 1,
+ 'result': {
+ 'criteria': criteria,
+ 'task_id': self.task_id,
+ 'info': info,
+ 'testcases': testcases
+ }
+ }
+
+ return result
+
+ def _get_task_criteria(self, testcases):
+ criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
+ if criteria:
+ return 'FAIL'
+ else:
+ return 'PASS'
+
+ def _do_output(self, output_config, result):
+
+ dispatcher = DispatcherBase.get(output_config)
+ dispatcher.flush_result_data(result)
+
def _run(self, scenarios, run_in_parallel, output_file):
"""Deploys context and calls runners"""
for context in self.contexts:
@@ -121,6 +200,7 @@ class Task(object): # pragma: no cover
background_runners = []
+ result = []
# Start all background scenarios
for scenario in filter(_is_background_scenario, scenarios):
scenario["runner"] = dict(type="Duration", duration=1000000000)
@@ -136,16 +216,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ LOG.error('Scenario: %s ERROR', scenario.get('type'))
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
# Abort background runners
@@ -154,15 +241,21 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- if runner.join(timeout=60) is None:
+ status = runner.join(timeout=60)
+ if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner_join(runner)
+ status = runner_join(runner)
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
else:
base_runner.Runner.release(runner)
+ if status != 0:
+ raise RuntimeError
print("Background task ended")
+ return result
+
def atexit_handler(self):
"""handler for process termination"""
base_runner.Runner.terminate_all()
@@ -227,7 +320,7 @@ class Task(object): # pragma: no cover
if "nodes" in scenario_cfg:
context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
- runner = base_runner.Runner.get(runner_cfg, self.config)
+ runner = base_runner.Runner.get(runner_cfg)
print("Starting runner of type '%s'" % runner_cfg["type"])
runner.run(scenario_cfg, context_cfg)
@@ -492,8 +585,7 @@ def runner_join(runner):
"""join (wait for) a runner, exit process at runner failure"""
status = runner.join()
base_runner.Runner.release(runner)
- if status != 0:
- sys.exit("Runner failed")
+ return status
def print_invalid_header(source_name, args):