diff options
Diffstat (limited to 'api/utils')
-rw-r--r-- | api/utils/common.py | 22 | ||||
-rw-r--r-- | api/utils/daemonthread.py | 23 | ||||
-rw-r--r-- | api/utils/influx.py | 46 |
3 files changed, 40 insertions, 51 deletions
diff --git a/api/utils/common.py b/api/utils/common.py index e3e64a72b..1c800ce49 100644 --- a/api/utils/common.py +++ b/api/utils/common.py @@ -6,6 +6,7 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from __future__ import absolute_import import collections import logging @@ -13,18 +14,19 @@ from flask import jsonify from api.utils.daemonthread import DaemonThread from yardstick.cmd.cli import YardstickCLI +import six logger = logging.getLogger(__name__) -def translate_to_str(object): - if isinstance(object, collections.Mapping): - return {str(k): translate_to_str(v) for k, v in object.items()} - elif isinstance(object, list): - return [translate_to_str(ele) for ele in object] - elif isinstance(object, unicode): - return str(object) - return object +def translate_to_str(obj): + if isinstance(obj, collections.Mapping): + return {str(k): translate_to_str(v) for k, v in obj.items()} + elif isinstance(obj, list): + return [translate_to_str(ele) for ele in obj] + elif isinstance(obj, six.text_type): + return str(obj) + return obj def get_command_list(command_list, opts, args): @@ -40,8 +42,8 @@ def get_command_list(command_list, opts, args): return command_list -def exec_command_task(command_list, task_id): # pragma: no cover - daemonthread = DaemonThread(YardstickCLI().api, (command_list, task_id)) +def exec_command_task(command_list, task_dict): # pragma: no cover + daemonthread = DaemonThread(YardstickCLI().api, (command_list, task_dict)) daemonthread.start() diff --git a/api/utils/daemonthread.py b/api/utils/daemonthread.py index 47c0b9108..0049834eb 100644 --- a/api/utils/daemonthread.py +++ b/api/utils/daemonthread.py @@ -6,13 +6,13 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from __future__ import absolute_import import threading import os -import datetime import errno from api import conf -from api.utils.influx import write_data_tasklist +from api.database.handlers import TasksHandler class DaemonThread(threading.Thread): @@ -21,19 +21,24 @@ class DaemonThread(threading.Thread): super(DaemonThread, self).__init__(target=method, args=args) self.method = method self.command_list = args[0] - self.task_id = args[1] + self.task_dict = args[1] def run(self): - timestamp = datetime.datetime.now() + self.task_dict['status'] = 0 + task_id = self.task_dict['task_id'] try: - write_data_tasklist(self.task_id, timestamp, 0) - self.method(self.command_list, self.task_id) - write_data_tasklist(self.task_id, timestamp, 1) + task_handler = TasksHandler() + task = task_handler.insert(self.task_dict) + + self.method(self.command_list, task_id) + + task_handler.update_status(task, 1) except Exception as e: - write_data_tasklist(self.task_id, timestamp, 2, error=str(e)) + task_handler.update_status(task, 2) + task_handler.update_error(task, str(e)) finally: - _handle_testsuite_file(self.task_id) + _handle_testsuite_file(task_id) def _handle_testsuite_file(task_id): diff --git a/api/utils/influx.py b/api/utils/influx.py index 9366ed3e9..275c63a24 100644 --- a/api/utils/influx.py +++ b/api/utils/influx.py @@ -6,11 +6,13 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from __future__ import absolute_import + import logging -from urlparse import urlsplit +import six.moves.configparser as ConfigParser +from six.moves.urllib.parse import urlsplit from influxdb import InfluxDBClient -import ConfigParser from api import conf @@ -21,46 +23,26 @@ def get_data_db_client(): parser = ConfigParser.ConfigParser() try: parser.read(conf.OUTPUT_CONFIG_FILE_PATH) - dispatcher = parser.get('DEFAULT', 'dispatcher') - if 'influxdb' != dispatcher: + if 'influxdb' != parser.get('DEFAULT', 'dispatcher'): raise RuntimeError - ip = _get_ip(parser.get('dispatcher_influxdb', 'target')) - username = parser.get('dispatcher_influxdb', 'username') - password = parser.get('dispatcher_influxdb', 'password') - db_name = parser.get('dispatcher_influxdb', 'db_name') - return InfluxDBClient(ip, conf.PORT, username, password, db_name) + return _get_client(parser) except ConfigParser.NoOptionError: logger.error('can not find the key') raise -def _get_ip(url): - return urlsplit(url).hostname - - -def _write_data(measurement, field, timestamp, tags): - point = { - 'measurement': measurement, - 'fields': field, - 'time': timestamp, - 'tags': tags - } - - try: - client = get_data_db_client() - - logger.debug('Start to write data: %s', point) - client.write_points([point]) - except RuntimeError: - logger.debug('dispatcher is not influxdb') +def _get_client(parser): + ip = _get_ip(parser.get('dispatcher_influxdb', 'target')) + username = parser.get('dispatcher_influxdb', 'username') + password = parser.get('dispatcher_influxdb', 'password') + db_name = parser.get('dispatcher_influxdb', 'db_name') + return InfluxDBClient(ip, conf.PORT, username, password, db_name) -def write_data_tasklist(task_id, timestamp, status, error=''): - field = {'status': status, 'error': error} - tags = {'task_id': task_id} - _write_data('tasklist', field, timestamp, tags) +def _get_ip(url): + return urlsplit(url).hostname def query(query_sql): |