summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/utilities
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf/utilities')
-rw-r--r--docker/storperf-master/storperf/utilities/__init__.py8
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py205
-rw-r--r--docker/storperf-master/storperf/utilities/data_treatment.py39
-rw-r--r--docker/storperf-master/storperf/utilities/dictionary.py15
-rw-r--r--docker/storperf-master/storperf/utilities/math.py116
-rw-r--r--docker/storperf-master/storperf/utilities/steady_state.py54
-rw-r--r--docker/storperf-master/storperf/utilities/thread_gate.py67
7 files changed, 504 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/utilities/__init__.py b/docker/storperf-master/storperf/utilities/__init__.py
new file mode 100644
index 0000000..73444b6
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..1da869c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -0,0 +1,205 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
+from time import sleep
+import time
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+ self.job_db = JobDB()
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.debug("Event received")
+
+ if executor.terminated:
+ self._push_to_db(executor)
+ else:
+ workload = '.'.join(executor.current_workload.split('.')[1:6])
+ if 'metrics' not in executor.metadata:
+ executor.metadata['metrics'] = {}
+
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ series = self._convert_timestamps_to_samples(
+ executor, series)
+ steady = self._evaluate_prior_data(
+ series, executor.steady_state_samples)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ average = math.average(treated_data['average_data'])
+ metrics[metric][io_type]['average'] = average
+
+ metrics_key = '%s.%s.%s' % (workload, io_type, metric)
+ executor.metadata['metrics'][metrics_key] = average
+
+ if not steady:
+ steady_state = False
+
+ if 'report_data' not in executor.metadata:
+ executor.metadata['report_data'] = {}
+
+ if 'steady_state' not in executor.metadata:
+ executor.metadata['steady_state'] = {}
+
+ executor.metadata['report_data'][workload] = metrics
+ executor.metadata['steady_state'][workload] = steady_state
+
+ workload_name = executor.current_workload.split('.')[1]
+
+ if steady_state and not workload_name.startswith('_'):
+ executor.terminate_current_run()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (executor.steady_state_samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _convert_timestamps_to_samples(self, executor, series):
+ workload_record = self.job_db.fetch_workloads(
+ executor.current_workload)
+ start_time = int(workload_record[0][1])
+
+ normalized_series = []
+
+ for item in series:
+ elapsed = (item[0] - start_time)
+ sample_number = int(round(float(elapsed) / 60))
+ normalized_series.append([sample_number, item[1]])
+
+ return normalized_series
+
+ def _evaluate_prior_data(self, data_series, samples):
+ self.logger.debug("Data series: %s" % data_series)
+ number_of_samples = len(data_series)
+
+ if number_of_samples == 0:
+ return False
+ if (number_of_samples < samples):
+ self.logger.debug("Only %s samples, ignoring" % number_of_samples)
+ return False
+
+ return SteadyState.steady_state(data_series)
+
+ def _push_to_db(self, executor):
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ test_case = dictionary.get_key_from_dict(executor.metadata,
+ 'test_case',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ payload = executor.metadata
+
+ steady_state = True
+ for _, value in executor.metadata['steady_state'].items():
+ steady_state = steady_state and value
+
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+
+ if steady_state:
+ criteria = 'PASS'
+ else:
+ criteria = 'FAIL'
+
+ start_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.start_time))
+
+ end_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.end_time))
+
+ test_db = os.environ.get('TEST_DB_URL')
+ if test_db is not None:
+ self.logger.info("Pushing results to %s" % (test_db))
+ try:
+ response = test_results_db.push_results_to_db(test_db,
+ "storperf",
+ test_case,
+ start_time,
+ end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ executor.result_url = response['href']
+ except:
+ self.logger.exception("Error pushing results into Database")
diff --git a/docker/storperf-master/storperf/utilities/data_treatment.py b/docker/storperf-master/storperf/utilities/data_treatment.py
new file mode 100644
index 0000000..2368fd9
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_treatment.py
@@ -0,0 +1,39 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def data_treatment(data_series):
+ """
+ This function aims at performing any necessary pre treatment on the
+ data_series passed to the steady_state function before being passed
+ under to the different math utilities (slope, range and average)
+ so the data can match the requirements of each algorithm.
+ The function returns a dictionary composed of three values that can be
+ accessed with the following keys : 'slope_data', 'range_data' and
+ 'average_data'.
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]]. If this pattern were to change, or
+ the input data pattern of one of the math module, this data_treatment
+ function should be the only part of the Steady State detection module
+ that would need to be modified too.
+ """
+
+ x_values = []
+ y_values = []
+ for l in data_series:
+ x_values.append(l[0])
+ y_values.append(l[1])
+
+ treated_data = {
+ 'slope_data': data_series, # No treatment necessary so far
+ 'range_data': y_values, # The y_values only
+ 'average_data': y_values
+ }
+
+ return treated_data
diff --git a/docker/storperf-master/storperf/utilities/dictionary.py b/docker/storperf-master/storperf/utilities/dictionary.py
new file mode 100644
index 0000000..95f625c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/dictionary.py
@@ -0,0 +1,15 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def get_key_from_dict(dictionary, key, default_value=None):
+ if key in dictionary:
+ return dictionary[key]
+ else:
+ return default_value
diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py
new file mode 100644
index 0000000..8e04134
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/math.py
@@ -0,0 +1,116 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+
+
+def slope(data_series):
+ """
+ This function implements the linear least squares algorithm described in
+ the following wikipedia article :
+ https://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)
+ in the case of m equations (provided by m data points) and 2 unknown
+ variables (x and y, which represent the time and the Volume performance
+ variable being tested e.g. IOPS, latency...).
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]].
+ If this data pattern were to change, the data_treatement function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Detection module.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ beta2 = None
+
+ else: # The general case
+ data_series = copy.deepcopy(data_series)
+ m = len(data_series)
+ # To make sure at least one element is a float number so the result
+ # of the algorithm be a float number
+ data_series[0][0] = float(data_series[0][0])
+
+ """
+ It consists in solving the normal equations system (2 equations,
+ 2 unknowns) by calculating the value of beta2 (slope).
+ The formula of beta1 (the y-intercept) is given as a comment in
+ case it is needed later.
+ """
+ sum_xi = 0
+ sum_xi_sq = 0
+ sum_yi_xi = 0
+ sum_yi = 0
+ for i in range(0, m):
+ xi = data_series[i][0]
+ yi = data_series[i][1]
+
+ sum_xi += xi
+ sum_xi_sq += xi**2
+ sum_yi_xi += xi * yi
+ sum_yi += yi
+
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
+ # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
+ # needed
+
+ return beta2
+
+
+def range_value(data_series):
+ """
+ This function implements a range algorithm that returns a float number
+ representing the range of the data_series that is passed to it.
+ The data_series being passed is assumed to follow the following data
+ pattern : [y1, y2, y3, ..., ym] where yi represents the ith
+ measuring point of the y variable. The y variable represents the
+ Volume performance being tested (e.g. IOPS, latency...).
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The conversion of the data series from the original pattern to the
+ [y1, y2, y3, ..., ym] pattern is done outside this function
+ so the original pattern can be changed without breaking this function.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ range_value = None
+
+ else: # The general case
+ max_value = max(data_series)
+ min_value = min(data_series)
+ range_value = max_value - min_value
+
+ return range_value
+
+
+def average(data_series):
+ """
+ This function seeks to calculate the average value of the data series
+ given a series following the pattern : [y1, y2, y3, ..., ym].
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The function returns a float number corresponding to the average of the yi.
+ """
+ m = len(data_series)
+
+ if m == 0: # In the particular case of an empty data series
+ average = None
+
+ else:
+ data_sum = 0
+ for value in data_series:
+ data_sum += value
+ average = data_sum / float(m)
+
+ return average
diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py
new file mode 100644
index 0000000..13f609d
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/steady_state.py
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import logging
+
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import math as math
+
+
+def steady_state(data_series):
+ """
+ This function seeks to detect steady state given on a measurement
+ window given the data series of that measurement window following
+ the pattern : [[x1,y1], [x2,y2], ..., [xm,ym]]. m represents the number
+ of points recorded in the measurement window, x which represents the
+ time, and y which represents the Volume performance variable being
+ tested e.g. IOPS, latency...
+ The function returns a boolean describing wether or not steady state
+ has been reached with the data that is passed to it.
+ """
+
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
+ # Pre conditioning the data to match the algorithms
+ treated_data = DataTreatment.data_treatment(data_series)
+
+ # Calculating useful values invoking dedicated functions
+ slope_value = math.slope(treated_data['slope_data'])
+ range_value = math.range_value(treated_data['range_data'])
+ average_value = math.average(treated_data['average_data'])
+
+ if (slope_value is not None and range_value is not None and
+ average_value is not None):
+ # Verification of the Steady State conditions following the SNIA
+ # definition
+ range_condition = abs(range_value) <= 0.20 * abs(average_value)
+ slope_condition = abs(slope_value) <= 0.10 * abs(average_value)
+
+ steady_state = range_condition and slope_condition
+
+ logger.debug("Range %s <= %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s <= %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
+ else:
+ steady_state = False
+
+ return steady_state
diff --git a/docker/storperf-master/storperf/utilities/thread_gate.py b/docker/storperf-master/storperf/utilities/thread_gate.py
new file mode 100644
index 0000000..38acbb1
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/thread_gate.py
@@ -0,0 +1,67 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+Creates a gate object that allows synchronization between an arbitrary
+number of callers.
+"""
+import logging
+import time
+from threading import Lock
+
+
+class FailureToReportException(Exception):
+ pass
+
+
+class ThreadGate(object):
+
+ def __init__(self, size, timeout=60):
+ self.logger = logging.getLogger(__name__)
+ self._gate_size = size
+ self._timeout = timeout
+ self._registrants = {}
+ self._creation_time = time.time()
+ self._lock = Lock()
+
+ """
+ Calling this method returns a true or false, indicating that enough
+ of the other registrants have reported in
+ """
+
+ def report(self, gate_id):
+ with self._lock:
+ now = time.time()
+ self._registrants[gate_id] = now
+ ready = True
+ self.logger.debug("Gate report for %s", gate_id)
+
+ total_missing = self._gate_size - len(self._registrants)
+ if total_missing > 0:
+ self.logger.debug("Not all registrants have reported in")
+ time_since_creation = now - self._creation_time
+ if (time_since_creation > (self._timeout * 2)):
+ self.logger.error(
+ "%s registrant(s) have never reported in",
+ total_missing)
+ raise FailureToReportException
+ return False
+
+ for k, v in self._registrants.items():
+ time_since_last_report = now - v
+ if time_since_last_report > self._timeout:
+ self.logger.debug("Registrant %s last reported %s ago",
+ k, time_since_last_report)
+ ready = False
+
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
+ return ready