diff options
author | Mark Beierl <mark.beierl@emc.com> | 2016-01-19 20:58:35 -0500 |
---|---|---|
committer | Mark Beierl <mark.beierl@emc.com> | 2016-01-29 13:43:04 -0500 |
commit | 488a47d945d3ef3dfa9ee37ca0aac3b480ffc800 (patch) | |
tree | 295ea3f6df99884675ba8f21c207bf892f0170bd /storperf/test_executor.py | |
parent | 9960601b321f10a11257832a2ecacb91acf03c53 (diff) |
Remote slave agent workload
Add storperf master object to manage stack
lifecycle.
Add configuration db.
Creation of CLI vs. main so that ReST API
and CLI API can be kept clear.
Fixed License in files.
Changed DB objects to be thread safe.
Added ssh server to container if desired
for CLI.
Change-Id: Idfe84bfb7920e6ce19d27462593c21ea86e7b406
JIRA: STORPERF-29
Signed-off-by: Mark Beierl <mark.beierl@emc.com>
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r-- | storperf/test_executor.py | 83 |
1 files changed, 54 insertions, 29 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 462f06b..a1a817e 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -7,23 +7,20 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +from os import listdir +from os.path import isfile, join +from storperf.carbon.converter import JSONToCarbon +from storperf.carbon.emitter import CarbonMetricTransmitter +from storperf.db.job_db import JobDB +from storperf.fio.fio_invoker import FIOInvoker +from threading import Thread import imp import logging -from os import listdir import os -from os.path import isfile, join -import socket - -from carbon.converter import JSONToCarbon -from carbon.emitter import CarbonMetricTransmitter -from db.job_db import JobDB -from fio.fio_invoker import FIOInvoker class UnknownWorkload(Exception): - - def __init__(self, msg): - self.msg = msg + pass class TestExecutor(object): @@ -31,7 +28,7 @@ class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] - self.filename = "storperf.dat" + self.filename = None self.precondition = True self.warm = True self.event_listeners = set() @@ -39,6 +36,16 @@ class TestExecutor(object): self.metrics_emitter = CarbonMetricTransmitter() self.prefix = None self.job_db = JobDB() + self._slaves = [] + + @property + def slaves(self): + return self._slaves + + @slaves.setter + def slaves(self, slaves): + self.logger.debug("Set slaves to: " + str(slaves)) + self._slaves = slaves def register(self, event_listener): self.event_listeners.add(event_listener) @@ -46,15 +53,15 @@ class TestExecutor(object): def unregister(self, event_listener): self.event_listeners.discard(event_listener) - def event(self, metric): + def event(self, callback_id, metric): carbon_metrics = self.metrics_converter.convert_to_dictionary( metric, - self.prefix) + callback_id) - read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"] - write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"] - read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"] - write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"] + read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"] + write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"] + read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"] + write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"] message = "Average Latency us Read/Write: " + read_latency \ + "/" + write_latency + " IOPS r/w: " + \ @@ -78,9 +85,11 @@ class TestExecutor(object): workloads = [] for filename in workload_files: - mname, ext = os.path.splitext(filename) + mname, _ = os.path.splitext(filename) if (not mname.startswith('_')): workloads.append(mname) + else: + workloads = workloads.split(',') if (self.warm is True): workloads.insert(0, "_warm_up") @@ -94,15 +103,16 @@ class TestExecutor(object): workload + ".py") self.logger.debug("Found: " + str(workload_module)) if(workload_module is None): - raise UnknownWorkload("Unknown workload: " + workload) + raise UnknownWorkload( + "ERROR: Unknown workload: " + workload) self.workload_modules.append(workload_module) except ImportError, err: - raise UnknownWorkload(err) + raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri)) path, fname = os.path.split(uri) - mname, ext = os.path.splitext(fname) + mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): @@ -115,21 +125,34 @@ class TestExecutor(object): def execute(self): - shortname = socket.getfqdn().split('.')[0] + self.job_db.create_job_id() + for slave in self.slaves: + t = Thread(target=self.execute_on_node, args=(slave,)) + t.daemon = False + t.start() + + return self.job_db.job_id + + def execute_on_node(self, remote_host): + + logger = logging.getLogger(__name__ + ":" + remote_host) invoker = FIOInvoker() + invoker.remote_host = remote_host invoker.register(self.event) - self.job_db.create_job_id() - self.logger.info("Starting job " + self.job_db.job_id) + + logger.info( + "Starting job " + self.job_db.job_id + " on " + remote_host) for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) - self.logger.debug( + logger.debug( "Found workload: " + str(constructorMethod)) workload = constructorMethod() - workload.filename = self.filename + if (self.filename is not None): + workload.filename = self.filename workload.invoker = invoker if (workload_name.startswith("_")): @@ -143,6 +166,7 @@ class TestExecutor(object): for iodepth in iodepths: full_workload_name = workload_name + \ + ".host." + remote_host + \ ".queue-depth." + str(iodepth) + \ ".block-size." + str(blocksize) @@ -151,14 +175,15 @@ class TestExecutor(object): self.logger.info( "Executing workload: " + full_workload_name) - self.prefix = shortname + "." + self.job_db.job_id + \ + invoker.callback_id = self.job_db.job_id + \ "." + full_workload_name self.job_db.start_workload(full_workload_name) workload.execute() self.job_db.end_workload(full_workload_name) - self.logger.info("Finished job " + self.job_db.job_id) + logger.info( + "Finished job " + self.job_db.job_id + " on " + remote_host) def fetch_results(self, job, workload_name=""): self.job_db.job_id = job |