summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2017-09-06 22:17:52 -0400
committermbeierl <mark.beierl@dell.com>2017-09-06 22:21:59 -0400
commit700c35d7419b8ee94059c14fab0f1fbc80ca1c60 (patch)
tree3bd2b90d0c9265941b2eeb2f5683120d783a1162 /docker/storperf-master
parent4b9014d25065f9578e0a4535086e89b48958c543 (diff)
Fix Job Status Report
Changes the loop for creating job status and list of job workloads to run down to a single function instead of two loops. Uses single function to drive both status and job workloads to execute. Change-Id: Ia173b8450a857d032a862d03c62bfc1b248583da JIRA: STORPERF-186 Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master')
-rw-r--r--docker/storperf-master/storperf/test_executor.py171
1 files changed, 90 insertions, 81 deletions
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 0e3fce0..4c2c972 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -235,25 +235,90 @@ class TestExecutor(object):
self.start_time = time.time()
self.workload_status = {}
- # Prepare stats list
- for workload_module in self.workload_modules:
- workload_name = getattr(workload_module, "__name__")
- blocksizes = self._block_sizes
- iodepths = self._queue_depths
- for blocksize in blocksizes:
- for iodepth in iodepths:
- name = '%s.%s.queue-depth.%s.block-size.%s' % \
- (self.job_db.job_id, workload_name, iodepth, blocksize)
- self.workload_status[name] = "Pending"
+
+ workloads = self._create_workload_matrix()
+
+ for current_workload in workloads:
+ workload = current_workload['workload']
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
+
+ if self._terminated:
+ return
+ self.current_workload = current_workload['name']
+
+ self.logger.info("Starting run %s" % self.current_workload)
+ self.workload_status[self.current_workload] = "Running"
+
+ scheduler = sched.scheduler(time.time, time.sleep)
+ if self.deadline is not None \
+ and not current_workload['workload_name'].startswith("_"):
+ event = scheduler.enter(self.deadline * 60, 1,
+ self.terminate_current_run,
+ ())
+ t = Thread(target=scheduler.run, args=())
+ t.start()
+
+ workload.options['iodepth'] = str(current_workload['queue-depth'])
+ workload.options['bs'] = str(current_workload['blocksize'])
+
+ slave_threads = []
+ for slave in self.slaves:
+ slave_workload = copy.copy(current_workload['workload'])
+ slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
+ t = Thread(target=self.execute_on_node,
+ args=(slave_workload,),
+ name="%s worker" % slave)
+ t.daemon = False
+ t.start()
+ slave_threads.append(t)
+
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ slave_thread.join()
+ self.logger.debug("Done waiting for %s" % slave_thread)
+
+ if not scheduler.empty():
+ try:
+ scheduler.cancel(event)
+ except ValueError:
+ pass
+
+ self.logger.info("Completed run %s"
+ % self.current_workload)
+ self.workload_status[self.current_workload] = "Completed"
+ self._workload_executors = []
+ self.current_workload = None
+
+ self.logger.info("Completed job %s" % (self.job_db.job_id))
+
+ self.end_time = time.time()
+ self._terminated = True
+ self.broadcast_event()
+ self.unregister(data_handler.data_event)
+ report = {'report': json.dumps(self.metadata)}
+ self.job_db.record_workload_params(report)
+ self.job_db.job_id = None
+ if self.result_url is not None:
+ self.logger.info("Results can be found at %s" % self.result_url)
+
+ def _create_workload_matrix(self):
+ workloads = []
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
- self.logger.info("Starting workload %s" % (workload_name))
constructorMethod = getattr(workload_module, workload_name)
workload = constructorMethod()
if (self.filename is not None):
workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ if (self.filename is not None):
+ workload.filename = self.filename
if (workload_name.startswith("_")):
iodepths = [8, ]
@@ -262,81 +327,25 @@ class TestExecutor(object):
iodepths = self._queue_depths
blocksizes = self._block_sizes
- workload.id = self.job_db.job_id
- self._thread_gate = ThreadGate(len(self.slaves),
- workload.options['status-interval'])
-
for blocksize in blocksizes:
for iodepth in iodepths:
- if self._terminated:
- return
- self.current_workload = (
- "%s.%s.queue-depth.%s.block-size.%s"
- % (self.job_db.job_id,
- workload_name,
- iodepth,
- blocksize))
-
- self.logger.info("Starting run %s" % self.current_workload)
- self.workload_status[self.current_workload] = "Running"
-
- scheduler = sched.scheduler(time.time, time.sleep)
- if self.deadline is not None \
- and not workload_name.startswith("_"):
- event = scheduler.enter(self.deadline * 60, 1,
- self.terminate_current_run,
- ())
- t = Thread(target=scheduler.run, args=())
- t.start()
-
- workload.options['iodepth'] = str(iodepth)
- workload.options['bs'] = str(blocksize)
-
- slave_threads = []
- for slave in self.slaves:
- slave_workload = copy.copy(workload)
- slave_workload.remote_host = slave
-
- self._workload_executors.append(slave_workload)
-
- t = Thread(target=self.execute_on_node,
- args=(slave_workload,),
- name="%s worker" % slave)
- t.daemon = False
- t.start()
- slave_threads.append(t)
-
- for slave_thread in slave_threads:
- self.logger.debug("Waiting on %s" % slave_thread)
- slave_thread.join()
- self.logger.debug("Done waiting for %s" % slave_thread)
-
- if not scheduler.empty():
- try:
- scheduler.cancel(event)
- except ValueError:
- pass
-
- self.logger.info("Completed run %s"
- % self.current_workload)
- self.workload_status[self.current_workload] = "Completed"
- self._workload_executors = []
- self.current_workload = None
-
- self.logger.info("Completed workload %s" % (workload_name))
- self.logger.info("Completed job %s" % (self.job_db.job_id))
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth, blocksize)
+ self.workload_status[name] = "Pending"
- if self.result_url is not None:
- self.logger.info("Results can be found at %s" % self.result_url)
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
- self.end_time = time.time()
- self._terminated = True
- self.broadcast_event()
- self.unregister(data_handler.data_event)
- report = {'report': json.dumps(self.metadata)}
- self.job_db.record_workload_params(report)
- self.job_db.job_id = None
+ self.logger.info("Workload %s=%s" % (name, parameters))
+
+ workloads.append(parameters)
+
+ return workloads
def execute_on_node(self, workload):