aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/dispatcher
diff options
context:
space:
mode:
authorQiLiang <liangqi1@huawei.com>2015-12-23 22:13:15 +0800
committerqi liang <liangqi1@huawei.com>2016-01-07 03:53:37 +0000
commit99ba990d4a01c0f3f4837f11a24b695f4a2393d2 (patch)
tree68bfa1b7c80080a294f61a3b2bddbbbdfe96957f /yardstick/dispatcher
parentffdd523055d2395d1216c5fa0007ed6af0b6146e (diff)
Initial InfluxDB dispatcher
Supports: - Basic influxDB write with timestamp - Add general result format func - Add UT TODO: - refine database schema (e.g. add more tags) plan in another patch JIRA: YARDSTICK-212 Change-Id: I1526568bbd850f1343135420ec59ed1b833bb99f Signed-off-by: QiLiang <liangqi1@huawei.com>
Diffstat (limited to 'yardstick/dispatcher')
-rw-r--r--yardstick/dispatcher/influxdb.py135
-rw-r--r--yardstick/dispatcher/influxdb_line_protocol.py114
2 files changed, 249 insertions, 0 deletions
diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py
new file mode 100644
index 000000000..c58054167
--- /dev/null
+++ b/yardstick/dispatcher/influxdb.py
@@ -0,0 +1,135 @@
+##############################################################################
+# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import json
+import logging
+import requests
+import time
+
+from oslo_config import cfg
+
+from yardstick.dispatcher.base import Base as DispatchBase
+from yardstick.dispatcher.influxdb_line_protocol import make_lines
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+influx_dispatcher_opts = [
+ cfg.StrOpt('target',
+ default='http://127.0.0.1:8086',
+ help='The target where the http request will be sent. '
+ 'If this is not set, no data will be posted. For '
+ 'example: target = http://hostname:1234/path'),
+ cfg.StrOpt('db_name',
+ default='yardstick',
+ help='The database name to store test results.'),
+ cfg.IntOpt('timeout',
+ default=5,
+ help='The max time in seconds to wait for a request to '
+ 'timeout.'),
+]
+
+CONF.register_opts(influx_dispatcher_opts, group="dispatcher_influxdb")
+
+
+class InfluxdbDispatcher(DispatchBase):
+ """Dispatcher class for posting data into an influxdb target.
+ """
+
+ __dispatcher_type__ = "Influxdb"
+
+ def __init__(self, conf):
+ super(InfluxdbDispatcher, self).__init__(conf)
+ self.timeout = CONF.dispatcher_influxdb.timeout
+ self.target = CONF.dispatcher_influxdb.target
+ self.db_name = CONF.dispatcher_influxdb.db_name
+ self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
+ self.raw_result = []
+ self.case_name = ""
+ self.static_tags = {
+ "pod_name": os.environ.get('POD_NAME', 'unknown'),
+ "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
+ "version": os.environ.get('YARDSTICK_VERSION', 'unknown')
+ }
+
+ def _dict_key_flatten(self, data):
+ next_data = {}
+
+ if not [v for v in data.values()
+ if type(v) == dict or type(v) == list]:
+ return data
+
+ for k, v in data.iteritems():
+ if type(v) == dict:
+ for n_k, n_v in v.iteritems():
+ next_data["%s.%s" % (k, n_k)] = n_v
+ elif type(v) == list:
+ for index, item in enumerate(v):
+ next_data["%s%d" % (k, index)] = item
+ else:
+ next_data[k] = v
+
+ return self._dict_key_flatten(next_data)
+
+ def _get_nano_timestamp(self, results):
+ try:
+ timestamp = results["benchmark"]["timestamp"]
+ except Exception:
+ timestamp = time.time()
+
+ return str(int(float(timestamp) * 1000000000))
+
+ def _data_to_line_protocol(self, data):
+ msg = {}
+ point = {}
+ point["measurement"] = self.case_name
+ point["fields"] = self._dict_key_flatten(data["benchmark"]["data"])
+ point["time"] = self._get_nano_timestamp(data)
+ msg["points"] = [point]
+ msg["tags"] = self.static_tags
+
+ return make_lines(msg).encode('utf-8')
+
+ def record_result_data(self, data):
+ LOG.debug('Test result : %s' % json.dumps(data))
+ self.raw_result.append(data)
+ if self.target == '':
+ # if the target was not set, do not do anything
+ LOG.error('Dispatcher target was not set, no data will'
+ 'be posted.')
+ return -1
+
+ if isinstance(data, dict) and "scenario_cfg" in data:
+ self.case_name = data["scenario_cfg"]["type"]
+ return 0
+
+ if self.case_name == "":
+ LOG.error('Test result : %s' % json.dumps(data))
+ LOG.error('The case_name cannot be found, no data will be posted.')
+ return -1
+
+ try:
+ line = self._data_to_line_protocol(data)
+ LOG.debug('Test result line format : %s' % line)
+ res = requests.post(self.influxdb_url,
+ data=line,
+ timeout=self.timeout)
+ if res.status_code != 204:
+ LOG.error('Test result posting finished with status code'
+ ' %d.' % res.status_code)
+ except Exception as err:
+ LOG.exception('Failed to record result data: %s',
+ err)
+ return -1
+ return 0
+
+ def flush_result_data(self):
+ LOG.debug('Test result all : %s' % json.dumps(self.raw_result))
+ return 0
diff --git a/yardstick/dispatcher/influxdb_line_protocol.py b/yardstick/dispatcher/influxdb_line_protocol.py
new file mode 100644
index 000000000..3e830ed5e
--- /dev/null
+++ b/yardstick/dispatcher/influxdb_line_protocol.py
@@ -0,0 +1,114 @@
+# yardstick comment: this file is a modified copy of
+# influxdb-python/influxdb/line_protocol.py
+
+from __future__ import unicode_literals
+from copy import copy
+
+from six import binary_type, text_type, integer_types
+
+
+def _escape_tag(tag):
+ tag = _get_unicode(tag, force=True)
+ return tag.replace(
+ "\\", "\\\\"
+ ).replace(
+ " ", "\\ "
+ ).replace(
+ ",", "\\,"
+ ).replace(
+ "=", "\\="
+ )
+
+
+def _escape_value(value):
+ value = _get_unicode(value)
+ if isinstance(value, text_type) and value != '':
+ return "\"{}\"".format(
+ value.replace(
+ "\"", "\\\""
+ ).replace(
+ "\n", "\\n"
+ )
+ )
+ elif isinstance(value, integer_types) and not isinstance(value, bool):
+ return str(value) + 'i'
+ else:
+ return str(value)
+
+
+def _get_unicode(data, force=False):
+ """
+ Try to return a text aka unicode object from the given data.
+ """
+ if isinstance(data, binary_type):
+ return data.decode('utf-8')
+ elif data is None:
+ return ''
+ elif force:
+ return str(data)
+ else:
+ return data
+
+
+def make_lines(data):
+ """
+ Extracts the points from the given dict and returns a Unicode string
+ matching the line protocol introduced in InfluxDB 0.9.0.
+
+ line protocol format:
+ <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>\
+ [,<field2-key>=<field2-value>...] [unix-nano-timestamp]
+
+ Ref:
+ https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html
+ https://influxdb.com/docs/v0.9/write_protocols/line.html
+ """
+ lines = []
+ static_tags = data.get('tags', None)
+ for point in data['points']:
+ elements = []
+
+ # add measurement name
+ measurement = _escape_tag(_get_unicode(
+ point.get('measurement', data.get('measurement'))
+ ))
+ key_values = [measurement]
+
+ # add tags
+ if static_tags is None:
+ tags = point.get('tags', {})
+ else:
+ tags = copy(static_tags)
+ tags.update(point.get('tags', {}))
+
+ # tags should be sorted client-side to take load off server
+ for tag_key in sorted(tags.keys()):
+ key = _escape_tag(tag_key)
+ value = _escape_tag(tags[tag_key])
+
+ if key != '' and value != '':
+ key_values.append("{key}={value}".format(key=key, value=value))
+ key_values = ','.join(key_values)
+ elements.append(key_values)
+
+ # add fields
+ field_values = []
+ for field_key in sorted(point['fields'].keys()):
+ key = _escape_tag(field_key)
+ value = _escape_value(point['fields'][field_key])
+ if key != '' and value != '':
+ field_values.append("{key}={value}".format(
+ key=key,
+ value=value
+ ))
+ field_values = ','.join(field_values)
+ elements.append(field_values)
+
+ # add timestamp
+ if 'time' in point:
+ elements.append(point['time'])
+
+ line = ' '.join(elements)
+ lines.append(line)
+ lines = '\n'.join(lines)
+ return lines + '\n'