diff options
Diffstat (limited to 'storperf/utilities')
-rw-r--r-- | storperf/utilities/data_handler.py | 91 | ||||
-rw-r--r-- | storperf/utilities/math.py | 7 | ||||
-rw-r--r-- | storperf/utilities/steady_state.py | 9 | ||||
-rw-r--r-- | storperf/utilities/thread_gate.py | 5 |
4 files changed, 107 insertions, 5 deletions
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index 03c764c..ebf715a 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -9,27 +9,112 @@ import logging import os +from time import sleep +import time from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB +from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState class DataHandler(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.samples = 11 """ """ def data_event(self, executor): - self.logger.info("Event received") - - # Data lookup + self.logger.debug("Event received") if executor.terminated: self._push_to_db(executor) + else: + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + steady = self._evaluate_prior_data(series) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + metrics[metric][io_type]['average'] = \ + math.average(treated_data['average_data']) + + if not steady: + steady_state = False + + executor.metadata['report_data'] = metrics + executor.metadata['steady_state'] = steady_state + + if steady_state: + executor.terminate() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (self.samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _evaluate_prior_data(self, data_series): + self.logger.debug("Data series: %s" % data_series) + if len(data_series) == 0: + return False + earliest_timestamp = data_series[0][0] + latest_timestamp = data_series[-1][0] + duration = latest_timestamp - earliest_timestamp + if (duration < 60 * self.samples): + self.logger.debug("Only %s minutes of samples, ignoring" % + (duration / 60,)) + return False + + return SteadyState.steady_state(data_series) def _push_to_db(self, executor): test_db = os.environ.get('TEST_DB_URL') diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py index a11ec19..4ddddca 100644 --- a/storperf/utilities/math.py +++ b/storperf/utilities/math.py @@ -52,8 +52,11 @@ def slope(data_series): sum_yi_xi += xi * yi sum_yi += yi - beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \ - (sum_xi**2 - m * sum_xi_sq) # The slope + over = (sum_xi**2 - m * sum_xi_sq) + if over == 0: + beta2 = None # Infinite slope + else: + beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if # needed diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py index 233bc78..0bbe21e 100644 --- a/storperf/utilities/steady_state.py +++ b/storperf/utilities/steady_state.py @@ -6,6 +6,8 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import logging + from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import math as math @@ -22,6 +24,8 @@ def steady_state(data_series): has been reached with the data that is passed to it. """ + logger = logging.getLogger('storperf.utilities.steady_state') + # Pre conditioning the data to match the algorithms treated_data = DataTreatment.data_treatment(data_series) @@ -39,6 +43,11 @@ def steady_state(data_series): steady_state = range_condition and slope_condition + logger.debug("Range %s < %s?" % (abs(range_value), + (0.20 * abs(average_value)))) + logger.debug("Slope %s < %s?" % (abs(slope_value), + (0.10 * abs(average_value)))) + logger.debug("Steady State? %s" % steady_state) else: steady_state = False diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py index b0dde50..295b8be 100644 --- a/storperf/utilities/thread_gate.py +++ b/storperf/utilities/thread_gate.py @@ -55,4 +55,9 @@ class ThreadGate(object): k, time_since_last_report) ready = False + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + return ready |