aboutsummaryrefslogtreecommitdiffstats
path: root/api/utils
diff options
context:
space:
mode:
Diffstat (limited to 'api/utils')
-rw-r--r--api/utils/common.py22
-rw-r--r--api/utils/daemonthread.py23
-rw-r--r--api/utils/influx.py46
3 files changed, 40 insertions, 51 deletions
diff --git a/api/utils/common.py b/api/utils/common.py
index e3e64a72b..1c800ce49 100644
--- a/api/utils/common.py
+++ b/api/utils/common.py
@@ -6,6 +6,7 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from __future__ import absolute_import
import collections
import logging
@@ -13,18 +14,19 @@ from flask import jsonify
from api.utils.daemonthread import DaemonThread
from yardstick.cmd.cli import YardstickCLI
+import six
logger = logging.getLogger(__name__)
-def translate_to_str(object):
- if isinstance(object, collections.Mapping):
- return {str(k): translate_to_str(v) for k, v in object.items()}
- elif isinstance(object, list):
- return [translate_to_str(ele) for ele in object]
- elif isinstance(object, unicode):
- return str(object)
- return object
+def translate_to_str(obj):
+ if isinstance(obj, collections.Mapping):
+ return {str(k): translate_to_str(v) for k, v in obj.items()}
+ elif isinstance(obj, list):
+ return [translate_to_str(ele) for ele in obj]
+ elif isinstance(obj, six.text_type):
+ return str(obj)
+ return obj
def get_command_list(command_list, opts, args):
@@ -40,8 +42,8 @@ def get_command_list(command_list, opts, args):
return command_list
-def exec_command_task(command_list, task_id): # pragma: no cover
- daemonthread = DaemonThread(YardstickCLI().api, (command_list, task_id))
+def exec_command_task(command_list, task_dict): # pragma: no cover
+ daemonthread = DaemonThread(YardstickCLI().api, (command_list, task_dict))
daemonthread.start()
diff --git a/api/utils/daemonthread.py b/api/utils/daemonthread.py
index 47c0b9108..0049834eb 100644
--- a/api/utils/daemonthread.py
+++ b/api/utils/daemonthread.py
@@ -6,13 +6,13 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from __future__ import absolute_import
import threading
import os
-import datetime
import errno
from api import conf
-from api.utils.influx import write_data_tasklist
+from api.database.handlers import TasksHandler
class DaemonThread(threading.Thread):
@@ -21,19 +21,24 @@ class DaemonThread(threading.Thread):
super(DaemonThread, self).__init__(target=method, args=args)
self.method = method
self.command_list = args[0]
- self.task_id = args[1]
+ self.task_dict = args[1]
def run(self):
- timestamp = datetime.datetime.now()
+ self.task_dict['status'] = 0
+ task_id = self.task_dict['task_id']
try:
- write_data_tasklist(self.task_id, timestamp, 0)
- self.method(self.command_list, self.task_id)
- write_data_tasklist(self.task_id, timestamp, 1)
+ task_handler = TasksHandler()
+ task = task_handler.insert(self.task_dict)
+
+ self.method(self.command_list, task_id)
+
+ task_handler.update_status(task, 1)
except Exception as e:
- write_data_tasklist(self.task_id, timestamp, 2, error=str(e))
+ task_handler.update_status(task, 2)
+ task_handler.update_error(task, str(e))
finally:
- _handle_testsuite_file(self.task_id)
+ _handle_testsuite_file(task_id)
def _handle_testsuite_file(task_id):
diff --git a/api/utils/influx.py b/api/utils/influx.py
index 9366ed3e9..275c63a24 100644
--- a/api/utils/influx.py
+++ b/api/utils/influx.py
@@ -6,11 +6,13 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from __future__ import absolute_import
+
import logging
-from urlparse import urlsplit
+import six.moves.configparser as ConfigParser
+from six.moves.urllib.parse import urlsplit
from influxdb import InfluxDBClient
-import ConfigParser
from api import conf
@@ -21,46 +23,26 @@ def get_data_db_client():
parser = ConfigParser.ConfigParser()
try:
parser.read(conf.OUTPUT_CONFIG_FILE_PATH)
- dispatcher = parser.get('DEFAULT', 'dispatcher')
- if 'influxdb' != dispatcher:
+ if 'influxdb' != parser.get('DEFAULT', 'dispatcher'):
raise RuntimeError
- ip = _get_ip(parser.get('dispatcher_influxdb', 'target'))
- username = parser.get('dispatcher_influxdb', 'username')
- password = parser.get('dispatcher_influxdb', 'password')
- db_name = parser.get('dispatcher_influxdb', 'db_name')
- return InfluxDBClient(ip, conf.PORT, username, password, db_name)
+ return _get_client(parser)
except ConfigParser.NoOptionError:
logger.error('can not find the key')
raise
-def _get_ip(url):
- return urlsplit(url).hostname
-
-
-def _write_data(measurement, field, timestamp, tags):
- point = {
- 'measurement': measurement,
- 'fields': field,
- 'time': timestamp,
- 'tags': tags
- }
-
- try:
- client = get_data_db_client()
-
- logger.debug('Start to write data: %s', point)
- client.write_points([point])
- except RuntimeError:
- logger.debug('dispatcher is not influxdb')
+def _get_client(parser):
+ ip = _get_ip(parser.get('dispatcher_influxdb', 'target'))
+ username = parser.get('dispatcher_influxdb', 'username')
+ password = parser.get('dispatcher_influxdb', 'password')
+ db_name = parser.get('dispatcher_influxdb', 'db_name')
+ return InfluxDBClient(ip, conf.PORT, username, password, db_name)
-def write_data_tasklist(task_id, timestamp, status, error=''):
- field = {'status': status, 'error': error}
- tags = {'task_id': task_id}
- _write_data('tasklist', field, timestamp, tags)
+def _get_ip(url):
+ return urlsplit(url).hostname
def query(query_sql):