From 9f5b776c1b5a54d2ca5942424111f3ff55d5737c Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Fri, 2 Dec 2016 22:25:48 -0500 Subject: Steady state detection Detection of steady state after 10+ samples of data Change-Id: I29368b06188c6370d17b3d567fece6486d171235 JIRA: STORPERF-72 STORPERF-73 Signed-off-by: Mark Beierl --- rest_server.py | 10 +- storperf/carbon/emitter.py | 5 +- storperf/db/graphite_db.py | 36 +++++- storperf/db/test_results_db.py | 3 +- storperf/fio/fio_invoker.py | 23 +++- storperf/logging.json | 29 ++--- storperf/storperf_master.py | 4 +- storperf/test_executor.py | 34 ++++-- storperf/utilities/data_handler.py | 91 ++++++++++++++- storperf/utilities/math.py | 7 +- storperf/utilities/steady_state.py | 9 ++ storperf/utilities/thread_gate.py | 5 + storperf/workloads/_base_workload.py | 2 +- tests/carbon_tests/emitter_test.py | 18 ++- tests/db_tests/graphite_db_test.py | 59 +++++++++- tests/utilities_tests/data_handler_test.py | 170 +++++++++++++++++++++++++++-- tests/utilities_tests/math_slope_test.py | 5 + 17 files changed, 447 insertions(+), 63 deletions(-) diff --git a/rest_server.py b/rest_server.py index d852bbb..40b9b77 100644 --- a/rest_server.py +++ b/rest_server.py @@ -12,15 +12,15 @@ import json import logging import logging.config import os +from storperf.db.job_db import JobDB +from storperf.plot.barchart import Barchart +from storperf.storperf_master import StorPerfMaster +import sys from flask import abort, Flask, request, jsonify, send_from_directory from flask_restful import Resource, Api, fields from flask_restful_swagger import swagger -from storperf.db.job_db import JobDB -from storperf.plot.barchart import Barchart -from storperf.storperf_master import StorPerfMaster - app = Flask(__name__, static_url_path="") api = swagger.docs(Api(app), apiVersion='1.0') @@ -372,6 +372,8 @@ prior to running any further tests, ] ) def delete(self): + self.logger.info("Threads: %s" % sys._current_frames()) + print sys._current_frames() try: return jsonify({'Slaves': storperf.terminate_workloads()}) except Exception as e: diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index c9af8a6..e23dc79 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -22,9 +22,8 @@ class CarbonMetricTransmitter(): def transmit_metrics(self, metrics): if 'timestamp' in metrics: - timestamp = metrics.pop('timestamp') - else: - timestamp = str(calendar.timegm(time.gmtime())) + metrics.pop('timestamp') + timestamp = str(calendar.timegm(time.gmtime())) carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) carbon_socket.connect((self.carbon_host, self.carbon_port)) diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py index c44d2aa..8be91c8 100644 --- a/storperf/db/graphite_db.py +++ b/storperf/db/graphite_db.py @@ -1,4 +1,3 @@ -from storperf.db.job_db import JobDB import calendar import json import logging @@ -6,6 +5,8 @@ import time import requests +from storperf.db.job_db import JobDB + class GraphiteDB(object): @@ -15,6 +16,27 @@ class GraphiteDB(object): self._job_db = JobDB() self.logger = logging.getLogger(__name__) + def fetch_series(self, workload, metric, io_type, time, duration): + + series = [] + end = time + start = end - duration + + request = ("http://127.0.0.1:8000/render/?target=" + "averageSeries(%s.*.jobs.1.%s.%s)" + "&format=json" + "&from=%s" + "&until=%s" % + (workload, io_type, metric, + start, end)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + series = self._series_results(json.loads(response.content)) + + return series + def fetch_averages(self, workload): workload_executions = self._job_db.fetch_workloads(workload) @@ -115,6 +137,18 @@ class GraphiteDB(object): return average + def _series_results(self, results): + + series = [] + + for item in results: + datapoints = item['datapoints'] + for datapoint in datapoints: + if datapoint[0] is not None: + series.append([datapoint[1], datapoint[0]]) + + return series + def make_fullname_pattern(self, workload): parts = workload.split('.') wildcards_needed = 7 - len(parts) diff --git a/storperf/db/test_results_db.py b/storperf/db/test_results_db.py index 4ee7a52..049b0b8 100644 --- a/storperf/db/test_results_db.py +++ b/storperf/db/test_results_db.py @@ -44,7 +44,8 @@ def push_results_to_db(db_url, project, case_name, headers = {'Content-Type': 'application/json'} try: if logger: - logger.debug("Pushing results to %s" % (url)) + logger.info("Pushing results to %s" % (url)) + logger.debug("Parameters: %s" % params) r = requests.post(url, data=json.dumps(params), headers=headers) if logger: logger.debug(r) diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 315b243..2febf25 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -54,12 +54,14 @@ class FIOInvoker(object): for event_listener in self.event_listeners: try: + self.logger.debug( + "Event listener callback") event_listener(self.callback_id, json_metric) except Exception, e: self.logger.exception( "Notifying listener %s: %s", self.callback_id, e) - self.logger.info( + self.logger.debug( "Event listener callback complete") except Exception, e: self.logger.error("Error parsing JSON: %s", e) @@ -78,7 +80,6 @@ class FIOInvoker(object): self.logger.debug("Finished") def execute(self, args=[]): - ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.remote_host, username='storperf', @@ -87,7 +88,12 @@ class FIOInvoker(object): command = "sudo ./fio " + ' '.join(args) self.logger.debug("Remote command: %s" % command) - (_, stdout, stderr) = ssh.exec_command(command) + + chan = ssh.get_transport().open_session(timeout=None) + chan.settimeout(None) + chan.exec_command(command) + stdout = chan.makefile('r', -1) + stderr = chan.makefile_stderr('r', -1) tout = Thread(target=self.stdout_handler, args=(stdout,), name="%s stdout" % self._remote_host) @@ -100,9 +106,18 @@ class FIOInvoker(object): terr.start() self.logger.info("Started fio on " + self.remote_host) + exit_status = chan.recv_exit_status() + self.logger.info("Finished fio on %s with exit code %s" % + (self.remote_host, exit_status)) + + stdout.close() + stderr.close() + + self.logger.debug("Joining stderr handler") terr.join() + self.logger.debug("Joining stdout handler") tout.join() - self.logger.info("Finished fio on " + self.remote_host) + self.logger.debug("Ended") def terminate(self): self.logger.debug("Terminating fio on " + self.remote_host) diff --git a/storperf/logging.json b/storperf/logging.json index 6d6026e..74df494 100644 --- a/storperf/logging.json +++ b/storperf/logging.json @@ -37,26 +37,15 @@ }, "loggers": { - "my_module": { - "level": "ERROR", - "handlers": ["console"], - "propagate": "no" + "": { + "level": "INFO", + "handlers": ["console", "file_handler", "error_file_handler"] + }, + "storperf": { + "level": "DEBUG" + }, + "storperf.carbon.emitter": { + "level": "INFO" } - }, - - "root": { - "level": "WARN", - "handlers": ["console", "file_handler", "error_file_handler"] - }, - - "storperf": { - "level": "DEBUG", - "handlers": ["console", "file_handler", "error_file_handler"] - }, - - "storperf.carbon.emitter": { - "level": "INFO", - "handlers": ["console", "file_handler", "error_file_handler"] } - } \ No newline at end of file diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index 35cba72..2975afd 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -378,9 +378,9 @@ class StorPerfMaster(object): (_, stdout, stderr) = ssh.exec_command(cmd) for line in stdout.readlines(): - logger.debug(line.decode('utf-8').strip()) + logger.debug(line.strip()) for line in stderr.readlines(): - logger.error(line.decode('utf-8').strip()) + logger.error(line.strip()) def _make_parameters(self): heat_parameters = {} diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 530ce80..8350e43 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker +from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from utilities.data_handler import DataHandler class UnknownWorkload(Exception): @@ -41,6 +41,7 @@ class TestExecutor(object): self.metadata = {} self.start_time = None self.end_time = None + self.current_workload = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -106,7 +107,7 @@ class TestExecutor(object): try: event_listener(self) except Exception, e: - self.logger.error("Notifying listener: %s", e) + self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): self.workload_modules = [] @@ -166,15 +167,17 @@ class TestExecutor(object): self.job_db.record_workload_params(metadata) self.metadata = metadata self._workload_thread = Thread(target=self.execute_workloads, - args=()) + args=(), + name="Workload thread") self._workload_thread.start() return self.job_db.job_id def terminate(self): self._terminated = True - return self.terminate_current_run() + self.end_time = time.time() + return self._terminate_current_run() - def terminate_current_run(self): + def _terminate_current_run(self): self.logger.info("Terminating current run") terminated_hosts = [] for workload in self._workload_executors: @@ -222,14 +225,23 @@ class TestExecutor(object): for blocksize in blocksizes: for iodepth in iodepths: - scheduler = sched.scheduler(time.time, time.sleep) if self._terminated: return + self.current_workload = ( + "%s.%s.queue-depth.%s.block-size.%s" % + (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) + self.logger.info("Starting run %s" % self.current_workload) + + scheduler = sched.scheduler(time.time, time.sleep) if self.deadline is not None \ and not workload_name.startswith("_"): event = scheduler.enter(self.deadline * 60, 1, - self.terminate_current_run, ()) + self._terminate_current_run, + ()) t = Thread(target=scheduler.run, args=()) t.start() @@ -244,13 +256,16 @@ class TestExecutor(object): self._workload_executors.append(slave_workload) t = Thread(target=self.execute_on_node, - args=(slave_workload,)) + args=(slave_workload,), + name="%s worker" % slave) t.daemon = False t.start() slave_threads.append(t) for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) if not scheduler.empty(): try: @@ -258,7 +273,10 @@ class TestExecutor(object): except: pass + self.logger.info("Completed run %s" % + self.current_workload) self._workload_executors = [] + self.current_workload = None self.logger.info("Completed workload %s" % (workload_name)) self.logger.info("Completed job %s" % (self.job_db.job_id)) diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index 03c764c..ebf715a 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -9,27 +9,112 @@ import logging import os +from time import sleep +import time from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB +from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState class DataHandler(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.samples = 11 """ """ def data_event(self, executor): - self.logger.info("Event received") - - # Data lookup + self.logger.debug("Event received") if executor.terminated: self._push_to_db(executor) + else: + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + steady = self._evaluate_prior_data(series) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + metrics[metric][io_type]['average'] = \ + math.average(treated_data['average_data']) + + if not steady: + steady_state = False + + executor.metadata['report_data'] = metrics + executor.metadata['steady_state'] = steady_state + + if steady_state: + executor.terminate() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (self.samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _evaluate_prior_data(self, data_series): + self.logger.debug("Data series: %s" % data_series) + if len(data_series) == 0: + return False + earliest_timestamp = data_series[0][0] + latest_timestamp = data_series[-1][0] + duration = latest_timestamp - earliest_timestamp + if (duration < 60 * self.samples): + self.logger.debug("Only %s minutes of samples, ignoring" % + (duration / 60,)) + return False + + return SteadyState.steady_state(data_series) def _push_to_db(self, executor): test_db = os.environ.get('TEST_DB_URL') diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py index a11ec19..4ddddca 100644 --- a/storperf/utilities/math.py +++ b/storperf/utilities/math.py @@ -52,8 +52,11 @@ def slope(data_series): sum_yi_xi += xi * yi sum_yi += yi - beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \ - (sum_xi**2 - m * sum_xi_sq) # The slope + over = (sum_xi**2 - m * sum_xi_sq) + if over == 0: + beta2 = None # Infinite slope + else: + beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if # needed diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py index 233bc78..0bbe21e 100644 --- a/storperf/utilities/steady_state.py +++ b/storperf/utilities/steady_state.py @@ -6,6 +6,8 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import logging + from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import math as math @@ -22,6 +24,8 @@ def steady_state(data_series): has been reached with the data that is passed to it. """ + logger = logging.getLogger('storperf.utilities.steady_state') + # Pre conditioning the data to match the algorithms treated_data = DataTreatment.data_treatment(data_series) @@ -39,6 +43,11 @@ def steady_state(data_series): steady_state = range_condition and slope_condition + logger.debug("Range %s < %s?" % (abs(range_value), + (0.20 * abs(average_value)))) + logger.debug("Slope %s < %s?" % (abs(slope_value), + (0.10 * abs(average_value)))) + logger.debug("Steady State? %s" % steady_state) else: steady_state = False diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py index b0dde50..295b8be 100644 --- a/storperf/utilities/thread_gate.py +++ b/storperf/utilities/thread_gate.py @@ -55,4 +55,9 @@ class ThreadGate(object): k, time_since_last_report) ready = False + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + return ready diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index 874e99c..3896cc6 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -23,7 +23,7 @@ class _base_workload(object): 'bs': '64k', 'iodepth': '1', 'numjobs': '1', - 'loops': '1', + 'loops': '20', 'output-format': 'json', 'status-interval': '60' } diff --git a/tests/carbon_tests/emitter_test.py b/tests/carbon_tests/emitter_test.py index fe19ed2..7f61049 100644 --- a/tests/carbon_tests/emitter_test.py +++ b/tests/carbon_tests/emitter_test.py @@ -7,14 +7,16 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from storperf.carbon import converter -from storperf.carbon.emitter import CarbonMetricTransmitter -from time import sleep import SocketServer import json +from storperf.carbon import converter +from storperf.carbon.emitter import CarbonMetricTransmitter import threading +from time import sleep, strptime import unittest +import mock + class MetricsHandler(SocketServer.BaseRequestHandler): @@ -42,10 +44,14 @@ class CarbonMetricTransmitterTest(unittest.TestCase): t.setDaemon(True) t.start() - def test_transmit_metrics(self): + @mock.patch("time.gmtime") + def test_transmit_metrics(self, mock_time): + + mock_time.return_value = strptime("30 Nov 00", "%d %b %y") testconv = converter.Converter() - json_object = json.loads("""{"timestamp" : "12345", "key":"value" }""") + json_object = json.loads( + """{"timestamp" : "975542400", "key":"value" }""") result = testconv.convert_json_to_flat(json_object, "host.run-name") emitter = CarbonMetricTransmitter() @@ -58,7 +64,7 @@ class CarbonMetricTransmitterTest(unittest.TestCase): count += 1 sleep(0.1) - self.assertEqual("host.run-name.key value 12345\n", + self.assertEqual("host.run-name.key value 975542400\n", CarbonMetricTransmitterTest.response, CarbonMetricTransmitterTest.response) diff --git a/tests/db_tests/graphite_db_test.py b/tests/db_tests/graphite_db_test.py index e13545b..d4c6fb6 100644 --- a/tests/db_tests/graphite_db_test.py +++ b/tests/db_tests/graphite_db_test.py @@ -7,9 +7,19 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from storperf.db.graphite_db import GraphiteDB import unittest +import mock + +from storperf.db.graphite_db import GraphiteDB + + +class MockResponse(): + + def __init__(self): + self.content = "" + self.status_code = 200 + class GraphiteDBTest(unittest.TestCase): @@ -32,6 +42,53 @@ class GraphiteDBTest(unittest.TestCase): # self.graphdb.fetch_averages(u'32d31724-fac1-44f3-9033-ca8e00066a36') pass + @mock.patch("requests.get") + def test_fetch_series(self, mock_requests): + + response = MockResponse() + response.content = """ +[ + { + "datapoints": [ + [null,1480455880], + [null,1480455890], + [null,1480455900], + [205.345,1480455910], + [201.59,1480455920], + [205.76,1480455930], + [null,1480455940], + [null,1480455950], + [null,1480455960], + [215.655,1480455970], + [214.16,1480455980], + [213.57,1480455990], + [null,1480456000], + [null,1480456010], + [null,1480456020], + [219.37,1480456030], + [219.28,1480456040], + [217.75,1480456050], + [null,1480456060] + ], + "target":"averageSeries(.8192.*.jobs.1.write.iops)" + } +]""" + expected = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + + mock_requests.side_effect = (response, ) + + actual = self.graphdb.fetch_series("workload", "iops", + "write", 0, 600) + self.assertEqual(expected, actual) + def fetch_workloads(self, workload): workloads = [[u'32d31724-fac1-44f3-9033-ca8e00066a36.' u'_warm_up.queue-depth.32.block-size.8192.10-9-15-151', diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py index 482b98e..b175c87 100644 --- a/tests/utilities_tests/data_handler_test.py +++ b/tests/utilities_tests/data_handler_test.py @@ -8,22 +8,25 @@ ############################################################################## import os +from storperf.utilities.data_handler import DataHandler import unittest import mock -from storperf.utilities.data_handler import DataHandler - class MockGraphiteDB(object): def __init__(self): self.called = False + self.series = [] def fetch_averages(self, job_id): self.called = True return None + def fetch_series(self, job_id, timeframe): + return self.series + class DataHandlerTest(unittest.TestCase): @@ -40,6 +43,7 @@ class DataHandlerTest(unittest.TestCase): mock.job_id = "1" self.job_db = mock self.pushed = False + self.current_workload = None pass @property @@ -50,8 +54,68 @@ class DataHandlerTest(unittest.TestCase): self.pushed = True pass - def test_not_terminated_report(self): - self.data_handler.data_event(self) + def terminate(self): + self._terminated = True + + @mock.patch("time.time") + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + def test_lookup_prior_data(self, mock_graphite_db, mock_time): + self._terminated = False + expected = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + mock_graphite_db.return_value = expected + mock_time.return_value = expected[-1][0] + 10 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) + + actual = self.data_handler._lookup_prior_data(self, 'read', 'iops') + self.assertEqual(expected, actual) + + def test_short_sample(self): + series = [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + + actual = self.data_handler._evaluate_prior_data(series) + self.assertEqual(False, actual) + + def test_long_not_steady_sample(self): + series = [[4804559100, 205345], + [4804559200, 20159], + [4804559300, 20576], + [4804560300, 21937], + [4804560400, 21928], + [4804560500, 21775]] + actual = self.data_handler._evaluate_prior_data(series) + self.assertEqual(False, actual) + + def test_long_steady_sample(self): + series = [[4804559100, 205.345], + [4804559200, 201.59], + [4804559300, 205.76], + [4804560300, 219.37], + [4804560400, 219.28], + [4804560500, 217.75]] + actual = self.data_handler._evaluate_prior_data(series) + self.assertEqual(True, actual) @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) @mock.patch("storperf.db.test_results_db.push_results_to_db") @@ -65,12 +129,104 @@ class DataHandlerTest(unittest.TestCase): self.assertEqual(True, self.pushed) @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("time.time") @mock.patch("storperf.db.test_results_db.push_results_to_db") - @mock.patch("storperf.utilities.data_handler.GraphiteDB") - def test_non_terminated_report(self, mock_graphite_db, mock_results_db): + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + def test_non_terminated_report(self, mock_graphite_db, mock_results_db, + mock_time): self._terminated = False mock_results_db.side_effect = self.push_results_to_db - mock_graphite_db.side_effect = MockGraphiteDB + series = \ + [[1480455910, 205.345], + [1480455920, 201.59], + [1480455930, 205.76], + [1480455970, 215.655], + [1480455980, 214.16], + [1480455990, 213.57], + [1480456030, 219.37], + [1480456040, 219.28], + [1480456050, 217.75]] + mock_graphite_db.return_value = series + mock_time.return_value = series[-1][0] + 10 + expected_slope = 0.1185333530108134 + expected_range = 17.78 + expected_average = 212.49777777777774 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) self.data_handler.data_event(self) self.assertEqual(False, self.pushed) + self.assertEqual(False, self._terminated) + + self.assertEqual(expected_slope, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['slope']) + self.assertEqual(expected_range, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['range']) + self.assertEqual(expected_average, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['average']) + self.assertEqual(series, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['series']) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("time.time") + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series") + def test_report_that_causes_termination(self, + mock_graphite_db, + mock_results_db, + mock_time): + self._terminated = False + mock_results_db.side_effect = self.push_results_to_db + series = [[4804559100, 205.345], + [4804559200, 201.59], + [4804559300, 205.76], + [4804560300, 219.37], + [4804560400, 219.28], + [4804560500, 217.75]] + mock_graphite_db.return_value = series + mock_time.return_value = 4804560500 + 10 + + expected_slope = 0.011830471529818998 + expected_range = 17.78 + expected_average = 211.51583333333335 + + self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" % + ("job_id", + "rw", + 8, + 8192)) + + self.data_handler.data_event(self) + + self.assertEqual(expected_slope, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['slope']) + self.assertEqual(expected_range, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['range']) + self.assertEqual(expected_average, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['average']) + self.assertEqual(series, self.metadata['report_data'] + ['lat.mean'] + ['read'] + ['series']) + self.assertEqual(True, self._terminated) + + self.assertEqual(False, self.pushed) + self.assertEqual(True, self._terminated) diff --git a/tests/utilities_tests/math_slope_test.py b/tests/utilities_tests/math_slope_test.py index 6c05aa4..24d5cd7 100644 --- a/tests/utilities_tests/math_slope_test.py +++ b/tests/utilities_tests/math_slope_test.py @@ -65,3 +65,8 @@ class MathSlopeTest(unittest.TestCase): expected = 1.5 actual = Slope.slope([[0.0, 0], [1, 1], [2, 3]]) self.assertEqual(expected, actual) + + def test_infinte_slope(self): + expected = None + actual = Slope.slope([[1480623510, 1295.87], [1480623520, 1380.79]]) + self.assertEqual(expected, actual) -- cgit 1.2.3-korg