diff options
-rw-r--r-- | api/resources/results.py | 63 | ||||
-rw-r--r-- | tests/unit/benchmark/core/test_task.py | 1 | ||||
-rw-r--r-- | tests/unit/benchmark/runner/test_base.py | 4 | ||||
-rw-r--r-- | tests/unit/dispatcher/test_influxdb.py | 36 | ||||
-rw-r--r-- | yardstick/benchmark/core/task.py | 112 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/arithmetic.py | 9 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 74 | ||||
-rw-r--r-- | yardstick/benchmark/runners/duration.py | 9 | ||||
-rw-r--r-- | yardstick/benchmark/runners/iteration.py | 9 | ||||
-rw-r--r-- | yardstick/benchmark/runners/sequence.py | 9 | ||||
-rw-r--r-- | yardstick/cmd/commands/task.py | 14 | ||||
-rw-r--r-- | yardstick/dispatcher/base.py | 10 | ||||
-rw-r--r-- | yardstick/dispatcher/file.py | 18 | ||||
-rw-r--r-- | yardstick/dispatcher/http.py | 91 | ||||
-rw-r--r-- | yardstick/dispatcher/influxdb.py | 138 |
15 files changed, 299 insertions, 298 deletions
diff --git a/api/resources/results.py b/api/resources/results.py index 86fc25193..a0527ed8c 100644 --- a/api/resources/results.py +++ b/api/resources/results.py @@ -28,12 +28,12 @@ def getResult(args): uuid.UUID(task_id) except KeyError: message = 'task_id must be provided' - return common_utils.error_handler(message) + return common_utils.result_handler(2, message) task = TasksHandler().get_task_by_taskid(task_id) def _unfinished(): - return common_utils.result_handler(0, []) + return common_utils.result_handler(0, {}) def _finished(): testcases = task.details.split(',') @@ -44,7 +44,7 @@ def getResult(args): data = common_utils.translate_to_str(influx_utils.query(query_sql)) return data - result = {k: get_data(k) for k in testcases} + result = _format_data({k: get_data(k) for k in testcases}) return common_utils.result_handler(1, result) @@ -61,4 +61,59 @@ def getResult(args): } return switcher.get(status, lambda: 'nothing')() except IndexError: - return common_utils.error_handler('no such task') + return common_utils.result_handler(2, 'no such task') + + +def _format_data(data): + try: + first_value = data.values()[0][0] + except IndexError: + return {'criteria': 'FAIL', 'testcases': {}} + else: + info = { + 'deploy_scenario': first_value.get('deploy_scenario'), + 'installer': first_value.get('installer'), + 'pod_name': first_value.get('pod_name'), + 'version': first_value.get('version') + } + task_id = first_value.get('task_id') + criteria = first_value.get('criteria') + testcases = {k: _get_case_data(v) for k, v in data.items()} + + result = { + 'criteria': criteria, + 'info': info, + 'task_id': task_id, + 'testcases': testcases + } + return result + + +def _get_case_data(data): + try: + scenario = data[0] + except IndexError: + return {'tc_data': [], 'criteria': 'FAIL'} + else: + tc_data = [_get_scenario_data(s) for s in data] + criteria = scenario.get('criteria') + return {'tc_data': tc_data, 'criteria': criteria} + + +def _get_scenario_data(data): + result = { + 'data': {}, + 'timestamp': '' + } + + blacklist = {'criteria', 'deploy_scenario', 'host', 'installer', + 'pod_name', 'runner_id', 'scenarios', 'target', + 'task_id', 'time', 'version'} + + keys = set(data.keys()) - set(blacklist) + for k in keys: + result['data'][k] = data[k] + + result['timestamp'] = data.get('time') + + return result diff --git a/tests/unit/benchmark/core/test_task.py b/tests/unit/benchmark/core/test_task.py index 8034392f4..b64bb8eed 100644 --- a/tests/unit/benchmark/core/test_task.py +++ b/tests/unit/benchmark/core/test_task.py @@ -65,6 +65,7 @@ class TaskTestCase(unittest.TestCase): runner = mock.Mock() runner.join.return_value = 0 runner.get_output.return_value = {} + runner.get_result.return_value = [] mock_base_runner.Runner.get.return_value = runner t._run([scenario], False, "yardstick.out") self.assertTrue(runner.run.called) diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py index 7880fe5a5..6e72fa548 100644 --- a/tests/unit/benchmark/runner/test_base.py +++ b/tests/unit/benchmark/runner/test_base.py @@ -13,7 +13,6 @@ from __future__ import print_function from __future__ import absolute_import import unittest -import multiprocessing import time from yardstick.benchmark.runners.iteration import IterationRunner @@ -22,8 +21,7 @@ from yardstick.benchmark.runners.iteration import IterationRunner class RunnerTestCase(unittest.TestCase): def test_get_output(self): - queue = multiprocessing.Queue() - runner = IterationRunner({}, queue) + runner = IterationRunner({}) runner.output_queue.put({'case': 'opnfv_yardstick_tc002'}) runner.output_queue.put({'criteria': 'PASS'}) diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py index dca3c4189..a5d9b0754 100644 --- a/tests/unit/dispatcher/test_influxdb.py +++ b/tests/unit/dispatcher/test_influxdb.py @@ -94,31 +94,31 @@ class InfluxdbDispatcherTestCase(unittest.TestCase): } } - self.yardstick_conf = {'yardstick': {}} - - def test_record_result_data_no_target(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - influxdb.target = '' - self.assertEqual(influxdb.record_result_data(self.data1), -1) - - def test_record_result_data_no_case_name(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - self.assertEqual(influxdb.record_result_data(self.data2), -1) + self.yardstick_conf = {'dispatcher_influxdb': {}} @mock.patch('yardstick.dispatcher.influxdb.requests') def test_record_result_data(self, mock_requests): type(mock_requests.post.return_value).status_code = 204 - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - self.assertEqual(influxdb.record_result_data(self.data1), 0) - self.assertEqual(influxdb.record_result_data(self.data2), 0) - self.assertEqual(influxdb.flush_result_data(), 0) + influxdb = InfluxdbDispatcher(self.yardstick_conf) + data = { + 'status': 1, + 'result': { + 'criteria': 'PASS', + 'info': { + }, + 'task_id': 'b9e2bbc2-dfd8-410d-8c24-07771e9f7979', + 'testcases': { + } + } + } + self.assertEqual(influxdb.flush_result_data(data), 0) def test__dict_key_flatten(self): line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,' \ 'mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00' # need to sort for assert to work line = ",".join(sorted(line.split(','))) - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) + influxdb = InfluxdbDispatcher(self.yardstick_conf) flattened_data = influxdb._dict_key_flatten( self.data3['benchmark']['data']) result = ",".join( @@ -126,15 +126,15 @@ class InfluxdbDispatcherTestCase(unittest.TestCase): self.assertEqual(result, line) def test__get_nano_timestamp(self): - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) - results = {'benchmark': {'timestamp': '1451461248.925574'}} + influxdb = InfluxdbDispatcher(self.yardstick_conf) + results = {'timestamp': '1451461248.925574'} self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') @mock.patch('yardstick.dispatcher.influxdb.time') def test__get_nano_timestamp_except(self, mock_time): results = {} - influxdb = InfluxdbDispatcher(None, self.yardstick_conf) + influxdb = InfluxdbDispatcher(self.yardstick_conf) mock_time.time.return_value = 1451461248.925574 self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c44081b73..091aa99d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -24,6 +24,7 @@ from six.moves import filter from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner +from yardstick.dispatcher.base import Base as DispatcherBase from yardstick.common.task_template import TaskTemplate from yardstick.common.utils import source_env from yardstick.common import utils @@ -42,7 +43,6 @@ class Task(object): # pragma: no cover """ def __init__(self): - self.config = {} self.contexts = [] self.outputs = {} @@ -55,7 +55,14 @@ class Task(object): # pragma: no cover check_environment() - self.config['yardstick'] = utils.parse_ini_file(config_file) + output_config = utils.parse_ini_file(config_file) + self._init_output_config(output_config) + self._set_output_config(output_config, args.output_file) + LOG.debug('Output configuration is: %s', output_config) + + if output_config['DEFAULT'].get('dispatcher') == 'file': + result = {'status': 0, 'result': {}} + utils.write_json_to_file(args.output_file, result) total_start_time = time.time() parser = TaskParser(args.inputfile[0]) @@ -75,6 +82,7 @@ class Task(object): # pragma: no cover if args.parse_only: sys.exit(0) + testcases = {} # parse task_files for i in range(0, len(task_files)): one_task_start_time = time.time() @@ -90,7 +98,15 @@ class Task(object): # pragma: no cover meet_precondition) continue - self._run(scenarios, run_in_parallel, args.output_file) + case_name = os.path.splitext(os.path.basename(task_files[i]))[0] + try: + data = self._run(scenarios, run_in_parallel, args.output_file) + except KeyboardInterrupt: + raise + except Exception: + testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []} + else: + testcases[case_name] = {'criteria': 'PASS', 'tc_data': data} if args.keep_deploy: # keep deployment, forget about stack @@ -104,6 +120,10 @@ class Task(object): # pragma: no cover LOG.info("task %s finished in %d secs", task_files[i], one_task_end_time - one_task_start_time) + result = self._get_format_result(testcases) + + self._do_output(output_config, result) + total_end_time = time.time() LOG.info("total finished in %d secs", total_end_time - total_start_time) @@ -114,6 +134,65 @@ class Task(object): # pragma: no cover print("Done, exiting") + def _init_output_config(self, output_config): + output_config.setdefault('DEFAULT', {}) + output_config.setdefault('dispatcher_http', {}) + output_config.setdefault('dispatcher_file', {}) + output_config.setdefault('dispatcher_influxdb', {}) + output_config.setdefault('nsb', {}) + + def _set_output_config(self, output_config, file_path): + try: + out_type = os.environ['DISPATCHER'] + except KeyError: + output_config['DEFAULT'].setdefault('dispatcher', 'file') + else: + output_config['DEFAULT']['dispatcher'] = out_type + + output_config['dispatcher_file']['file_path'] = file_path + + try: + target = os.environ['TARGET'] + except KeyError: + pass + else: + k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher']) + output_config[k]['target'] = target + + def _get_format_result(self, testcases): + criteria = self._get_task_criteria(testcases) + + info = { + 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'), + 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'), + 'pod_name': os.environ.get('NODE_NAME', 'unknown'), + 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown') + } + + result = { + 'status': 1, + 'result': { + 'criteria': criteria, + 'task_id': self.task_id, + 'info': info, + 'testcases': testcases + } + } + + return result + + def _get_task_criteria(self, testcases): + criteria = any(t.get('criteria') != 'PASS' for t in testcases.values()) + if criteria: + return 'FAIL' + else: + return 'PASS' + + def _do_output(self, output_config, result): + + dispatcher = DispatcherBase.get(output_config) + dispatcher.flush_result_data(result) + def _run(self, scenarios, run_in_parallel, output_file): """Deploys context and calls runners""" for context in self.contexts: @@ -121,6 +200,7 @@ class Task(object): # pragma: no cover background_runners = [] + result = [] # Start all background scenarios for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) @@ -136,16 +216,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - runner_join(runner) + status = runner_join(runner) + if status != 0: + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - runner_join(runner) + status = runner_join(runner) + if status != 0: + LOG.error('Scenario: %s ERROR', scenario.get('type')) + raise RuntimeError self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) print("Runner ended, output in", output_file) # Abort background runners @@ -154,15 +241,21 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - if runner.join(timeout=60) is None: + status = runner.join(timeout=60) + if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner_join(runner) + status = runner_join(runner) self.outputs.update(runner.get_output()) + result.extend(runner.get_result()) else: base_runner.Runner.release(runner) + if status != 0: + raise RuntimeError print("Background task ended") + return result + def atexit_handler(self): """handler for process termination""" base_runner.Runner.terminate_all() @@ -227,7 +320,7 @@ class Task(object): # pragma: no cover if "nodes" in scenario_cfg: context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg) - runner = base_runner.Runner.get(runner_cfg, self.config) + runner = base_runner.Runner.get(runner_cfg) print("Starting runner of type '%s'" % runner_cfg["type"]) runner.run(scenario_cfg, context_cfg) @@ -489,8 +582,7 @@ def runner_join(runner): """join (wait for) a runner, exit process at runner failure""" status = runner.join() base_runner.Runner.release(runner) - if status != 0: - sys.exit("Runner failed") + return status def print_invalid_header(source_name, args): diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 7ec593396..7898ae2bc 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -63,10 +63,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, benchmark.setup() method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -132,10 +128,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index ebb9a91b5..f6816c7ed 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -22,46 +22,13 @@ import logging import multiprocessing import subprocess import time -import os import traceback -from oslo_config import cfg - import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario -from yardstick.dispatcher.base import Base as DispatcherBase log = logging.getLogger(__name__) -CONF = cfg.CONF - - -def _output_serializer_main(filename, queue, config): - """entrypoint for the singleton subprocess writing to outfile - Use of this process enables multiple instances of a scenario without - messing up the output file. - """ - try: - out_type = config['yardstick'].get('DEFAULT', {})['dispatcher'] - except KeyError: - out_type = os.environ.get('DISPATCHER', 'file') - - conf = { - 'type': out_type.capitalize(), - 'file_path': filename - } - - dispatcher = DispatcherBase.get(conf, config) - - while True: - # blocks until data becomes available - record = queue.get() - if record == '_TERMINATE_': - dispatcher.flush_result_data() - break - else: - dispatcher.record_result_data(record) - def _execute_shell_command(command): """execute shell script with error handling""" @@ -110,8 +77,6 @@ def _periodic_action(interval, command, queue): class Runner(object): - queue = None - dump_process = None runners = [] @staticmethod @@ -131,30 +96,10 @@ class Runner(object): return types @staticmethod - def get(runner_cfg, config): + def get(runner_cfg): """Returns instance of a scenario runner for execution type. """ - # if there is no runner, start the output serializer subprocess - if not Runner.runners: - log.debug("Starting dump process file '%s'", - runner_cfg["output_filename"]) - Runner.queue = multiprocessing.Queue() - Runner.dump_process = multiprocessing.Process( - target=_output_serializer_main, - name="Dumper", - args=(runner_cfg["output_filename"], Runner.queue, config)) - Runner.dump_process.start() - - return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue) - - @staticmethod - def release_dump_process(): - """Release the dumper process""" - log.debug("Stopping dump process") - if Runner.dump_process: - Runner.queue.put('_TERMINATE_') - Runner.dump_process.join() - Runner.dump_process = None + return Runner.get_cls(runner_cfg["type"])(runner_cfg) @staticmethod def release(runner): @@ -162,10 +107,6 @@ class Runner(object): if runner in Runner.runners: Runner.runners.remove(runner) - # if this was the last runner, stop the output serializer subprocess - if not Runner.runners: - Runner.release_dump_process() - @staticmethod def terminate(runner): """Terminate the runner""" @@ -179,7 +120,6 @@ class Runner(object): # release dumper process as some errors before any runner is created if not Runner.runners: - Runner.release_dump_process() return for runner in Runner.runners: @@ -193,11 +133,11 @@ class Runner(object): runner.periodic_action_process = None Runner.release(runner) - def __init__(self, config, queue): + def __init__(self, config): self.config = config self.periodic_action_process = None - self.result_queue = queue self.output_queue = multiprocessing.Queue() + self.result_queue = multiprocessing.Queue() self.process = None self.aborted = multiprocessing.Event() Runner.runners.append(self) @@ -276,3 +216,9 @@ class Runner(object): while not self.output_queue.empty(): result.update(self.output_queue.get()) return result + + def get_result(self): + result = [] + while not self.result_queue.empty(): + result.append(self.result_queue.get()) + return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 2bf2cd2fe..69d744562 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -52,10 +52,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - start = time.time() while True: @@ -90,10 +86,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 973bb9ac4..50fe106bd 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -53,10 +53,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -105,10 +101,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 74ff82204..68e272c57 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -57,10 +57,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, benchmark.setup() method = getattr(benchmark, method_name) - queue.put({'runner_id': runner_cfg['runner_id'], - 'scenario_cfg': scenario_cfg, - 'context_cfg': context_cfg}) - sla_action = None if "sla" in scenario_cfg: sla_action = scenario_cfg["sla"].get("action", "assert") @@ -99,10 +95,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - record = {'runner_id': runner_cfg['runner_id'], - 'benchmark': benchmark_output} - - queue.put(record) + queue.put(benchmark_output) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], "sequence": sequence}) diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py index 16a4db291..6384e6eb1 100644 --- a/yardstick/cmd/commands/task.py +++ b/yardstick/cmd/commands/task.py @@ -14,7 +14,6 @@ from __future__ import absolute_import from yardstick.benchmark.core.task import Task from yardstick.common.utils import cliargs from yardstick.common.utils import write_json_to_file -from yardstick.common.utils import read_json_from_file from yardstick.cmd.commands import change_osloobj_to_paras output_file_default = "/tmp/yardstick.out" @@ -46,22 +45,11 @@ class TaskCommands(object): param = change_osloobj_to_paras(args) self.output_file = param.output_file - self._init_result_file() - try: Task().start(param, **kwargs) - self._finish() except Exception as e: self._write_error_data(e) - - def _init_result_file(self): - data = {'status': 0, 'result': []} - write_json_to_file(self.output_file, data) - - def _finish(self): - result = read_json_from_file(self.output_file).get('result') - data = {'status': 1, 'result': result} - write_json_to_file(self.output_file, data) + raise def _write_error_data(self, error): data = {'status': 2, 'result': str(error)} diff --git a/yardstick/dispatcher/base.py b/yardstick/dispatcher/base.py index a1c858297..e77249c54 100644 --- a/yardstick/dispatcher/base.py +++ b/yardstick/dispatcher/base.py @@ -38,15 +38,13 @@ class Base(object): raise RuntimeError("No such dispatcher_type %s" % dispatcher_type) @staticmethod - def get(conf, config): + def get(config): """Returns instance of a dispatcher for dispatcher type. """ - return Base.get_cls(conf["type"])(conf, config) + out_type = config['DEFAULT']['dispatcher'] - @abc.abstractmethod - def record_result_data(self, data): - """Recording result data interface.""" + return Base.get_cls(out_type.capitalize())(config) @abc.abstractmethod - def flush_result_data(self): + def flush_result_data(self, data): """Flush result data into permanent storage media interface.""" diff --git a/yardstick/dispatcher/file.py b/yardstick/dispatcher/file.py index 8acd5dfbb..24fc22dd4 100644 --- a/yardstick/dispatcher/file.py +++ b/yardstick/dispatcher/file.py @@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase): __dispatcher_type__ = "File" - def __init__(self, conf, config): + def __init__(self, conf): super(FileDispatcher, self).__init__(conf) - self.result = [] + self.target = conf['dispatcher_file'].get('file_path', + consts.DEFAULT_OUTPUT_FILE) - def record_result_data(self, data): - self.result.append(data) - - def flush_result_data(self): - file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE) - - res = utils.read_json_from_file(file_path).get('result') - res.extend(self.result) - - data = {'status': 0, 'result': res} - utils.write_json_to_file(file_path, data) + def flush_result_data(self, data): + utils.write_json_to_file(self.target, data) diff --git a/yardstick/dispatcher/http.py b/yardstick/dispatcher/http.py index 0d8d2a346..9bf9af33b 100644 --- a/yardstick/dispatcher/http.py +++ b/yardstick/dispatcher/http.py @@ -20,30 +20,15 @@ from __future__ import absolute_import import logging import os +from datetime import datetime from oslo_serialization import jsonutils import requests -from oslo_config import cfg from yardstick.dispatcher.base import Base as DispatchBase LOG = logging.getLogger(__name__) -CONF = cfg.CONF -http_dispatcher_opts = [ - cfg.StrOpt('target', - default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'), - help='The target where the http request will be sent. ' - 'If this is not set, no data will be posted. For ' - 'example: target = http://hostname:1234/path'), - cfg.IntOpt('timeout', - default=5, - help='The max time in seconds to wait for a request to ' - 'timeout.'), -] - -CONF.register_opts(http_dispatcher_opts, group="dispatcher_http") - class HttpDispatcher(DispatchBase): """Dispatcher class for posting data into a http target. @@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase): __dispatcher_type__ = "Http" - def __init__(self, conf, config): + def __init__(self, conf): super(HttpDispatcher, self).__init__(conf) + http_conf = conf['dispatcher_http'] self.headers = {'Content-type': 'application/json'} - self.timeout = CONF.dispatcher_http.timeout - self.target = CONF.dispatcher_http.target - self.raw_result = [] - self.result = { - "project_name": "yardstick", - "description": "yardstick test cases result", - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "version": os.environ.get('YARDSTICK_VERSION', 'unknown'), - "build_tag": os.environ.get('BUILD_TAG') - } - - def record_result_data(self, data): - self.raw_result.append(data) + self.timeout = int(http_conf.get('timeout', 5)) + self.target = http_conf.get('target', 'http://127.0.0.1:8000/results') - def flush_result_data(self): + def flush_result_data(self, data): if self.target == '': # if the target was not set, do not do anything LOG.error('Dispatcher target was not set, no data will' 'be posted.') return - self.result["details"] = {'results': self.raw_result} - - case_name = "" - for v in self.raw_result: - if isinstance(v, dict) and "scenario_cfg" in v: - case_name = v["scenario_cfg"]["tc"] - break - if case_name == "": - LOG.error('Test result : %s', - jsonutils.dump_as_bytes(self.result)) - LOG.error('The case_name cannot be found, no data will be posted.') - return + result = data['result'] + self.info = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + self._upload_case_result(case, data) - self.result["case_name"] = case_name + def _upload_case_result(self, case, data): + try: + scenario_data = data.get('tc_data', [])[0] + except IndexError: + current_time = datetime.now() + else: + timestamp = float(scenario_data.get('timestamp', 0.0)) + current_time = datetime.fromtimestamp(timestamp) + + result = { + "project_name": "yardstick", + "case_name": case, + "description": "yardstick ci scenario status", + "scenario": self.info.get('deploy_scenario'), + "version": self.info.get('version'), + "pod_name": self.info.get('pod_name'), + "installer": self.info.get('installer'), + "build_tag": os.environ.get('BUILD_TAG'), + "criteria": data.get('criteria'), + "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "trust_indicator": "", + "details": "" + } try: - LOG.debug('Test result : %s', - jsonutils.dump_as_bytes(self.result)) + LOG.debug('Test result : %s', result) res = requests.post(self.target, - data=jsonutils.dump_as_bytes(self.result), + data=jsonutils.dump_as_bytes(result), headers=self.headers, timeout=self.timeout) LOG.debug('Test result posting finished with status code' ' %d.' % res.status_code) except Exception as err: - LOG.exception('Failed to record result data: %s', - err) + LOG.exception('Failed to record result data: %s', err) diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py index 53af79c71..373aae13a 100644 --- a/yardstick/dispatcher/influxdb.py +++ b/yardstick/dispatcher/influxdb.py @@ -10,13 +10,11 @@ from __future__ import absolute_import import logging -import os import time import collections import requests import six -from oslo_serialization import jsonutils from third_party.influxdb.influxdb_line_protocol import make_lines from yardstick.dispatcher.base import Base as DispatchBase @@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase): __dispatcher_type__ = "Influxdb" - def __init__(self, conf, config): + def __init__(self, conf): super(InfluxdbDispatcher, self).__init__(conf) - db_conf = config['yardstick'].get('dispatcher_influxdb', {}) + db_conf = conf['dispatcher_influxdb'] self.timeout = int(db_conf.get('timeout', 5)) self.target = db_conf.get('target', 'http://127.0.0.1:8086') self.db_name = db_conf.get('db_name', 'yardstick') self.username = db_conf.get('username', 'root') self.password = db_conf.get('password', 'root') + self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name) - self.raw_result = [] - self.case_name = "" - self.tc = "" + self.task_id = -1 - self.runners_info = {} - self.static_tags = { - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'), - "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH', - 'unknown')) + def flush_result_data(self, data): + LOG.debug('Test result all : %s', data) + if self.target == '': + # if the target was not set, do not do anything + LOG.error('Dispatcher target was not set, no data will be posted.') + + result = data['result'] + self.tags = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + tc_criteria = data['criteria'] + for record in data['tc_data']: + self._upload_one_record(record, case, tc_criteria) + + return 0 + + def _upload_one_record(self, data, case, tc_criteria): + try: + line = self._data_to_line_protocol(data, case, tc_criteria) + LOG.debug('Test result line format : %s', line) + res = requests.post(self.influxdb_url, + data=line, + auth=(self.username, self.password), + timeout=self.timeout) + if res.status_code != 204: + LOG.error('Test result posting finished with status code' + ' %d.', res.status_code) + LOG.error(res.text) + + except Exception as err: + LOG.exception('Failed to record result data: %s', err) + + def _data_to_line_protocol(self, data, case, criteria): + msg = {} + point = { + "measurement": case, + "fields": self._dict_key_flatten(data["data"]), + "time": self._get_nano_timestamp(data), + "tags": self._get_extended_tags(criteria), } + msg["points"] = [point] + msg["tags"] = self.tags + + return make_lines(msg).encode('utf-8') def _dict_key_flatten(self, data): next_data = {} @@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase): def _get_nano_timestamp(self, results): try: - timestamp = results["benchmark"]["timestamp"] + timestamp = results["timestamp"] except Exception: timestamp = time.time() return str(int(float(timestamp) * 1000000000)) - def _get_extended_tags(self, data): - runner_info = self.runners_info[data["runner_id"]] + def _get_extended_tags(self, criteria): tags = { - "runner_id": data["runner_id"], "task_id": self.task_id, - "scenarios": runner_info["scenarios"] + "criteria": criteria } - if "host" in runner_info: - tags["host"] = runner_info["host"] - if "target" in runner_info: - tags["target"] = runner_info["target"] return tags - - def _data_to_line_protocol(self, data): - msg = {} - point = { - "measurement": self.tc, - "fields": self._dict_key_flatten(data["benchmark"]["data"]), - "time": self._get_nano_timestamp(data), - "tags": self._get_extended_tags(data), - } - msg["points"] = [point] - msg["tags"] = self.static_tags - - return make_lines(msg).encode('utf-8') - - def record_result_data(self, data): - LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data)) - self.raw_result.append(data) - if self.target == '': - # if the target was not set, do not do anything - LOG.error('Dispatcher target was not set, no data will' - 'be posted.') - return -1 - - if isinstance(data, dict) and "scenario_cfg" in data: - self.tc = data["scenario_cfg"]["tc"] - self.task_id = data["scenario_cfg"]["task_id"] - scenario_cfg = data["scenario_cfg"] - runner_id = data["runner_id"] - self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]} - if "host" in scenario_cfg: - self.runners_info[runner_id]["host"] = scenario_cfg["host"] - if "target" in scenario_cfg: - self.runners_info[runner_id]["target"] = scenario_cfg["target"] - return 0 - - if self.tc == "": - LOG.error('Test result : %s', jsonutils.dump_as_bytes(data)) - LOG.error('The case_name cannot be found, no data will be posted.') - return -1 - - try: - line = self._data_to_line_protocol(data) - LOG.debug('Test result line format : %s', line) - res = requests.post(self.influxdb_url, - data=line, - auth=(self.username, self.password), - timeout=self.timeout) - if res.status_code != 204: - LOG.error('Test result posting finished with status code' - ' %d.', res.status_code) - LOG.error(res.text) - - except Exception as err: - LOG.exception('Failed to record result data: %s', - err) - return -1 - return 0 - - def flush_result_data(self): - LOG.debug('Test result all : %s', - jsonutils.dump_as_bytes(self.raw_result)) - return 0 |