diff options
-rw-r--r-- | yardstick/benchmark/core/task.py | 19 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 24 | ||||
-rw-r--r-- | yardstick/dispatcher/influxdb.py | 34 | ||||
-rw-r--r-- | yardstick/tests/unit/benchmark/core/test_task.py | 11 | ||||
-rw-r--r-- | yardstick/tests/unit/benchmark/runner/test_base.py | 17 |
5 files changed, 79 insertions, 26 deletions
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index 5fcc9182c..f5d2b18ac 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -120,7 +120,7 @@ class Task(object): # pragma: no cover case_name = os.path.splitext(os.path.basename(task_files[i]))[0] try: - data = self._run(scenarios, run_in_parallel, args.output_file) + data = self._run(scenarios, run_in_parallel, output_config) except KeyboardInterrupt: raise except Exception: # pylint: disable=broad-except @@ -232,11 +232,12 @@ class Task(object): # pragma: no cover def _do_output(self, output_config, result): dispatchers = DispatcherBase.get(output_config) + dispatchers = (d for d in dispatchers if d.__dispatcher_type__ != 'Influxdb') for dispatcher in dispatchers: dispatcher.flush_result_data(result) - def _run(self, scenarios, run_in_parallel, output_file): + def _run(self, scenarios, run_in_parallel, output_config): """Deploys context and calls runners""" for context in self.contexts: context.deploy() @@ -247,14 +248,14 @@ class Task(object): # pragma: no cover # Start all background scenarios for scenario in filter(_is_background_scenario, scenarios): scenario["runner"] = dict(type="Duration", duration=1000000000) - runner = self.run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_config) background_runners.append(runner) runners = [] if run_in_parallel: for scenario in scenarios: if not _is_background_scenario(scenario): - runner = self.run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_config) runners.append(runner) # Wait for runners to finish @@ -263,12 +264,12 @@ class Task(object): # pragma: no cover if status != 0: raise RuntimeError( "{0} runner status {1}".format(runner.__execution_type__, status)) - LOG.info("Runner ended, output in %s", output_file) + LOG.info("Runner ended") else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): - runner = self.run_one_scenario(scenario, output_file) + runner = self.run_one_scenario(scenario, output_config) status = runner_join(runner, background_runners, self.outputs, result) if status != 0: LOG.error('Scenario NO.%s: "%s" ERROR!', @@ -276,7 +277,7 @@ class Task(object): # pragma: no cover scenario.get('type')) raise RuntimeError( "{0} runner status {1}".format(runner.__execution_type__, status)) - LOG.info("Runner ended, output in %s", output_file) + LOG.info("Runner ended") # Abort background runners for runner in background_runners: @@ -313,10 +314,10 @@ class Task(object): # pragma: no cover else: return op - def run_one_scenario(self, scenario_cfg, output_file): + def run_one_scenario(self, scenario_cfg, output_config): """run one scenario using context""" runner_cfg = scenario_cfg["runner"] - runner_cfg['output_filename'] = output_file + runner_cfg['output_config'] = output_config options = scenario_cfg.get('options', {}) scenario_cfg['options'] = self._parse_options(options) diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a887fa5b3..99386a440 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -23,6 +23,7 @@ import multiprocessing import subprocess import time import traceback +from subprocess import CalledProcessError import importlib @@ -30,6 +31,7 @@ from six.moves.queue import Empty import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario +from yardstick.dispatcher.base import Base as DispatcherBase log = logging.getLogger(__name__) @@ -39,7 +41,7 @@ def _execute_shell_command(command): exitcode = 0 try: output = subprocess.check_output(command, shell=True) - except Exception: + except CalledProcessError: exitcode = -1 output = traceback.format_exc() log.error("exec command '%s' error:\n ", command) @@ -137,6 +139,8 @@ class Runner(object): Runner.release(runner) def __init__(self, config): + self.task_id = None + self.case_name = None self.config = config self.periodic_action_process = None self.output_queue = multiprocessing.Queue() @@ -170,6 +174,8 @@ class Runner(object): cls = getattr(module, path_split[-1]) self.config['object'] = class_name + self.case_name = scenario_cfg['tc'] + self.task_id = scenario_cfg['task_id'] self.aborted.clear() # run a potentially configured pre-start action @@ -245,10 +251,24 @@ class Runner(object): def get_result(self): result = [] + + dispatcher = self.config['output_config']['DEFAULT']['dispatcher'] + output_in_influxdb = 'influxdb' in dispatcher + while not self.result_queue.empty(): log.debug("result_queue size %s", self.result_queue.qsize()) try: - result.append(self.result_queue.get(True, 1)) + one_record = self.result_queue.get(True, 1) except Empty: pass + else: + if output_in_influxdb: + self._output_to_influxdb(one_record) + + result.append(one_record) return result + + def _output_to_influxdb(self, record): + dispatchers = DispatcherBase.get(self.config['output_config']) + dispatcher = next((d for d in dispatchers if d.__dispatcher_type__ == 'Influxdb')) + dispatcher.upload_one_record(record, self.case_name, '', task_id=self.task_id) diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py index 632b433b5..e8c7cf57b 100644 --- a/yardstick/dispatcher/influxdb.py +++ b/yardstick/dispatcher/influxdb.py @@ -11,8 +11,10 @@ from __future__ import absolute_import import logging import time +import os import requests +from requests import ConnectionError from yardstick.common import utils from third_party.influxdb.influxdb_line_protocol import make_lines @@ -38,7 +40,8 @@ class InfluxdbDispatcher(DispatchBase): self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name) - self.task_id = -1 + self.task_id = None + self.tags = None def flush_result_data(self, data): LOG.debug('Test result all : %s', data) @@ -57,28 +60,41 @@ class InfluxdbDispatcher(DispatchBase): for record in data['tc_data']: # skip results with no data because we influxdb encode empty dicts if record.get("data"): - self._upload_one_record(record, case, tc_criteria) + self.upload_one_record(record, case, tc_criteria) return 0 - def _upload_one_record(self, data, case, tc_criteria): + def upload_one_record(self, data, case, tc_criteria, task_id=None): + if task_id: + self.task_id = task_id + + line = self._data_to_line_protocol(data, case, tc_criteria) + LOG.debug('Test result line format : %s', line) + try: - line = self._data_to_line_protocol(data, case, tc_criteria) - LOG.debug('Test result line format : %s', line) res = requests.post(self.influxdb_url, data=line, auth=(self.username, self.password), timeout=self.timeout) + except ConnectionError as err: + LOG.exception('Failed to record result data: %s', err) + else: if res.status_code != 204: LOG.error('Test result posting finished with status code' ' %d.', res.status_code) LOG.error(res.text) - except Exception as err: - LOG.exception('Failed to record result data: %s', err) - def _data_to_line_protocol(self, data, case, criteria): msg = {} + + if not self.tags: + self.tags = { + 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'), + 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'), + 'pod_name': os.environ.get('NODE_NAME', 'unknown'), + 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown') + } + point = { "measurement": case, "fields": utils.flatten_dict_key(data["data"]), @@ -93,7 +109,7 @@ class InfluxdbDispatcher(DispatchBase): def _get_nano_timestamp(self, results): try: timestamp = results["timestamp"] - except Exception: + except KeyError: timestamp = time.time() return str(int(float(timestamp) * 1000000000)) diff --git a/yardstick/tests/unit/benchmark/core/test_task.py b/yardstick/tests/unit/benchmark/core/test_task.py index bac035fb9..ee00d8826 100644 --- a/yardstick/tests/unit/benchmark/core/test_task.py +++ b/yardstick/tests/unit/benchmark/core/test_task.py @@ -48,8 +48,15 @@ class TaskTestCase(unittest.TestCase): def test__do_output(self, mock_dispatcher): t = task.Task() output_config = {"DEFAULT": {"dispatcher": "file, http"}} - mock_dispatcher.get = mock.MagicMock(return_value=[mock.MagicMock(), - mock.MagicMock()]) + + dispatcher1 = mock.MagicMock() + dispatcher1.__dispatcher_type__ = 'file' + + dispatcher2 = mock.MagicMock() + dispatcher2.__dispatcher_type__ = 'http' + + mock_dispatcher.get = mock.MagicMock(return_value=[dispatcher1, + dispatcher2]) self.assertEqual(None, t._do_output(output_config, {})) @mock.patch.object(task, 'Context') diff --git a/yardstick/tests/unit/benchmark/runner/test_base.py b/yardstick/tests/unit/benchmark/runner/test_base.py index 0fdc42347..59739c54f 100644 --- a/yardstick/tests/unit/benchmark/runner/test_base.py +++ b/yardstick/tests/unit/benchmark/runner/test_base.py @@ -11,6 +11,8 @@ import time import mock import unittest +from subprocess import CalledProcessError + from yardstick.benchmark.runners import base from yardstick.benchmark.runners import iteration @@ -20,19 +22,19 @@ class ActionTestCase(unittest.TestCase): @mock.patch("yardstick.benchmark.runners.base.subprocess") def test__execute_shell_command(self, mock_subprocess): - mock_subprocess.check_output.side_effect = Exception() + mock_subprocess.check_output.side_effect = CalledProcessError(-1, '') self.assertEqual(base._execute_shell_command("")[0], -1) @mock.patch("yardstick.benchmark.runners.base.subprocess") def test__single_action(self, mock_subprocess): - mock_subprocess.check_output.side_effect = Exception() + mock_subprocess.check_output.side_effect = CalledProcessError(-1, '') base._single_action(0, "echo", mock.MagicMock()) @mock.patch("yardstick.benchmark.runners.base.subprocess") def test__periodic_action(self, mock_subprocess): - mock_subprocess.check_output.side_effect = Exception() + mock_subprocess.check_output.side_effect = CalledProcessError(-1, '') base._periodic_action(0, "echo", mock.MagicMock()) @@ -40,7 +42,14 @@ class ActionTestCase(unittest.TestCase): class RunnerTestCase(unittest.TestCase): def setUp(self): - self.runner = iteration.IterationRunner({}) + config = { + 'output_config': { + 'DEFAULT': { + 'dispatcher': 'file' + } + } + } + self.runner = iteration.IterationRunner(config) @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") def test_get_output(self, *args): |