From 99ba990d4a01c0f3f4837f11a24b695f4a2393d2 Mon Sep 17 00:00:00 2001 From: QiLiang Date: Wed, 23 Dec 2015 22:13:15 +0800 Subject: Initial InfluxDB dispatcher Supports: - Basic influxDB write with timestamp - Add general result format func - Add UT TODO: - refine database schema (e.g. add more tags) plan in another patch JIRA: YARDSTICK-212 Change-Id: I1526568bbd850f1343135420ec59ed1b833bb99f Signed-off-by: QiLiang --- etc/yardstick/yardstick.conf.sample | 5 + tests/unit/dispatcher/__init__.py | 0 tests/unit/dispatcher/test_influxdb.py | 122 +++++++++++++++++++ .../unit/dispatcher/test_influxdb_line_protocol.py | 55 +++++++++ yardstick/dispatcher/influxdb.py | 135 +++++++++++++++++++++ yardstick/dispatcher/influxdb_line_protocol.py | 114 +++++++++++++++++ 6 files changed, 431 insertions(+) create mode 100644 tests/unit/dispatcher/__init__.py create mode 100644 tests/unit/dispatcher/test_influxdb.py create mode 100644 tests/unit/dispatcher/test_influxdb_line_protocol.py create mode 100644 yardstick/dispatcher/influxdb.py create mode 100644 yardstick/dispatcher/influxdb_line_protocol.py diff --git a/etc/yardstick/yardstick.conf.sample b/etc/yardstick/yardstick.conf.sample index 82326dd1b..63462c573 100644 --- a/etc/yardstick/yardstick.conf.sample +++ b/etc/yardstick/yardstick.conf.sample @@ -11,3 +11,8 @@ # file_path = /tmp/yardstick.out # max_bytes = 0 # backup_count = 0 + +[dispatcher_influxdb] +# timeout = 5 +# target = http://127.0.0.1:8086 +# db_name = yardstick diff --git a/tests/unit/dispatcher/__init__.py b/tests/unit/dispatcher/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/unit/dispatcher/test_influxdb.py b/tests/unit/dispatcher/test_influxdb.py new file mode 100644 index 000000000..3989f5889 --- /dev/null +++ b/tests/unit/dispatcher/test_influxdb.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python + +############################################################################## +# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +# Unittest for yardstick.dispatcher.influxdb + +import mock +import unittest + +from yardstick.dispatcher.influxdb import InfluxdbDispatcher + +class InfluxdbDispatcherTestCase(unittest.TestCase): + + def setUp(self): + self.data1 = { + "runner_id": 8921, + "context_cfg": { + "host": { + "ip": "10.229.43.154", + "key_filename": "/root/yardstick/yardstick/resources/files/yardstick_key", + "name": "kvm.LF", + "user": "root" + }, + "target": { + "ipaddr": "10.229.44.134" + } + }, + "scenario_cfg": { + "runner": { + "interval": 1, + "object": "yardstick.benchmark.scenarios.networking.ping.Ping", + "output_filename": "/tmp/yardstick.out", + "runner_id": 8921, + "duration": 10, + "type": "Duration" + }, + "host": "kvm.LF", + "type": "Ping", + "target": "10.229.44.134", + "sla": { + "action": "monitor", + "max_rtt": 10 + } + } + } + self.data2 = { + "benchmark": { + "timestamp": "1451478117.883505", + "errors": "", + "data": { + "rtt": 0.613 + }, + "sequence": 1 + }, + "runner_id": 8921 + } + self.data3 ={ + "benchmark": { + "data": { + "mpstat": { + "cpu0": { + "%sys": "0.00", + "%idle": "99.00" + }, + "loadavg": [ + "1.09", + "0.29" + ] + }, + "rtt": "1.03" + } + } + } + + def test_record_result_data_no_target(self): + influxdb = InfluxdbDispatcher(None) + influxdb.target = '' + self.assertEqual(influxdb.record_result_data(self.data1), -1) + + def test_record_result_data_no_case_name(self): + influxdb = InfluxdbDispatcher(None) + self.assertEqual(influxdb.record_result_data(self.data2), -1) + + @mock.patch('yardstick.dispatcher.influxdb.requests') + def test_record_result_data(self, mock_requests): + type(mock_requests.post.return_value).status_code = 204 + influxdb = InfluxdbDispatcher(None) + self.assertEqual(influxdb.record_result_data(self.data1), 0) + self.assertEqual(influxdb.record_result_data(self.data2), 0) + self.assertEqual(influxdb.flush_result_data(), 0) + + def test__dict_key_flatten(self): + line = 'mpstat.loadavg1=0.29,rtt=1.03,mpstat.loadavg0=1.09,mpstat.cpu0.%idle=99.00,mpstat.cpu0.%sys=0.00' + influxdb = InfluxdbDispatcher(None) + flattened_data = influxdb._dict_key_flatten(self.data3['benchmark']['data']) + result = ",".join([k+"="+v for k, v in flattened_data.items()]) + self.assertEqual(result, line) + + def test__get_nano_timestamp(self): + influxdb = InfluxdbDispatcher(None) + results = {'benchmark': {'timestamp': '1451461248.925574'}} + self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') + + @mock.patch('yardstick.dispatcher.influxdb.time') + def test__get_nano_timestamp_except(self, mock_time): + results = {} + influxdb = InfluxdbDispatcher(None) + mock_time.time.return_value = 1451461248.925574 + self.assertEqual(influxdb._get_nano_timestamp(results), '1451461248925574144') + +def main(): + unittest.main() + +if __name__ == '__main__': + main() diff --git a/tests/unit/dispatcher/test_influxdb_line_protocol.py b/tests/unit/dispatcher/test_influxdb_line_protocol.py new file mode 100644 index 000000000..cb05bf4d2 --- /dev/null +++ b/tests/unit/dispatcher/test_influxdb_line_protocol.py @@ -0,0 +1,55 @@ +# Unittest for yardstick.dispatcher.influxdb_line_protocol + +# yardstick comment: this file is a modified copy of +# influxdb-python/influxdb/tests/test_line_protocol.py + +import unittest +from yardstick.dispatcher.influxdb_line_protocol import make_lines + + +class TestLineProtocol(unittest.TestCase): + + def test_make_lines(self): + data = { + "tags": { + "empty_tag": "", + "none_tag": None, + "integer_tag": 2, + "string_tag": "hello" + }, + "points": [ + { + "measurement": "test", + "fields": { + "string_val": "hello!", + "int_val": 1, + "float_val": 1.1, + "none_field": None, + "bool_val": True, + } + } + ] + } + + self.assertEqual( + make_lines(data), + 'test,integer_tag=2,string_tag=hello ' + 'bool_val=True,float_val=1.1,int_val=1i,string_val="hello!"\n' + ) + + def test_string_val_newline(self): + data = { + "points": [ + { + "measurement": "m1", + "fields": { + "multi_line": "line1\nline1\nline3" + } + } + ] + } + + self.assertEqual( + make_lines(data), + 'm1 multi_line="line1\\nline1\\nline3"\n' + ) diff --git a/yardstick/dispatcher/influxdb.py b/yardstick/dispatcher/influxdb.py new file mode 100644 index 000000000..c58054167 --- /dev/null +++ b/yardstick/dispatcher/influxdb.py @@ -0,0 +1,135 @@ +############################################################################## +# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import os +import json +import logging +import requests +import time + +from oslo_config import cfg + +from yardstick.dispatcher.base import Base as DispatchBase +from yardstick.dispatcher.influxdb_line_protocol import make_lines + +LOG = logging.getLogger(__name__) + +CONF = cfg.CONF +influx_dispatcher_opts = [ + cfg.StrOpt('target', + default='http://127.0.0.1:8086', + help='The target where the http request will be sent. ' + 'If this is not set, no data will be posted. For ' + 'example: target = http://hostname:1234/path'), + cfg.StrOpt('db_name', + default='yardstick', + help='The database name to store test results.'), + cfg.IntOpt('timeout', + default=5, + help='The max time in seconds to wait for a request to ' + 'timeout.'), +] + +CONF.register_opts(influx_dispatcher_opts, group="dispatcher_influxdb") + + +class InfluxdbDispatcher(DispatchBase): + """Dispatcher class for posting data into an influxdb target. + """ + + __dispatcher_type__ = "Influxdb" + + def __init__(self, conf): + super(InfluxdbDispatcher, self).__init__(conf) + self.timeout = CONF.dispatcher_influxdb.timeout + self.target = CONF.dispatcher_influxdb.target + self.db_name = CONF.dispatcher_influxdb.db_name + self.influxdb_url = "%s/write?db=%s" % (self.target, self.db_name) + self.raw_result = [] + self.case_name = "" + self.static_tags = { + "pod_name": os.environ.get('POD_NAME', 'unknown'), + "installer": os.environ.get('INSTALLER_TYPE', 'unknown'), + "version": os.environ.get('YARDSTICK_VERSION', 'unknown') + } + + def _dict_key_flatten(self, data): + next_data = {} + + if not [v for v in data.values() + if type(v) == dict or type(v) == list]: + return data + + for k, v in data.iteritems(): + if type(v) == dict: + for n_k, n_v in v.iteritems(): + next_data["%s.%s" % (k, n_k)] = n_v + elif type(v) == list: + for index, item in enumerate(v): + next_data["%s%d" % (k, index)] = item + else: + next_data[k] = v + + return self._dict_key_flatten(next_data) + + def _get_nano_timestamp(self, results): + try: + timestamp = results["benchmark"]["timestamp"] + except Exception: + timestamp = time.time() + + return str(int(float(timestamp) * 1000000000)) + + def _data_to_line_protocol(self, data): + msg = {} + point = {} + point["measurement"] = self.case_name + point["fields"] = self._dict_key_flatten(data["benchmark"]["data"]) + point["time"] = self._get_nano_timestamp(data) + msg["points"] = [point] + msg["tags"] = self.static_tags + + return make_lines(msg).encode('utf-8') + + def record_result_data(self, data): + LOG.debug('Test result : %s' % json.dumps(data)) + self.raw_result.append(data) + if self.target == '': + # if the target was not set, do not do anything + LOG.error('Dispatcher target was not set, no data will' + 'be posted.') + return -1 + + if isinstance(data, dict) and "scenario_cfg" in data: + self.case_name = data["scenario_cfg"]["type"] + return 0 + + if self.case_name == "": + LOG.error('Test result : %s' % json.dumps(data)) + LOG.error('The case_name cannot be found, no data will be posted.') + return -1 + + try: + line = self._data_to_line_protocol(data) + LOG.debug('Test result line format : %s' % line) + res = requests.post(self.influxdb_url, + data=line, + timeout=self.timeout) + if res.status_code != 204: + LOG.error('Test result posting finished with status code' + ' %d.' % res.status_code) + except Exception as err: + LOG.exception('Failed to record result data: %s', + err) + return -1 + return 0 + + def flush_result_data(self): + LOG.debug('Test result all : %s' % json.dumps(self.raw_result)) + return 0 diff --git a/yardstick/dispatcher/influxdb_line_protocol.py b/yardstick/dispatcher/influxdb_line_protocol.py new file mode 100644 index 000000000..3e830ed5e --- /dev/null +++ b/yardstick/dispatcher/influxdb_line_protocol.py @@ -0,0 +1,114 @@ +# yardstick comment: this file is a modified copy of +# influxdb-python/influxdb/line_protocol.py + +from __future__ import unicode_literals +from copy import copy + +from six import binary_type, text_type, integer_types + + +def _escape_tag(tag): + tag = _get_unicode(tag, force=True) + return tag.replace( + "\\", "\\\\" + ).replace( + " ", "\\ " + ).replace( + ",", "\\," + ).replace( + "=", "\\=" + ) + + +def _escape_value(value): + value = _get_unicode(value) + if isinstance(value, text_type) and value != '': + return "\"{}\"".format( + value.replace( + "\"", "\\\"" + ).replace( + "\n", "\\n" + ) + ) + elif isinstance(value, integer_types) and not isinstance(value, bool): + return str(value) + 'i' + else: + return str(value) + + +def _get_unicode(data, force=False): + """ + Try to return a text aka unicode object from the given data. + """ + if isinstance(data, binary_type): + return data.decode('utf-8') + elif data is None: + return '' + elif force: + return str(data) + else: + return data + + +def make_lines(data): + """ + Extracts the points from the given dict and returns a Unicode string + matching the line protocol introduced in InfluxDB 0.9.0. + + line protocol format: + [,=...] =\ + [,=...] [unix-nano-timestamp] + + Ref: + https://influxdb.com/docs/v0.9/write_protocols/write_syntax.html + https://influxdb.com/docs/v0.9/write_protocols/line.html + """ + lines = [] + static_tags = data.get('tags', None) + for point in data['points']: + elements = [] + + # add measurement name + measurement = _escape_tag(_get_unicode( + point.get('measurement', data.get('measurement')) + )) + key_values = [measurement] + + # add tags + if static_tags is None: + tags = point.get('tags', {}) + else: + tags = copy(static_tags) + tags.update(point.get('tags', {})) + + # tags should be sorted client-side to take load off server + for tag_key in sorted(tags.keys()): + key = _escape_tag(tag_key) + value = _escape_tag(tags[tag_key]) + + if key != '' and value != '': + key_values.append("{key}={value}".format(key=key, value=value)) + key_values = ','.join(key_values) + elements.append(key_values) + + # add fields + field_values = [] + for field_key in sorted(point['fields'].keys()): + key = _escape_tag(field_key) + value = _escape_value(point['fields'][field_key]) + if key != '' and value != '': + field_values.append("{key}={value}".format( + key=key, + value=value + )) + field_values = ','.join(field_values) + elements.append(field_values) + + # add timestamp + if 'time' in point: + elements.append(point['time']) + + line = ' '.join(elements) + lines.append(line) + lines = '\n'.join(lines) + return lines + '\n' -- cgit 1.2.3-korg