diff options
Diffstat (limited to 'storperf/utilities/data_handler.py')
-rw-r--r-- | storperf/utilities/data_handler.py | 91 |
1 files changed, 88 insertions, 3 deletions
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index 03c764c..ebf715a 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -9,27 +9,112 @@ import logging import os +from time import sleep +import time from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB +from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState class DataHandler(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.samples = 11 """ """ def data_event(self, executor): - self.logger.info("Event received") - - # Data lookup + self.logger.debug("Event received") if executor.terminated: self._push_to_db(executor) + else: + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + steady = self._evaluate_prior_data(series) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + metrics[metric][io_type]['average'] = \ + math.average(treated_data['average_data']) + + if not steady: + steady_state = False + + executor.metadata['report_data'] = metrics + executor.metadata['steady_state'] = steady_state + + if steady_state: + executor.terminate() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (self.samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _evaluate_prior_data(self, data_series): + self.logger.debug("Data series: %s" % data_series) + if len(data_series) == 0: + return False + earliest_timestamp = data_series[0][0] + latest_timestamp = data_series[-1][0] + duration = latest_timestamp - earliest_timestamp + if (duration < 60 * self.samples): + self.logger.debug("Only %s minutes of samples, ignoring" % + (duration / 60,)) + return False + + return SteadyState.steady_state(data_series) def _push_to_db(self, executor): test_db = os.environ.get('TEST_DB_URL') |