summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/utilities
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2017-07-11 15:12:35 -0400
committermbeierl <mark.beierl@dell.com>2017-07-11 15:47:46 -0400
commit7602a54309adbe5c5346ee6befecc2e596976504 (patch)
tree60f15026780db30b0b8842ba1a1e2cc021e22625 /docker/storperf-master/storperf/utilities
parentfc09b37e95c19f820ec60db19d98c0dc3d670829 (diff)
Change all paths
Changes the paths of all source code so that it exists under the dockerfile location for each container. This way we can use COPY instead of git clone, as well as use the existing JJB. Change-Id: I883b2957d89659c164fff0a1ebc4d677c534796d JIRA: STORPERF-188 Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf/utilities')
-rw-r--r--docker/storperf-master/storperf/utilities/__init__.py8
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py205
-rw-r--r--docker/storperf-master/storperf/utilities/data_treatment.py39
-rw-r--r--docker/storperf-master/storperf/utilities/dictionary.py15
-rw-r--r--docker/storperf-master/storperf/utilities/math.py116
-rw-r--r--docker/storperf-master/storperf/utilities/steady_state.py54
-rw-r--r--docker/storperf-master/storperf/utilities/thread_gate.py67
7 files changed, 504 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/utilities/__init__.py b/docker/storperf-master/storperf/utilities/__init__.py
new file mode 100644
index 0000000..73444b6
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/__init__.py
@@ -0,0 +1,8 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..1da869c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -0,0 +1,205 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
+from time import sleep
+import time
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+ self.job_db = JobDB()
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.debug("Event received")
+
+ if executor.terminated:
+ self._push_to_db(executor)
+ else:
+ workload = '.'.join(executor.current_workload.split('.')[1:6])
+ if 'metrics' not in executor.metadata:
+ executor.metadata['metrics'] = {}
+
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ series = self._convert_timestamps_to_samples(
+ executor, series)
+ steady = self._evaluate_prior_data(
+ series, executor.steady_state_samples)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ average = math.average(treated_data['average_data'])
+ metrics[metric][io_type]['average'] = average
+
+ metrics_key = '%s.%s.%s' % (workload, io_type, metric)
+ executor.metadata['metrics'][metrics_key] = average
+
+ if not steady:
+ steady_state = False
+
+ if 'report_data' not in executor.metadata:
+ executor.metadata['report_data'] = {}
+
+ if 'steady_state' not in executor.metadata:
+ executor.metadata['steady_state'] = {}
+
+ executor.metadata['report_data'][workload] = metrics
+ executor.metadata['steady_state'][workload] = steady_state
+
+ workload_name = executor.current_workload.split('.')[1]
+
+ if steady_state and not workload_name.startswith('_'):
+ executor.terminate_current_run()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (executor.steady_state_samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _convert_timestamps_to_samples(self, executor, series):
+ workload_record = self.job_db.fetch_workloads(
+ executor.current_workload)
+ start_time = int(workload_record[0][1])
+
+ normalized_series = []
+
+ for item in series:
+ elapsed = (item[0] - start_time)
+ sample_number = int(round(float(elapsed) / 60))
+ normalized_series.append([sample_number, item[1]])
+
+ return normalized_series
+
+ def _evaluate_prior_data(self, data_series, samples):
+ self.logger.debug("Data series: %s" % data_series)
+ number_of_samples = len(data_series)
+
+ if number_of_samples == 0:
+ return False
+ if (number_of_samples < samples):
+ self.logger.debug("Only %s samples, ignoring" % number_of_samples)
+ return False
+
+ return SteadyState.steady_state(data_series)
+
+ def _push_to_db(self, executor):
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ test_case = dictionary.get_key_from_dict(executor.metadata,
+ 'test_case',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ payload = executor.metadata
+
+ steady_state = True
+ for _, value in executor.metadata['steady_state'].items():
+ steady_state = steady_state and value
+
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+
+ if steady_state:
+ criteria = 'PASS'
+ else:
+ criteria = 'FAIL'
+
+ start_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.start_time))
+
+ end_time = time.strftime('%Y-%m-%d %H:%M:%S',
+ time.gmtime(executor.end_time))
+
+ test_db = os.environ.get('TEST_DB_URL')
+ if test_db is not None:
+ self.logger.info("Pushing results to %s" % (test_db))
+ try:
+ response = test_results_db.push_results_to_db(test_db,
+ "storperf",
+ test_case,
+ start_time,
+ end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ executor.result_url = response['href']
+ except:
+ self.logger.exception("Error pushing results into Database")
diff --git a/docker/storperf-master/storperf/utilities/data_treatment.py b/docker/storperf-master/storperf/utilities/data_treatment.py
new file mode 100644
index 0000000..2368fd9
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/data_treatment.py
@@ -0,0 +1,39 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def data_treatment(data_series):
+ """
+ This function aims at performing any necessary pre treatment on the
+ data_series passed to the steady_state function before being passed
+ under to the different math utilities (slope, range and average)
+ so the data can match the requirements of each algorithm.
+ The function returns a dictionary composed of three values that can be
+ accessed with the following keys : 'slope_data', 'range_data' and
+ 'average_data'.
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]]. If this pattern were to change, or
+ the input data pattern of one of the math module, this data_treatment
+ function should be the only part of the Steady State detection module
+ that would need to be modified too.
+ """
+
+ x_values = []
+ y_values = []
+ for l in data_series:
+ x_values.append(l[0])
+ y_values.append(l[1])
+
+ treated_data = {
+ 'slope_data': data_series, # No treatment necessary so far
+ 'range_data': y_values, # The y_values only
+ 'average_data': y_values
+ }
+
+ return treated_data
diff --git a/docker/storperf-master/storperf/utilities/dictionary.py b/docker/storperf-master/storperf/utilities/dictionary.py
new file mode 100644
index 0000000..95f625c
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/dictionary.py
@@ -0,0 +1,15 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+
+def get_key_from_dict(dictionary, key, default_value=None):
+ if key in dictionary:
+ return dictionary[key]
+ else:
+ return default_value
diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py
new file mode 100644
index 0000000..8e04134
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/math.py
@@ -0,0 +1,116 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import copy
+
+
+def slope(data_series):
+ """
+ This function implements the linear least squares algorithm described in
+ the following wikipedia article :
+ https://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)
+ in the case of m equations (provided by m data points) and 2 unknown
+ variables (x and y, which represent the time and the Volume performance
+ variable being tested e.g. IOPS, latency...).
+ The data_series is currently assumed to follow the pattern :
+ [[x1,y1], [x2,y2], ..., [xm,ym]].
+ If this data pattern were to change, the data_treatement function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Detection module.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ beta2 = None
+
+ else: # The general case
+ data_series = copy.deepcopy(data_series)
+ m = len(data_series)
+ # To make sure at least one element is a float number so the result
+ # of the algorithm be a float number
+ data_series[0][0] = float(data_series[0][0])
+
+ """
+ It consists in solving the normal equations system (2 equations,
+ 2 unknowns) by calculating the value of beta2 (slope).
+ The formula of beta1 (the y-intercept) is given as a comment in
+ case it is needed later.
+ """
+ sum_xi = 0
+ sum_xi_sq = 0
+ sum_yi_xi = 0
+ sum_yi = 0
+ for i in range(0, m):
+ xi = data_series[i][0]
+ yi = data_series[i][1]
+
+ sum_xi += xi
+ sum_xi_sq += xi**2
+ sum_yi_xi += xi * yi
+ sum_yi += yi
+
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
+ # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
+ # needed
+
+ return beta2
+
+
+def range_value(data_series):
+ """
+ This function implements a range algorithm that returns a float number
+ representing the range of the data_series that is passed to it.
+ The data_series being passed is assumed to follow the following data
+ pattern : [y1, y2, y3, ..., ym] where yi represents the ith
+ measuring point of the y variable. The y variable represents the
+ Volume performance being tested (e.g. IOPS, latency...).
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The conversion of the data series from the original pattern to the
+ [y1, y2, y3, ..., ym] pattern is done outside this function
+ so the original pattern can be changed without breaking this function.
+ """
+
+ # In the particular case of an empty data series
+ if len(data_series) == 0:
+ range_value = None
+
+ else: # The general case
+ max_value = max(data_series)
+ min_value = min(data_series)
+ range_value = max_value - min_value
+
+ return range_value
+
+
+def average(data_series):
+ """
+ This function seeks to calculate the average value of the data series
+ given a series following the pattern : [y1, y2, y3, ..., ym].
+ If this data pattern were to change, the data_treatment function
+ should be adjusted to ensure compatibility with the rest of the
+ Steady State Dectection module.
+ The function returns a float number corresponding to the average of the yi.
+ """
+ m = len(data_series)
+
+ if m == 0: # In the particular case of an empty data series
+ average = None
+
+ else:
+ data_sum = 0
+ for value in data_series:
+ data_sum += value
+ average = data_sum / float(m)
+
+ return average
diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py
new file mode 100644
index 0000000..13f609d
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/steady_state.py
@@ -0,0 +1,54 @@
+##############################################################################
+# Copyright (c) 2016 CENGN and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import logging
+
+from storperf.utilities import data_treatment as DataTreatment
+from storperf.utilities import math as math
+
+
+def steady_state(data_series):
+ """
+ This function seeks to detect steady state given on a measurement
+ window given the data series of that measurement window following
+ the pattern : [[x1,y1], [x2,y2], ..., [xm,ym]]. m represents the number
+ of points recorded in the measurement window, x which represents the
+ time, and y which represents the Volume performance variable being
+ tested e.g. IOPS, latency...
+ The function returns a boolean describing wether or not steady state
+ has been reached with the data that is passed to it.
+ """
+
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
+ # Pre conditioning the data to match the algorithms
+ treated_data = DataTreatment.data_treatment(data_series)
+
+ # Calculating useful values invoking dedicated functions
+ slope_value = math.slope(treated_data['slope_data'])
+ range_value = math.range_value(treated_data['range_data'])
+ average_value = math.average(treated_data['average_data'])
+
+ if (slope_value is not None and range_value is not None and
+ average_value is not None):
+ # Verification of the Steady State conditions following the SNIA
+ # definition
+ range_condition = abs(range_value) <= 0.20 * abs(average_value)
+ slope_condition = abs(slope_value) <= 0.10 * abs(average_value)
+
+ steady_state = range_condition and slope_condition
+
+ logger.debug("Range %s <= %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s <= %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
+ else:
+ steady_state = False
+
+ return steady_state
diff --git a/docker/storperf-master/storperf/utilities/thread_gate.py b/docker/storperf-master/storperf/utilities/thread_gate.py
new file mode 100644
index 0000000..38acbb1
--- /dev/null
+++ b/docker/storperf-master/storperf/utilities/thread_gate.py
@@ -0,0 +1,67 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+Creates a gate object that allows synchronization between an arbitrary
+number of callers.
+"""
+import logging
+import time
+from threading import Lock
+
+
+class FailureToReportException(Exception):
+ pass
+
+
+class ThreadGate(object):
+
+ def __init__(self, size, timeout=60):
+ self.logger = logging.getLogger(__name__)
+ self._gate_size = size
+ self._timeout = timeout
+ self._registrants = {}
+ self._creation_time = time.time()
+ self._lock = Lock()
+
+ """
+ Calling this method returns a true or false, indicating that enough
+ of the other registrants have reported in
+ """
+
+ def report(self, gate_id):
+ with self._lock:
+ now = time.time()
+ self._registrants[gate_id] = now
+ ready = True
+ self.logger.debug("Gate report for %s", gate_id)
+
+ total_missing = self._gate_size - len(self._registrants)
+ if total_missing > 0:
+ self.logger.debug("Not all registrants have reported in")
+ time_since_creation = now - self._creation_time
+ if (time_since_creation > (self._timeout * 2)):
+ self.logger.error(
+ "%s registrant(s) have never reported in",
+ total_missing)
+ raise FailureToReportException
+ return False
+
+ for k, v in self._registrants.items():
+ time_since_last_report = now - v
+ if time_since_last_report > self._timeout:
+ self.logger.debug("Registrant %s last reported %s ago",
+ k, time_since_last_report)
+ ready = False
+
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
+ return ready