aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorQiLiang <liangqi1@huawei.com>2015-12-23 22:13:15 +0800
committerAna Cunha <ana.cunha@ericsson.com>2016-01-08 11:07:11 +0000
commit262f007f06da281e1f92018cb87e9abe2432e644 (patch)
tree8819353c38ac5cabbd4e1686cbd0ac5fc70d399a
parent93fb39a9436064c0b0fd85e3d69b23ceed4c1f1a (diff)
Initial InfluxDB dispatcher
Supports: - Basic influxDB write with timestamp - Add general result format func - Add UT TODO: - refine database schema (e.g. add more tags) plan in another patch JIRA: YARDSTICK-212 Change-Id: I1526568bbd850f1343135420ec59ed1b833bb99f Signed-off-by: QiLiang <liangqi1@huawei.com> (cherry picked from commit 99ba990d4a01c0f3f4837f11a24b695f4a2393d2)
-rw-r--r--etc/yardstick/yardstick.conf.sample5
-rw-r--r--tests/unit/dispatcher/__init__.py0
-rw-r--r--tests/unit/dispatcher/test_influxdb.py122
-rw-r--r--tests/unit/dispatcher/test_influxdb_line_protocol.py55
-rw-r--r--yardstick/dispatcher/influxdb.py135
-rw-r--r--yardstick/dispatcher/influxdb_line_protocol.py114
6 files changed, 431 insertions, 0 deletions
diff --git a/etc/yardstick/yardstick.conf.sample b/etc/yardstick/yardstick.conf.sample
index 82326dd1b..63462c573 100644
--- a/etc/yardstick/yardstick.conf.sample
+++ b/etc/yardstick/yardstick.conf.sample
@@ -11,3 +11,8 @@
# file_path = /tmp/yardstick.out
# max_bytes = 0
# backup_count = 0
+
+[dispatcher_influxdb]
+# timeout = 5
+# target = http://127.0.0.1:8086
+# db_name = yardstick
diff --git a/tests/unit/dispatcher/__init__.py b/tests/unit/dispatcher/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/tests/unit/dispatcher/__init__.py
diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py
new file mode 100644
index 000000000..3989f5889
--- /dev/null
+++ b/tests/unit/dispatcher/test_influxdb.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python
+
+##############################################################################
+# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# Unittest for yardstick.dispatcher.influxdb
+
+import mock
+import unittest
+
+from yardstick.dispatcher.influxdb import InfluxdbDispatcher
+
+class InfluxdbDispatcherTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.data1 = {
+ "runner_id": 8921,
+ "context_cfg": {
+ "host": {
+ "ip": "10.229.43.154",
+ "key_filename": "/root/yardstick/yardstick/resources/files/yardstick_key",
+ "name": "kvm.LF",
+ "user": "root"
+ },
+ "target": {
+ "ipaddr": "10.229.44.134"
+ }
+ },
+ "scenario_cfg": {
+ "runner": {
+ "interval": 1,
+ "object": "yardstick.benchmark.scenarios.networking.ping.Ping",
+ "output_filename": "/tmp/yardstick.out",
+ "runner_id": 8921,
+ "duration": 10,
+ "type": "Duration"
+ },
+ "host": "kvm.LF",
+ "type": "Ping",
+ "target": "10.229.44.134",
+ "sla": {
+ "action": "monitor",
+ "max_rtt": 10
+ }
+ }
+ }
+ self.data2 = {
+ "benchmark": {
+ "timestamp": "1451478117.883505",
+ "errors": "",
+ "data": {
+ "rtt": 0.613
+ },
+ "sequence": 1
+ },
+ "runner_id": 8921
+ }
+ self.data3 ={
+ "benchmark": {
+ "data": {
+ "mpstat": {
+ "cpu0": {
+ "%sys": "0.00",
+ "%idle": "99.00"
+ },
+ "loadavg": [
+ "1.09",
+ "0.29"
+ ]
+ },
+ "rtt": "1.03"
+ }
+ }
+ }
+
+ def test_record_result_data_no_target(self):
+ influxdb = InfluxdbDispatcher(None)
+ influxdb.target = ''
+ self.assertEqual(influxdb.record_result_data(self.data1), -1)
+
+ def test_record_result_data_no_case_name(self):
+ influxdb = InfluxdbDispatcher(None)
+ self.assertEqual(influxdb.record_result_data(self.data2), -1)
+
+ @mock.patch('yardstick.dispatcher.influxdb.requests')
+ def test_record_result_data(self, mock_requests):
+ type(mock_requests.post.return_value).status_code = 204
+ influxdb = InfluxdbDispatcher(None)
+ self.assertEqual(influxdb.record_result_data(self.data1), 0)
+ self.assertEqual(influxdb.record_result_data(self.data2), 0)
+ self.assertEqual(influxdb.flush_result_data(), 0)
+
+ def test__dict_key_flatten(self):
+ line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00'
+ influxdb = InfluxdbDispatcher(None)
+ flattened_data = influxdb._dict_key_flatten(self.data3['benchmark']['data'])
+ result = ",".join([k+"="+v for k, v in flattened_data.items()])
+ self.assertEqual(result, line)
+
+ def test__get_nano_timestamp(self):
+ influxdb = InfluxdbDispatcher(None)
+ results = {'benchmark': {'timestamp': '1451461248.925574'}}
+ self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144')
+
+ @mock.patch('yardstick.dispatcher.influxdb.time')
+ def test__get_nano_timestamp_except(self, mock_time):
+ results = {}
+ influxdb = InfluxdbDispatcher(None)
+ mock_time.time.return_value = 1451461248.925574
+ self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144')
+
+def main():
+ unittest.main()
+
+if __name__ == '__main__':
+ main()
diff --git a/tests/unit/dispatcher/test_influxdb_line_protocol.py b/tests/unit/dispatcher/test_influxdb_line_protocol.py
new file mode 100644
index 000000000..cb05bf4d2
--- /dev/null
+++ b/tests/unit/dispatcher/test_influxdb_line_protocol.py
@@ -0,0 +1,55 @@
+# Unittest for yardstick.dispatcher.influxdb_line_protocol
+
+# yardstick comment: this file is a modified copy of
+# influxdb-python/influxdb/tests/test_line_protocol.py
+
+import unittest
+from yardstick.dispatcher.influxdb_line_protocol import make_lines
+
+
+class TestLineProtocol(unittest.TestCase):
+
+ def test_make_lines(self):
+ data = {
+ "tags": {
+ "empty_tag": "",
+ "none_tag": None,
+ "integer_tag": 2,
+ "string_tag": "hello"
+ },
+ "points": [
+ {
+ "measurement": "test",
+ "fields": {
+ "string_val": "hello!",
+ "int_val": 1,
+ "float_val": 1.1,
+ "none_field": None,
+ "bool_val": True,
+ }
+ }
+ ]
+ }
+
+ self.assertEqual(
+ make_lines(data),
+ 'test,integer_tag=2,string_tag=hello '
+ 'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n'
+ )
+
+ def test_string_val_newline(self):
+ data = {
+ "points": [
+ {
+ "measurement": "m1",
+ "fields": {
+ "multi_line": "line1\nline1\nline3"
+ }
+ }
+ ]
+ }
+
+ self.assertEqual(
+ make_lines(data),
+ 'm1 multi_line="line1\\nline1\\nline3"\n'
+ )
diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py
new file mode 100644
index 000000000..c58054167
--- /dev/null
+++ b/yardstick/dispatcher/influxdb.py
@@ -0,0 +1,135 @@
+##############################################################################
+# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import json
+import logging
+import requests
+import time
+
+from oslo_config import cfg
+
+from yardstick.dispatcher.base import Base as DispatchBase
+from yardstick.dispatcher.influxdb_line_protocol import make_lines
+
+LOG = logging.getLogger(__name__)
+
+CONF = cfg.CONF
+influx_dispatcher_opts = [
+ cfg.StrOpt('target',
+ default='http://127.0.0.1:8086',
+ help='The target where the http request will be sent. '
+ 'If this is not set, no data will be posted. For '
+ 'example: target = http://hostname:1234/path'),
+ cfg.StrOpt('db_name',
+ default='yardstick',
+ help='The database name to store test results.'),
+ cfg.IntOpt('timeout',
+ default=5,
+ help='The max time in seconds to wait for a request to '
+ 'timeout.'),
+]
+
+CONF.register_opts(influx_dispatcher_opts, group="dispatcher_influxdb")
+
+
+class InfluxdbDispatcher(DispatchBase):
+ """Dispatcher class for posting data into an influxdb target.
+ """
+
+ __dispatcher_type__ = "Influxdb"
+
+ def __init__(self, conf):
+ super(InfluxdbDispatcher, self).__init__(conf)
+ self.timeout = CONF.dispatcher_influxdb.timeout
+ self.target = CONF.dispatcher_influxdb.target
+ self.db_name = CONF.dispatcher_influxdb.db_name
+ self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name)
+ self.raw_result = []
+ self.case_name = ""
+ self.static_tags = {
+ "pod_name": os.environ.get('POD_NAME', 'unknown'),
+ "installer": os.environ.get('INSTALLER_TYPE', 'unknown'),
+ "version": os.environ.get('YARDSTICK_VERSION', 'unknown')
+ }
+
+ def _dict_key_flatten(self, data):
+ next_data = {}
+
+ if not [v for v in data.values()
+ if type(v) == dict or type(v) == list]:
+ return data
+
+ for k, v in data.iteritems():
+ if type(v) == dict:
+ for n_k, n_v in v.iteritems():
+ next_data["%s.%s" % (k, n_k)] = n_v
+ elif type(v) == list:
+ for index, item in enumerate(v):
+ next_data["%s%d" % (k, index)] = item
+ else:
+ next_data[k] = v
+
+ return self._dict_key_flatten(next_data)
+
+ def _get_nano_timestamp(self, results):
+ try:
+ timestamp = results["benchmark"]["timestamp"]
+ except Exception:
+ timestamp = time.time()
+
+ return str(int(float(timestamp) * 1000000000))
+
+ def _data_to_line_protocol(self, data):
+ msg = {}
+ point = {}
+ point["measurement"] = self.case_name
+ point["fields"] = self._dict_key_flatten(data["benchmark"]["data"])
+ point["time"] = self._get_nano_timestamp(data)
+ msg["points"] = [point]
+ msg["tags"] = self.static_tags
+
+ return make_lines(msg).encode('utf-8')
+
+ def record_result_data(self, data):
+ LOG.debug('Test result : %s' % json.dumps(data))
+ self.raw_result.append(data)
+ if self.target == '':
+ # if the target was not set, do not do anything
+ LOG.error('Dispatcher target was not set, no data will'
+ 'be posted.')
+ return -1
+
+ if isinstance(data, dict) and "scenario_cfg" in data:
+ self.case_name = data["scenario_cfg"]["type"]
+ return 0
+
+ if self.case_name == "":
+ LOG.error('Test result : %s' % json.dumps(data))
+ LOG.error('The case_name cannot be found, no data will be posted.')
+ return -1
+
+ try:
+ line = self._data_to_line_protocol(data)
+ LOG.debug('Test result line format : %s' % line)
+ res = requests.post(self.influxdb_url,
+ data=line,
+ timeout=self.timeout)
+ if res.status_code != 204:
+ LOG.error('Test result posting finished with status code'
+ ' %d.' % res.status_code)
+ except Exception as err:
+ LOG.exception('Failed to record result data: %s',
+ err)
+ return -1
+ return 0
+
+ def flush_result_data(self):
+ LOG.debug('Test result all : %s' % json.dumps(self.raw_result))
+ return 0
diff --git a/yardstick/dispatcher/influxdb_line_protocol.py b/yardstick/dispatcher/influxdb_line_protocol.py
new file mode 100644
index 000000000..3e830ed5e
--- /dev/null
+++ b/yardstick/dispatcher/influxdb_line_protocol.py
@@ -0,0 +1,114 @@
+# yardstick comment: this file is a modified copy of
+# influxdb-python/influxdb/line_protocol.py
+
+from __future__ import unicode_literals
+from copy import copy
+
+from six import binary_type, text_type, integer_types
+
+
+def _escape_tag(tag):
+ tag = _get_unicode(tag, force=True)
+ return tag.replace(
+ "\\", "\\\\"
+ ).replace(
+ " ", "\\ "
+ ).replace(
+ ",", "\\,"
+ ).replace(
+ "=", "\\="
+ )
+
+
+def _escape_value(value):
+ value = _get_unicode(value)
+ if isinstance(value, text_type) and value != '':
+ return "\"{}\"".format(
+ value.replace(
+ "\"", "\\\""
+ ).replace(
+ "\n", "\\n"
+ )
+ )
+ elif isinstance(value, integer_types) and not isinstance(value, bool):
+ return str(value) + 'i'
+ else:
+ return str(value)
+
+
+def _get_unicode(data, force=False):
+ """
+ Try to return a text aka unicode object from the given data.
+ """
+ if isinstance(data, binary_type):
+ return data.decode('utf-8')
+ elif data is None:
+ return ''
+ elif force:
+ return str(data)
+ else:
+ return data
+
+
+def make_lines(data):
+ """
+ Extracts the points from the given dict and returns a Unicode string
+ matching the line protocol introduced in InfluxDB 0.9.0.
+
+ line protocol format:
+ <measurement>[,<tag-key>=<tag-value>...] <field-key>=<field-value>\
+ [,<field2-key>=<field2-value>...] [unix-nano-timestamp]
+
+ Ref:
+ https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html
+ https://influxdb.com/docs/v0.9/write_protocols/line.html
+ """
+ lines = []
+ static_tags = data.get('tags', None)
+ for point in data['points']:
+ elements = []
+
+ # add measurement name
+ measurement = _escape_tag(_get_unicode(
+ point.get('measurement', data.get('measurement'))
+ ))
+ key_values = [measurement]
+
+ # add tags
+ if static_tags is None:
+ tags = point.get('tags', {})
+ else:
+ tags = copy(static_tags)
+ tags.update(point.get('tags', {}))
+
+ # tags should be sorted client-side to take load off server
+ for tag_key in sorted(tags.keys()):
+ key = _escape_tag(tag_key)
+ value = _escape_tag(tags[tag_key])
+
+ if key != '' and value != '':
+ key_values.append("{key}={value}".format(key=key, value=value))
+ key_values = ','.join(key_values)
+ elements.append(key_values)
+
+ # add fields
+ field_values = []
+ for field_key in sorted(point['fields'].keys()):
+ key = _escape_tag(field_key)
+ value = _escape_value(point['fields'][field_key])
+ if key != '' and value != '':
+ field_values.append("{key}={value}".format(
+ key=key,
+ value=value
+ ))
+ field_values = ','.join(field_values)
+ elements.append(field_values)
+
+ # add timestamp
+ if 'time' in point:
+ elements.append(point['time'])
+
+ line = ' '.join(elements)
+ lines.append(line)
+ lines = '\n'.join(lines)
+ return lines + '\n'