aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/dispatcher
diff options
context:
space:
mode:
authorchenjiankun <chenjiankun1@huawei.com>2017-05-24 07:22:51 +0000
committerchenjiankun <chenjiankun1@huawei.com>2017-06-22 11:06:17 +0000
commitacc757fc7cf9db54d97d4563cd294efafc3f7747 (patch)
tree20a55964d84fa84c06a91b2e9b9233902f42c2c7 /yardstick/dispatcher
parent5c33b82efbc0f7e58bdcfc4288ce08b7b3c999f2 (diff)
Yardstick output format unified
JIRA: YARDSTICK-658 Currently the yardstick have three dispatcher: file, influxdb, mongodb. (influxdb using API to get result and mongodb using testAPI to get result) But their output format is different. It is hard to use. In this patch, make all dispatchers using the same data source. And make the output format of file and influxdb unified. As for mongodb, since it is related to testAPI, so I make it push data every test case. The unified output format is: http://paste.openstack.org/show/610125/ Change-Id: I854ac4f03e6f904469b07b0c924c7d850545ae5b Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
Diffstat (limited to 'yardstick/dispatcher')
-rw-r--r--yardstick/dispatcher/base.py10
-rw-r--r--yardstick/dispatcher/file.py18
-rw-r--r--yardstick/dispatcher/http.py91
-rw-r--r--yardstick/dispatcher/influxdb.py138
4 files changed, 103 insertions, 154 deletions
diff --git a/yardstick/dispatcher/base.py b/yardstick/dispatcher/base.py
index a1c858297..e77249c54 100644
--- a/yardstick/dispatcher/base.py
+++ b/yardstick/dispatcher/base.py
@@ -38,15 +38,13 @@ class Base(object):
raise RuntimeError("No such dispatcher_type %s" % dispatcher_type)
@staticmethod
- def get(conf, config):
+ def get(config):
"""Returns instance of a dispatcher for dispatcher type.
"""
- return Base.get_cls(conf["type"])(conf, config)
+ out_type = config['DEFAULT']['dispatcher']
- @abc.abstractmethod
- def record_result_data(self, data):
- """Recording result data interface."""
+ return Base.get_cls(out_type.capitalize())(config)
@abc.abstractmethod
- def flush_result_data(self):
+ def flush_result_data(self, data):
"""Flush result data into permanent storage media interface."""
diff --git a/yardstick/dispatcher/file.py b/yardstick/dispatcher/file.py
index 8acd5dfbb..24fc22dd4 100644
--- a/yardstick/dispatcher/file.py
+++ b/yardstick/dispatcher/file.py
@@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase):
__dispatcher_type__ = "File"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(FileDispatcher, self).__init__(conf)
- self.result = []
+ self.target = conf['dispatcher_file'].get('file_path',
+ consts.DEFAULT_OUTPUT_FILE)
- def record_result_data(self, data):
- self.result.append(data)
-
- def flush_result_data(self):
- file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE)
-
- res = utils.read_json_from_file(file_path).get('result')
- res.extend(self.result)
-
- data = {'status': 0, 'result': res}
- utils.write_json_to_file(file_path, data)
+ def flush_result_data(self, data):
+ utils.write_json_to_file(self.target, data)
diff --git a/yardstick/dispatcher/http.py b/yardstick/dispatcher/http.py
index 0d8d2a346..9bf9af33b 100644
--- a/yardstick/dispatcher/http.py
+++ b/yardstick/dispatcher/http.py
@@ -20,30 +20,15 @@ from __future__ import absolute_import
import logging
import os
+from datetime import datetime
from oslo_serialization import jsonutils
import requests
-from oslo_config import cfg
from yardstick.dispatcher.base import Base as DispatchBase
LOG = logging.getLogger(__name__)
-CONF = cfg.CONF
-http_dispatcher_opts = [
- cfg.StrOpt('target',
- default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'),
- help='The target where the http request will be sent. '
- 'If this is not set, no data will be posted. For '
- 'example: target = http://hostname:1234/path'),
- cfg.IntOpt('timeout',
- default=5,
- help='The max time in seconds to wait for a request to '
- 'timeout.'),
-]
-
-CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
-
class HttpDispatcher(DispatchBase):
"""Dispatcher class for posting data into a http target.
@@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase):
__dispatcher_type__ = "Http"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(HttpDispatcher, self).__init__(conf)
+ http_conf = conf['dispatcher_http']
self.headers = {'Content-type': 'application/json'}
- self.timeout = CONF.dispatcher_http.timeout
- self.target = CONF.dispatcher_http.target
- self.raw_result = []
- self.result = {
- "project_name": "yardstick",
- "description": "yardstick test cases result",
- "pod_name": os.environ.get('NODE_NAME', 'unknown'),
- "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
- "version": os.environ.get('YARDSTICK_VERSION', 'unknown'),
- "build_tag": os.environ.get('BUILD_TAG')
- }
-
- def record_result_data(self, data):
- self.raw_result.append(data)
+ self.timeout = int(http_conf.get('timeout', 5))
+ self.target = http_conf.get('target', 'http://127.0.0.1:8000/results')
- def flush_result_data(self):
+ def flush_result_data(self, data):
if self.target == '':
# if the target was not set, do not do anything
LOG.error('Dispatcher target was not set, no data will'
'be posted.')
return
- self.result["details"] = {'results': self.raw_result}
-
- case_name = ""
- for v in self.raw_result:
- if isinstance(v, dict) and "scenario_cfg" in v:
- case_name = v["scenario_cfg"]["tc"]
- break
- if case_name == "":
- LOG.error('Test result : %s',
- jsonutils.dump_as_bytes(self.result))
- LOG.error('The case_name cannot be found, no data will be posted.')
- return
+ result = data['result']
+ self.info = result['info']
+ self.task_id = result['task_id']
+ self.criteria = result['criteria']
+ testcases = result['testcases']
+
+ for case, data in testcases.items():
+ self._upload_case_result(case, data)
- self.result["case_name"] = case_name
+ def _upload_case_result(self, case, data):
+ try:
+ scenario_data = data.get('tc_data', [])[0]
+ except IndexError:
+ current_time = datetime.now()
+ else:
+ timestamp = float(scenario_data.get('timestamp', 0.0))
+ current_time = datetime.fromtimestamp(timestamp)
+
+ result = {
+ "project_name": "yardstick",
+ "case_name": case,
+ "description": "yardstick ci scenario status",
+ "scenario": self.info.get('deploy_scenario'),
+ "version": self.info.get('version'),
+ "pod_name": self.info.get('pod_name'),
+ "installer": self.info.get('installer'),
+ "build_tag": os.environ.get('BUILD_TAG'),
+ "criteria": data.get('criteria'),
+ "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+ "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'),
+ "trust_indicator": "",
+ "details": ""
+ }
try:
- LOG.debug('Test result : %s',
- jsonutils.dump_as_bytes(self.result))
+ LOG.debug('Test result : %s', result)
res = requests.post(self.target,
- data=jsonutils.dump_as_bytes(self.result),
+ data=jsonutils.dump_as_bytes(result),
headers=self.headers,
timeout=self.timeout)
LOG.debug('Test result posting finished with status code'
' %d.' % res.status_code)
except Exception as err:
- LOG.exception('Failed to record result data: %s',
- err)
+ LOG.exception('Failed to record result data: %s', err)
diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py
index 53af79c71..373aae13a 100644
--- a/yardstick/dispatcher/influxdb.py
+++ b/yardstick/dispatcher/influxdb.py
@@ -10,13 +10,11 @@
from __future__ import absolute_import
import logging
-import os
import time
import collections
import requests
import six
-from oslo_serialization import jsonutils
from third_party.influxdb.influxdb_line_protocol import make_lines
from yardstick.dispatcher.base import Base as DispatchBase
@@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase):
__dispatcher_type__ = "Influxdb"
- def __init__(self, conf, config):
+ def __init__(self, conf):
super(InfluxdbDispatcher, self).__init__(conf)
- db_conf = config['yardstick'].get('dispatcher_influxdb', {})
+ db_conf = conf['dispatcher_influxdb']
self.timeout = int(db_conf.get('timeout', 5))
self.target = db_conf.get('target', 'http://127.0.0.1:8086')
self.db_name = db_conf.get('db_name', 'yardstick')
self.username = db_conf.get('username', 'root')
self.password = db_conf.get('password', 'root')
+
self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
- self.raw_result = []
- self.case_name = ""
- self.tc = ""
+
self.task_id = -1
- self.runners_info = {}
- self.static_tags = {
- "pod_name": os.environ.get('NODE_NAME', 'unknown'),
- "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
- "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'),
- "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH',
- 'unknown'))
+ def flush_result_data(self, data):
+ LOG.debug('Test result all : %s', data)
+ if self.target == '':
+ # if the target was not set, do not do anything
+ LOG.error('Dispatcher target was not set, no data will be posted.')
+
+ result = data['result']
+ self.tags = result['info']
+ self.task_id = result['task_id']
+ self.criteria = result['criteria']
+ testcases = result['testcases']
+
+ for case, data in testcases.items():
+ tc_criteria = data['criteria']
+ for record in data['tc_data']:
+ self._upload_one_record(record, case, tc_criteria)
+
+ return 0
+
+ def _upload_one_record(self, data, case, tc_criteria):
+ try:
+ line = self._data_to_line_protocol(data, case, tc_criteria)
+ LOG.debug('Test result line format : %s', line)
+ res = requests.post(self.influxdb_url,
+ data=line,
+ auth=(self.username, self.password),
+ timeout=self.timeout)
+ if res.status_code != 204:
+ LOG.error('Test result posting finished with status code'
+ ' %d.', res.status_code)
+ LOG.error(res.text)
+
+ except Exception as err:
+ LOG.exception('Failed to record result data: %s', err)
+
+ def _data_to_line_protocol(self, data, case, criteria):
+ msg = {}
+ point = {
+ "measurement": case,
+ "fields": self._dict_key_flatten(data["data"]),
+ "time": self._get_nano_timestamp(data),
+ "tags": self._get_extended_tags(criteria),
}
+ msg["points"] = [point]
+ msg["tags"] = self.tags
+
+ return make_lines(msg).encode('utf-8')
def _dict_key_flatten(self, data):
next_data = {}
@@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase):
def _get_nano_timestamp(self, results):
try:
- timestamp = results["benchmark"]["timestamp"]
+ timestamp = results["timestamp"]
except Exception:
timestamp = time.time()
return str(int(float(timestamp) * 1000000000))
- def _get_extended_tags(self, data):
- runner_info = self.runners_info[data["runner_id"]]
+ def _get_extended_tags(self, criteria):
tags = {
- "runner_id": data["runner_id"],
"task_id": self.task_id,
- "scenarios": runner_info["scenarios"]
+ "criteria": criteria
}
- if "host" in runner_info:
- tags["host"] = runner_info["host"]
- if "target" in runner_info:
- tags["target"] = runner_info["target"]
return tags
-
- def _data_to_line_protocol(self, data):
- msg = {}
- point = {
- "measurement": self.tc,
- "fields": self._dict_key_flatten(data["benchmark"]["data"]),
- "time": self._get_nano_timestamp(data),
- "tags": self._get_extended_tags(data),
- }
- msg["points"] = [point]
- msg["tags"] = self.static_tags
-
- return make_lines(msg).encode('utf-8')
-
- def record_result_data(self, data):
- LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data))
- self.raw_result.append(data)
- if self.target == '':
- # if the target was not set, do not do anything
- LOG.error('Dispatcher target was not set, no data will'
- 'be posted.')
- return -1
-
- if isinstance(data, dict) and "scenario_cfg" in data:
- self.tc = data["scenario_cfg"]["tc"]
- self.task_id = data["scenario_cfg"]["task_id"]
- scenario_cfg = data["scenario_cfg"]
- runner_id = data["runner_id"]
- self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]}
- if "host" in scenario_cfg:
- self.runners_info[runner_id]["host"] = scenario_cfg["host"]
- if "target" in scenario_cfg:
- self.runners_info[runner_id]["target"] = scenario_cfg["target"]
- return 0
-
- if self.tc == "":
- LOG.error('Test result : %s', jsonutils.dump_as_bytes(data))
- LOG.error('The case_name cannot be found, no data will be posted.')
- return -1
-
- try:
- line = self._data_to_line_protocol(data)
- LOG.debug('Test result line format : %s', line)
- res = requests.post(self.influxdb_url,
- data=line,
- auth=(self.username, self.password),
- timeout=self.timeout)
- if res.status_code != 204:
- LOG.error('Test result posting finished with status code'
- ' %d.', res.status_code)
- LOG.error(res.text)
-
- except Exception as err:
- LOG.exception('Failed to record result data: %s',
- err)
- return -1
- return 0
-
- def flush_result_data(self):
- LOG.debug('Test result all : %s',
- jsonutils.dump_as_bytes(self.raw_result))
- return 0