summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xci/start_job.sh2
-rw-r--r--rest_server.py10
-rw-r--r--storperf/db/graphite_db.py111
-rw-r--r--storperf/db/job_db.py5
-rw-r--r--storperf/storperf_master.py12
-rw-r--r--storperf/test_executor.py5
-rw-r--r--storperf/utilities/data_handler.py17
-rw-r--r--tests/utilities_tests/data_handler_test.py5
8 files changed, 40 insertions, 127 deletions
diff --git a/ci/start_job.sh b/ci/start_job.sh
index 1a71735..b40abc9 100755
--- a/ci/start_job.sh
+++ b/ci/start_job.sh
@@ -13,7 +13,7 @@ cat << EOF > body.json
"block_sizes": "${BLOCK_SIZE}",
"nowarm": "string",
"nossd": "string",
- "deadline": 1200,
+ "deadline": 20,
"queue_depths": "${QUEUE_DEPTH}",
"workload": "${WORKLOAD}",
"metadata": {
diff --git a/rest_server.py b/rest_server.py
index 5c84fdb..20c1783 100644
--- a/rest_server.py
+++ b/rest_server.py
@@ -7,20 +7,19 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import io
import json
-import logging
import logging.config
import os
-from storperf.db.job_db import JobDB
-from storperf.plot.barchart import Barchart
-from storperf.storperf_master import StorPerfMaster
import sys
from flask import abort, Flask, request, jsonify, send_from_directory
from flask_restful import Resource, Api, fields
from flask_restful_swagger import swagger
+from storperf.db.job_db import JobDB
+from storperf.plot.barchart import Barchart
+from storperf.storperf_master import StorPerfMaster
+
app = Flask(__name__, static_url_path="")
api = swagger.docs(Api(app), apiVersion='1.0')
@@ -37,7 +36,6 @@ def send_swagger(path):
def results_page(job_id):
job_db = JobDB()
- params = {}
params = job_db.fetch_workload_params(job_id)
diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py
index 8be91c8..c8a2d35 100644
--- a/storperf/db/graphite_db.py
+++ b/storperf/db/graphite_db.py
@@ -1,7 +1,14 @@
-import calendar
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
import json
import logging
-import time
import requests
@@ -37,106 +44,6 @@ class GraphiteDB(object):
return series
- def fetch_averages(self, workload):
- workload_executions = self._job_db.fetch_workloads(workload)
-
- # Create a map of job runs
- workload_names = {}
- for workload_execution in workload_executions:
- name = '.'.join(workload_execution[0].split('.')[0:6])
- if name in workload_names:
- workload_record = workload_names[name]
- start = workload_record[0]
- end = workload_record[1]
- else:
- start = None
- end = None
-
- if start is None or workload_execution[1] < start:
- start = workload_execution[1]
-
- if end is None or workload_execution[2] > end:
- end = workload_execution[2]
-
- workload_names[name] = [start, end]
-
- averages = {}
-
- for io_type in ['read', 'write']:
- for workload_name, times in workload_names.iteritems():
- workload_pattern = self.make_fullname_pattern(workload_name)
- short_name = '.'.join(workload_name.split('.')[1:6])
- start = times[0]
- end = times[1]
-
- if end is None:
- end = str(calendar.timegm(time.gmtime()))
- averages[short_name + ".duration"] = \
- (int(end) - int(start))
-
- key = short_name + "." + io_type
-
- request = ("http://127.0.0.1:8000/render/?target="
- "averageSeries(%s.jobs.1.%s.lat.mean)"
- "&format=json"
- "&from=%s"
- "&until=%s" %
- (workload_pattern, io_type, start, end))
- self.logger.debug("Calling %s" % (request))
-
- response = requests.get(request)
- if (response.status_code == 200):
- averages[key + ".latency"] = \
- self._average_results(json.loads(response.content))
-
- request = ("http://127.0.0.1:8000/render/?target="
- "averageSeries(%s.jobs.1.%s.bw)"
- "&format=json"
- "&from=%s"
- "&until=%s" %
- (workload_pattern, io_type, start, end))
- self.logger.debug("Calling %s" % (request))
-
- response = requests.get(request)
- if (response.status_code == 200):
- averages[key + ".throughput"] = \
- self._average_results(json.loads(response.content))
-
- request = ("http://127.0.0.1:8000/render/?target="
- "averageSeries(%s.jobs.1.%s.iops)"
- "&format=json"
- "&from=%s"
- "&until=%s" %
- (workload_pattern, io_type, start, end))
- self.logger.debug("Calling %s" % (request))
-
- response = requests.get(request)
- if (response.status_code == 200):
- averages[key + ".iops"] = \
- self._average_results(json.loads(response.content))
-
- return averages
-
- def _average_results(self, results):
-
- for item in results:
- datapoints = item['datapoints']
-
- total = 0
- count = 0
-
- for datapoint in datapoints:
- if datapoint[0] is not None:
- total += datapoint[0]
- count += 1
-
- if count > 0:
- average = total / count
- else:
- average = total
-
- return average
-
def _series_results(self, results):
series = []
diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py
index 05160ec..eabcb54 100644
--- a/storperf/db/job_db.py
+++ b/storperf/db/job_db.py
@@ -58,9 +58,9 @@ class JobDB(object):
cursor.execute('''CREATE TABLE job_summary
(job_id text,
summary text)''')
- self.logger.debug("Created job table")
+ self.logger.debug("Created job summary table")
except OperationalError:
- self.logger.debug("Job table exists")
+ self.logger.debug("Job summary table exists")
cursor.execute('SELECT * FROM jobs')
cursor.execute('SELECT * FROM job_params')
@@ -250,6 +250,5 @@ class JobDB(object):
if (row is None):
break
params[row[0]] = row[1]
-
db.close()
return params
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index 7bbadae..6d9625c 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -8,6 +8,7 @@
##############################################################################
from datetime import datetime
+import json
import logging
import os
import socket
@@ -22,7 +23,6 @@ from scp import SCPClient
import heatclient.client as heatclient
from storperf.db.configuration_db import ConfigurationDB
-from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.test_executor import TestExecutor
@@ -323,8 +323,14 @@ class StorPerfMaster(object):
return self._test_executor.terminate()
def fetch_results(self, job_id):
- graphite_db = GraphiteDB()
- return graphite_db.fetch_averages(job_id)
+ if self._test_executor.job_db.job_id == job_id:
+ return self._test_executor.metadata['metrics']
+
+ workload_params = self.job_db.fetch_workload_params(job_id)
+ if 'report' in workload_params:
+ report = json.loads(workload_params['report'])
+ return report['metrics']
+ return {}
def fetch_metadata(self, job_id):
return self.job_db.fetch_workload_params(job_id)
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index d46e0c7..6b6316c 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -9,6 +9,7 @@
import copy
import imp
+import json
import logging
from os import listdir
import os
@@ -170,6 +171,7 @@ class TestExecutor(object):
self.job_db.create_job_id()
self.job_db.record_workload_params(metadata)
self.metadata = metadata
+ self.metadata['metrics'] = {}
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
@@ -310,8 +312,11 @@ class TestExecutor(object):
self.end_time = time.time()
self._terminated = True
+ report = {'report': json.dumps(self.metadata)}
+ self.job_db.record_workload_params(report)
self.broadcast_event()
self.unregister(data_handler.data_event)
+ self.job_db.job_id = None
def execute_on_node(self, workload):
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index a4c9ae4..d95d6fa 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -37,6 +37,10 @@ class DataHandler(object):
if executor.terminated:
self._push_to_db(executor)
else:
+ workload = '.'.join(executor.current_workload.split('.')[1:6])
+ if 'metrics' not in executor.metadata:
+ executor.metadata['metrics'] = {}
+
steady_state = True
metrics = {}
for metric in ('lat.mean', 'iops', 'bw'):
@@ -60,14 +64,15 @@ class DataHandler(object):
math.slope(treated_data['slope_data'])
metrics[metric][io_type]['range'] = \
math.range_value(treated_data['range_data'])
- metrics[metric][io_type]['average'] = \
- math.average(treated_data['average_data'])
+ average = math.average(treated_data['average_data'])
+ metrics[metric][io_type]['average'] = average
+
+ metrics_key = '%s.%s.%s' % (workload, io_type, metric)
+ executor.metadata['metrics'][metrics_key] = average
if not steady:
steady_state = False
- workload = '.'.join(executor.current_workload.split('.')[1:6])
-
if 'report_data' not in executor.metadata:
executor.metadata['report_data'] = {}
@@ -168,9 +173,7 @@ class DataHandler(object):
payload['timestart'] = executor.start_time
payload['duration'] = duration
- graphite_db = GraphiteDB()
- payload['metrics'] = graphite_db.fetch_averages(
- executor.job_db.job_id)
+
if steady_state:
criteria = 'PASS'
else:
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
index 333bed0..4630d54 100644
--- a/tests/utilities_tests/data_handler_test.py
+++ b/tests/utilities_tests/data_handler_test.py
@@ -18,13 +18,8 @@ from storperf.utilities.data_handler import DataHandler
class MockGraphiteDB(object):
def __init__(self):
- self.called = False
self.series = []
- def fetch_averages(self, job_id):
- self.called = True
- return None
-
def fetch_series(self, job_id, timeframe):
return self.series