diff options
-rwxr-xr-x | ci/start_job.sh | 2 | ||||
-rw-r--r-- | rest_server.py | 10 | ||||
-rw-r--r-- | storperf/db/graphite_db.py | 111 | ||||
-rw-r--r-- | storperf/db/job_db.py | 5 | ||||
-rw-r--r-- | storperf/storperf_master.py | 12 | ||||
-rw-r--r-- | storperf/test_executor.py | 5 | ||||
-rw-r--r-- | storperf/utilities/data_handler.py | 17 | ||||
-rw-r--r-- | tests/utilities_tests/data_handler_test.py | 5 |
8 files changed, 40 insertions, 127 deletions
diff --git a/ci/start_job.sh b/ci/start_job.sh index 1a71735..b40abc9 100755 --- a/ci/start_job.sh +++ b/ci/start_job.sh @@ -13,7 +13,7 @@ cat << EOF > body.json "block_sizes": "${BLOCK_SIZE}", "nowarm": "string", "nossd": "string", - "deadline": 1200, + "deadline": 20, "queue_depths": "${QUEUE_DEPTH}", "workload": "${WORKLOAD}", "metadata": { diff --git a/rest_server.py b/rest_server.py index 5c84fdb..20c1783 100644 --- a/rest_server.py +++ b/rest_server.py @@ -7,20 +7,19 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -import io import json -import logging import logging.config import os -from storperf.db.job_db import JobDB -from storperf.plot.barchart import Barchart -from storperf.storperf_master import StorPerfMaster import sys from flask import abort, Flask, request, jsonify, send_from_directory from flask_restful import Resource, Api, fields from flask_restful_swagger import swagger +from storperf.db.job_db import JobDB +from storperf.plot.barchart import Barchart +from storperf.storperf_master import StorPerfMaster + app = Flask(__name__, static_url_path="") api = swagger.docs(Api(app), apiVersion='1.0') @@ -37,7 +36,6 @@ def send_swagger(path): def results_page(job_id): job_db = JobDB() - params = {} params = job_db.fetch_workload_params(job_id) diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py index 8be91c8..c8a2d35 100644 --- a/storperf/db/graphite_db.py +++ b/storperf/db/graphite_db.py @@ -1,7 +1,14 @@ -import calendar +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + import json import logging -import time import requests @@ -37,106 +44,6 @@ class GraphiteDB(object): return series - def fetch_averages(self, workload): - workload_executions = self._job_db.fetch_workloads(workload) - - # Create a map of job runs - workload_names = {} - for workload_execution in workload_executions: - name = '.'.join(workload_execution[0].split('.')[0:6]) - if name in workload_names: - workload_record = workload_names[name] - start = workload_record[0] - end = workload_record[1] - else: - start = None - end = None - - if start is None or workload_execution[1] < start: - start = workload_execution[1] - - if end is None or workload_execution[2] > end: - end = workload_execution[2] - - workload_names[name] = [start, end] - - averages = {} - - for io_type in ['read', 'write']: - for workload_name, times in workload_names.iteritems(): - workload_pattern = self.make_fullname_pattern(workload_name) - short_name = '.'.join(workload_name.split('.')[1:6]) - start = times[0] - end = times[1] - - if end is None: - end = str(calendar.timegm(time.gmtime())) - averages[short_name + ".duration"] = \ - (int(end) - int(start)) - - key = short_name + "." + io_type - - request = ("http://127.0.0.1:8000/render/?target=" - "averageSeries(%s.jobs.1.%s.lat.mean)" - "&format=json" - "&from=%s" - "&until=%s" % - (workload_pattern, io_type, start, end)) - self.logger.debug("Calling %s" % (request)) - - response = requests.get(request) - if (response.status_code == 200): - averages[key + ".latency"] = \ - self._average_results(json.loads(response.content)) - - request = ("http://127.0.0.1:8000/render/?target=" - "averageSeries(%s.jobs.1.%s.bw)" - "&format=json" - "&from=%s" - "&until=%s" % - (workload_pattern, io_type, start, end)) - self.logger.debug("Calling %s" % (request)) - - response = requests.get(request) - if (response.status_code == 200): - averages[key + ".throughput"] = \ - self._average_results(json.loads(response.content)) - - request = ("http://127.0.0.1:8000/render/?target=" - "averageSeries(%s.jobs.1.%s.iops)" - "&format=json" - "&from=%s" - "&until=%s" % - (workload_pattern, io_type, start, end)) - self.logger.debug("Calling %s" % (request)) - - response = requests.get(request) - if (response.status_code == 200): - averages[key + ".iops"] = \ - self._average_results(json.loads(response.content)) - - return averages - - def _average_results(self, results): - - for item in results: - datapoints = item['datapoints'] - - total = 0 - count = 0 - - for datapoint in datapoints: - if datapoint[0] is not None: - total += datapoint[0] - count += 1 - - if count > 0: - average = total / count - else: - average = total - - return average - def _series_results(self, results): series = [] diff --git a/storperf/db/job_db.py b/storperf/db/job_db.py index 05160ec..eabcb54 100644 --- a/storperf/db/job_db.py +++ b/storperf/db/job_db.py @@ -58,9 +58,9 @@ class JobDB(object): cursor.execute('''CREATE TABLE job_summary (job_id text, summary text)''') - self.logger.debug("Created job table") + self.logger.debug("Created job summary table") except OperationalError: - self.logger.debug("Job table exists") + self.logger.debug("Job summary table exists") cursor.execute('SELECT * FROM jobs') cursor.execute('SELECT * FROM job_params') @@ -250,6 +250,5 @@ class JobDB(object): if (row is None): break params[row[0]] = row[1] - db.close() return params diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index 7bbadae..6d9625c 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -8,6 +8,7 @@ ############################################################################## from datetime import datetime +import json import logging import os import socket @@ -22,7 +23,6 @@ from scp import SCPClient import heatclient.client as heatclient from storperf.db.configuration_db import ConfigurationDB -from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB from storperf.test_executor import TestExecutor @@ -323,8 +323,14 @@ class StorPerfMaster(object): return self._test_executor.terminate() def fetch_results(self, job_id): - graphite_db = GraphiteDB() - return graphite_db.fetch_averages(job_id) + if self._test_executor.job_db.job_id == job_id: + return self._test_executor.metadata['metrics'] + + workload_params = self.job_db.fetch_workload_params(job_id) + if 'report' in workload_params: + report = json.loads(workload_params['report']) + return report['metrics'] + return {} def fetch_metadata(self, job_id): return self.job_db.fetch_workload_params(job_id) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index d46e0c7..6b6316c 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -9,6 +9,7 @@ import copy import imp +import json import logging from os import listdir import os @@ -170,6 +171,7 @@ class TestExecutor(object): self.job_db.create_job_id() self.job_db.record_workload_params(metadata) self.metadata = metadata + self.metadata['metrics'] = {} self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") @@ -310,8 +312,11 @@ class TestExecutor(object): self.end_time = time.time() self._terminated = True + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) self.broadcast_event() self.unregister(data_handler.data_event) + self.job_db.job_id = None def execute_on_node(self, workload): diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index a4c9ae4..d95d6fa 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -37,6 +37,10 @@ class DataHandler(object): if executor.terminated: self._push_to_db(executor) else: + workload = '.'.join(executor.current_workload.split('.')[1:6]) + if 'metrics' not in executor.metadata: + executor.metadata['metrics'] = {} + steady_state = True metrics = {} for metric in ('lat.mean', 'iops', 'bw'): @@ -60,14 +64,15 @@ class DataHandler(object): math.slope(treated_data['slope_data']) metrics[metric][io_type]['range'] = \ math.range_value(treated_data['range_data']) - metrics[metric][io_type]['average'] = \ - math.average(treated_data['average_data']) + average = math.average(treated_data['average_data']) + metrics[metric][io_type]['average'] = average + + metrics_key = '%s.%s.%s' % (workload, io_type, metric) + executor.metadata['metrics'][metrics_key] = average if not steady: steady_state = False - workload = '.'.join(executor.current_workload.split('.')[1:6]) - if 'report_data' not in executor.metadata: executor.metadata['report_data'] = {} @@ -168,9 +173,7 @@ class DataHandler(object): payload['timestart'] = executor.start_time payload['duration'] = duration - graphite_db = GraphiteDB() - payload['metrics'] = graphite_db.fetch_averages( - executor.job_db.job_id) + if steady_state: criteria = 'PASS' else: diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py index 333bed0..4630d54 100644 --- a/tests/utilities_tests/data_handler_test.py +++ b/tests/utilities_tests/data_handler_test.py @@ -18,13 +18,8 @@ from storperf.utilities.data_handler import DataHandler class MockGraphiteDB(object): def __init__(self): - self.called = False self.series = [] - def fetch_averages(self, job_id): - self.called = True - return None - def fetch_series(self, job_id, timeframe): return self.series |