summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@dell.com>2016-12-02 22:25:48 -0500
committermbeierl <mark.beierl@dell.com>2016-12-05 13:11:08 -0500
commit9f5b776c1b5a54d2ca5942424111f3ff55d5737c (patch)
tree56bff326b62256d1c3108453dc3ef2d33dd7ea12
parentf56d13f2aac136b3b8762e06ca15688dd88ff502 (diff)
Steady state detection
Detection of steady state after 10+ samples of data Change-Id: I29368b06188c6370d17b3d567fece6486d171235 JIRA: STORPERF-72 STORPERF-73 Signed-off-by: Mark Beierl <mark.beierl@dell.com>
-rw-r--r--rest_server.py10
-rw-r--r--storperf/carbon/emitter.py5
-rw-r--r--storperf/db/graphite_db.py36
-rw-r--r--storperf/db/test_results_db.py3
-rw-r--r--storperf/fio/fio_invoker.py23
-rw-r--r--storperf/logging.json29
-rw-r--r--storperf/storperf_master.py4
-rw-r--r--storperf/test_executor.py34
-rw-r--r--storperf/utilities/data_handler.py91
-rw-r--r--storperf/utilities/math.py7
-rw-r--r--storperf/utilities/steady_state.py9
-rw-r--r--storperf/utilities/thread_gate.py5
-rw-r--r--storperf/workloads/_base_workload.py2
-rw-r--r--tests/carbon_tests/emitter_test.py18
-rw-r--r--tests/db_tests/graphite_db_test.py59
-rw-r--r--tests/utilities_tests/data_handler_test.py170
-rw-r--r--tests/utilities_tests/math_slope_test.py5
17 files changed, 447 insertions, 63 deletions
diff --git a/rest_server.py b/rest_server.py
index d852bbb..40b9b77 100644
--- a/rest_server.py
+++ b/rest_server.py
@@ -12,15 +12,15 @@ import json
import logging
import logging.config
import os
+from storperf.db.job_db import JobDB
+from storperf.plot.barchart import Barchart
+from storperf.storperf_master import StorPerfMaster
+import sys
from flask import abort, Flask, request, jsonify, send_from_directory
from flask_restful import Resource, Api, fields
from flask_restful_swagger import swagger
-from storperf.db.job_db import JobDB
-from storperf.plot.barchart import Barchart
-from storperf.storperf_master import StorPerfMaster
-
app = Flask(__name__, static_url_path="")
api = swagger.docs(Api(app), apiVersion='1.0')
@@ -372,6 +372,8 @@ prior to running any further tests,
]
)
def delete(self):
+ self.logger.info("Threads: %s" % sys._current_frames())
+ print sys._current_frames()
try:
return jsonify({'Slaves': storperf.terminate_workloads()})
except Exception as e:
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index c9af8a6..e23dc79 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -22,9 +22,8 @@ class CarbonMetricTransmitter():
def transmit_metrics(self, metrics):
if 'timestamp' in metrics:
- timestamp = metrics.pop('timestamp')
- else:
- timestamp = str(calendar.timegm(time.gmtime()))
+ metrics.pop('timestamp')
+ timestamp = str(calendar.timegm(time.gmtime()))
carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
carbon_socket.connect((self.carbon_host, self.carbon_port))
diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py
index c44d2aa..8be91c8 100644
--- a/storperf/db/graphite_db.py
+++ b/storperf/db/graphite_db.py
@@ -1,4 +1,3 @@
-from storperf.db.job_db import JobDB
import calendar
import json
import logging
@@ -6,6 +5,8 @@ import time
import requests
+from storperf.db.job_db import JobDB
+
class GraphiteDB(object):
@@ -15,6 +16,27 @@ class GraphiteDB(object):
self._job_db = JobDB()
self.logger = logging.getLogger(__name__)
+ def fetch_series(self, workload, metric, io_type, time, duration):
+
+ series = []
+ end = time
+ start = end - duration
+
+ request = ("http://127.0.0.1:8000/render/?target="
+ "averageSeries(%s.*.jobs.1.%s.%s)"
+ "&format=json"
+ "&from=%s"
+ "&until=%s" %
+ (workload, io_type, metric,
+ start, end))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ series = self._series_results(json.loads(response.content))
+
+ return series
+
def fetch_averages(self, workload):
workload_executions = self._job_db.fetch_workloads(workload)
@@ -115,6 +137,18 @@ class GraphiteDB(object):
return average
+ def _series_results(self, results):
+
+ series = []
+
+ for item in results:
+ datapoints = item['datapoints']
+ for datapoint in datapoints:
+ if datapoint[0] is not None:
+ series.append([datapoint[1], datapoint[0]])
+
+ return series
+
def make_fullname_pattern(self, workload):
parts = workload.split('.')
wildcards_needed = 7 - len(parts)
diff --git a/storperf/db/test_results_db.py b/storperf/db/test_results_db.py
index 4ee7a52..049b0b8 100644
--- a/storperf/db/test_results_db.py
+++ b/storperf/db/test_results_db.py
@@ -44,7 +44,8 @@ def push_results_to_db(db_url, project, case_name,
headers = {'Content-Type': 'application/json'}
try:
if logger:
- logger.debug("Pushing results to %s" % (url))
+ logger.info("Pushing results to %s" % (url))
+ logger.debug("Parameters: %s" % params)
r = requests.post(url, data=json.dumps(params), headers=headers)
if logger:
logger.debug(r)
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 315b243..2febf25 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -54,12 +54,14 @@ class FIOInvoker(object):
for event_listener in self.event_listeners:
try:
+ self.logger.debug(
+ "Event listener callback")
event_listener(self.callback_id, json_metric)
except Exception, e:
self.logger.exception(
"Notifying listener %s: %s",
self.callback_id, e)
- self.logger.info(
+ self.logger.debug(
"Event listener callback complete")
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
@@ -78,7 +80,6 @@ class FIOInvoker(object):
self.logger.debug("Finished")
def execute(self, args=[]):
-
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.remote_host, username='storperf',
@@ -87,7 +88,12 @@ class FIOInvoker(object):
command = "sudo ./fio " + ' '.join(args)
self.logger.debug("Remote command: %s" % command)
- (_, stdout, stderr) = ssh.exec_command(command)
+
+ chan = ssh.get_transport().open_session(timeout=None)
+ chan.settimeout(None)
+ chan.exec_command(command)
+ stdout = chan.makefile('r', -1)
+ stderr = chan.makefile_stderr('r', -1)
tout = Thread(target=self.stdout_handler, args=(stdout,),
name="%s stdout" % self._remote_host)
@@ -100,9 +106,18 @@ class FIOInvoker(object):
terr.start()
self.logger.info("Started fio on " + self.remote_host)
+ exit_status = chan.recv_exit_status()
+ self.logger.info("Finished fio on %s with exit code %s" %
+ (self.remote_host, exit_status))
+
+ stdout.close()
+ stderr.close()
+
+ self.logger.debug("Joining stderr handler")
terr.join()
+ self.logger.debug("Joining stdout handler")
tout.join()
- self.logger.info("Finished fio on " + self.remote_host)
+ self.logger.debug("Ended")
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)
diff --git a/storperf/logging.json b/storperf/logging.json
index 6d6026e..74df494 100644
--- a/storperf/logging.json
+++ b/storperf/logging.json
@@ -37,26 +37,15 @@
},
"loggers": {
- "my_module": {
- "level": "ERROR",
- "handlers": ["console"],
- "propagate": "no"
+ "": {
+ "level": "INFO",
+ "handlers": ["console", "file_handler", "error_file_handler"]
+ },
+ "storperf": {
+ "level": "DEBUG"
+ },
+ "storperf.carbon.emitter": {
+ "level": "INFO"
}
- },
-
- "root": {
- "level": "WARN",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf": {
- "level": "DEBUG",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf.carbon.emitter": {
- "level": "INFO",
- "handlers": ["console", "file_handler", "error_file_handler"]
}
-
} \ No newline at end of file
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index 35cba72..2975afd 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -378,9 +378,9 @@ class StorPerfMaster(object):
(_, stdout, stderr) = ssh.exec_command(cmd)
for line in stdout.readlines():
- logger.debug(line.decode('utf-8').strip())
+ logger.debug(line.strip())
for line in stderr.readlines():
- logger.error(line.decode('utf-8').strip())
+ logger.error(line.strip())
def _make_parameters(self):
heat_parameters = {}
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 530ce80..8350e43 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -41,6 +41,7 @@ class TestExecutor(object):
self.metadata = {}
self.start_time = None
self.end_time = None
+ self.current_workload = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -106,7 +107,7 @@ class TestExecutor(object):
try:
event_listener(self)
except Exception, e:
- self.logger.error("Notifying listener: %s", e)
+ self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
self.workload_modules = []
@@ -166,15 +167,17 @@ class TestExecutor(object):
self.job_db.record_workload_params(metadata)
self.metadata = metadata
self._workload_thread = Thread(target=self.execute_workloads,
- args=())
+ args=(),
+ name="Workload thread")
self._workload_thread.start()
return self.job_db.job_id
def terminate(self):
self._terminated = True
- return self.terminate_current_run()
+ self.end_time = time.time()
+ return self._terminate_current_run()
- def terminate_current_run(self):
+ def _terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
@@ -222,14 +225,23 @@ class TestExecutor(object):
for blocksize in blocksizes:
for iodepth in iodepths:
- scheduler = sched.scheduler(time.time, time.sleep)
if self._terminated:
return
+ self.current_workload = (
+ "%s.%s.queue-depth.%s.block-size.%s" %
+ (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
+ self.logger.info("Starting run %s" % self.current_workload)
+
+ scheduler = sched.scheduler(time.time, time.sleep)
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run, ())
+ self._terminate_current_run,
+ ())
t = Thread(target=scheduler.run, args=())
t.start()
@@ -244,13 +256,16 @@ class TestExecutor(object):
self._workload_executors.append(slave_workload)
t = Thread(target=self.execute_on_node,
- args=(slave_workload,))
+ args=(slave_workload,),
+ name="%s worker" % slave)
t.daemon = False
t.start()
slave_threads.append(t)
for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
if not scheduler.empty():
try:
@@ -258,7 +273,10 @@ class TestExecutor(object):
except:
pass
+ self.logger.info("Completed run %s" %
+ self.current_workload)
self._workload_executors = []
+ self.current_workload = None
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index 03c764c..ebf715a 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -9,27 +9,112 @@
import logging
import os
+from time import sleep
+import time
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
class DataHandler(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.samples = 11
"""
"""
def data_event(self, executor):
- self.logger.info("Event received")
-
- # Data lookup
+ self.logger.debug("Event received")
if executor.terminated:
self._push_to_db(executor)
+ else:
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ steady = self._evaluate_prior_data(series)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ metrics[metric][io_type]['average'] = \
+ math.average(treated_data['average_data'])
+
+ if not steady:
+ steady_state = False
+
+ executor.metadata['report_data'] = metrics
+ executor.metadata['steady_state'] = steady_state
+
+ if steady_state:
+ executor.terminate()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (self.samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _evaluate_prior_data(self, data_series):
+ self.logger.debug("Data series: %s" % data_series)
+ if len(data_series) == 0:
+ return False
+ earliest_timestamp = data_series[0][0]
+ latest_timestamp = data_series[-1][0]
+ duration = latest_timestamp - earliest_timestamp
+ if (duration < 60 * self.samples):
+ self.logger.debug("Only %s minutes of samples, ignoring" %
+ (duration / 60,))
+ return False
+
+ return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
test_db = os.environ.get('TEST_DB_URL')
diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py
index a11ec19..4ddddca 100644
--- a/storperf/utilities/math.py
+++ b/storperf/utilities/math.py
@@ -52,8 +52,11 @@ def slope(data_series):
sum_yi_xi += xi * yi
sum_yi += yi
- beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \
- (sum_xi**2 - m * sum_xi_sq) # The slope
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
# beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
# needed
diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py
index 233bc78..0bbe21e 100644
--- a/storperf/utilities/steady_state.py
+++ b/storperf/utilities/steady_state.py
@@ -6,6 +6,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import logging
+
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import math as math
@@ -22,6 +24,8 @@ def steady_state(data_series):
has been reached with the data that is passed to it.
"""
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
# Pre conditioning the data to match the algorithms
treated_data = DataTreatment.data_treatment(data_series)
@@ -39,6 +43,11 @@ def steady_state(data_series):
steady_state = range_condition and slope_condition
+ logger.debug("Range %s < %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s < %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
else:
steady_state = False
diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
index b0dde50..295b8be 100644
--- a/storperf/utilities/thread_gate.py
+++ b/storperf/utilities/thread_gate.py
@@ -55,4 +55,9 @@ class ThreadGate(object):
k, time_since_last_report)
ready = False
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
return ready
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index 874e99c..3896cc6 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -23,7 +23,7 @@ class _base_workload(object):
'bs': '64k',
'iodepth': '1',
'numjobs': '1',
- 'loops': '1',
+ 'loops': '20',
'output-format': 'json',
'status-interval': '60'
}
diff --git a/tests/carbon_tests/emitter_test.py b/tests/carbon_tests/emitter_test.py
index fe19ed2..7f61049 100644
--- a/tests/carbon_tests/emitter_test.py
+++ b/tests/carbon_tests/emitter_test.py
@@ -7,14 +7,16 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.carbon import converter
-from storperf.carbon.emitter import CarbonMetricTransmitter
-from time import sleep
import SocketServer
import json
+from storperf.carbon import converter
+from storperf.carbon.emitter import CarbonMetricTransmitter
import threading
+from time import sleep, strptime
import unittest
+import mock
+
class MetricsHandler(SocketServer.BaseRequestHandler):
@@ -42,10 +44,14 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
t.setDaemon(True)
t.start()
- def test_transmit_metrics(self):
+ @mock.patch("time.gmtime")
+ def test_transmit_metrics(self, mock_time):
+
+ mock_time.return_value = strptime("30 Nov 00", "%d %b %y")
testconv = converter.Converter()
- json_object = json.loads("""{"timestamp" : "12345", "key":"value" }""")
+ json_object = json.loads(
+ """{"timestamp" : "975542400", "key":"value" }""")
result = testconv.convert_json_to_flat(json_object, "host.run-name")
emitter = CarbonMetricTransmitter()
@@ -58,7 +64,7 @@ class CarbonMetricTransmitterTest(unittest.TestCase):
count += 1
sleep(0.1)
- self.assertEqual("host.run-name.key value 12345\n",
+ self.assertEqual("host.run-name.key value 975542400\n",
CarbonMetricTransmitterTest.response,
CarbonMetricTransmitterTest.response)
diff --git a/tests/db_tests/graphite_db_test.py b/tests/db_tests/graphite_db_test.py
index e13545b..d4c6fb6 100644
--- a/tests/db_tests/graphite_db_test.py
+++ b/tests/db_tests/graphite_db_test.py
@@ -7,9 +7,19 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.db.graphite_db import GraphiteDB
import unittest
+import mock
+
+from storperf.db.graphite_db import GraphiteDB
+
+
+class MockResponse():
+
+ def __init__(self):
+ self.content = ""
+ self.status_code = 200
+
class GraphiteDBTest(unittest.TestCase):
@@ -32,6 +42,53 @@ class GraphiteDBTest(unittest.TestCase):
# self.graphdb.fetch_averages(u'32d31724-fac1-44f3-9033-ca8e00066a36')
pass
+ @mock.patch("requests.get")
+ def test_fetch_series(self, mock_requests):
+
+ response = MockResponse()
+ response.content = """
+[
+ {
+ "datapoints": [
+ [null,1480455880],
+ [null,1480455890],
+ [null,1480455900],
+ [205.345,1480455910],
+ [201.59,1480455920],
+ [205.76,1480455930],
+ [null,1480455940],
+ [null,1480455950],
+ [null,1480455960],
+ [215.655,1480455970],
+ [214.16,1480455980],
+ [213.57,1480455990],
+ [null,1480456000],
+ [null,1480456010],
+ [null,1480456020],
+ [219.37,1480456030],
+ [219.28,1480456040],
+ [217.75,1480456050],
+ [null,1480456060]
+ ],
+ "target":"averageSeries(.8192.*.jobs.1.write.iops)"
+ }
+]"""
+ expected = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+
+ mock_requests.side_effect = (response, )
+
+ actual = self.graphdb.fetch_series("workload", "iops",
+ "write", 0, 600)
+ self.assertEqual(expected, actual)
+
def fetch_workloads(self, workload):
workloads = [[u'32d31724-fac1-44f3-9033-ca8e00066a36.'
u'_warm_up.queue-depth.32.block-size.8192.10-9-15-151',
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
index 482b98e..b175c87 100644
--- a/tests/utilities_tests/data_handler_test.py
+++ b/tests/utilities_tests/data_handler_test.py
@@ -8,22 +8,25 @@
##############################################################################
import os
+from storperf.utilities.data_handler import DataHandler
import unittest
import mock
-from storperf.utilities.data_handler import DataHandler
-
class MockGraphiteDB(object):
def __init__(self):
self.called = False
+ self.series = []
def fetch_averages(self, job_id):
self.called = True
return None
+ def fetch_series(self, job_id, timeframe):
+ return self.series
+
class DataHandlerTest(unittest.TestCase):
@@ -40,6 +43,7 @@ class DataHandlerTest(unittest.TestCase):
mock.job_id = "1"
self.job_db = mock
self.pushed = False
+ self.current_workload = None
pass
@property
@@ -50,8 +54,68 @@ class DataHandlerTest(unittest.TestCase):
self.pushed = True
pass
- def test_not_terminated_report(self):
- self.data_handler.data_event(self)
+ def terminate(self):
+ self._terminated = True
+
+ @mock.patch("time.time")
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_lookup_prior_data(self, mock_graphite_db, mock_time):
+ self._terminated = False
+ expected = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+ mock_graphite_db.return_value = expected
+ mock_time.return_value = expected[-1][0] + 10
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
+
+ actual = self.data_handler._lookup_prior_data(self, 'read', 'iops')
+ self.assertEqual(expected, actual)
+
+ def test_short_sample(self):
+ series = [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(False, actual)
+
+ def test_long_not_steady_sample(self):
+ series = [[4804559100, 205345],
+ [4804559200, 20159],
+ [4804559300, 20576],
+ [4804560300, 21937],
+ [4804560400, 21928],
+ [4804560500, 21775]]
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(False, actual)
+
+ def test_long_steady_sample(self):
+ series = [[4804559100, 205.345],
+ [4804559200, 201.59],
+ [4804559300, 205.76],
+ [4804560300, 219.37],
+ [4804560400, 219.28],
+ [4804560500, 217.75]]
+ actual = self.data_handler._evaluate_prior_data(series)
+ self.assertEqual(True, actual)
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
@mock.patch("storperf.db.test_results_db.push_results_to_db")
@@ -65,12 +129,104 @@ class DataHandlerTest(unittest.TestCase):
self.assertEqual(True, self.pushed)
@mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("time.time")
@mock.patch("storperf.db.test_results_db.push_results_to_db")
- @mock.patch("storperf.utilities.data_handler.GraphiteDB")
- def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_non_terminated_report(self, mock_graphite_db, mock_results_db,
+ mock_time):
self._terminated = False
mock_results_db.side_effect = self.push_results_to_db
- mock_graphite_db.side_effect = MockGraphiteDB
+ series = \
+ [[1480455910, 205.345],
+ [1480455920, 201.59],
+ [1480455930, 205.76],
+ [1480455970, 215.655],
+ [1480455980, 214.16],
+ [1480455990, 213.57],
+ [1480456030, 219.37],
+ [1480456040, 219.28],
+ [1480456050, 217.75]]
+ mock_graphite_db.return_value = series
+ mock_time.return_value = series[-1][0] + 10
+ expected_slope = 0.1185333530108134
+ expected_range = 17.78
+ expected_average = 212.49777777777774
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
self.data_handler.data_event(self)
self.assertEqual(False, self.pushed)
+ self.assertEqual(False, self._terminated)
+
+ self.assertEqual(expected_slope, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['slope'])
+ self.assertEqual(expected_range, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['range'])
+ self.assertEqual(expected_average, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['average'])
+ self.assertEqual(series, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['series'])
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("time.time")
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.db.graphite_db.GraphiteDB.fetch_series")
+ def test_report_that_causes_termination(self,
+ mock_graphite_db,
+ mock_results_db,
+ mock_time):
+ self._terminated = False
+ mock_results_db.side_effect = self.push_results_to_db
+ series = [[4804559100, 205.345],
+ [4804559200, 201.59],
+ [4804559300, 205.76],
+ [4804560300, 219.37],
+ [4804560400, 219.28],
+ [4804560500, 217.75]]
+ mock_graphite_db.return_value = series
+ mock_time.return_value = 4804560500 + 10
+
+ expected_slope = 0.011830471529818998
+ expected_range = 17.78
+ expected_average = 211.51583333333335
+
+ self.current_workload = ("%s.%s.queue-depth.%s.block-size.%s" %
+ ("job_id",
+ "rw",
+ 8,
+ 8192))
+
+ self.data_handler.data_event(self)
+
+ self.assertEqual(expected_slope, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['slope'])
+ self.assertEqual(expected_range, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['range'])
+ self.assertEqual(expected_average, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['average'])
+ self.assertEqual(series, self.metadata['report_data']
+ ['lat.mean']
+ ['read']
+ ['series'])
+ self.assertEqual(True, self._terminated)
+
+ self.assertEqual(False, self.pushed)
+ self.assertEqual(True, self._terminated)
diff --git a/tests/utilities_tests/math_slope_test.py b/tests/utilities_tests/math_slope_test.py
index 6c05aa4..24d5cd7 100644
--- a/tests/utilities_tests/math_slope_test.py
+++ b/tests/utilities_tests/math_slope_test.py
@@ -65,3 +65,8 @@ class MathSlopeTest(unittest.TestCase):
expected = 1.5
actual = Slope.slope([[0.0, 0], [1, 1], [2, 3]])
self.assertEqual(expected, actual)
+
+ def test_infinte_slope(self):
+ expected = None
+ actual = Slope.slope([[1480623510, 1295.87], [1480623520, 1380.79]])
+ self.assertEqual(expected, actual)