aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorchenjiankun <chenjiankun1@huawei.com>2017-05-24 07:22:51 +0000
committerchenjiankun <chenjiankun1@huawei.com>2017-06-22 11:06:17 +0000
commitacc757fc7cf9db54d97d4563cd294efafc3f7747 (patch)
tree20a55964d84fa84c06a91b2e9b9233902f42c2c7
parent5c33b82efbc0f7e58bdcfc4288ce08b7b3c999f2 (diff)
Yardstick output format unified
JIRA: YARDSTICK-658 Currently the yardstick have three dispatcher: file, influxdb, mongodb. (influxdb using API to get result and mongodb using testAPI to get result) But their output format is different. It is hard to use. In this patch, make all dispatchers using the same data source. And make the output format of file and influxdb unified. As for mongodb, since it is related to testAPI, so I make it push data every test case. The unified output format is: http://paste.openstack.org/show/610125/ Change-Id: I854ac4f03e6f904469b07b0c924c7d850545ae5b Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
-rw-r--r--api/resources/results.py63
-rw-r--r--tests/unit/benchmark/core/test_task.py1
-rw-r--r--tests/unit/benchmark/runner/test_base.py4
-rw-r--r--tests/unit/dispatcher/test_influxdb.py36
-rw-r--r--yardstick/benchmark/core/task.py112
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py9
-rwxr-xr-xyardstick/benchmark/runners/base.py74
-rw-r--r--yardstick/benchmark/runners/duration.py9
-rw-r--r--yardstick/benchmark/runners/iteration.py9
-rw-r--r--yardstick/benchmark/runners/sequence.py9
-rw-r--r--yardstick/cmd/commands/task.py14
-rw-r--r--yardstick/dispatcher/base.py10
-rw-r--r--yardstick/dispatcher/file.py18
-rw-r--r--yardstick/dispatcher/http.py91
-rw-r--r--yardstick/dispatcher/influxdb.py138
15 files changed, 299 insertions, 298 deletions
diff --git a/api/resources/results.py b/api/resources/results.py
index 86fc25193..a0527ed8c 100644
--- a/api/resources/results.py
+++ b/api/resources/results.py
@@ -28,12 +28,12 @@ def getResult(args):
uuid.UUID(task_id)
except KeyError:
message = 'task_id must be provided'
- return common_utils.error_handler(message)
+ return common_utils.result_handler(2, message)
task = TasksHandler().get_task_by_taskid(task_id)
def _unfinished():
- return common_utils.result_handler(0, [])
+ return common_utils.result_handler(0, {})
def _finished():
testcases = task.details.split(',')
@@ -44,7 +44,7 @@ def getResult(args):
data = common_utils.translate_to_str(influx_utils.query(query_sql))
return data
- result = {k: get_data(k) for k in testcases}
+ result = _format_data({k: get_data(k) for k in testcases})
return common_utils.result_handler(1, result)
@@ -61,4 +61,59 @@ def getResult(args):
}
return switcher.get(status, lambda: 'nothing')()
except IndexError:
- return common_utils.error_handler('no such task')
+ return common_utils.result_handler(2, 'no such task')
+
+
+def _format_data(data):
+ try:
+ first_value = data.values()[0][0]
+ except IndexError:
+ return {'criteria': 'FAIL', 'testcases': {}}
+ else:
+ info = {
+ 'deploy_scenario': first_value.get('deploy_scenario'),
+ 'installer': first_value.get('installer'),
+ 'pod_name': first_value.get('pod_name'),
+ 'version': first_value.get('version')
+ }
+ task_id = first_value.get('task_id')
+ criteria = first_value.get('criteria')
+ testcases = {k: _get_case_data(v) for k, v in data.items()}
+
+ result = {
+ 'criteria': criteria,
+ 'info': info,
+ 'task_id': task_id,
+ 'testcases': testcases
+ }
+ return result
+
+
+def _get_case_data(data):
+ try:
+ scenario = data[0]
+ except IndexError:
+ return {'tc_data': [], 'criteria': 'FAIL'}
+ else:
+ tc_data = [_get_scenario_data(s) for s in data]
+ criteria = scenario.get('criteria')
+ return {'tc_data': tc_data, 'criteria': criteria}
+
+
+def _get_scenario_data(data):
+ result = {
+ 'data': {},
+ 'timestamp': ''
+ }
+
+ blacklist = {'criteria', 'deploy_scenario', 'host', 'installer',
+ 'pod_name', 'runner_id', 'scenarios', 'target',
+ 'task_id', 'time', 'version'}
+
+ keys = set(data.keys()) - set(blacklist)
+ for k in keys:
+ result['data'][k] = data[k]
+
+ result['timestamp'] = data.get('time')
+
+ return result
diff --git a/tests/unit/benchmark/core/test_task.py b/tests/unit/benchmark/core/test_task.py
index 8034392f4..b64bb8eed 100644
--- a/tests/unit/benchmark/core/test_task.py
+++ b/tests/unit/benchmark/core/test_task.py
@@ -65,6 +65,7 @@ class TaskTestCase(unittest.TestCase):
runner = mock.Mock()
runner.join.return_value = 0
runner.get_output.return_value = {}
+ runner.get_result.return_value = []
mock_base_runner.Runner.get.return_value = runner
t._run([scenario], False, "yardstick.out")
self.assertTrue(runner.run.called)
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py
index 7880fe5a5..6e72fa548 100644
--- a/tests/unit/benchmark/runner/test_base.py
+++ b/tests/unit/benchmark/runner/test_base.py
@@ -13,7 +13,6 @@ from __future__ import print_function
from __future__ import absolute_import
import unittest
-import multiprocessing
import time
from yardstick.benchmark.runners.iteration import IterationRunner
@@ -22,8 +21,7 @@ from yardstick.benchmark.runners.iteration import IterationRunner
class RunnerTestCase(unittest.TestCase):
def test_get_output(self):
- queue = multiprocessing.Queue()
- runner = IterationRunner({}, queue)
+ runner = IterationRunner({})
runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
runner.output_queue.put({'criteria': 'PASS'})
diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py
index dca3c4189..a5d9b0754 100644
--- a/tests/unit/dispatcher/test_influxdb.py
+++ b/tests/unit/dispatcher/test_influxdb.py
@@ -94,31 +94,31 @@ class InfluxdbDispatcherTestCase(unittest.TestCase):
}
}
- self.yardstick_conf = {'yardstick': {}}
-
- def test_record_result_data_no_target(self):
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
- influxdb.target = ''
- self.assertEqual(influxdb.record_result_data(self.data1), -1)
-
- def test_record_result_data_no_case_name(self):
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
- self.assertEqual(influxdb.record_result_data(self.data2), -1)
+ self.yardstick_conf = {'dispatcher_influxdb': {}}
@mock.patch('yardstick.dispatcher.influxdb.requests')
def test_record_result_data(self, mock_requests):
type(mock_requests.post.return_value).status_code = 204
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
- self.assertEqual(influxdb.record_result_data(self.data1), 0)
- self.assertEqual(influxdb.record_result_data(self.data2), 0)
- self.assertEqual(influxdb.flush_result_data(), 0)
+ influxdb = InfluxdbDispatcher(self.yardstick_conf)
+ data = {
+ 'status': 1,
+ 'result': {
+ 'criteria': 'PASS',
+ 'info': {
+ },
+ 'task_id': 'b9e2bbc2-dfd8-410d-8c24-07771e9f7979',
+ 'testcases': {
+ }
+ }
+ }
+ self.assertEqual(influxdb.flush_result_data(data), 0)
def test__dict_key_flatten(self):
line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,' \
'mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00'
# need to sort for assert to work
line = ",".join(sorted(line.split(',')))
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
+ influxdb = InfluxdbDispatcher(self.yardstick_conf)
flattened_data = influxdb._dict_key_flatten(
self.data3['benchmark']['data'])
result = ",".join(
@@ -126,15 +126,15 @@ class InfluxdbDispatcherTestCase(unittest.TestCase):
self.assertEqual(result, line)
def test__get_nano_timestamp(self):
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
- results = {'benchmark': {'timestamp': '1451461248.925574'}}
+ influxdb = InfluxdbDispatcher(self.yardstick_conf)
+ results = {'timestamp': '1451461248.925574'}
self.assertEqual(influxdb._get_nano_timestamp(results),
'1451461248925574144')
@mock.patch('yardstick.dispatcher.influxdb.time')
def test__get_nano_timestamp_except(self, mock_time):
results = {}
- influxdb = InfluxdbDispatcher(None, self.yardstick_conf)
+ influxdb = InfluxdbDispatcher(self.yardstick_conf)
mock_time.time.return_value = 1451461248.925574
self.assertEqual(influxdb._get_nano_timestamp(results),
'1451461248925574144')
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index c44081b73..091aa99d3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -24,6 +24,7 @@ from six.moves import filter
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.runners import base as base_runner
+from yardstick.dispatcher.base import Base as DispatcherBase
from yardstick.common.task_template import TaskTemplate
from yardstick.common.utils import source_env
from yardstick.common import utils
@@ -42,7 +43,6 @@ class Task(object): # pragma: no cover
"""
def __init__(self):
- self.config = {}
self.contexts = []
self.outputs = {}
@@ -55,7 +55,14 @@ class Task(object): # pragma: no cover
check_environment()
- self.config['yardstick'] = utils.parse_ini_file(config_file)
+ output_config = utils.parse_ini_file(config_file)
+ self._init_output_config(output_config)
+ self._set_output_config(output_config, args.output_file)
+ LOG.debug('Output configuration is: %s', output_config)
+
+ if output_config['DEFAULT'].get('dispatcher') == 'file':
+ result = {'status': 0, 'result': {}}
+ utils.write_json_to_file(args.output_file, result)
total_start_time = time.time()
parser = TaskParser(args.inputfile[0])
@@ -75,6 +82,7 @@ class Task(object): # pragma: no cover
if args.parse_only:
sys.exit(0)
+ testcases = {}
# parse task_files
for i in range(0, len(task_files)):
one_task_start_time = time.time()
@@ -90,7 +98,15 @@ class Task(object): # pragma: no cover
meet_precondition)
continue
- self._run(scenarios, run_in_parallel, args.output_file)
+ case_name = os.path.splitext(os.path.basename(task_files[i]))[0]
+ try:
+ data = self._run(scenarios, run_in_parallel, args.output_file)
+ except KeyboardInterrupt:
+ raise
+ except Exception:
+ testcases[case_name] = {'criteria': 'FAIL', 'tc_data': []}
+ else:
+ testcases[case_name] = {'criteria': 'PASS', 'tc_data': data}
if args.keep_deploy:
# keep deployment, forget about stack
@@ -104,6 +120,10 @@ class Task(object): # pragma: no cover
LOG.info("task %s finished in %d secs", task_files[i],
one_task_end_time - one_task_start_time)
+ result = self._get_format_result(testcases)
+
+ self._do_output(output_config, result)
+
total_end_time = time.time()
LOG.info("total finished in %d secs",
total_end_time - total_start_time)
@@ -114,6 +134,65 @@ class Task(object): # pragma: no cover
print("Done, exiting")
+ def _init_output_config(self, output_config):
+ output_config.setdefault('DEFAULT', {})
+ output_config.setdefault('dispatcher_http', {})
+ output_config.setdefault('dispatcher_file', {})
+ output_config.setdefault('dispatcher_influxdb', {})
+ output_config.setdefault('nsb', {})
+
+ def _set_output_config(self, output_config, file_path):
+ try:
+ out_type = os.environ['DISPATCHER']
+ except KeyError:
+ output_config['DEFAULT'].setdefault('dispatcher', 'file')
+ else:
+ output_config['DEFAULT']['dispatcher'] = out_type
+
+ output_config['dispatcher_file']['file_path'] = file_path
+
+ try:
+ target = os.environ['TARGET']
+ except KeyError:
+ pass
+ else:
+ k = 'dispatcher_{}'.format(output_config['DEFAULT']['dispatcher'])
+ output_config[k]['target'] = target
+
+ def _get_format_result(self, testcases):
+ criteria = self._get_task_criteria(testcases)
+
+ info = {
+ 'deploy_scenario': os.environ.get('DEPLOY_SCENARIO', 'unknown'),
+ 'installer': os.environ.get('INSTALLER_TYPE', 'unknown'),
+ 'pod_name': os.environ.get('NODE_NAME', 'unknown'),
+ 'version': os.environ.get('YARDSTICK_BRANCH', 'unknown')
+ }
+
+ result = {
+ 'status': 1,
+ 'result': {
+ 'criteria': criteria,
+ 'task_id': self.task_id,
+ 'info': info,
+ 'testcases': testcases
+ }
+ }
+
+ return result
+
+ def _get_task_criteria(self, testcases):
+ criteria = any(t.get('criteria') != 'PASS' for t in testcases.values())
+ if criteria:
+ return 'FAIL'
+ else:
+ return 'PASS'
+
+ def _do_output(self, output_config, result):
+
+ dispatcher = DispatcherBase.get(output_config)
+ dispatcher.flush_result_data(result)
+
def _run(self, scenarios, run_in_parallel, output_file):
"""Deploys context and calls runners"""
for context in self.contexts:
@@ -121,6 +200,7 @@ class Task(object): # pragma: no cover
background_runners = []
+ result = []
# Start all background scenarios
for scenario in filter(_is_background_scenario, scenarios):
scenario["runner"] = dict(type="Duration", duration=1000000000)
@@ -136,16 +216,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- runner_join(runner)
+ status = runner_join(runner)
+ if status != 0:
+ LOG.error('Scenario: %s ERROR', scenario.get('type'))
+ raise RuntimeError
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
print("Runner ended, output in", output_file)
# Abort background runners
@@ -154,15 +241,21 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- if runner.join(timeout=60) is None:
+ status = runner.join(timeout=60)
+ if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner_join(runner)
+ status = runner_join(runner)
self.outputs.update(runner.get_output())
+ result.extend(runner.get_result())
else:
base_runner.Runner.release(runner)
+ if status != 0:
+ raise RuntimeError
print("Background task ended")
+ return result
+
def atexit_handler(self):
"""handler for process termination"""
base_runner.Runner.terminate_all()
@@ -227,7 +320,7 @@ class Task(object): # pragma: no cover
if "nodes" in scenario_cfg:
context_cfg["nodes"] = parse_nodes_with_context(scenario_cfg)
- runner = base_runner.Runner.get(runner_cfg, self.config)
+ runner = base_runner.Runner.get(runner_cfg)
print("Starting runner of type '%s'" % runner_cfg["type"])
runner.run(scenario_cfg, context_cfg)
@@ -489,8 +582,7 @@ def runner_join(runner):
"""join (wait for) a runner, exit process at runner failure"""
status = runner.join()
base_runner.Runner.release(runner)
- if status != 0:
- sys.exit("Runner failed")
+ return status
def print_invalid_header(source_name, args):
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 7ec593396..7898ae2bc 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -63,10 +63,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
benchmark.setup()
method = getattr(benchmark, method_name)
- queue.put({'runner_id': runner_cfg['runner_id'],
- 'scenario_cfg': scenario_cfg,
- 'context_cfg': context_cfg})
-
sla_action = None
if "sla" in scenario_cfg:
sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -132,10 +128,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- record = {'runner_id': runner_cfg['runner_id'],
- 'benchmark': benchmark_output}
-
- queue.put(record)
+ queue.put(benchmark_output)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"], "sequence": sequence})
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index ebb9a91b5..f6816c7ed 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -22,46 +22,13 @@ import logging
import multiprocessing
import subprocess
import time
-import os
import traceback
-from oslo_config import cfg
-
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
-from yardstick.dispatcher.base import Base as DispatcherBase
log = logging.getLogger(__name__)
-CONF = cfg.CONF
-
-
-def _output_serializer_main(filename, queue, config):
- """entrypoint for the singleton subprocess writing to outfile
- Use of this process enables multiple instances of a scenario without
- messing up the output file.
- """
- try:
- out_type = config['yardstick'].get('DEFAULT', {})['dispatcher']
- except KeyError:
- out_type = os.environ.get('DISPATCHER', 'file')
-
- conf = {
- 'type': out_type.capitalize(),
- 'file_path': filename
- }
-
- dispatcher = DispatcherBase.get(conf, config)
-
- while True:
- # blocks until data becomes available
- record = queue.get()
- if record == '_TERMINATE_':
- dispatcher.flush_result_data()
- break
- else:
- dispatcher.record_result_data(record)
-
def _execute_shell_command(command):
"""execute shell script with error handling"""
@@ -110,8 +77,6 @@ def _periodic_action(interval, command, queue):
class Runner(object):
- queue = None
- dump_process = None
runners = []
@staticmethod
@@ -131,30 +96,10 @@ class Runner(object):
return types
@staticmethod
- def get(runner_cfg, config):
+ def get(runner_cfg):
"""Returns instance of a scenario runner for execution type.
"""
- # if there is no runner, start the output serializer subprocess
- if not Runner.runners:
- log.debug("Starting dump process file '%s'",
- runner_cfg["output_filename"])
- Runner.queue = multiprocessing.Queue()
- Runner.dump_process = multiprocessing.Process(
- target=_output_serializer_main,
- name="Dumper",
- args=(runner_cfg["output_filename"], Runner.queue, config))
- Runner.dump_process.start()
-
- return Runner.get_cls(runner_cfg["type"])(runner_cfg, Runner.queue)
-
- @staticmethod
- def release_dump_process():
- """Release the dumper process"""
- log.debug("Stopping dump process")
- if Runner.dump_process:
- Runner.queue.put('_TERMINATE_')
- Runner.dump_process.join()
- Runner.dump_process = None
+ return Runner.get_cls(runner_cfg["type"])(runner_cfg)
@staticmethod
def release(runner):
@@ -162,10 +107,6 @@ class Runner(object):
if runner in Runner.runners:
Runner.runners.remove(runner)
- # if this was the last runner, stop the output serializer subprocess
- if not Runner.runners:
- Runner.release_dump_process()
-
@staticmethod
def terminate(runner):
"""Terminate the runner"""
@@ -179,7 +120,6 @@ class Runner(object):
# release dumper process as some errors before any runner is created
if not Runner.runners:
- Runner.release_dump_process()
return
for runner in Runner.runners:
@@ -193,11 +133,11 @@ class Runner(object):
runner.periodic_action_process = None
Runner.release(runner)
- def __init__(self, config, queue):
+ def __init__(self, config):
self.config = config
self.periodic_action_process = None
- self.result_queue = queue
self.output_queue = multiprocessing.Queue()
+ self.result_queue = multiprocessing.Queue()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
@@ -276,3 +216,9 @@ class Runner(object):
while not self.output_queue.empty():
result.update(self.output_queue.get())
return result
+
+ def get_result(self):
+ result = []
+ while not self.result_queue.empty():
+ result.append(self.result_queue.get())
+ return result
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index 2bf2cd2fe..69d744562 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -52,10 +52,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if "sla" in scenario_cfg:
sla_action = scenario_cfg["sla"].get("action", "assert")
- queue.put({'runner_id': runner_cfg['runner_id'],
- 'scenario_cfg': scenario_cfg,
- 'context_cfg': context_cfg})
-
start = time.time()
while True:
@@ -90,10 +86,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- record = {'runner_id': runner_cfg['runner_id'],
- 'benchmark': benchmark_output}
-
- queue.put(record)
+ queue.put(benchmark_output)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"], "sequence": sequence})
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 973bb9ac4..50fe106bd 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -53,10 +53,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
method = getattr(benchmark, method_name)
- queue.put({'runner_id': runner_cfg['runner_id'],
- 'scenario_cfg': scenario_cfg,
- 'context_cfg': context_cfg})
-
sla_action = None
if "sla" in scenario_cfg:
sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -105,10 +101,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- record = {'runner_id': runner_cfg['runner_id'],
- 'benchmark': benchmark_output}
-
- queue.put(record)
+ queue.put(benchmark_output)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index 74ff82204..68e272c57 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -57,10 +57,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
benchmark.setup()
method = getattr(benchmark, method_name)
- queue.put({'runner_id': runner_cfg['runner_id'],
- 'scenario_cfg': scenario_cfg,
- 'context_cfg': context_cfg})
-
sla_action = None
if "sla" in scenario_cfg:
sla_action = scenario_cfg["sla"].get("action", "assert")
@@ -99,10 +95,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- record = {'runner_id': runner_cfg['runner_id'],
- 'benchmark': benchmark_output}
-
- queue.put(record)
+ queue.put(benchmark_output)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"], "sequence": sequence})
diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py
index 16a4db291..6384e6eb1 100644
--- a/yardstick/cmd/commands/task.py
+++ b/yardstick/cmd/commands/task.py
@@ -14,7 +14,6 @@ from __future__ import absolute_import
from yardstick.benchmark.core.task import Task
from yardstick.common.utils import cliargs
from yardstick.common.utils import write_json_to_file
-from yardstick.common.utils import read_json_from_file
from yardstick.cmd.commands import change_osloobj_to_paras
output_file_default = "/tmp/yardstick.out"
@@ -46,22 +45,11 @@ class TaskCommands(object):
param = change_osloobj_to_paras(args)
self.output_file = param.output_file
- self._init_result_file()
-
try:
Task().start(param, **kwargs)
- self._finish()
except Exception as e:
self._write_error_data(e)
-
- def _init_result_file(self):
- data = {'status': 0, 'result': []}
- write_json_to_file(self.output_file, data)
-
- def _finish(self):
- result = read_json_from_file(self.output_file).get('result')
- data = {'status': 1, 'result': result}
- write_json_to_file(self.output_file, data)
+ raise
def _write_error_data(self, error):
data = {'status': 2, 'result': str(error)}
diff --git a/yardstick/dispatcher/base.py b/yardstick/dispatcher/base.py
index a1c858297..e77249c54 100644
--- a/yardstick/dispatcher/base.py
+++ b/yardstick/dispatcher/base.py
@@ -38,15 +38,13 @@ class Base(object):
raise RuntimeError("No such dispatcher_type %s" % dispatcher_type)
@staticmethod
- def get(conf, config):
+ def get(config):
"""Returns instance of a dispatcher for dispatcher type.
"""
- return Base.get_cls(conf["type"])(conf, config)
+ out_type = config['DEFAULT']['dispatcher']
- @abc.abstractmethod
- def record_result_data(self, data):
- """Recording result data interface."""
+ return Base.get_cls(out_type.capitalize())(config)
@abc.abstractmethod
- def flush_result_data(self):
+ def flush_result_data(self, data):
"""Flush result data into permanent storage media interface."""
diff --git a/yardstick/dispatcher/file.py b/yardstick/dispatcher/file.py
index 8acd5dfbb..24fc22dd4 100644
--- a/yardstick/dispatcher/file.py
+++ b/yardstick/dispatcher/file.py
@@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase):
__dispatcher_type__ = "File"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(FileDispatcher, self).__init__(conf)
- self.result = []
+ self.target = conf['dispatcher_file'].get('file_path',
+ consts.DEFAULT_OUTPUT_FILE)
- def record_result_data(self, data):
- self.result.append(data)
-
- def flush_result_data(self):
- file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE)
-
- res = utils.read_json_from_file(file_path).get('result')
- res.extend(self.result)
-
- data = {'status': 0, 'result': res}
- utils.write_json_to_file(file_path, data)
+ def flush_result_data(self, data):
+ utils.write_json_to_file(self.target, data)
diff --git a/yardstick/dispatcher/http.py b/yardstick/dispatcher/http.py
index 0d8d2a346..9bf9af33b 100644
--- a/yardstick/dispatcher/http.py
+++ b/yardstick/dispatcher/http.py
@@ -20,30 +20,15 @@ from __future__ import absolute_import
import logging
import os
+from datetime import datetime
from oslo_serialization import jsonutils
import requests
-from oslo_config import cfg
from yardstick.dispatcher.base import Base as DispatchBase
LOG = logging.getLogger(__name__)
-CONF = cfg.CONF
-http_dispatcher_opts = [
- cfg.StrOpt('target',
- default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'),
- help='The target where the http request will be sent. '
- 'If this is not set, no data will be posted. For '
- 'example: target = http://hostname:1234/path'),
- cfg.IntOpt('timeout',
- default=5,
- help='The max time in seconds to wait for a request to '
- 'timeout.'),
-]
-
-CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
-
class HttpDispatcher(DispatchBase):
"""Dispatcher class for posting data into a http target.
@@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase):
__dispatcher_type__ = "Http"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(HttpDispatcher, self).__init__(conf)
+ http_conf = conf['dispatcher_http']
self.headers = {'Content-type': 'application/json'}
- self.timeout = CONF.dispatcher_http.timeout
- self.target = CONF.dispatcher_http.target
- self.raw_result = []
- self.result = {
- "project_name": "yardstick",
- "description": "yardstick test cases result",
- "pod_name": os.environ.get('NODE_NAME', 'unknown'),
- "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
- "version": os.environ.get('YARDSTICK_VERSION', 'unknown'),
- "build_tag": os.environ.get('BUILD_TAG')
- }
-
- def record_result_data(self, data):
- self.raw_result.append(data)
+ self.timeout = int(http_conf.get('timeout', 5))
+ self.target = http_conf.get('target', 'http://127.0.0.1:8000/results')
- def flush_result_data(self):
+ def flush_result_data(self, data):
if self.target == '':
# if the target was not set, do not do anything
LOG.error('Dispatcher target was not set, no data will'
'be posted.')
return
- self.result["details"] = {'results': self.raw_result}
-
- case_name = ""
- for v in self.raw_result:
- if isinstance(v, dict) and "scenario_cfg" in v:
- case_name = v["scenario_cfg"]["tc"]
- break
- if case_name == "":
- LOG.error('Test result : %s',
- jsonutils.dump_as_bytes(self.result))
- LOG.error('The case_name cannot be found, no data will be posted.')
- return
+ result = data['result']
+ self.info = result['info']
+ self.task_id = result['task_id']
+ self.criteria = result['criteria']
+ testcases = result['testcases']
+
+ for case, data in testcases.items():
+ self._upload_case_result(case, data)
- self.result["case_name"] = case_name
+ def _upload_case_result(self, case, data):
+ try:
+ scenario_data = data.get('tc_data', [])[0]
+ except IndexError:
+ current_time = datetime.now()
+ else:
+ timestamp = float(scenario_data.get('timestamp', 0.0))
+ current_time = datetime.fromtimestamp(timestamp)
+
+ result = {
+ "project_name": "yardstick",
+ "case_name": case,
+ "description": "yardstick ci scenario status",
+ "scenario": self.info.get('deploy_scenario'),
+ "version": self.info.get('version'),
+ "pod_name": self.info.get('pod_name'),
+ "installer": self.info.get('installer'),
+ "build_tag": os.environ.get('BUILD_TAG'),
+ "criteria": data.get('criteria'),
+ "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+ "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+ "trust_indicator": "",
+ "details": ""
+ }
try:
- LOG.debug('Test result : %s',
- jsonutils.dump_as_bytes(self.result))
+ LOG.debug('Test result : %s', result)
res = requests.post(self.target,
- data=jsonutils.dump_as_bytes(self.result),
+ data=jsonutils.dump_as_bytes(result),
headers=self.headers,
timeout=self.timeout)
LOG.debug('Test result posting finished with status code'
' %d.' % res.status_code)
except Exception as err:
- LOG.exception('Failed to record result data: %s',
- err)
+ LOG.exception('Failed to record result data: %s', err)
diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py
index 53af79c71..373aae13a 100644
--- a/yardstick/dispatcher/influxdb.py
+++ b/yardstick/dispatcher/influxdb.py
@@ -10,13 +10,11 @@
from __future__ import absolute_import
import logging
-import os
import time
import collections
import requests
import six
-from oslo_serialization import jsonutils
from third_party.influxdb.influxdb_line_protocol import make_lines
from yardstick.dispatcher.base import Base as DispatchBase
@@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase):
__dispatcher_type__ = "Influxdb"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(InfluxdbDispatcher, self).__init__(conf)
- db_conf = config['yardstick'].get('dispatcher_influxdb', {})
+ db_conf = conf['dispatcher_influxdb']
self.timeout = int(db_conf.get('timeout', 5))
self.target = db_conf.get('target', 'http://127.0.0.1:8086')
self.db_name = db_conf.get('db_name', 'yardstick')
self.username = db_conf.get('username', 'root')
self.password = db_conf.get('password', 'root')
+
self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
- self.raw_result = []
- self.case_name = ""
- self.tc = ""
+
self.task_id = -1
- self.runners_info = {}
- self.static_tags = {
- "pod_name": os.environ.get('NODE_NAME', 'unknown'),
- "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
- "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'),
- "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH',
- 'unknown'))
+ def flush_result_data(self, data):
+ LOG.debug('Test result all : %s', data)
+ if self.target == '':
+ # if the target was not set, do not do anything
+ LOG.error('Dispatcher target was not set, no data will be posted.')
+
+ result = data['result']
+ self.tags = result['info']
+ self.task_id = result['task_id']
+ self.criteria = result['criteria']
+ testcases = result['testcases']
+
+ for case, data in testcases.items():
+ tc_criteria = data['criteria']
+ for record in data['tc_data']:
+ self._upload_one_record(record, case, tc_criteria)
+
+ return 0
+
+ def _upload_one_record(self, data, case, tc_criteria):
+ try:
+ line = self._data_to_line_protocol(data, case, tc_criteria)
+ LOG.debug('Test result line format : %s', line)
+ res = requests.post(self.influxdb_url,
+ data=line,
+ auth=(self.username, self.password),
+ timeout=self.timeout)
+ if res.status_code != 204:
+ LOG.error('Test result posting finished with status code'
+ ' %d.', res.status_code)
+ LOG.error(res.text)
+
+ except Exception as err:
+ LOG.exception('Failed to record result data: %s', err)
+
+ def _data_to_line_protocol(self, data, case, criteria):
+ msg = {}
+ point = {
+ "measurement": case,
+ "fields": self._dict_key_flatten(data["data"]),
+ "time": self._get_nano_timestamp(data),
+ "tags": self._get_extended_tags(criteria),
}
+ msg["points"] = [point]
+ msg["tags"] = self.tags
+
+ return make_lines(msg).encode('utf-8')
def _dict_key_flatten(self, data):
next_data = {}
@@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase):
def _get_nano_timestamp(self, results):
try:
- timestamp = results["benchmark"]["timestamp"]
+ timestamp = results["timestamp"]
except Exception:
timestamp = time.time()
return str(int(float(timestamp) * 1000000000))
- def _get_extended_tags(self, data):
- runner_info = self.runners_info[data["runner_id"]]
+ def _get_extended_tags(self, criteria):
tags = {
- "runner_id": data["runner_id"],
"task_id": self.task_id,
- "scenarios": runner_info["scenarios"]
+ "criteria": criteria
}
- if "host" in runner_info:
- tags["host"] = runner_info["host"]
- if "target" in runner_info:
- tags["target"] = runner_info["target"]
return tags
-
- def _data_to_line_protocol(self, data):
- msg = {}
- point = {
- "measurement": self.tc,
- "fields": self._dict_key_flatten(data["benchmark"]["data"]),
- "time": self._get_nano_timestamp(data),
- "tags": self._get_extended_tags(data),
- }
- msg["points"] = [point]
- msg["tags"] = self.static_tags
-
- return make_lines(msg).encode('utf-8')
-
- def record_result_data(self, data):
- LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data))
- self.raw_result.append(data)
- if self.target == '':
- # if the target was not set, do not do anything
- LOG.error('Dispatcher target was not set, no data will'
- 'be posted.')
- return -1
-
- if isinstance(data, dict) and "scenario_cfg" in data:
- self.tc = data["scenario_cfg"]["tc"]
- self.task_id = data["scenario_cfg"]["task_id"]
- scenario_cfg = data["scenario_cfg"]
- runner_id = data["runner_id"]
- self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]}
- if "host" in scenario_cfg:
- self.runners_info[runner_id]["host"] = scenario_cfg["host"]
- if "target" in scenario_cfg:
- self.runners_info[runner_id]["target"] = scenario_cfg["target"]
- return 0
-
- if self.tc == "":
- LOG.error('Test result : %s', jsonutils.dump_as_bytes(data))
- LOG.error('The case_name cannot be found, no data will be posted.')
- return -1
-
- try:
- line = self._data_to_line_protocol(data)
- LOG.debug('Test result line format : %s', line)
- res = requests.post(self.influxdb_url,
- data=line,
- auth=(self.username, self.password),
- timeout=self.timeout)
- if res.status_code != 204:
- LOG.error('Test result posting finished with status code'
- ' %d.', res.status_code)
- LOG.error(res.text)
-
- except Exception as err:
- LOG.exception('Failed to record result data: %s',
- err)
- return -1
- return 0
-
- def flush_result_data(self):
- LOG.debug('Test result all : %s',
- jsonutils.dump_as_bytes(self.raw_result))
- return 0