summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-04-25 09:55:03 -0400
committerMark Beierl <mark.beierl@emc.com>2016-04-25 10:51:31 -0400
commit311eee3bec00d5acc32b6eba76a7ff0d1990f4b2 (patch)
tree9fe4ee0f33687ebaed1c82ff3bea0d569039cc78 /storperf/test_executor.py
parent07a375f25ee831100ecf21e7bb9c1bdcd3b960f5 (diff)
Job run lifecycle rework
Change the way slave jobs are managed so that they are in step with each other, and we can track the overall thread that is running them. This lays groundwork for STORPERF-20 and STORPERF-44 JIRA: STORPERF-33 STORPERF-43 Change-Id: Iaff48a2823ba85d6512e9782fd9091a72639835c Signed-off-by: Mark Beierl <mark.beierl@emc.com>
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py84
1 files changed, 33 insertions, 51 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 734b514..497d17c 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -14,6 +14,7 @@ from storperf.carbon.emitter import CarbonMetricTransmitter
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from threading import Thread
+import copy
import imp
import logging
import os
@@ -37,6 +38,7 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._workload_thread = None
@property
def slaves(self):
@@ -58,22 +60,9 @@ class TestExecutor(object):
metric,
callback_id)
- read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
-
- message = "Average Latency us Read/Write: " + read_latency \
- + "/" + write_latency + " IOPS r/w: " + \
- read_iops + "/" + write_iops
-
- for event_listener in self.event_listeners:
- event_listener(message)
-
self.metrics_emitter.transmit_metrics(carbon_metrics)
def register_workloads(self, workloads):
-
if (workloads is None or len(workloads) == 0):
workload_dir = os.path.normpath(
os.path.join(os.path.dirname(__file__), "workloads"))
@@ -124,66 +113,59 @@ class TestExecutor(object):
return None
def execute(self):
-
self.job_db.create_job_id()
- for slave in self.slaves:
- t = Thread(target=self.execute_on_node, args=(slave,))
- t.daemon = False
- t.start()
-
+ self._workload_thread = Thread(target=self.execute_workloads, args=())
+ self._workload_thread.start()
return self.job_db.job_id
- def execute_on_node(self, remote_host):
-
- logger = logging.getLogger(__name__ + ":" + remote_host)
-
- invoker = FIOInvoker()
- invoker.remote_host = remote_host
- invoker.register(self.event)
-
- logger.info(
- "Starting job " + self.job_db.job_id + " on " + remote_host)
-
+ def execute_workloads(self):
for workload_module in self.workload_modules:
-
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- logger.debug(
- "Found workload: " + str(constructorMethod))
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
- workload.invoker = invoker
if (workload_name.startswith("_")):
iodepths = [2, ]
- blocksizes = [65536, ]
+ blocksizes = [8192, ]
else:
iodepths = [1, 16, 128]
- blocksizes = [4096, 65536, 1048576]
+ blocksizes = [8192, 4096, 512]
+
+ workload.id = self.job_db.job_id
for blocksize in blocksizes:
for iodepth in iodepths:
-
- full_workload_name = workload_name + \
- ".host." + remote_host + \
- ".queue-depth." + str(iodepth) + \
- ".block-size." + str(blocksize)
-
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
- self.logger.info(
- "Executing workload: " + full_workload_name)
- invoker.callback_id = self.job_db.job_id + \
- "." + full_workload_name
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(workload)
+ slave_workload.remote_host = slave
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,))
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ slave_thread.join()
+
+ def execute_on_node(self, workload):
+
+ invoker = FIOInvoker()
+ invoker.register(self.event)
+ workload.invoker = invoker
+
+ self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(full_workload_name)
- workload.execute()
- self.job_db.end_workload(full_workload_name)
+ self.job_db.start_workload(workload)
+ workload.execute()
+ self.job_db.end_workload(workload)
- logger.info(
- "Finished job " + self.job_db.job_id + " on " + remote_host)
+ self.logger.info("Ended " + workload.fullname)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job