diff options
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r-- | storperf/test_executor.py | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 530ce80..8350e43 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -21,8 +21,8 @@ from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker +from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate -from utilities.data_handler import DataHandler class UnknownWorkload(Exception): @@ -41,6 +41,7 @@ class TestExecutor(object): self.metadata = {} self.start_time = None self.end_time = None + self.current_workload = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -106,7 +107,7 @@ class TestExecutor(object): try: event_listener(self) except Exception, e: - self.logger.error("Notifying listener: %s", e) + self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): self.workload_modules = [] @@ -166,15 +167,17 @@ class TestExecutor(object): self.job_db.record_workload_params(metadata) self.metadata = metadata self._workload_thread = Thread(target=self.execute_workloads, - args=()) + args=(), + name="Workload thread") self._workload_thread.start() return self.job_db.job_id def terminate(self): self._terminated = True - return self.terminate_current_run() + self.end_time = time.time() + return self._terminate_current_run() - def terminate_current_run(self): + def _terminate_current_run(self): self.logger.info("Terminating current run") terminated_hosts = [] for workload in self._workload_executors: @@ -222,14 +225,23 @@ class TestExecutor(object): for blocksize in blocksizes: for iodepth in iodepths: - scheduler = sched.scheduler(time.time, time.sleep) if self._terminated: return + self.current_workload = ( + "%s.%s.queue-depth.%s.block-size.%s" % + (self.job_db.job_id, + workload_name, + iodepth, + blocksize)) + self.logger.info("Starting run %s" % self.current_workload) + + scheduler = sched.scheduler(time.time, time.sleep) if self.deadline is not None \ and not workload_name.startswith("_"): event = scheduler.enter(self.deadline * 60, 1, - self.terminate_current_run, ()) + self._terminate_current_run, + ()) t = Thread(target=scheduler.run, args=()) t.start() @@ -244,13 +256,16 @@ class TestExecutor(object): self._workload_executors.append(slave_workload) t = Thread(target=self.execute_on_node, - args=(slave_workload,)) + args=(slave_workload,), + name="%s worker" % slave) t.daemon = False t.start() slave_threads.append(t) for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) if not scheduler.empty(): try: @@ -258,7 +273,10 @@ class TestExecutor(object): except: pass + self.logger.info("Completed run %s" % + self.current_workload) self._workload_executors = [] + self.current_workload = None self.logger.info("Completed workload %s" % (workload_name)) self.logger.info("Completed job %s" % (self.job_db.job_id)) |