summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py34
1 files changed, 26 insertions, 8 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 530ce80..8350e43 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
+from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
-from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -41,6 +41,7 @@ class TestExecutor(object):
self.metadata = {}
self.start_time = None
self.end_time = None
+ self.current_workload = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -106,7 +107,7 @@ class TestExecutor(object):
try:
event_listener(self)
except Exception, e:
- self.logger.error("Notifying listener: %s", e)
+ self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
self.workload_modules = []
@@ -166,15 +167,17 @@ class TestExecutor(object):
self.job_db.record_workload_params(metadata)
self.metadata = metadata
self._workload_thread = Thread(target=self.execute_workloads,
- args=())
+ args=(),
+ name="Workload thread")
self._workload_thread.start()
return self.job_db.job_id
def terminate(self):
self._terminated = True
- return self.terminate_current_run()
+ self.end_time = time.time()
+ return self._terminate_current_run()
- def terminate_current_run(self):
+ def _terminate_current_run(self):
self.logger.info("Terminating current run")
terminated_hosts = []
for workload in self._workload_executors:
@@ -222,14 +225,23 @@ class TestExecutor(object):
for blocksize in blocksizes:
for iodepth in iodepths:
- scheduler = sched.scheduler(time.time, time.sleep)
if self._terminated:
return
+ self.current_workload = (
+ "%s.%s.queue-depth.%s.block-size.%s" %
+ (self.job_db.job_id,
+ workload_name,
+ iodepth,
+ blocksize))
+ self.logger.info("Starting run %s" % self.current_workload)
+
+ scheduler = sched.scheduler(time.time, time.sleep)
if self.deadline is not None \
and not workload_name.startswith("_"):
event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run, ())
+ self._terminate_current_run,
+ ())
t = Thread(target=scheduler.run, args=())
t.start()
@@ -244,13 +256,16 @@ class TestExecutor(object):
self._workload_executors.append(slave_workload)
t = Thread(target=self.execute_on_node,
- args=(slave_workload,))
+ args=(slave_workload,),
+ name="%s worker" % slave)
t.daemon = False
t.start()
slave_threads.append(t)
for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
if not scheduler.empty():
try:
@@ -258,7 +273,10 @@ class TestExecutor(object):
except:
pass
+ self.logger.info("Completed run %s" %
+ self.current_workload)
self._workload_executors = []
+ self.current_workload = None
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))