From 9f5b776c1b5a54d2ca5942424111f3ff55d5737c Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Fri, 2 Dec 2016 22:25:48 -0500 Subject: Steady state detection Detection of steady state after 10+ samples of data Change-Id: I29368b06188c6370d17b3d567fece6486d171235 JIRA: STORPERF-72 STORPERF-73 Signed-off-by: Mark Beierl --- storperf/carbon/emitter.py | 5 +- storperf/db/graphite_db.py | 36 +++++++++++++- storperf/db/test_results_db.py | 3 +- storperf/fio/fio_invoker.py | 23 +++++++-- storperf/logging.json | 29 ++++-------- storperf/storperf_master.py | 4 +- storperf/test_executor.py | 34 ++++++++++---- storperf/utilities/data_handler.py | 91 ++++++++++++++++++++++++++++++++++-- storperf/utilities/math.py | 7 ++- storperf/utilities/steady_state.py | 9 ++++ storperf/utilities/thread_gate.py | 5 ++ storperf/workloads/_base_workload.py | 2 +- 12 files changed, 203 insertions(+), 45 deletions(-) (limited to 'storperf') diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py index c9af8a6..e23dc79 100644 --- a/storperf/carbon/emitter.py +++ b/storperf/carbon/emitter.py @@ -22,9 +22,8 @@ class CarbonMetricTransmitter(): def transmit_metrics(self, metrics): if 'timestamp' in metrics: - timestamp = metrics.pop('timestamp') - else: - timestamp = str(calendar.timegm(time.gmtime())) + metrics.pop('timestamp') + timestamp = str(calendar.timegm(time.gmtime())) carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) carbon_socket.connect((self.carbon_host, self.carbon_port)) diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py index c44d2aa..8be91c8 100644 --- a/storperf/db/graphite_db.py +++ b/storperf/db/graphite_db.py @@ -1,4 +1,3 @@ -from storperf.db.job_db import JobDB import calendar import json import logging @@ -6,6 +5,8 @@ import time import requests +from storperf.db.job_db import JobDB + class GraphiteDB(object): @@ -15,6 +16,27 @@ class GraphiteDB(object): self._job_db = JobDB() self.logger = logging.getLogger(__name__) + def fetch_series(self, workload, metric, io_type, time, duration): + + series = [] + end = time + start = end - duration + + request = ("http://127.0.0.1:8000/render/?target=" + "averageSeries(%s.*.jobs.1.%s.%s)" + "&format=json" + "&from=%s" + "&until=%s" % + (workload, io_type, metric, + start, end)) + self.logger.debug("Calling %s" % (request)) + + response = requests.get(request) + if (response.status_code == 200): + series = self._series_results(json.loads(response.content)) + + return series + def fetch_averages(self, workload): workload_executions = self._job_db.fetch_workloads(workload) @@ -115,6 +137,18 @@ class GraphiteDB(object): return average + def _series_results(self, results): + + series = [] + + for item in results: + datapoints = item['datapoints'] + for datapoint in datapoints: + if datapoint[0] is not None: + series.append([datapoint[1], datapoint[0]]) + + return series + def make_fullname_pattern(self, workload): parts = workload.split('.') wildcards_needed = 7 - len(parts) diff --git a/storperf/db/test_results_db.py b/storperf/db/test_results_db.py index 4ee7a52..049b0b8 100644 --- a/storperf/db/test_results_db.py +++ b/storperf/db/test_results_db.py @@ -44,7 +44,8 @@ def push_results_to_db(db_url, project, case_name, headers = {'Content-Type': 'application/json'} try: if logger: - logger.debug("Pushing results to %s" % (url)) + logger.info("Pushing results to %s" % (url)) + logger.debug("Parameters: %s" % params) r = requests.post(url, data=json.dumps(params), headers=headers) if logger: logger.debug(r) diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index 315b243..2febf25 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -54,12 +54,14 @@ class FIOInvoker(object): for event_listener in self.event_listeners: try: + self.logger.debug( + "Event listener callback") event_listener(self.callback_id, json_metric) except Exception, e: self.logger.exception( "Notifying listener %s: %s", self.callback_id, e) - self.logger.info( + self.logger.debug( "Event listener callback complete") except Exception, e: self.logger.error("Error parsing JSON: %s", e) @@ -78,7 +80,6 @@ class FIOInvoker(object): self.logger.debug("Finished") def execute(self, args=[]): - ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(self.remote_host, username='storperf', @@ -87,7 +88,12 @@ class FIOInvoker(object): command = "sudo ./fio " + ' '.join(args) self.logger.debug("Remote command: %s" % command) - (_, stdout, stderr) = ssh.exec_command(command) + + chan = ssh.get_transport().open_session(timeout=None) + chan.settimeout(None) + chan.exec_command(command) + stdout = chan.makefile('r', -1) + stderr = chan.makefile_stderr('r', -1) tout = Thread(target=self.stdout_handler, args=(stdout,), name="%s stdout" % self._remote_host) @@ -100,9 +106,18 @@ class FIOInvoker(object): terr.start() self.logger.info("Started fio on " + self.remote_host) + exit_status = chan.recv_exit_status() + self.logger.info("Finished fio on %s with exit code %s" % + (self.remote_host, exit_status)) + + stdout.close() + stderr.close() + + self.logger.debug("Joining stderr handler") terr.join() + self.logger.debug("Joining stdout handler") tout.join() - self.logger.info("Finished fio on " + self.remote_host) + self.logger.debug("Ended") def terminate(self): self.logger.debug("Terminating fio on " + self.remote_host) diff --git a/storperf/logging.json b/storperf/logging.json index 6d6026e..74df494 100644 --- a/storperf/logging.json +++ b/storperf/logging.json @@ -37,26 +37,15 @@ }, "loggers": { - "my_module": { - "level": "ERROR", - "handlers": ["console"], - "propagate": "no" + "": { + "level": "INFO", + "handlers": ["console", "file_handler", "error_file_handler"] + }, + "storperf": { + "level": "DEBUG" + }, + "storperf.carbon.emitter": { + "level": "INFO" } - }, - - "root": { - "level": "WARN", - "handlers": ["console", "file_handler", "error_file_handler"] - }, - - "storperf": { - "level": "DEBUG", - "handlers": ["console", "file_handler", "error_file_handler"] - }, - - "storperf.carbon.emitter": { - "level": "INFO", - "handlers": ["console", "file_handler", "error_file_handler"] } - } \ No newline at end of file diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index 35cba72..2975afd 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -378,9 +378,9 @@ class StorPerfMaster(object): (_, stdout, stderr) = ssh.exec_command(cmd) for line in stdout.readlines(): - logger.debug(line.decode('utf-8').strip()) + logger.debug(line.strip()) for line in stderr.readlines(): - logger.error(line.decode('utf-8').strip()) + logger.error(line.strip()) def _make_parameters(self): heat_parameters = {} diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 530ce80..8350e43 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker +from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from utilities.data_handler import DataHandler class UnknownWorkload(Exception): @@ -41,6 +41,7 @@ class TestExecutor(object): self.metadata = {} self.start_time = None self.end_time = None + self.current_workload = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -106,7 +107,7 @@ class TestExecutor(object): try: event_listener(self) except Exception, e: - self.logger.error("Notifying listener: %s", e) + self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): self.workload_modules = [] @@ -166,15 +167,17 @@ class TestExecutor(object): self.job_db.record_workload_params(metadata) self.metadata = metadata self._workload_thread = Thread(target=self.execute_workloads, - args=()) + args=(), + name="Workload thread") self._workload_thread.start() return self.job_db.job_id def terminate(self): self._terminated = True - return self.terminate_current_run() + self.end_time = time.time() + return self._terminate_current_run() - def terminate_current_run(self): + def _terminate_current_run(self): self.logger.info("Terminating current run") terminated_hosts = [] for workload in self._workload_executors: @@ -222,14 +225,23 @@ class TestExecutor(object): for blocksize in blocksizes: for iodepth in iodepths: - scheduler = sched.scheduler(time.time, time.sleep) if self._terminated: return + self.current_workload = ( + "%s.%s.queue-depth.%s.block-size.%s" % + (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) + self.logger.info("Starting run %s" % self.current_workload) + + scheduler = sched.scheduler(time.time, time.sleep) if self.deadline is not None \ and not workload_name.startswith("_"): event = scheduler.enter(self.deadline * 60, 1, - self.terminate_current_run, ()) + self._terminate_current_run, + ()) t = Thread(target=scheduler.run, args=()) t.start() @@ -244,13 +256,16 @@ class TestExecutor(object): self._workload_executors.append(slave_workload) t = Thread(target=self.execute_on_node, - args=(slave_workload,)) + args=(slave_workload,), + name="%s worker" % slave) t.daemon = False t.start() slave_threads.append(t) for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) if not scheduler.empty(): try: @@ -258,7 +273,10 @@ class TestExecutor(object): except: pass + self.logger.info("Completed run %s" % + self.current_workload) self._workload_executors = [] + self.current_workload = None self.logger.info("Completed workload %s" % (workload_name)) self.logger.info("Completed job %s" % (self.job_db.job_id)) diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py index 03c764c..ebf715a 100644 --- a/storperf/utilities/data_handler.py +++ b/storperf/utilities/data_handler.py @@ -9,27 +9,112 @@ import logging import os +from time import sleep +import time from storperf.db import test_results_db from storperf.db.graphite_db import GraphiteDB +from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import dictionary +from storperf.utilities import math as math +from storperf.utilities import steady_state as SteadyState class DataHandler(object): def __init__(self): self.logger = logging.getLogger(__name__) + self.samples = 11 """ """ def data_event(self, executor): - self.logger.info("Event received") - - # Data lookup + self.logger.debug("Event received") if executor.terminated: self._push_to_db(executor) + else: + steady_state = True + metrics = {} + for metric in ('lat.mean', 'iops', 'bw'): + metrics[metric] = {} + for io_type in ('read', 'write'): + metrics[metric][io_type] = {} + + series = self._lookup_prior_data(executor, metric, io_type) + steady = self._evaluate_prior_data(series) + + self.logger.debug("Steady state for %s %s: %s" + % (io_type, metric, steady)) + + metrics[metric][io_type]['series'] = series + metrics[metric][io_type]['steady_state'] = steady + treated_data = DataTreatment.data_treatment(series) + + metrics[metric][io_type]['slope'] = \ + math.slope(treated_data['slope_data']) + metrics[metric][io_type]['range'] = \ + math.range_value(treated_data['range_data']) + metrics[metric][io_type]['average'] = \ + math.average(treated_data['average_data']) + + if not steady: + steady_state = False + + executor.metadata['report_data'] = metrics + executor.metadata['steady_state'] = steady_state + + if steady_state: + executor.terminate() + + def _lookup_prior_data(self, executor, metric, io_type): + workload = executor.current_workload + graphite_db = GraphiteDB() + + # A bit of a hack here as Carbon might not be finished storing the + # data we just sent to it + now = int(time.time()) + backtime = 60 * (self.samples + 2) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + most_recent_time = now + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + + delta = now - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + while (delta < 5 or (delta > 60 and delta < 120)): + sleep(5) + data_series = graphite_db.fetch_series(workload, + metric, + io_type, + now, + backtime) + if len(data_series) > 0: + most_recent_time = data_series[-1][0] + delta = time.time() - most_recent_time + self.logger.debug("Last update to graphite was %s ago" % delta) + + return data_series + + def _evaluate_prior_data(self, data_series): + self.logger.debug("Data series: %s" % data_series) + if len(data_series) == 0: + return False + earliest_timestamp = data_series[0][0] + latest_timestamp = data_series[-1][0] + duration = latest_timestamp - earliest_timestamp + if (duration < 60 * self.samples): + self.logger.debug("Only %s minutes of samples, ignoring" % + (duration / 60,)) + return False + + return SteadyState.steady_state(data_series) def _push_to_db(self, executor): test_db = os.environ.get('TEST_DB_URL') diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py index a11ec19..4ddddca 100644 --- a/storperf/utilities/math.py +++ b/storperf/utilities/math.py @@ -52,8 +52,11 @@ def slope(data_series): sum_yi_xi += xi * yi sum_yi += yi - beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \ - (sum_xi**2 - m * sum_xi_sq) # The slope + over = (sum_xi**2 - m * sum_xi_sq) + if over == 0: + beta2 = None # Infinite slope + else: + beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope # beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if # needed diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py index 233bc78..0bbe21e 100644 --- a/storperf/utilities/steady_state.py +++ b/storperf/utilities/steady_state.py @@ -6,6 +6,8 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import logging + from storperf.utilities import data_treatment as DataTreatment from storperf.utilities import math as math @@ -22,6 +24,8 @@ def steady_state(data_series): has been reached with the data that is passed to it. """ + logger = logging.getLogger('storperf.utilities.steady_state') + # Pre conditioning the data to match the algorithms treated_data = DataTreatment.data_treatment(data_series) @@ -39,6 +43,11 @@ def steady_state(data_series): steady_state = range_condition and slope_condition + logger.debug("Range %s < %s?" % (abs(range_value), + (0.20 * abs(average_value)))) + logger.debug("Slope %s < %s?" % (abs(slope_value), + (0.10 * abs(average_value)))) + logger.debug("Steady State? %s" % steady_state) else: steady_state = False diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py index b0dde50..295b8be 100644 --- a/storperf/utilities/thread_gate.py +++ b/storperf/utilities/thread_gate.py @@ -55,4 +55,9 @@ class ThreadGate(object): k, time_since_last_report) ready = False + self.logger.debug("Gate pass? %s", ready) + + if ready: + self._registrants.clear() + return ready diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index 874e99c..3896cc6 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -23,7 +23,7 @@ class _base_workload(object): 'bs': '64k', 'iodepth': '1', 'numjobs': '1', - 'loops': '1', + 'loops': '20', 'output-format': 'json', 'status-interval': '60' } -- cgit 1.2.3-korg