From 700c35d7419b8ee94059c14fab0f1fbc80ca1c60 Mon Sep 17 00:00:00 2001 From: mbeierl Date: Wed, 6 Sep 2017 22:17:52 -0400 Subject: Fix Job Status Report Changes the loop for creating job status and list of job workloads to run down to a single function instead of two loops. Uses single function to drive both status and job workloads to execute. Change-Id: Ia173b8450a857d032a862d03c62bfc1b248583da JIRA: STORPERF-186 Signed-off-by: mbeierl --- docker/storperf-master/storperf/test_executor.py | 171 ++++++++++++----------- 1 file changed, 90 insertions(+), 81 deletions(-) diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 0e3fce0..4c2c972 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -235,23 +235,88 @@ class TestExecutor(object): self.start_time = time.time() self.workload_status = {} - # Prepare stats list - for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") - blocksizes = self._block_sizes - iodepths = self._queue_depths - for blocksize in blocksizes: - for iodepth in iodepths: - name = '%s.%s.queue-depth.%s.block-size.%s' % \ - (self.job_db.job_id, workload_name, iodepth, blocksize) - self.workload_status[name] = "Pending" + + workloads = self._create_workload_matrix() + + for current_workload in workloads: + workload = current_workload['workload'] + self._thread_gate = ThreadGate(len(self.slaves), + workload.options['status-interval']) + + if self._terminated: + return + self.current_workload = current_workload['name'] + + self.logger.info("Starting run %s" % self.current_workload) + self.workload_status[self.current_workload] = "Running" + + scheduler = sched.scheduler(time.time, time.sleep) + if self.deadline is not None \ + and not current_workload['workload_name'].startswith("_"): + event = scheduler.enter(self.deadline * 60, 1, + self.terminate_current_run, + ()) + t = Thread(target=scheduler.run, args=()) + t.start() + + workload.options['iodepth'] = str(current_workload['queue-depth']) + workload.options['bs'] = str(current_workload['blocksize']) + + slave_threads = [] + for slave in self.slaves: + slave_workload = copy.copy(current_workload['workload']) + slave_workload.remote_host = slave + + self._workload_executors.append(slave_workload) + + t = Thread(target=self.execute_on_node, + args=(slave_workload,), + name="%s worker" % slave) + t.daemon = False + t.start() + slave_threads.append(t) + + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + slave_thread.join() + self.logger.debug("Done waiting for %s" % slave_thread) + + if not scheduler.empty(): + try: + scheduler.cancel(event) + except ValueError: + pass + + self.logger.info("Completed run %s" + % self.current_workload) + self.workload_status[self.current_workload] = "Completed" + self._workload_executors = [] + self.current_workload = None + + self.logger.info("Completed job %s" % (self.job_db.job_id)) + + self.end_time = time.time() + self._terminated = True + self.broadcast_event() + self.unregister(data_handler.data_event) + report = {'report': json.dumps(self.metadata)} + self.job_db.record_workload_params(report) + self.job_db.job_id = None + if self.result_url is not None: + self.logger.info("Results can be found at %s" % self.result_url) + + def _create_workload_matrix(self): + workloads = [] for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") - self.logger.info("Starting workload %s" % (workload_name)) constructorMethod = getattr(workload_module, workload_name) workload = constructorMethod() + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + if (self.filename is not None): workload.filename = self.filename @@ -262,81 +327,25 @@ class TestExecutor(object): iodepths = self._queue_depths blocksizes = self._block_sizes - workload.id = self.job_db.job_id - self._thread_gate = ThreadGate(len(self.slaves), - workload.options['status-interval']) - for blocksize in blocksizes: for iodepth in iodepths: - if self._terminated: - return - self.current_workload = ( - "%s.%s.queue-depth.%s.block-size.%s" - % (self.job_db.job_id, - workload_name, - iodepth, - blocksize)) - - self.logger.info("Starting run %s" % self.current_workload) - self.workload_status[self.current_workload] = "Running" - - scheduler = sched.scheduler(time.time, time.sleep) - if self.deadline is not None \ - and not workload_name.startswith("_"): - event = scheduler.enter(self.deadline * 60, 1, - self.terminate_current_run, - ()) - t = Thread(target=scheduler.run, args=()) - t.start() - - workload.options['iodepth'] = str(iodepth) - workload.options['bs'] = str(blocksize) - - slave_threads = [] - for slave in self.slaves: - slave_workload = copy.copy(workload) - slave_workload.remote_host = slave - - self._workload_executors.append(slave_workload) - - t = Thread(target=self.execute_on_node, - args=(slave_workload,), - name="%s worker" % slave) - t.daemon = False - t.start() - slave_threads.append(t) - - for slave_thread in slave_threads: - self.logger.debug("Waiting on %s" % slave_thread) - slave_thread.join() - self.logger.debug("Done waiting for %s" % slave_thread) - - if not scheduler.empty(): - try: - scheduler.cancel(event) - except ValueError: - pass - - self.logger.info("Completed run %s" - % self.current_workload) - self.workload_status[self.current_workload] = "Completed" - self._workload_executors = [] - self.current_workload = None - - self.logger.info("Completed workload %s" % (workload_name)) - self.logger.info("Completed job %s" % (self.job_db.job_id)) + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, blocksize) + self.workload_status[name] = "Pending" - if self.result_url is not None: - self.logger.info("Results can be found at %s" % self.result_url) + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} - self.end_time = time.time() - self._terminated = True - self.broadcast_event() - self.unregister(data_handler.data_event) - report = {'report': json.dumps(self.metadata)} - self.job_db.record_workload_params(report) - self.job_db.job_id = None + self.logger.info("Workload %s=%s" % (name, parameters)) + + workloads.append(parameters) + + return workloads def execute_on_node(self, workload): -- cgit 1.2.3-korg