diff options
Diffstat (limited to 'docker/storperf-master/storperf/utilities')
7 files changed, 504 insertions, 0 deletions
diff --git a/docker/storperf-master/storperf/utilities/__init__.py b/docker/storperf-master/storperf/utilities/__init__.py new file mode 100644 index 0000000..73444b6 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/__init__.py @@ -0,0 +1,8 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py new file mode 100644 index 0000000..1da869c --- /dev/null +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -0,0 +1,205 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import logging +import os +from storperf.db import test_results_db +from storperf.db.graphite_db import GraphiteDB +from storperf.db.job_db import JobDB +from storperf.utilities import data_treatment as DataTreatment +from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState +from time import sleep +import time + + +class DataHandler(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + self.job_db = JobDB() + + """ + """ + + def data_event(self, executor): + self.logger.debug("Event received") + + if executor.terminated: + self._push_to_db(executor) + else: + workload = '.'.join(executor.current_workload.split('.')[1:6]) + if 'metrics' not in executor.metadata: + executor.metadata['metrics'] = {} + + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + series = self._convert_timestamps_to_samples( + executor, series) + steady = self._evaluate_prior_data( + series, executor.steady_state_samples) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + average = math.average(treated_data['average_data']) + metrics[metric][io_type]['average'] = average + + metrics_key = '%s.%s.%s' % (workload, io_type, metric) + executor.metadata['metrics'][metrics_key] = average + + if not steady: + steady_state = False + + if 'report_data' not in executor.metadata: + executor.metadata['report_data'] = {} + + if 'steady_state' not in executor.metadata: + executor.metadata['steady_state'] = {} + + executor.metadata['report_data'][workload] = metrics + executor.metadata['steady_state'][workload] = steady_state + + workload_name = executor.current_workload.split('.')[1] + + if steady_state and not workload_name.startswith('_'): + executor.terminate_current_run() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (executor.steady_state_samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _convert_timestamps_to_samples(self, executor, series): + workload_record = self.job_db.fetch_workloads( + executor.current_workload) + start_time = int(workload_record[0][1]) + + normalized_series = [] + + for item in series: + elapsed = (item[0] - start_time) + sample_number = int(round(float(elapsed) / 60)) + normalized_series.append([sample_number, item[1]]) + + return normalized_series + + def _evaluate_prior_data(self, data_series, samples): + self.logger.debug("Data series: %s" % data_series) + number_of_samples = len(data_series) + + if number_of_samples == 0: + return False + if (number_of_samples < samples): + self.logger.debug("Only %s samples, ignoring" % number_of_samples) + return False + + return SteadyState.steady_state(data_series) + + def _push_to_db(self, executor): + pod_name = dictionary.get_key_from_dict(executor.metadata, + 'pod_name', + 'Unknown') + version = dictionary.get_key_from_dict(executor.metadata, + 'version', + 'Unknown') + scenario = dictionary.get_key_from_dict(executor.metadata, + 'scenario_name', + 'Unknown') + build_tag = dictionary.get_key_from_dict(executor.metadata, + 'build_tag', + 'Unknown') + test_case = dictionary.get_key_from_dict(executor.metadata, + 'test_case', + 'Unknown') + duration = executor.end_time - executor.start_time + + payload = executor.metadata + + steady_state = True + for _, value in executor.metadata['steady_state'].items(): + steady_state = steady_state and value + + payload['timestart'] = executor.start_time + payload['duration'] = duration + + if steady_state: + criteria = 'PASS' + else: + criteria = 'FAIL' + + start_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.start_time)) + + end_time = time.strftime('%Y-%m-%d %H:%M:%S', + time.gmtime(executor.end_time)) + + test_db = os.environ.get('TEST_DB_URL') + if test_db is not None: + self.logger.info("Pushing results to %s" % (test_db)) + try: + response = test_results_db.push_results_to_db(test_db, + "storperf", + test_case, + start_time, + end_time, + self.logger, + pod_name, + version, + scenario, + criteria, + build_tag, + payload) + executor.result_url = response['href'] + except: + self.logger.exception("Error pushing results into Database") diff --git a/docker/storperf-master/storperf/utilities/data_treatment.py b/docker/storperf-master/storperf/utilities/data_treatment.py new file mode 100644 index 0000000..2368fd9 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/data_treatment.py @@ -0,0 +1,39 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + + +def data_treatment(data_series): + """ + This function aims at performing any necessary pre treatment on the + data_series passed to the steady_state function before being passed + under to the different math utilities (slope, range and average) + so the data can match the requirements of each algorithm. + The function returns a dictionary composed of three values that can be + accessed with the following keys : 'slope_data', 'range_data' and + 'average_data'. + The data_series is currently assumed to follow the pattern : + [[x1,y1], [x2,y2], ..., [xm,ym]]. If this pattern were to change, or + the input data pattern of one of the math module, this data_treatment + function should be the only part of the Steady State detection module + that would need to be modified too. + """ + + x_values = [] + y_values = [] + for l in data_series: + x_values.append(l[0]) + y_values.append(l[1]) + + treated_data = { + 'slope_data': data_series, # No treatment necessary so far + 'range_data': y_values, # The y_values only + 'average_data': y_values + } + + return treated_data diff --git a/docker/storperf-master/storperf/utilities/dictionary.py b/docker/storperf-master/storperf/utilities/dictionary.py new file mode 100644 index 0000000..95f625c --- /dev/null +++ b/docker/storperf-master/storperf/utilities/dictionary.py @@ -0,0 +1,15 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + + +def get_key_from_dict(dictionary, key, default_value=None): + if key in dictionary: + return dictionary[key] + else: + return default_value diff --git a/docker/storperf-master/storperf/utilities/math.py b/docker/storperf-master/storperf/utilities/math.py new file mode 100644 index 0000000..8e04134 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/math.py @@ -0,0 +1,116 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import copy + + +def slope(data_series): + """ + This function implements the linear least squares algorithm described in + the following wikipedia article : + https://en.wikipedia.org/wiki/Linear_least_squares_(mathematics) + in the case of m equations (provided by m data points) and 2 unknown + variables (x and y, which represent the time and the Volume performance + variable being tested e.g. IOPS, latency...). + The data_series is currently assumed to follow the pattern : + [[x1,y1], [x2,y2], ..., [xm,ym]]. + If this data pattern were to change, the data_treatement function + should be adjusted to ensure compatibility with the rest of the + Steady State Detection module. + """ + + # In the particular case of an empty data series + if len(data_series) == 0: + beta2 = None + + else: # The general case + data_series = copy.deepcopy(data_series) + m = len(data_series) + # To make sure at least one element is a float number so the result + # of the algorithm be a float number + data_series[0][0] = float(data_series[0][0]) + + """ + It consists in solving the normal equations system (2 equations, + 2 unknowns) by calculating the value of beta2 (slope). + The formula of beta1 (the y-intercept) is given as a comment in + case it is needed later. + """ + sum_xi = 0 + sum_xi_sq = 0 + sum_yi_xi = 0 + sum_yi = 0 + for i in range(0, m): + xi = data_series[i][0] + yi = data_series[i][1] + + sum_xi += xi + sum_xi_sq += xi**2 + sum_yi_xi += xi * yi + sum_yi += yi + + over = (sum_xi**2 - m * sum_xi_sq) + if over == 0: + beta2 = None # Infinite slope + else: + beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope + # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if + # needed + + return beta2 + + +def range_value(data_series): + """ + This function implements a range algorithm that returns a float number + representing the range of the data_series that is passed to it. + The data_series being passed is assumed to follow the following data + pattern : [y1, y2, y3, ..., ym] where yi represents the ith + measuring point of the y variable. The y variable represents the + Volume performance being tested (e.g. IOPS, latency...). + If this data pattern were to change, the data_treatment function + should be adjusted to ensure compatibility with the rest of the + Steady State Dectection module. + The conversion of the data series from the original pattern to the + [y1, y2, y3, ..., ym] pattern is done outside this function + so the original pattern can be changed without breaking this function. + """ + + # In the particular case of an empty data series + if len(data_series) == 0: + range_value = None + + else: # The general case + max_value = max(data_series) + min_value = min(data_series) + range_value = max_value - min_value + + return range_value + + +def average(data_series): + """ + This function seeks to calculate the average value of the data series + given a series following the pattern : [y1, y2, y3, ..., ym]. + If this data pattern were to change, the data_treatment function + should be adjusted to ensure compatibility with the rest of the + Steady State Dectection module. + The function returns a float number corresponding to the average of the yi. + """ + m = len(data_series) + + if m == 0: # In the particular case of an empty data series + average = None + + else: + data_sum = 0 + for value in data_series: + data_sum += value + average = data_sum / float(m) + + return average diff --git a/docker/storperf-master/storperf/utilities/steady_state.py b/docker/storperf-master/storperf/utilities/steady_state.py new file mode 100644 index 0000000..13f609d --- /dev/null +++ b/docker/storperf-master/storperf/utilities/steady_state.py @@ -0,0 +1,54 @@ +############################################################################## +# Copyright (c) 2016 CENGN and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import logging + +from storperf.utilities import data_treatment as DataTreatment +from storperf.utilities import math as math + + +def steady_state(data_series): + """ + This function seeks to detect steady state given on a measurement + window given the data series of that measurement window following + the pattern : [[x1,y1], [x2,y2], ..., [xm,ym]]. m represents the number + of points recorded in the measurement window, x which represents the + time, and y which represents the Volume performance variable being + tested e.g. IOPS, latency... + The function returns a boolean describing wether or not steady state + has been reached with the data that is passed to it. + """ + + logger = logging.getLogger('storperf.utilities.steady_state') + + # Pre conditioning the data to match the algorithms + treated_data = DataTreatment.data_treatment(data_series) + + # Calculating useful values invoking dedicated functions + slope_value = math.slope(treated_data['slope_data']) + range_value = math.range_value(treated_data['range_data']) + average_value = math.average(treated_data['average_data']) + + if (slope_value is not None and range_value is not None and + average_value is not None): + # Verification of the Steady State conditions following the SNIA + # definition + range_condition = abs(range_value) <= 0.20 * abs(average_value) + slope_condition = abs(slope_value) <= 0.10 * abs(average_value) + + steady_state = range_condition and slope_condition + + logger.debug("Range %s <= %s?" % (abs(range_value), + (0.20 * abs(average_value)))) + logger.debug("Slope %s <= %s?" % (abs(slope_value), + (0.10 * abs(average_value)))) + logger.debug("Steady State? %s" % steady_state) + else: + steady_state = False + + return steady_state diff --git a/docker/storperf-master/storperf/utilities/thread_gate.py b/docker/storperf-master/storperf/utilities/thread_gate.py new file mode 100644 index 0000000..38acbb1 --- /dev/null +++ b/docker/storperf-master/storperf/utilities/thread_gate.py @@ -0,0 +1,67 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +""" +Creates a gate object that allows synchronization between an arbitrary +number of callers. +""" +import logging +import time +from threading import Lock + + +class FailureToReportException(Exception): + pass + + +class ThreadGate(object): + + def __init__(self, size, timeout=60): + self.logger = logging.getLogger(__name__) + self._gate_size = size + self._timeout = timeout + self._registrants = {} + self._creation_time = time.time() + self._lock = Lock() + + """ + Calling this method returns a true or false, indicating that enough + of the other registrants have reported in + """ + + def report(self, gate_id): + with self._lock: + now = time.time() + self._registrants[gate_id] = now + ready = True + self.logger.debug("Gate report for %s", gate_id) + + total_missing = self._gate_size - len(self._registrants) + if total_missing > 0: + self.logger.debug("Not all registrants have reported in") + time_since_creation = now - self._creation_time + if (time_since_creation > (self._timeout * 2)): + self.logger.error( + "%s registrant(s) have never reported in", + total_missing) + raise FailureToReportException + return False + + for k, v in self._registrants.items(): + time_since_last_report = now - v + if time_since_last_report > self._timeout: + self.logger.debug("Registrant %s last reported %s ago", + k, time_since_last_report) + ready = False + + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + + return ready |