From ff389f460711b17db2070ca90417f7ebbc0eff56 Mon Sep 17 00:00:00 2001 From: mbeierl Date: Thu, 5 Jul 2018 15:22:43 -0400 Subject: Support Custom Workloads Refactors interaction with test_executor to clean up the tight coupling. Adds ability to specify custom workloads. Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503 JIRA: STORPERF-246 Co-Authored-By: Ameed.Ashour.Ext@Nokia.com Signed-off-by: mbeierl --- docker/storperf-master/storperf/test_executor.py | 263 ++++++++++++++--------- 1 file changed, 163 insertions(+), 100 deletions(-) (limited to 'docker/storperf-master/storperf/test_executor.py') diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 2ed6a9e..38e052e 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -11,6 +11,7 @@ import copy import imp import json import logging +from multiprocessing.pool import ThreadPool from os import listdir import os from os.path import isfile, join @@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate +from storperf.workloads._custom_workload import _custom_workload class UnknownWorkload(Exception): pass +class InvalidWorkloadName(Exception): + pass + + class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] + self._custom_workloads = {} self.filename = None self.deadline = None self.steady_state_samples = 10 @@ -43,7 +50,6 @@ class TestExecutor(object): self.end_time = None self.current_workload = None self.workload_status = {} - self.result_url = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -97,6 +103,16 @@ class TestExecutor(object): self.logger.debug("Set volume count to: " + str(volume_count)) self._volume_count = volume_count + @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, custom_workloads): + self.logger.debug("Set custom workloads to: %s " % + custom_workloads) + self._custom_workloads = custom_workloads + @property def queue_depths(self): return ','.join(self._queue_depths) @@ -152,7 +168,7 @@ class TestExecutor(object): self.logger.debug("Notifying event listener %s", event_listener) event_listener(self) - except Exception, e: + except Exception as e: self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): @@ -185,7 +201,7 @@ class TestExecutor(object): "ERROR: Unknown workload: " + workload) if workload_module not in self.workload_modules: self.workload_modules.append(workload_module) - except ImportError, err: + except ImportError as err: raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): @@ -193,23 +209,26 @@ class TestExecutor(object): path, fname = os.path.split(uri) mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) - self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): - self.logger.debug("Loading compiled: " + mname + " from " + no_ext) return imp.load_compiled(mname, no_ext + '.pyc') if os.path.exists(no_ext + '.py'): - self.logger.debug("Compiling: " + mname + " from " + no_ext) return imp.load_source(mname, no_ext + '.py') return None def execute(self, metadata): self.job_db.create_job_id() + try: + self.test_params() + except Exception as e: + self.terminate() + raise e self.job_db.record_workload_params(metadata) self._setup_metadata(metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") self._workload_thread.start() + # seems to be hanging here return self.job_db.job_id def terminate(self): @@ -225,38 +244,61 @@ class TestExecutor(object): terminated_hosts.append(workload.remote_host) return terminated_hosts - def execution_status(self, job_id): - - result = {} - status = "Completed" - - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url - - else: - jobs = self.job_db.fetch_jobs() - self.logger.info("Jobs") - self.logger.info(jobs) - for job in jobs: - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url - else: - result[job] = {} - result[job]['Status'] = "Completed" - - return result + def test_params(self): + workloads = self._create_workload_matrix() + for current_workload in workloads: + workload = current_workload['workload'] + self.logger.info("Testing FIO parameters for %s" + % current_workload) + result = self._execute_workload(current_workload, + workload, + parse_only=True) + if result: + message = result[0] + self.logger.error("FIO parameter validation failed") + raise Exception("Workload parameter validation failed %s" + % message) + pass + + def _execute_workload(self, current_workload, workload, parse_only=False): + workload.options['iodepth'] = str(current_workload['queue-depth']) + workload.options['bs'] = str(current_workload['blocksize']) + slave_threads = [] + thread_pool = ThreadPool(processes=len(self.slaves) * + self.volume_count) + + for slave in self.slaves: + volume_number = 0 + while volume_number < self.volume_count: + slave_workload = copy.copy(current_workload['workload']) + slave_workload.remote_host = slave + last_char_of_filename = chr( + ord(slave_workload.filename[-1:]) + volume_number) + slave_workload.filename = ("%s%s" % + (slave_workload.filename[:-1], + last_char_of_filename)) + self.logger.debug("Device to profile on %s: %s" % + (slave, slave_workload.filename)) + self._workload_executors.append(slave_workload) + + worker = thread_pool.apply_async( + self.execute_on_node, (slave_workload, parse_only)) + slave_threads.append(worker) + volume_number += 1 + + final_result = None + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + result = slave_thread.get() + self.logger.debug("Done waiting for %s, exit status %s" % + (slave_thread, result)) + if result: + final_result = result + return final_result def execute_workloads(self): self._terminated = False self.logger.info("Starting job %s" % (self.job_db.job_id)) - self.event_listeners.clear() data_handler = DataHandler() self.register(data_handler.data_event) @@ -267,12 +309,13 @@ class TestExecutor(object): workloads = self._create_workload_matrix() for current_workload in workloads: + if self._terminated: + continue + workload = current_workload['workload'] self._thread_gate = ThreadGate(len(self.slaves), workload.options['status-interval']) - if self._terminated: - return self.current_workload = current_workload['name'] self.logger.info("Starting run %s" % self.current_workload) @@ -287,34 +330,7 @@ class TestExecutor(object): t = Thread(target=scheduler.run, args=()) t.start() - workload.options['iodepth'] = str(current_workload['queue-depth']) - workload.options['bs'] = str(current_workload['blocksize']) - - slave_threads = [] - for slave in self.slaves: - volume_number = 0 - while volume_number < self.volume_count: - slave_workload = copy.copy(current_workload['workload']) - slave_workload.remote_host = slave - last_char_of_filename = chr(ord( - slave_workload.filename[-1:]) + volume_number) - slave_workload.filename = "%s%s" % \ - (slave_workload.filename[:-1], last_char_of_filename) - self.logger.debug("Device to profile: %s" % - slave_workload.filename) - self._workload_executors.append(slave_workload) - t = Thread(target=self.execute_on_node, - args=(slave_workload,), - name="%s worker" % slave) - t.daemon = False - t.start() - slave_threads.append(t) - volume_number += 1 - - for slave_thread in slave_threads: - self.logger.debug("Waiting on %s" % slave_thread) - slave_thread.join() - self.logger.debug("Done waiting for %s" % slave_thread) + self._execute_workload(current_workload, workload) if not scheduler.empty(): try: @@ -337,59 +353,106 @@ class TestExecutor(object): report = {'report': json.dumps(self.metadata)} self.job_db.record_workload_params(report) self.job_db.job_id = None - if self.result_url is not None: - self.logger.info("Results can be found at %s" % self.result_url) def _create_workload_matrix(self): workloads = [] - for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") - - constructorMethod = getattr(workload_module, workload_name) - workload = constructorMethod() - if (self.filename is not None): - workload.filename = self.filename - workload.id = self.job_db.job_id - - if (workload_name.startswith("_")): - iodepths = [8, ] - blocksizes = [16384, ] - else: - iodepths = self._queue_depths - blocksizes = self._block_sizes + if self._custom_workloads: + for workload_name in self._custom_workloads.iterkeys(): + if not workload_name.isalnum(): + raise InvalidWorkloadName( + "Workload name must be alphanumeric only: %s" % + workload_name) + workload = _custom_workload() + workload.options['name'] = workload_name + workload.name = workload_name + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + workload_params = self._custom_workloads[workload_name] + for param, value in workload_params.iteritems(): + if param == "readwrite": + param = "rw" + if param in workload.fixed_options: + self.logger.warn("Skipping fixed option %s" % param) + continue + workload.options[param] = value + + for blocksize in self._block_sizes: + for iodepth in self._queue_depths: + + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" + + workload.options['bs'] = blocksize + workload.options['iodepth'] = iodepth + + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} + + self.logger.info("Workload %s=%s" % + (name, workload.options)) + + workloads.append(parameters) + else: + for workload_module in self.workload_modules: + workload_name = getattr(workload_module, "__name__") + + constructorMethod = getattr(workload_module, workload_name) + workload = constructorMethod() + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + if (workload_name.startswith("_")): + iodepths = [8, ] + blocksizes = [16384, ] + else: + iodepths = self._queue_depths + blocksizes = self._block_sizes - for blocksize in blocksizes: - for iodepth in iodepths: + for blocksize in blocksizes: + for iodepth in iodepths: - name = '%s.%s.queue-depth.%s.block-size.%s' % \ - (self.job_db.job_id, workload_name, iodepth, blocksize) - self.workload_status[name] = "Pending" + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" - parameters = {'queue-depth': iodepth, - 'blocksize': blocksize, - 'name': name, - 'workload_name': workload_name, - 'status': 'Pending', - 'workload': workload} + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} - self.logger.info("Workload %s=%s" % (name, parameters)) + self.logger.info("Workload %s=%s" % (name, parameters)) - workloads.append(parameters) + workloads.append(parameters) return workloads - def execute_on_node(self, workload): + def execute_on_node(self, workload, parse_only=False): invoker = FIOInvoker(self.metadata) - invoker.register(self.event) workload.invoker = invoker self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(workload) - workload.execute() - self.job_db.end_workload(workload) - invoker.unregister(self.event) + if not parse_only: + invoker.register(self.event) + self.job_db.start_workload(workload) + result = workload.execute(parse_only) + if not parse_only: + self.job_db.end_workload(workload) + invoker.unregister(self.event) self.logger.info("Ended " + workload.fullname) + return result -- cgit 1.2.3-korg