diff options
Diffstat (limited to 'yardstick/dispatcher')
-rw-r--r-- | yardstick/dispatcher/base.py | 10 | ||||
-rw-r--r-- | yardstick/dispatcher/file.py | 18 | ||||
-rw-r--r-- | yardstick/dispatcher/http.py | 91 | ||||
-rw-r--r-- | yardstick/dispatcher/influxdb.py | 138 |
4 files changed, 103 insertions, 154 deletions
diff --git a/yardstick/dispatcher/base.py b/yardstick/dispatcher/base.py index a1c858297..e77249c54 100644 --- a/yardstick/dispatcher/base.py +++ b/yardstick/dispatcher/base.py @@ -38,15 +38,13 @@ class Base(object): raise RuntimeError("No such dispatcher_type %s" % dispatcher_type) @staticmethod - def get(conf, config): + def get(config): """Returns instance of a dispatcher for dispatcher type. """ - return Base.get_cls(conf["type"])(conf, config) + out_type = config['DEFAULT']['dispatcher'] - @abc.abstractmethod - def record_result_data(self, data): - """Recording result data interface.""" + return Base.get_cls(out_type.capitalize())(config) @abc.abstractmethod - def flush_result_data(self): + def flush_result_data(self, data): """Flush result data into permanent storage media interface.""" diff --git a/yardstick/dispatcher/file.py b/yardstick/dispatcher/file.py index 8acd5dfbb..24fc22dd4 100644 --- a/yardstick/dispatcher/file.py +++ b/yardstick/dispatcher/file.py @@ -29,18 +29,10 @@ class FileDispatcher(DispatchBase): __dispatcher_type__ = "File" - def __init__(self, conf, config): + def __init__(self, conf): super(FileDispatcher, self).__init__(conf) - self.result = [] + self.target = conf['dispatcher_file'].get('file_path', + consts.DEFAULT_OUTPUT_FILE) - def record_result_data(self, data): - self.result.append(data) - - def flush_result_data(self): - file_path = self.conf.get('file_path', consts.DEFAULT_OUTPUT_FILE) - - res = utils.read_json_from_file(file_path).get('result') - res.extend(self.result) - - data = {'status': 0, 'result': res} - utils.write_json_to_file(file_path, data) + def flush_result_data(self, data): + utils.write_json_to_file(self.target, data) diff --git a/yardstick/dispatcher/http.py b/yardstick/dispatcher/http.py index 0d8d2a346..9bf9af33b 100644 --- a/yardstick/dispatcher/http.py +++ b/yardstick/dispatcher/http.py @@ -20,30 +20,15 @@ from __future__ import absolute_import import logging import os +from datetime import datetime from oslo_serialization import jsonutils import requests -from oslo_config import cfg from yardstick.dispatcher.base import Base as DispatchBase LOG = logging.getLogger(__name__) -CONF = cfg.CONF -http_dispatcher_opts = [ - cfg.StrOpt('target', - default=os.getenv('TARGET', 'http://127.0.0.1:8000/results'), - help='The target where the http request will be sent. ' - 'If this is not set, no data will be posted. For ' - 'example: target = http://hostname:1234/path'), - cfg.IntOpt('timeout', - default=5, - help='The max time in seconds to wait for a request to ' - 'timeout.'), -] - -CONF.register_opts(http_dispatcher_opts, group="dispatcher_http") - class HttpDispatcher(DispatchBase): """Dispatcher class for posting data into a http target. @@ -51,55 +36,61 @@ class HttpDispatcher(DispatchBase): __dispatcher_type__ = "Http" - def __init__(self, conf, config): + def __init__(self, conf): super(HttpDispatcher, self).__init__(conf) + http_conf = conf['dispatcher_http'] self.headers = {'Content-type': 'application/json'} - self.timeout = CONF.dispatcher_http.timeout - self.target = CONF.dispatcher_http.target - self.raw_result = [] - self.result = { - "project_name": "yardstick", - "description": "yardstick test cases result", - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "version": os.environ.get('YARDSTICK_VERSION', 'unknown'), - "build_tag": os.environ.get('BUILD_TAG') - } - - def record_result_data(self, data): - self.raw_result.append(data) + self.timeout = int(http_conf.get('timeout', 5)) + self.target = http_conf.get('target', 'http://127.0.0.1:8000/results') - def flush_result_data(self): + def flush_result_data(self, data): if self.target == '': # if the target was not set, do not do anything LOG.error('Dispatcher target was not set, no data will' 'be posted.') return - self.result["details"] = {'results': self.raw_result} - - case_name = "" - for v in self.raw_result: - if isinstance(v, dict) and "scenario_cfg" in v: - case_name = v["scenario_cfg"]["tc"] - break - if case_name == "": - LOG.error('Test result : %s', - jsonutils.dump_as_bytes(self.result)) - LOG.error('The case_name cannot be found, no data will be posted.') - return + result = data['result'] + self.info = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + self._upload_case_result(case, data) - self.result["case_name"] = case_name + def _upload_case_result(self, case, data): + try: + scenario_data = data.get('tc_data', [])[0] + except IndexError: + current_time = datetime.now() + else: + timestamp = float(scenario_data.get('timestamp', 0.0)) + current_time = datetime.fromtimestamp(timestamp) + + result = { + "project_name": "yardstick", + "case_name": case, + "description": "yardstick ci scenario status", + "scenario": self.info.get('deploy_scenario'), + "version": self.info.get('version'), + "pod_name": self.info.get('pod_name'), + "installer": self.info.get('installer'), + "build_tag": os.environ.get('BUILD_TAG'), + "criteria": data.get('criteria'), + "start_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "stop_date": current_time.strftime('%Y-%m-%d %H:%M:%S'), + "trust_indicator": "", + "details": "" + } try: - LOG.debug('Test result : %s', - jsonutils.dump_as_bytes(self.result)) + LOG.debug('Test result : %s', result) res = requests.post(self.target, - data=jsonutils.dump_as_bytes(self.result), + data=jsonutils.dump_as_bytes(result), headers=self.headers, timeout=self.timeout) LOG.debug('Test result posting finished with status code' ' %d.' % res.status_code) except Exception as err: - LOG.exception('Failed to record result data: %s', - err) + LOG.exception('Failed to record result data: %s', err) diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py index 53af79c71..373aae13a 100644 --- a/yardstick/dispatcher/influxdb.py +++ b/yardstick/dispatcher/influxdb.py @@ -10,13 +10,11 @@ from __future__ import absolute_import import logging -import os import time import collections import requests import six -from oslo_serialization import jsonutils from third_party.influxdb.influxdb_line_protocol import make_lines from yardstick.dispatcher.base import Base as DispatchBase @@ -30,28 +28,66 @@ class InfluxdbDispatcher(DispatchBase): __dispatcher_type__ = "Influxdb" - def __init__(self, conf, config): + def __init__(self, conf): super(InfluxdbDispatcher, self).__init__(conf) - db_conf = config['yardstick'].get('dispatcher_influxdb', {}) + db_conf = conf['dispatcher_influxdb'] self.timeout = int(db_conf.get('timeout', 5)) self.target = db_conf.get('target', 'http://127.0.0.1:8086') self.db_name = db_conf.get('db_name', 'yardstick') self.username = db_conf.get('username', 'root') self.password = db_conf.get('password', 'root') + self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name) - self.raw_result = [] - self.case_name = "" - self.tc = "" + self.task_id = -1 - self.runners_info = {} - self.static_tags = { - "pod_name": os.environ.get('NODE_NAME', 'unknown'), - "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), - "deploy_scenario": os.environ.get('DEPLOY_SCENARIO', 'unknown'), - "version": os.path.basename(os.environ.get('YARDSTICK_BRANCH', - 'unknown')) + def flush_result_data(self, data): + LOG.debug('Test result all : %s', data) + if self.target == '': + # if the target was not set, do not do anything + LOG.error('Dispatcher target was not set, no data will be posted.') + + result = data['result'] + self.tags = result['info'] + self.task_id = result['task_id'] + self.criteria = result['criteria'] + testcases = result['testcases'] + + for case, data in testcases.items(): + tc_criteria = data['criteria'] + for record in data['tc_data']: + self._upload_one_record(record, case, tc_criteria) + + return 0 + + def _upload_one_record(self, data, case, tc_criteria): + try: + line = self._data_to_line_protocol(data, case, tc_criteria) + LOG.debug('Test result line format : %s', line) + res = requests.post(self.influxdb_url, + data=line, + auth=(self.username, self.password), + timeout=self.timeout) + if res.status_code != 204: + LOG.error('Test result posting finished with status code' + ' %d.', res.status_code) + LOG.error(res.text) + + except Exception as err: + LOG.exception('Failed to record result data: %s', err) + + def _data_to_line_protocol(self, data, case, criteria): + msg = {} + point = { + "measurement": case, + "fields": self._dict_key_flatten(data["data"]), + "time": self._get_nano_timestamp(data), + "tags": self._get_extended_tags(criteria), } + msg["points"] = [point] + msg["tags"] = self.tags + + return make_lines(msg).encode('utf-8') def _dict_key_flatten(self, data): next_data = {} @@ -76,84 +112,16 @@ class InfluxdbDispatcher(DispatchBase): def _get_nano_timestamp(self, results): try: - timestamp = results["benchmark"]["timestamp"] + timestamp = results["timestamp"] except Exception: timestamp = time.time() return str(int(float(timestamp) * 1000000000)) - def _get_extended_tags(self, data): - runner_info = self.runners_info[data["runner_id"]] + def _get_extended_tags(self, criteria): tags = { - "runner_id": data["runner_id"], "task_id": self.task_id, - "scenarios": runner_info["scenarios"] + "criteria": criteria } - if "host" in runner_info: - tags["host"] = runner_info["host"] - if "target" in runner_info: - tags["target"] = runner_info["target"] return tags - - def _data_to_line_protocol(self, data): - msg = {} - point = { - "measurement": self.tc, - "fields": self._dict_key_flatten(data["benchmark"]["data"]), - "time": self._get_nano_timestamp(data), - "tags": self._get_extended_tags(data), - } - msg["points"] = [point] - msg["tags"] = self.static_tags - - return make_lines(msg).encode('utf-8') - - def record_result_data(self, data): - LOG.debug('Test result : %s', jsonutils.dump_as_bytes(data)) - self.raw_result.append(data) - if self.target == '': - # if the target was not set, do not do anything - LOG.error('Dispatcher target was not set, no data will' - 'be posted.') - return -1 - - if isinstance(data, dict) and "scenario_cfg" in data: - self.tc = data["scenario_cfg"]["tc"] - self.task_id = data["scenario_cfg"]["task_id"] - scenario_cfg = data["scenario_cfg"] - runner_id = data["runner_id"] - self.runners_info[runner_id] = {"scenarios": scenario_cfg["type"]} - if "host" in scenario_cfg: - self.runners_info[runner_id]["host"] = scenario_cfg["host"] - if "target" in scenario_cfg: - self.runners_info[runner_id]["target"] = scenario_cfg["target"] - return 0 - - if self.tc == "": - LOG.error('Test result : %s', jsonutils.dump_as_bytes(data)) - LOG.error('The case_name cannot be found, no data will be posted.') - return -1 - - try: - line = self._data_to_line_protocol(data) - LOG.debug('Test result line format : %s', line) - res = requests.post(self.influxdb_url, - data=line, - auth=(self.username, self.password), - timeout=self.timeout) - if res.status_code != 204: - LOG.error('Test result posting finished with status code' - ' %d.', res.status_code) - LOG.error(res.text) - - except Exception as err: - LOG.exception('Failed to record result data: %s', - err) - return -1 - return 0 - - def flush_result_data(self): - LOG.debug('Test result all : %s', - jsonutils.dump_as_bytes(self.raw_result)) - return 0 |