summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py84
1 files changed, 33 insertions, 51 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 734b514..497d17c 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from threading import Thread
+import copy
import imp
import logging
import os
@@ -37,6 +38,7 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._workload_thread = None
@property
def slaves(self):
@@ -58,22 +60,9 @@ class TestExecutor(object):
metric,
callback_id)
- read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
- message = "Average Latency us Read/Write: " + read_latency \
- + "/" + write_latency + " IOPS r/w: " + \
- read_iops + "/" + write_iops
-
- for event_listener in self.event_listeners:
- event_listener(message)
-
self.metrics_emitter.transmit_metrics(carbon_metrics)
def register_workloads(self, workloads):
-
if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
@@ -124,66 +113,59 @@ class TestExecutor(object):
return None
def execute(self):
-
self.job_db.create_job_id()
- for slave in self.slaves:
- t = Thread(target=self.execute_on_node, args=(slave,))
- t.daemon = False
- t.start()
-
+ self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self._workload_thread.start()
return self.job_db.job_id
- def execute_on_node(self, remote_host):
-
- logger = logging.getLogger(__name__ + ":" + remote_host)
-
- invoker = FIOInvoker()
- invoker.remote_host = remote_host
- invoker.register(self.event)
-
- logger.info(
- "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+ def execute_workloads(self):
for workload_module in self.workload_modules:
-
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- logger.debug(
- "Found workload: " + str(constructorMethod))
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
- workload.invoker = invoker
if (workload_name.startswith("_")):
iodepths = [2, ]
- blocksizes = [65536, ]
+ blocksizes = [8192, ]
else:
iodepths = [1, 16, 128]
- blocksizes = [4096, 65536, 1048576]
+ blocksizes = [8192, 4096, 512]
+
+ workload.id = self.job_db.job_id
for blocksize in blocksizes:
for iodepth in iodepths:
-
- full_workload_name = workload_name + \
- ".host." + remote_host + \
- ".queue-depth." + str(iodepth) + \
- ".block-size." + str(blocksize)
-
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
- self.logger.info(
- "Executing workload: " + full_workload_name)
- invoker.callback_id = self.job_db.job_id + \
- "." + full_workload_name
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,))
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ slave_thread.join()
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(full_workload_name)
- workload.execute()
- self.job_db.end_workload(full_workload_name)
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
- logger.info(
- "Finished job " + self.job_db.job_id + " on " + remote_host)
+ self.logger.info("Ended " + workload.fullname)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job