From acc757fc7cf9db54d97d4563cd294efafc3f7747 Mon Sep 17 00:00:00 2001 From: chenjiankun Date: Wed, 24 May 2017 07:22:51 +0000 Subject: Yardstick output format unified JIRA: YARDSTICK-658 Currently the yardstick have three dispatcher: file, influxdb, mongodb. (influxdb using API to get result and mongodb using testAPI to get result) But their output format is different. It is hard to use. In this patch, make all dispatchers using the same data source. And make the output format of file and influxdb unified. As for mongodb, since it is related to testAPI, so I make it push data every test case. The unified output format is: http://paste.openstack.org/show/610125/ Change-Id: I854ac4f03e6f904469b07b0c924c7d850545ae5b Signed-off-by: chenjiankun --- api/resources/results.py | 63 +++++++++++++- tests/unit/benchmark/core/test_task.py | 1 + tests/unit/benchmark/runner/test_base.py | 4 +- tests/unit/dispatcher/test_influxdb.py | 36 ++++---- yardstick/benchmark/core/task.py | 112 +++++++++++++++++++++--- yardstick/benchmark/runners/arithmetic.py | 9 +- yardstick/benchmark/runners/base.py | 74 +++------------- yardstick/benchmark/runners/duration.py | 9 +- yardstick/benchmark/runners/iteration.py | 9 +- yardstick/benchmark/runners/sequence.py | 9 +- yardstick/cmd/commands/task.py | 14 +-- yardstick/dispatcher/base.py | 10 +-- yardstick/dispatcher/file.py | 18 ++-- yardstick/dispatcher/http.py | 91 +++++++++----------- yardstick/dispatcher/influxdb.py | 138 ++++++++++++------------------ 15 files changed, 299 insertions(+), 298 deletions(-) diff --git a/api/resources/results.py b/api/resources/results.py index 86fc25193..a0527ed8c 100644 --- a/api/resources/results.py +++ b/api/resources/results.py @@ -28,12 +28,12 @@ def getResult(args): uuid.UUID(task_id) except KeyError: message = 'task_id must be provided' - return common_utils.error_handler(message) + return common_utils.result_handler(2, message) task = TasksHandler().get_task_by_taskid(task_id) def _unfinished(): - return common_utils.result_handler(0, []) + return common_utils.result_handler(0, {}) def _finished(): testcases = task.details.split(',') @@ -44,7 +44,7 @@ def getResult(args): data = common_utils.translate_to_str(influx_utils.query(query_sql)) return data - result = {k: get_data(k) for k in testcases} + result = _format_data({k: get_data(k) for k in testcases}) return common_utils.result_handler(1, result) @@ -61,4 +61,59 @@ def getResult(args): } return switcher.get(status, lambda: 'nothing')() except IndexError: - return common_utils.error_handler('no such task') + return common_utils.result_handler(2, 'no such task') + + +def _format_data(data): + try: + first_value = data.values()[0][0] + except IndexError: + return {'criteria': 'FAIL', 'testcases': {}} + else: + info = { + 'deploy_scenario': first_value.get('deploy_scenario'), + 'installer': first_value.get('installer'), + 'pod_name': first_value.get('pod_name'), + 'version': first_value.get('version') + } + task_id = first_value.get('task_id') + criteria = first_value.get('criteria') + testcases = {k: _get_case_data(v) for k, v in data.items()} + + result = { + 'criteria': criteria, + 'info': info, + 'task_id': task_id, + 'testcases': testcases + } + return result + + +def _get_case_data(data): + try: + scenario = data[0] + except IndexError: + return {'tc_data': [], 'criteria': 'FAIL'} + else: + tc_data = [_get_scenario_data(s) for s in data] + criteria = scenario.get('criteria') + return {'tc_data': tc_data, 'criteria': criteria} + + +def _get_scenario_data(data): + result = { + 'data': {}, + 'timestamp': '' + } + + blacklist = {'criteria', 'deploy_scenario', 'host', 'installer', + 'pod_name', 'runner_id', 'scenarios', 'target', + 'task_id', 'time', 'version'} + + keys = set(data.keys()) - set(blacklist) + for k in keys: + result['data'][k] = data[k] + + result['timestamp'] = data.get('time') + + return result diff --git a/tests/unit/benchmark/core/test_task.py b/tests/unit/benchmark/core/test_task.py index 8034392f4..b64bb8eed 100644 --- a/tests/unit/benchmark/core/test_task.py +++ b/tests/unit/benchmark/core/test_task.py @@ -65,6 +65,7 @@ class TaskTestCase(unittest.TestCase): runner = mock.Mock() runner.join.return_value = 0 runner.get_output.return_value = {} + runner.get_result.return_value = [] mock_base_runner.Runner.get.return_value = runner t._run([scenario], False, "yardstick.out") self.assertTrue(runner.run.called) diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py index 7880fe5a5..6e72fa548 100644 --- a/tests/unit/benchmark/runner/test_base.py +++ b/tests/unit/benchmark/runner/test_base.py @@ -13,7 +13,6 @@ from __future__ import print_function from __future__ import absolute_import import unittest -import multiprocessing import time from yardstick.benchmark.runners.iteration import IterationRunner @@ -22,8 +21,7 @@ from yardstick.benchmark.runners.iteration import IterationRunner class RunnerTestCase(unittest.TestCase): def test_get_output(self): - queue = multiprocessing.Queue() - runner = IterationRunner({}, queue) + runner = IterationRunner({}) runner.output_queue.put({'case': 'opnfv_yardstick_tc002'}) runner.output_queue.put({'criteria': 'PASS'}) diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py index dca3c4189..a5d9b0754 100644 --- a/tests/unit/dispatcher/test_influxdb.py +++ b/tests/unit/dispatcher/test_influxdb.py @@ -94,31 +94,31 @@ class InfluxdbDispatcherTestCase(unittest.TestCase): } } - self.yardstick_conf = {'yardstick': {}} - - def test_record_result_data_no_target(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - influxdb.target = '' - self.assertEqual(influxdb.record_result_data(self.data1), -1) - - def test_record_result_data_no_case_name(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - self.assertEqual(influxdb.record_result_data(self.data2), -1) + self.yardstick_conf = {'dispatcher_influxdb': {}} @mock.patch('yardstick.dispatcher.influxdb.requests') def test_record_result_data(self, mock_requests): type(mock_requests.post.return_value).status_code = 204 - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - self.assertEqual(influxdb.record_result_data(self.data1), 0) - self.assertEqual(influxdb.record_result_data(self.data2), 0) - self.assertEqual(influxdb.flush_result_data(), 0) + influxdb = InfluxdbDispatcher(self.yardstick_conf) + data = { + 'status': 1, + 'result': { + 'criteria': 'PASS', + 'info': { + }, + 'task_id': 'b9e2bbc2-dfd8-410d-8c24-07771e9f7979', + 'testcases': { + } + } + } + self.assertEqual(influxdb.flush_result_data(data), 0) def test__dict_key_flatten(self): line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,' \ 'mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00' # need to sort for assert to work line = ",".join(sorted(line.split(','))) - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) + influxdb = InfluxdbDispatcher(self.yardstick_conf) flattened_data = influxdb._dict_key_flatten( self.data3['benchmark']['data']) result = ",".join( @@ -126,15 +126,15 @@ class InfluxdbDispatcherTestCase(unittest.TestCase): self.assertEqual(result, line) def test__get_nano_timestamp(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - results = {'benchmark': {'timestamp': '1451461248.925574'}} + influxdb = InfluxdbDispatcher(self.yardstick_conf) + results = {'timestamp': '1451461248.925574'} self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') @mock.patch('yardstick.dispatcher.influxdb.time') def test__get_nano_timestamp_except(self, mock_time): results = {} - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) + influxdb = InfluxdbDispatcher(self.yardstick_conf) mock_time.time.return_value = 1451461248.925574 self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c44081b73..091aa99d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -24,6 +24,7 @@ from six.moves import filter from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner +from yardstick.dispatcher.base import Base as DispatcherBase from yardstick.common.task_template import TaskTemplate from yardstick.common.utils import source_env from yardstick.common import utils @@ -42,7 +43,6 @@ class Task(object): # pragma: no cover """ def __init__(self): - self.config = {} self.contexts = [] self.outputs = {} @@ -55,7 +55,14 @@ class Task(object): # pragma: no cover check_environment() - self.config['yardstick'] = utils.parse_ini_file(config_file) + output_config = utils.parse_ini_file(config_file) + self._init_output_config(output_config) + self._set_output_config(output_config, args.output_file) + LOG.debug('Output configuration is: %s', output_config) + + if output_config['DEFAULT'].get('dispatcher') == 'file': + result = {'status': 0, 'result': {}} + utils.write_json_to_file(args.output_file, result) total_start_time = time.time() parser = TaskParser(args.inputfile[0]) @@ -75,6 +82,7 @@ class Task(object): # pragma: no cover if args.parse_only: sys.exit(0) + testcases = {} # parse task_files for i in range(0, len(task_files)): one_task_start_time = time.time() @@ -90,7 +98,15 @@ class Task(object): # pragma: no cover meet_precondition) continue - self._run(scenarios, run_in_parallel, args.output_file) + case_name = os.path.splitext(os.path.basename(task_files[i]))[0] + try: + data = self._run(scenarios, run_in_parallel, args.output_file) + except KeyboardInterrupt: + raise + except Exception: + testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} + else: + testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} if args.keep_deploy: # keep deployment, forget about stack @@ -104,6 +120,10 @@ class Task(object): # pragma: no cover LOG.info("task %s finished in %d secs", task_files[i], one_task_end_time - one_task_start_time) + result = self._get_format_result(testcases) + + self._do_output(output_config, result) + total_end_time = time.time() LOG.info("total finished in %d secs", total_end_time - total_start_time) @@ -114,6 +134,65 @@ class Task(object): # pragma: no cover print("Done, exiting") + def _init_output_config(self, output_config): + output_config.setdefault('DEFAULT', {}) + output_config.setdefault('dispatcher_http', {}) + output_config.setdefault('dispatcher_file', {}) + output_config.setdefault('dispatcher_influxdb', {}) + output_config.setdefault('nsb', {}) + + def _set_output_config(self, output_config, file_path): + try: + out_type = os.environ['DISPATCHER'] + except KeyError: + output_config['DEFAULT'].setdefault('dispatcher', 'file') + else: + output_config['DEFAULT']['dispatcher'] = out_type + + output_config['dispatcher_file']['file_path'] = file_path + + try: + target = os.environ['TARGET'] + except KeyError: + pass + else: + k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher']) + output_config[k]['target'] = target + + def _get_format_result(self, testcases): + criteria = self._get_task_criteria(testcases) + + info = { + 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'), + 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'), + 'pod_name': os.environ.get('NODE_NAME', 'unknown'), + 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown') + } + + result = { + 'status': 1, + 'result': { + 'criteria': criteria, + 'task_id': self.task_id, + 'info': info, + 'testcases': testcases + } + } + + return result + + def _get_task_criteria(self, testcases): + criteria = any(t.get('criteria') != 'PASS' for t in testcases.values()) + if criteria: + return 'FAIL' + else: + return 'PASS' + + def _do_output(self, output_config, result): + + dispatcher = DispatcherBase.get(output_config) + dispatcher.flush_result_data(result) + def _run(self, scenarios, run_in_parallel, output_file): """Deploys context and calls runners""" for context in self.contexts: @@ -121,6 +200,7 @@ class Task(object): # pragma: no cover background_runners = [] + result = [] # Start all background scenarios for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) @@ -136,16 +216,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - runner_join(runner) + status = runner_join(runner) + if status != 0: + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - runner_join(runner) + status = runner_join(runner) + if status != 0: + LOG.error('Scenario: %s ERROR', scenario.get('type')) + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) # Abort background runners @@ -154,15 +241,21 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - if runner.join(timeout=60) is None: + status = runner.join(timeout=60) + if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner_join(runner) + status = runner_join(runner) self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) else: base_runner.Runner.release(runner) + if status != 0: + raise RuntimeError print("Background task ended") + return result + def atexit_handler(self): """handler for process termination""" base_runner.Runner.terminate_all() @@ -227,7 +320,7 @@ class Task(object): # pragma: no cover if "nodes" in scenario_cfg: context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) - runner = base_runner.Runner.get(runner_cfg, self.config) + runner = base_runner.Runner.get(runner_cfg) print("Starting runner of type '%s'" % runner_cfg["type"]) runner.run(scenario_cfg, context_cfg) @@ -489,8 +582,7 @@ def runner_join(runner): """join (wait for) a runner, exit process at runner failure""" status = runner.join() base_runner.Runner.release(runner) - if status != 0: - sys.exit("Runner failed") + return status def print_invalid_header(source_name, args): diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 7ec593396..7898ae2bc 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -63,10 +63,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, benchmark.setup() method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -132,10 +128,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index ebb9a91b5..f6816c7ed 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -22,46 +22,13 @@ import logging import multiprocessing import subprocess import time -import os import traceback -from oslo_config import cfg - import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario -from yardstick.dispatcher.base import Base as DispatcherBase log = logging.getLogger(__name__) -CONF = cfg.CONF - - -def _output_serializer_main(filename, queue, config): - """entrypoint for the singleton subprocess writing to outfile - Use of this process enables multiple instances of a scenario without - messing up the output file. - """ - try: - out_type = config['yardstick'].get('DEFAULT', {})['dispatcher'] - except KeyError: - out_type = os.environ.get('DISPATCHER', 'file') - - conf = { - 'type': out_type.capitalize(), - 'file_path': filename - } - - dispatcher = DispatcherBase.get(conf, config) - - while True: - # blocks until data becomes available - record = queue.get() - if record == '_TERMINATE_': - dispatcher.flush_result_data() - break - else: - dispatcher.record_result_data(record) - def _execute_shell_command(command): """execute shell script with error handling""" @@ -110,8 +77,6 @@ def _periodic_action(interval, command, queue): class Runner(object): - queue = None - dump_process = None runners = [] @staticmethod @@ -131,30 +96,10 @@ class Runner(object): return types @staticmethod - def get(runner_cfg, config): + def get(runner_cfg): """Returns instance of a scenario runner for execution type. """ - # if there is no runner, start the output serializer subprocess - if not Runner.runners: - log.debug("Starting dump process file '%s'", - runner_cfg["output_filename"]) - Runner.queue = multiprocessing.Queue() - Runner.dump_process = multiprocessing.Process( - target=_output_serializer_main, - name="Dumper", - args=(runner_cfg["output_filename"], Runner.queue, config)) - Runner.dump_process.start() - - return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue) - - @staticmethod - def release_dump_process(): - """Release the dumper process""" - log.debug("Stopping dump process") - if Runner.dump_process: - Runner.queue.put('_TERMINATE_') - Runner.dump_process.join() - Runner.dump_process = None + return Runner.get_cls(runner_cfg["type"])(runner_cfg) @staticmethod def release(runner): @@ -162,10 +107,6 @@ class Runner(object): if runner in Runner.runners: Runner.runners.remove(runner) - # if this was the last runner, stop the output serializer subprocess - if not Runner.runners: - Runner.release_dump_process() - @staticmethod def terminate(runner): """Terminate the runner""" @@ -179,7 +120,6 @@ class Runner(object): # release dumper process as some errors before any runner is created if not Runner.runners: - Runner.release_dump_process() return for runner in Runner.runners: @@ -193,11 +133,11 @@ class Runner(object): runner.periodic_action_process = None Runner.release(runner) - def __init__(self, config, queue): + def __init__(self, config): self.config = config self.periodic_action_process = None - self.result_queue = queue self.output_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() self.process = None self.aborted = multiprocessing.Event() Runner.runners.append(self) @@ -276,3 +216,9 @@ class Runner(object): while not self.output_queue.empty(): result.update(self.output_queue.get()) return result + + def get_result(self): + result = [] + while not self.result_queue.empty(): + result.append(self.result_queue.get()) + return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 2bf2cd2fe..69d744562 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -52,10 +52,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - start = time.time() while True: @@ -90,10 +86,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 973bb9ac4..50fe106bd 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -53,10 +53,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -105,10 +101,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 74ff82204..68e272c57 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -57,10 +57,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, benchmark.setup() method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -99,10 +95,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py index 16a4db291..6384e6eb1 100644 --- a/yardstick/cmd/commands/task.py +++ b/yardstick/cmd/commands/task.py @@ -14,7 +14,6 @@ from __future__ import absolute_import from yardstick.benchmark.core.task import Task from yardstick.common.utils import cliargs from yardstick.common.utils import write_json_to_file -from yardstick.common.utils import read_json_from_file from yardstick.cmd.commands import change_osloobj_to_paras output_file_default = "/tmp/yardstick.out" @@ -46,22 +45,11 @@ class TaskCommands(object): param = change_osloobj_to_paras(args) self.output_file = param.output_file - self._init_result_file() - try: Task().start(param, **kwargs) - self._finish() except Exception as e: self._write_error_data(e) - - def _init_result_file(self): - data = {'status': 0, 'result': []} - write_json_to_file(self.output_file, data) - - def _finish(self): - result = read_json_from_file(self.output_file).get('result') - data = {'status': 1, 'result': result} - write_json_to_file(self.output_file, data) + raise def _write_error_data(self, error): data = {'status': 2, 'result': str(error)} diff --git a/yardstick/dispatcher/base.py b/yardstick/dispatcher/base.py index a1c858297..e77249c54 100644 --- a/yardstick/dispatcher/base.py +++ b/yardstick/dispatcher/base.py @@ -38,15 +38,13 @@ class Base(object): raise RuntimeError("No such dispatcher_type %s" % dispatcher_type) @staticmethod - def get(conf, config): + def get(config): """Returns instance of a dispatcher for dispatcher type. """ - return Base.get_cls(conf["type"])(conf, config) + out_type = config['DEFAULT']['dispatcher'] - @abc.abstractmethod - def record_result_data(self, data): - """Recording result data interface.""" + return Base.get_cls(out_type.capitalize())(config) @abc.abstractmethod - def flush_result_data(self): + def flush_result_data(self, data): """Flush result data into permanent storage media interface.""" diff --git a/yardstick/dispatcher/file.py b/yardstick/dispatcher/file.py index 8acd5dfbb..24fc22dd4 100644 --- a/yardstick/dispatcher/file.py +++ b/yardstick/dispatcher/file.py @@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase): __dispatcher_type__ = "File" - def __init__(self, conf, config): + def __init__(self, conf): super(FileDispatcher, self).__init__(conf) - self.result = [] + self.target = conf['dispatcher_file'].get('file_path', + consts.DEFAULT_OUTPUT_FILE) - def record_result_data(self, data): - self.result.append(data) - - def flush_result_data(self): - file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE) - - res = utils.read_json_from_file(file_path).get('result') - res.extend(self.result) - - data = {'status': 0, 'result': res} - utils.write_json_to_file(file_path, data) + def flush_result_data(self, data): + utils.write_json_to_file(self.target, data) diff --git a/yardstick/dispatcher/http.py b/yardstick/dispatcher/http.py index 0d8d2a346..9bf9af33b 100644 --- a/yardstick/dispatcher/http.py +++ b/yardstick/dispatcher/http.py @@ -20,30 +20,15 @@ from __future__ import absolute_import import logging import os +from datetime import datetime from oslo_serialization import jsonutils import requests -from oslo_config import cfg from yardstick.dispatcher.base import Base as DispatchBase LOG = logging.getLogger(__name__) -CONF = cfg.CONF -http_dispatcher_opts = [ - cfg.StrOpt('target', - default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'), - help='The target where the http request will be sent. ' - 'If this is not set, no data will be posted. For ' - 'example: target = http://hostname:1234/path'), - cfg.IntOpt('timeout', - default=5, - help='The max time in seconds to wait for a request to ' - 'timeout.'), -] - -CONF.register_opts(http_dispatcher_opts, group="dispatcher_http") - class HttpDispatcher(DispatchBase): """Dispatcher class for posting data into a http target. @@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase): __dispatcher_type__ = "Http" - def __init__(self, conf, config): + def __init__(self, conf): super(HttpDispatcher, self).__init__(conf) + http_conf = conf['dispatcher_http'] self.headers = {'Content-type': 'application/json'} - self.timeout = CONF.dispatcher_http.timeout - self.target = CONF.dispatcher_http.target - self.raw_result = [] - self.result = { - "project_name": "yardstick", - "description": "yardstick test cases result", - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "version": os.environ.get('YARDSTICK_VERSION', 'unknown'), - "build_tag": os.environ.get('BUILD_TAG') - } - - def record_result_data(self, data): - self.raw_result.append(data) + self.timeout = int(http_conf.get('timeout', 5)) + self.target = http_conf.get('target', 'http://127.0.0.1:8000/results') - def flush_result_data(self): + def flush_result_data(self, data): if self.target == '': # if the target was not set, do not do anything LOG.error('Dispatcher target was not set, no data will' 'be posted.') return - self.result["details"] = {'results': self.raw_result} - - case_name = "" - for v in self.raw_result: - if isinstance(v, dict) and "scenario_cfg" in v: - case_name = v["scenario_cfg"]["tc"] - break - if case_name == "": - LOG.error('Test result : %s', - jsonutils.dump_as_bytes(self.result)) - LOG.error('The case_name cannot be found, no data will be posted.') - return + result = data['result'] + self.info = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + self._upload_case_result(case, data) - self.result["case_name"] = case_name + def _upload_case_result(self, case, data): + try: + scenario_data = data.get('tc_data', [])[0] + except IndexError: + current_time = datetime.now() + else: + timestamp = float(scenario_data.get('timestamp', 0.0)) + current_time = datetime.fromtimestamp(timestamp) + + result = { + "project_name": "yardstick", + "case_name": case, + "description": "yardstick ci scenario status", + "scenario": self.info.get('deploy_scenario'), + "version": self.info.get('version'), + "pod_name": self.info.get('pod_name'), + "installer": self.info.get('installer'), + "build_tag": os.environ.get('BUILD_TAG'), + "criteria": data.get('criteria'), + "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "trust_indicator": "", + "details": "" + } try: - LOG.debug('Test result : %s', - jsonutils.dump_as_bytes(self.result)) + LOG.debug('Test result : %s', result) res = requests.post(self.target, - data=jsonutils.dump_as_bytes(self.result), + data=jsonutils.dump_as_bytes(result), headers=self.headers, timeout=self.timeout) LOG.debug('Test result posting finished with status code' ' %d.' % res.status_code) except Exception as err: - LOG.exception('Failed to record result data: %s', - err) + LOG.exception('Failed to record result data: %s', err) diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py index 53af79c71..373aae13a 100644 --- a/yardstick/dispatcher/influxdb.py +++ b/yardstick/dispatcher/influxdb.py @@ -10,13 +10,11 @@ from __future__ import absolute_import import logging -import os import time import collections import requests import six -from oslo_serialization import jsonutils from third_party.influxdb.influxdb_line_protocol import make_lines from yardstick.dispatcher.base import Base as DispatchBase @@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase): __dispatcher_type__ = "Influxdb" - def __init__(self, conf, config): + def __init__(self, conf): super(InfluxdbDispatcher, self).__init__(conf) - db_conf = config['yardstick'].get('dispatcher_influxdb', {}) + db_conf = conf['dispatcher_influxdb'] self.timeout = int(db_conf.get('timeout', 5)) self.target = db_conf.get('target', 'http://127.0.0.1:8086') self.db_name = db_conf.get('db_name', 'yardstick') self.username = db_conf.get('username', 'root') self.password = db_conf.get('password', 'root') + self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name) - self.raw_result = [] - self.case_name = "" - self.tc = "" + self.task_id = -1 - self.runners_info = {} - self.static_tags = { - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'), - "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH', - 'unknown')) + def flush_result_data(self, data): + LOG.debug('Test result all : %s', data) + if self.target == '': + # if the target was not set, do not do anything + LOG.error('Dispatcher target was not set, no data will be posted.') + + result = data['result'] + self.tags = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + tc_criteria = data['criteria'] + for record in data['tc_data']: + self._upload_one_record(record, case, tc_criteria) + + return 0 + + def _upload_one_record(self, data, case, tc_criteria): + try: + line = self._data_to_line_protocol(data, case, tc_criteria) + LOG.debug('Test result line format : %s', line) + res = requests.post(self.influxdb_url, + data=line, + auth=(self.username, self.password), + timeout=self.timeout) + if res.status_code != 204: + LOG.error('Test result posting finished with status code' + ' %d.', res.status_code) + LOG.error(res.text) + + except Exception as err: + LOG.exception('Failed to record result data: %s', err) + + def _data_to_line_protocol(self, data, case, criteria): + msg = {} + point = { + "measurement": case, + "fields": self._dict_key_flatten(data["data"]), + "time": self._get_nano_timestamp(data), + "tags": self._get_extended_tags(criteria), } + msg["points"] = [point] + msg["tags"] = self.tags + + return make_lines(msg).encode('utf-8') def _dict_key_flatten(self, data): next_data = {} @@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase): def _get_nano_timestamp(self, results): try: - timestamp = results["benchmark"]["timestamp"] + timestamp = results["timestamp"] except Exception: timestamp = time.time() return str(int(float(timestamp) * 1000000000)) - def _get_extended_tags(self, data): - runner_info = self.runners_info[data["runner_id"]] + def _get_extended_tags(self, criteria): tags = { - "runner_id": data["runner_id"], "task_id": self.task_id, - "scenarios": runner_info["scenarios"] + "criteria": criteria } - if "host" in runner_info: - tags["host"] = runner_info["host"] - if "target" in runner_info: - tags["target"] = runner_info["target"] return tags - - def _data_to_line_protocol(self, data): - msg = {} - point = { - "measurement": self.tc, - "fields": self._dict_key_flatten(data["benchmark"]["data"]), - "time": self._get_nano_timestamp(data), - "tags": self._get_extended_tags(data), - } - msg["points"] = [point] - msg["tags"] = self.static_tags - - return make_lines(msg).encode('utf-8') - - def record_result_data(self, data): - LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data)) - self.raw_result.append(data) - if self.target == '': - # if the target was not set, do not do anything - LOG.error('Dispatcher target was not set, no data will' - 'be posted.') - return -1 - - if isinstance(data, dict) and "scenario_cfg" in data: - self.tc = data["scenario_cfg"]["tc"] - self.task_id = data["scenario_cfg"]["task_id"] - scenario_cfg = data["scenario_cfg"] - runner_id = data["runner_id"] - self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]} - if "host" in scenario_cfg: - self.runners_info[runner_id]["host"] = scenario_cfg["host"] - if "target" in scenario_cfg: - self.runners_info[runner_id]["target"] = scenario_cfg["target"] - return 0 - - if self.tc == "": - LOG.error('Test result : %s', jsonutils.dump_as_bytes(data)) - LOG.error('The case_name cannot be found, no data will be posted.') - return -1 - - try: - line = self._data_to_line_protocol(data) - LOG.debug('Test result line format : %s', line) - res = requests.post(self.influxdb_url, - data=line, - auth=(self.username, self.password), - timeout=self.timeout) - if res.status_code != 204: - LOG.error('Test result posting finished with status code' - ' %d.', res.status_code) - LOG.error(res.text) - - except Exception as err: - LOG.exception('Failed to record result data: %s', - err) - return -1 - return 0 - - def flush_result_data(self): - LOG.debug('Test result all : %s', - jsonutils.dump_as_bytes(self.raw_result)) - return 0 -- cgit 1.2.3-korg