summaryrefslogtreecommitdiffstats
path: root/storperf
diff options
context:
space:
mode:
Diffstat (limited to 'storperf')
-rw-r--r--storperf/carbon/emitter.py5
-rw-r--r--storperf/db/graphite_db.py36
-rw-r--r--storperf/db/test_results_db.py3
-rw-r--r--storperf/fio/fio_invoker.py23
-rw-r--r--storperf/logging.json29
-rw-r--r--storperf/storperf_master.py4
-rw-r--r--storperf/test_executor.py34
-rw-r--r--storperf/utilities/data_handler.py91
-rw-r--r--storperf/utilities/math.py7
-rw-r--r--storperf/utilities/steady_state.py9
-rw-r--r--storperf/utilities/thread_gate.py5
-rw-r--r--storperf/workloads/_base_workload.py2
12 files changed, 203 insertions, 45 deletions
diff --git a/storperf/carbon/emitter.py b/storperf/carbon/emitter.py
index c9af8a6..e23dc79 100644
--- a/storperf/carbon/emitter.py
+++ b/storperf/carbon/emitter.py
@@ -22,9 +22,8 @@ class CarbonMetricTransmitter():
def transmit_metrics(self, metrics):
if 'timestamp' in metrics:
- timestamp = metrics.pop('timestamp')
- else:
- timestamp = str(calendar.timegm(time.gmtime()))
+ metrics.pop('timestamp')
+ timestamp = str(calendar.timegm(time.gmtime()))
carbon_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
carbon_socket.connect((self.carbon_host, self.carbon_port))
diff --git a/storperf/db/graphite_db.py b/storperf/db/graphite_db.py
index c44d2aa..8be91c8 100644
--- a/storperf/db/graphite_db.py
+++ b/storperf/db/graphite_db.py
@@ -1,4 +1,3 @@
-from storperf.db.job_db import JobDB
import calendar
import json
import logging
@@ -6,6 +5,8 @@ import time
import requests
+from storperf.db.job_db import JobDB
+
class GraphiteDB(object):
@@ -15,6 +16,27 @@ class GraphiteDB(object):
self._job_db = JobDB()
self.logger = logging.getLogger(__name__)
+ def fetch_series(self, workload, metric, io_type, time, duration):
+
+ series = []
+ end = time
+ start = end - duration
+
+ request = ("http://127.0.0.1:8000/render/?target="
+ "averageSeries(%s.*.jobs.1.%s.%s)"
+ "&format=json"
+ "&from=%s"
+ "&until=%s" %
+ (workload, io_type, metric,
+ start, end))
+ self.logger.debug("Calling %s" % (request))
+
+ response = requests.get(request)
+ if (response.status_code == 200):
+ series = self._series_results(json.loads(response.content))
+
+ return series
+
def fetch_averages(self, workload):
workload_executions = self._job_db.fetch_workloads(workload)
@@ -115,6 +137,18 @@ class GraphiteDB(object):
return average
+ def _series_results(self, results):
+
+ series = []
+
+ for item in results:
+ datapoints = item['datapoints']
+ for datapoint in datapoints:
+ if datapoint[0] is not None:
+ series.append([datapoint[1], datapoint[0]])
+
+ return series
+
def make_fullname_pattern(self, workload):
parts = workload.split('.')
wildcards_needed = 7 - len(parts)
diff --git a/storperf/db/test_results_db.py b/storperf/db/test_results_db.py
index 4ee7a52..049b0b8 100644
--- a/storperf/db/test_results_db.py
+++ b/storperf/db/test_results_db.py
@@ -44,7 +44,8 @@ def push_results_to_db(db_url, project, case_name,
headers = {'Content-Type': 'application/json'}
try:
if logger:
- logger.debug("Pushing results to %s" % (url))
+ logger.info("Pushing results to %s" % (url))
+ logger.debug("Parameters: %s" % params)
r = requests.post(url, data=json.dumps(params), headers=headers)
if logger:
logger.debug(r)
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index 315b243..2febf25 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -54,12 +54,14 @@ class FIOInvoker(object):
for event_listener in self.event_listeners:
try:
+ self.logger.debug(
+ "Event listener callback")
event_listener(self.callback_id, json_metric)
except Exception, e:
self.logger.exception(
"Notifying listener %s: %s",
self.callback_id, e)
- self.logger.info(
+ self.logger.debug(
"Event listener callback complete")
except Exception, e:
self.logger.error("Error parsing JSON: %s", e)
@@ -78,7 +80,6 @@ class FIOInvoker(object):
self.logger.debug("Finished")
def execute(self, args=[]):
-
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(self.remote_host, username='storperf',
@@ -87,7 +88,12 @@ class FIOInvoker(object):
command = "sudo ./fio " + ' '.join(args)
self.logger.debug("Remote command: %s" % command)
- (_, stdout, stderr) = ssh.exec_command(command)
+
+ chan = ssh.get_transport().open_session(timeout=None)
+ chan.settimeout(None)
+ chan.exec_command(command)
+ stdout = chan.makefile('r', -1)
+ stderr = chan.makefile_stderr('r', -1)
tout = Thread(target=self.stdout_handler, args=(stdout,),
name="%s stdout" % self._remote_host)
@@ -100,9 +106,18 @@ class FIOInvoker(object):
terr.start()
self.logger.info("Started fio on " + self.remote_host)
+ exit_status = chan.recv_exit_status()
+ self.logger.info("Finished fio on %s with exit code %s" %
+ (self.remote_host, exit_status))
+
+ stdout.close()
+ stderr.close()
+
+ self.logger.debug("Joining stderr handler")
terr.join()
+ self.logger.debug("Joining stdout handler")
tout.join()
- self.logger.info("Finished fio on " + self.remote_host)
+ self.logger.debug("Ended")
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)
diff --git a/storperf/logging.json b/storperf/logging.json
index 6d6026e..74df494 100644
--- a/storperf/logging.json
+++ b/storperf/logging.json
@@ -37,26 +37,15 @@
},
"loggers": {
- "my_module": {
- "level": "ERROR",
- "handlers": ["console"],
- "propagate": "no"
+ "": {
+ "level": "INFO",
+ "handlers": ["console", "file_handler", "error_file_handler"]
+ },
+ "storperf": {
+ "level": "DEBUG"
+ },
+ "storperf.carbon.emitter": {
+ "level": "INFO"
}
- },
-
- "root": {
- "level": "WARN",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf": {
- "level": "DEBUG",
- "handlers": ["console", "file_handler", "error_file_handler"]
- },
-
- "storperf.carbon.emitter": {
- "level": "INFO",
- "handlers": ["console", "file_handler", "error_file_handler"]
}
-
} \ No newline at end of file
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index 35cba72..2975afd 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -378,9 +378,9 @@ class StorPerfMaster(object):
(_, stdout, stderr) = ssh.exec_command(cmd)
for line in stdout.readlines():
- logger.debug(line.decode('utf-8').strip())
+ logger.debug(line.strip())
for line in stderr.readlines():
- logger.error(line.decode('utf-8').strip())
+ logger.error(line.strip())
def _make_parameters(self):
heat_parameters = {}
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 530ce80..8350e43 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -41,6 +41,7 @@ class TestExecutor(object):
self.metadata = {}
self.start_time = None
self.end_time = None
+ self.current_workload = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -106,7 +107,7 @@ class TestExecutor(object):
try:
event_listener(self)
except Exception, e:
- self.logger.error("Notifying listener: %s", e)
+ self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
self.workload_modules = []
@@ -166,15 +167,17 @@ class TestExecutor(object):
self.job_db.record_workload_params(metadata)
self.metadata = metadata
self._workload_thread = Thread(target=self.execute_workloads,
- args=())
+ args=(),
+ name="Workload thread")
self._workload_thread.start()
return self.job_db.job_id
def terminate(self):
self._terminated = True
- return self.terminate_current_run()
+ self.end_time = time.time()
+ return self._terminate_current_run()
- def terminate_current_run(self):
+ def _terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
@@ -222,14 +225,23 @@ class TestExecutor(object):
for blocksize in blocksizes:
for iodepth in iodepths:
- scheduler = sched.scheduler(time.time, time.sleep)
if self._terminated:
return
+ self.current_workload = (
+ "%s.%s.queue-depth.%s.block-size.%s" %
+ (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
+ self.logger.info("Starting run %s" % self.current_workload)
+
+ scheduler = sched.scheduler(time.time, time.sleep)
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run, ())
+ self._terminate_current_run,
+ ())
t = Thread(target=scheduler.run, args=())
t.start()
@@ -244,13 +256,16 @@ class TestExecutor(object):
self._workload_executors.append(slave_workload)
t = Thread(target=self.execute_on_node,
- args=(slave_workload,))
+ args=(slave_workload,),
+ name="%s worker" % slave)
t.daemon = False
t.start()
slave_threads.append(t)
for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
if not scheduler.empty():
try:
@@ -258,7 +273,10 @@ class TestExecutor(object):
except:
pass
+ self.logger.info("Completed run %s" %
+ self.current_workload)
self._workload_executors = []
+ self.current_workload = None
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
index 03c764c..ebf715a 100644
--- a/storperf/utilities/data_handler.py
+++ b/storperf/utilities/data_handler.py
@@ -9,27 +9,112 @@
import logging
import os
+from time import sleep
+import time
from storperf.db import test_results_db
from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import dictionary
+from storperf.utilities import math as math
+from storperf.utilities import steady_state as SteadyState
class DataHandler(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
+ self.samples = 11
"""
"""
def data_event(self, executor):
- self.logger.info("Event received")
-
- # Data lookup
+ self.logger.debug("Event received")
if executor.terminated:
self._push_to_db(executor)
+ else:
+ steady_state = True
+ metrics = {}
+ for metric in ('lat.mean', 'iops', 'bw'):
+ metrics[metric] = {}
+ for io_type in ('read', 'write'):
+ metrics[metric][io_type] = {}
+
+ series = self._lookup_prior_data(executor, metric, io_type)
+ steady = self._evaluate_prior_data(series)
+
+ self.logger.debug("Steady state for %s %s: %s"
+ % (io_type, metric, steady))
+
+ metrics[metric][io_type]['series'] = series
+ metrics[metric][io_type]['steady_state'] = steady
+ treated_data = DataTreatment.data_treatment(series)
+
+ metrics[metric][io_type]['slope'] = \
+ math.slope(treated_data['slope_data'])
+ metrics[metric][io_type]['range'] = \
+ math.range_value(treated_data['range_data'])
+ metrics[metric][io_type]['average'] = \
+ math.average(treated_data['average_data'])
+
+ if not steady:
+ steady_state = False
+
+ executor.metadata['report_data'] = metrics
+ executor.metadata['steady_state'] = steady_state
+
+ if steady_state:
+ executor.terminate()
+
+ def _lookup_prior_data(self, executor, metric, io_type):
+ workload = executor.current_workload
+ graphite_db = GraphiteDB()
+
+ # A bit of a hack here as Carbon might not be finished storing the
+ # data we just sent to it
+ now = int(time.time())
+ backtime = 60 * (self.samples + 2)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ most_recent_time = now
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+
+ delta = now - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ while (delta < 5 or (delta > 60 and delta < 120)):
+ sleep(5)
+ data_series = graphite_db.fetch_series(workload,
+ metric,
+ io_type,
+ now,
+ backtime)
+ if len(data_series) > 0:
+ most_recent_time = data_series[-1][0]
+ delta = time.time() - most_recent_time
+ self.logger.debug("Last update to graphite was %s ago" % delta)
+
+ return data_series
+
+ def _evaluate_prior_data(self, data_series):
+ self.logger.debug("Data series: %s" % data_series)
+ if len(data_series) == 0:
+ return False
+ earliest_timestamp = data_series[0][0]
+ latest_timestamp = data_series[-1][0]
+ duration = latest_timestamp - earliest_timestamp
+ if (duration < 60 * self.samples):
+ self.logger.debug("Only %s minutes of samples, ignoring" %
+ (duration / 60,))
+ return False
+
+ return SteadyState.steady_state(data_series)
def _push_to_db(self, executor):
test_db = os.environ.get('TEST_DB_URL')
diff --git a/storperf/utilities/math.py b/storperf/utilities/math.py
index a11ec19..4ddddca 100644
--- a/storperf/utilities/math.py
+++ b/storperf/utilities/math.py
@@ -52,8 +52,11 @@ def slope(data_series):
sum_yi_xi += xi * yi
sum_yi += yi
- beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / \
- (sum_xi**2 - m * sum_xi_sq) # The slope
+ over = (sum_xi**2 - m * sum_xi_sq)
+ if over == 0:
+ beta2 = None # Infinite slope
+ else:
+ beta2 = (sum_yi * sum_xi - m * sum_yi_xi) / over # The slope
# beta1 = (sum_yi_xi - beta2*sum_xi_sq)/sum_xi #The y-intercept if
# needed
diff --git a/storperf/utilities/steady_state.py b/storperf/utilities/steady_state.py
index 233bc78..0bbe21e 100644
--- a/storperf/utilities/steady_state.py
+++ b/storperf/utilities/steady_state.py
@@ -6,6 +6,8 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import logging
+
from storperf.utilities import data_treatment as DataTreatment
from storperf.utilities import math as math
@@ -22,6 +24,8 @@ def steady_state(data_series):
has been reached with the data that is passed to it.
"""
+ logger = logging.getLogger('storperf.utilities.steady_state')
+
# Pre conditioning the data to match the algorithms
treated_data = DataTreatment.data_treatment(data_series)
@@ -39,6 +43,11 @@ def steady_state(data_series):
steady_state = range_condition and slope_condition
+ logger.debug("Range %s < %s?" % (abs(range_value),
+ (0.20 * abs(average_value))))
+ logger.debug("Slope %s < %s?" % (abs(slope_value),
+ (0.10 * abs(average_value))))
+ logger.debug("Steady State? %s" % steady_state)
else:
steady_state = False
diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
index b0dde50..295b8be 100644
--- a/storperf/utilities/thread_gate.py
+++ b/storperf/utilities/thread_gate.py
@@ -55,4 +55,9 @@ class ThreadGate(object):
k, time_since_last_report)
ready = False
+ self.logger.debug("Gate pass? %s", ready)
+
+ if ready:
+ self._registrants.clear()
+
return ready
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index 874e99c..3896cc6 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -23,7 +23,7 @@ class _base_workload(object):
'bs': '64k',
'iodepth': '1',
'numjobs': '1',
- 'loops': '1',
+ 'loops': '20',
'output-format': 'json',
'status-interval': '60'
}