diff options
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r-- | storperf/test_executor.py | 84 |
1 files changed, 33 insertions, 51 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 734b514..497d17c 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from threading import Thread +import copy import imp import logging import os @@ -37,6 +38,7 @@ class TestExecutor(object): self.prefix = None self.job_db = JobDB() self._slaves = [] + self._workload_thread = None @property def slaves(self): @@ -58,22 +60,9 @@ class TestExecutor(object): metric, callback_id) - read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] - write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] - - message = "Average Latency us Read/Write: " + read_latency \ - + "/" + write_latency + " IOPS r/w: " + \ - read_iops + "/" + write_iops - - for event_listener in self.event_listeners: - event_listener(message) - self.metrics_emitter.transmit_metrics(carbon_metrics) def register_workloads(self, workloads): - if (workloads is None or len(workloads) == 0): workload_dir = os.path.normpath( os.path.join(os.path.dirname(__file__), "workloads")) @@ -124,66 +113,59 @@ class TestExecutor(object): return None def execute(self): - self.job_db.create_job_id() - for slave in self.slaves: - t = Thread(target=self.execute_on_node, args=(slave,)) - t.daemon = False - t.start() - + self._workload_thread = Thread(target=self.execute_workloads, args=()) + self._workload_thread.start() return self.job_db.job_id - def execute_on_node(self, remote_host): - - logger = logging.getLogger(__name__ + ":" + remote_host) - - invoker = FIOInvoker() - invoker.remote_host = remote_host - invoker.register(self.event) - - logger.info( - "Starting job " + self.job_db.job_id + " on " + remote_host) - + def execute_workloads(self): for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - logger.debug( - "Found workload: " + str(constructorMethod)) workload = constructorMethod() if (self.filename is not None): workload.filename = self.filename - workload.invoker = invoker if (workload_name.startswith("_")): iodepths = [2, ] - blocksizes = [65536, ] + blocksizes = [8192, ] else: iodepths = [1, 16, 128] - blocksizes = [4096, 65536, 1048576] + blocksizes = [8192, 4096, 512] + + workload.id = self.job_db.job_id for blocksize in blocksizes: for iodepth in iodepths: - - full_workload_name = workload_name + \ - ".host." + remote_host + \ - ".queue-depth." + str(iodepth) + \ - ".block-size." + str(blocksize) - workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) - self.logger.info( - "Executing workload: " + full_workload_name) - invoker.callback_id = self.job_db.job_id + \ - "." + full_workload_name + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(workload) + slave_workload.remote_host = slave + t = Thread(target=self.execute_on_node, + args=(slave_workload,)) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + slave_thread.join() + + def execute_on_node(self, workload): + + invoker = FIOInvoker() + invoker.register(self.event) + workload.invoker = invoker + + self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(full_workload_name) - workload.execute() - self.job_db.end_workload(full_workload_name) + self.job_db.start_workload(workload) + workload.execute() + self.job_db.end_workload(workload) - logger.info( - "Finished job " + self.job_db.job_id + " on " + remote_host) + self.logger.info("Ended " + workload.fullname) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job |