summaryrefslogtreecommitdiffstats
path: root/storperf/utilities
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/utilities')
-rw-r--r--storperf/utilities/data_handler.py91
-rw-r--r--storperf/utilities/math.py7
-rw-r--r--storperf/utilities/steady_state.py9
-rw-r--r--storperf/utilities/thread_gate.py5
4 files changed, 107 insertions, 5 deletions
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index 03c764c..ebf715a 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -9,27 +9,112 @@
import logging
import os
+from time import sleep
+import time
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
class DataHandler(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.samples = 11
"""
"""
def data_event(self, executor):
- self.logger.info("Event received")
-
- # Data lookup
+ self.logger.debug("Event received")
if executor.terminated:
self._push_to_db(executor)
+ else:
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ steady = self._evaluate_prior_data(series)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ metrics[metric][io_type]['average'] = \
+ math.average(treated_data['average_data'])
+
+ if not steady:
+ steady_state = False
+
+ executor.metadata['report_data'] = metrics
+ executor.metadata['steady_state'] = steady_state
+
+ if steady_state:
+ executor.terminate()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (self.samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _evaluate_prior_data(self, data_series):
+ self.logger.debug("Data series: %s" % data_series)
+ if len(data_series) == 0:
+ return False
+ earliest_timestamp = data_series[0][0]
+ latest_timestamp = data_series[-1][0]
+ duration = latest_timestamp - earliest_timestamp
+ if (duration < 60 * self.samples):
+ self.logger.debug("Only %s minutes of samples, ignoring" %
+ (duration / 60,))
+ return False
+
+ return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
test_db = os.environ.get('TEST_DB_URL')
diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py
index a11ec19..4ddddca 100644
--- a/storperf/utilities/math.py
+++ b/storperf/utilities/math.py
@@ -52,8 +52,11 @@ def slope(data_series):
sum_yi_xi += xi * yi
sum_yi += yi
- beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \
- (sum_xi**2 - m * sum_xi_sq) # The slope
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
# beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
# needed
diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py
index 233bc78..0bbe21e 100644
--- a/storperf/utilities/steady_state.py
+++ b/storperf/utilities/steady_state.py
@@ -6,6 +6,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import logging
+
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import math as math
@@ -22,6 +24,8 @@ def steady_state(data_series):
has been reached with the data that is passed to it.
"""
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
# Pre conditioning the data to match the algorithms
treated_data = DataTreatment.data_treatment(data_series)
@@ -39,6 +43,11 @@ def steady_state(data_series):
steady_state = range_condition and slope_condition
+ logger.debug("Range %s < %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s < %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
else:
steady_state = False
diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
index b0dde50..295b8be 100644
--- a/storperf/utilities/thread_gate.py
+++ b/storperf/utilities/thread_gate.py
@@ -55,4 +55,9 @@ class ThreadGate(object):
k, time_since_last_report)
ready = False
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
return ready