diff options
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r-- | storperf/test_executor.py | 83 |
1 files changed, 54 insertions, 29 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 462f06b..a1a817e 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -7,23 +7,20 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from os import listdir +from os.path import isfile, join +from storperf.carbon.converter import JSONToCarbon +from storperf.carbon.emitter import CarbonMetricTransmitter +from storperf.db.job_db import JobDB +from storperf.fio.fio_invoker import FIOInvoker +from threading import Thread import imp import logging -from os import listdir import os -from os.path import isfile, join -import socket - -from carbon.converter import JSONToCarbon -from carbon.emitter import CarbonMetricTransmitter -from db.job_db import JobDB -from fio.fio_invoker import FIOInvoker class UnknownWorkload(Exception): - - def __init__(self, msg): - self.msg = msg + pass class TestExecutor(object): @@ -31,7 +28,7 @@ class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] - self.filename = "storperf.dat" + self.filename = None self.precondition = True self.warm = True self.event_listeners = set() @@ -39,6 +36,16 @@ class TestExecutor(object): self.metrics_emitter = CarbonMetricTransmitter() self.prefix = None self.job_db = JobDB() + self._slaves = [] + + @property + def slaves(self): + return self._slaves + + @slaves.setter + def slaves(self, slaves): + self.logger.debug("Set slaves to: " + str(slaves)) + self._slaves = slaves def register(self, event_listener): self.event_listeners.add(event_listener) @@ -46,15 +53,15 @@ class TestExecutor(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def event(self, metric): + def event(self, callback_id, metric): carbon_metrics = self.metrics_converter.convert_to_dictionary( metric, - self.prefix) + callback_id) - read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"] - write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"] + read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] + write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] + read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] + write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] message = "Average Latency us Read/Write: " + read_latency \ + "/" + write_latency + " IOPS r/w: " + \ @@ -78,9 +85,11 @@ class TestExecutor(object): workloads = [] for filename in workload_files: - mname, ext = os.path.splitext(filename) + mname, _ = os.path.splitext(filename) if (not mname.startswith('_')): workloads.append(mname) + else: + workloads = workloads.split(',') if (self.warm is True): workloads.insert(0, "_warm_up") @@ -94,15 +103,16 @@ class TestExecutor(object): workload + ".py") self.logger.debug("Found: " + str(workload_module)) if(workload_module is None): - raise UnknownWorkload("Unknown workload: " + workload) + raise UnknownWorkload( + "ERROR: Unknown workload: " + workload) self.workload_modules.append(workload_module) except ImportError, err: - raise UnknownWorkload(err) + raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) path, fname = os.path.split(uri) - mname, ext = os.path.splitext(fname) + mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): @@ -115,21 +125,34 @@ class TestExecutor(object): def execute(self): - shortname = socket.getfqdn().split('.')[0] + self.job_db.create_job_id() + for slave in self.slaves: + t = Thread(target=self.execute_on_node, args=(slave,)) + t.daemon = False + t.start() + + return self.job_db.job_id + + def execute_on_node(self, remote_host): + + logger = logging.getLogger(__name__ + ":" + remote_host) invoker = FIOInvoker() + invoker.remote_host = remote_host invoker.register(self.event) - self.job_db.create_job_id() - self.logger.info("Starting job " + self.job_db.job_id) + + logger.info( + "Starting job " + self.job_db.job_id + " on " + remote_host) for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - self.logger.debug( + logger.debug( "Found workload: " + str(constructorMethod)) workload = constructorMethod() - workload.filename = self.filename + if (self.filename is not None): + workload.filename = self.filename workload.invoker = invoker if (workload_name.startswith("_")): @@ -143,6 +166,7 @@ class TestExecutor(object): for iodepth in iodepths: full_workload_name = workload_name + \ + ".host." + remote_host + \ ".queue-depth." + str(iodepth) + \ ".block-size." + str(blocksize) @@ -151,14 +175,15 @@ class TestExecutor(object): self.logger.info( "Executing workload: " + full_workload_name) - self.prefix = shortname + "." + self.job_db.job_id + \ + invoker.callback_id = self.job_db.job_id + \ "." + full_workload_name self.job_db.start_workload(full_workload_name) workload.execute() self.job_db.end_workload(full_workload_name) - self.logger.info("Finished job " + self.job_db.job_id) + logger.info( + "Finished job " + self.job_db.job_id + " on " + remote_host) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job |