summaryrefslogtreecommitdiffstats
path: root/storperf/utilities/data_handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/utilities/data_handler.py')
-rw-r--r--storperf/utilities/data_handler.py91
1 files changed, 88 insertions, 3 deletions
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index 03c764c..ebf715a 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -9,27 +9,112 @@
import logging
import os
+from time import sleep
+import time
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
class DataHandler(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.samples = 11
"""
"""
def data_event(self, executor):
- self.logger.info("Event received")
-
- # Data lookup
+ self.logger.debug("Event received")
if executor.terminated:
self._push_to_db(executor)
+ else:
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ steady = self._evaluate_prior_data(series)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ metrics[metric][io_type]['average'] = \
+ math.average(treated_data['average_data'])
+
+ if not steady:
+ steady_state = False
+
+ executor.metadata['report_data'] = metrics
+ executor.metadata['steady_state'] = steady_state
+
+ if steady_state:
+ executor.terminate()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (self.samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _evaluate_prior_data(self, data_series):
+ self.logger.debug("Data series: %s" % data_series)
+ if len(data_series) == 0:
+ return False
+ earliest_timestamp = data_series[0][0]
+ latest_timestamp = data_series[-1][0]
+ duration = latest_timestamp - earliest_timestamp
+ if (duration < 60 * self.samples):
+ self.logger.debug("Only %s minutes of samples, ignoring" %
+ (duration / 60,))
+ return False
+
+ return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
test_db = os.environ.get('TEST_DB_URL')