diff options
Diffstat (limited to 'docker/storperf-master/storperf/test_executor.py')
-rw-r--r-- | docker/storperf-master/storperf/test_executor.py | 282 |
1 files changed, 184 insertions, 98 deletions
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 9ed6386..cb7e478 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -11,6 +11,7 @@ import copy import imp import json import logging +from multiprocessing.pool import ThreadPool from os import listdir import os from os.path import isfile, join @@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate +from storperf.workloads._custom_workload import _custom_workload class UnknownWorkload(Exception): pass +class InvalidWorkloadName(Exception): + pass + + class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] + self._custom_workloads = {} self.filename = None self.deadline = None self.steady_state_samples = 10 @@ -43,7 +50,6 @@ class TestExecutor(object): self.end_time = None self.current_workload = None self.workload_status = {} - self.result_url = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -53,6 +59,7 @@ class TestExecutor(object): self.job_db = JobDB() self._slaves = [] self._terminated = False + self._volume_count = 1 self._workload_executors = [] self._workload_thread = None self._thread_gate = None @@ -62,7 +69,7 @@ class TestExecutor(object): try: installer = os.environ['INSTALLER_TYPE'] except KeyError: - self.logger.error("Cannot determine installer") + self.logger.warn("Cannot determine installer") installer = "Unknown_installer" self.metadata = {} @@ -88,6 +95,25 @@ class TestExecutor(object): self._slaves = slaves @property + def volume_count(self): + return self._volume_count + + @volume_count.setter + def volume_count(self, volume_count): + self.logger.debug("Set volume count to: " + str(volume_count)) + self._volume_count = volume_count + + @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, custom_workloads): + self.logger.debug("Set custom workloads to: %s " % + custom_workloads) + self._custom_workloads = custom_workloads + + @property def queue_depths(self): return ','.join(self._queue_depths) @@ -142,7 +168,7 @@ class TestExecutor(object): self.logger.debug("Notifying event listener %s", event_listener) event_listener(self) - except Exception, e: + except Exception as e: self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): @@ -175,7 +201,7 @@ class TestExecutor(object): "ERROR: Unknown workload: " + workload) if workload_module not in self.workload_modules: self.workload_modules.append(workload_module) - except ImportError, err: + except ImportError as err: raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): @@ -183,19 +209,23 @@ class TestExecutor(object): path, fname = os.path.split(uri) mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) - self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): - self.logger.debug("Loading compiled: " + mname + " from " + no_ext) return imp.load_compiled(mname, no_ext + '.pyc') if os.path.exists(no_ext + '.py'): - self.logger.debug("Compiling: " + mname + " from " + no_ext) return imp.load_source(mname, no_ext + '.py') return None def execute(self, metadata): self.job_db.create_job_id() - self.job_db.record_workload_params(metadata) self._setup_metadata(metadata) + try: + self.test_params() + except Exception as e: + self.terminate() + raise e + stripped_metadata = metadata.copy() + stripped_metadata.pop('ssh_key', None) + self.job_db.record_workload_params(stripped_metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") @@ -215,38 +245,63 @@ class TestExecutor(object): terminated_hosts.append(workload.remote_host) return terminated_hosts - def execution_status(self, job_id): - - result = {} - status = "Completed" - - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url + def test_params(self): + workloads = self._create_workload_matrix() + for current_workload in workloads: + workload = current_workload['workload'] + self.logger.info("Testing FIO parameters for %s" + % current_workload) + result = self._execute_workload(current_workload, + workload, + parse_only=True) + if result: + message = result[0] + self.logger.error("FIO parameter validation failed") + raise Exception("Workload parameter validation failed %s" + % message) + pass + + def _execute_workload(self, current_workload, workload, parse_only=False): + workload.options['iodepth'] = str(current_workload['queue-depth']) + workload.options['bs'] = str(current_workload['blocksize']) + self._workload_executors = [] + slave_threads = [] + thread_pool = ThreadPool(processes=len(self.slaves) * + self.volume_count) - else: - jobs = self.job_db.fetch_jobs() - self.logger.info("Jobs") - self.logger.info(jobs) - for job in jobs: - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url - else: - result[job] = {} - result[job]['Status'] = "Completed" + self._workload_executors = [] + for slave in self.slaves: + volume_number = 0 + while volume_number < self.volume_count: + slave_workload = copy.copy(current_workload['workload']) + slave_workload.remote_host = slave + last_char_of_filename = chr( + ord(slave_workload.filename[-1:]) + volume_number) + slave_workload.filename = ("%s%s" % + (slave_workload.filename[:-1], + last_char_of_filename)) + self.logger.debug("Device to profile on %s: %s" % + (slave, slave_workload.filename)) + self._workload_executors.append(slave_workload) - return result + worker = thread_pool.apply_async( + self.execute_on_node, (slave_workload, parse_only)) + slave_threads.append(worker) + volume_number += 1 + + final_result = None + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + result = slave_thread.get() + self.logger.debug("Done waiting for %s, exit status %s" % + (slave_thread, result)) + if result: + final_result = result + return final_result def execute_workloads(self): self._terminated = False self.logger.info("Starting job %s" % (self.job_db.job_id)) - self.event_listeners.clear() data_handler = DataHandler() self.register(data_handler.data_event) @@ -257,12 +312,14 @@ class TestExecutor(object): workloads = self._create_workload_matrix() for current_workload in workloads: + if self._terminated: + continue + workload = current_workload['workload'] - self._thread_gate = ThreadGate(len(self.slaves), - workload.options['status-interval']) + self._thread_gate = ThreadGate( + len(self.slaves) * min(1, self.volume_count), + float(workload.options['status-interval'])) - if self._terminated: - return self.current_workload = current_workload['name'] self.logger.info("Starting run %s" % self.current_workload) @@ -277,27 +334,7 @@ class TestExecutor(object): t = Thread(target=scheduler.run, args=()) t.start() - workload.options['iodepth'] = str(current_workload['queue-depth']) - workload.options['bs'] = str(current_workload['blocksize']) - - slave_threads = [] - for slave in self.slaves: - slave_workload = copy.copy(current_workload['workload']) - slave_workload.remote_host = slave - - self._workload_executors.append(slave_workload) - - t = Thread(target=self.execute_on_node, - args=(slave_workload,), - name="%s worker" % slave) - t.daemon = False - t.start() - slave_threads.append(t) - - for slave_thread in slave_threads: - self.logger.debug("Waiting on %s" % slave_thread) - slave_thread.join() - self.logger.debug("Done waiting for %s" % slave_thread) + self._execute_workload(current_workload, workload) if not scheduler.empty(): try: @@ -320,62 +357,111 @@ class TestExecutor(object): report = {'report': json.dumps(self.metadata)} self.job_db.record_workload_params(report) self.job_db.job_id = None - if self.result_url is not None: - self.logger.info("Results can be found at %s" % self.result_url) def _create_workload_matrix(self): workloads = [] - for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") - - constructorMethod = getattr(workload_module, workload_name) - workload = constructorMethod() - if (self.filename is not None): - workload.filename = self.filename - workload.id = self.job_db.job_id - - if (self.filename is not None): - workload.filename = self.filename - - if (workload_name.startswith("_")): - iodepths = [8, ] - blocksizes = [16384, ] - else: - iodepths = self._queue_depths - blocksizes = self._block_sizes + if self._custom_workloads: + for workload_name in self._custom_workloads.keys(): + real_name = workload_name + if real_name.startswith('_'): + real_name = real_name.replace('_', '') + self.logger.info("--- real_name: %s" % real_name) + + if not real_name.isalnum(): + raise InvalidWorkloadName( + "Workload name must be alphanumeric only: %s" % + real_name) + workload = _custom_workload() + workload.options['name'] = real_name + workload.name = workload_name + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + workload_params = self._custom_workloads[workload_name] + for param, value in workload_params.items(): + if param == "readwrite": + param = "rw" + if param in workload.fixed_options: + self.logger.warn("Skipping fixed option %s" % param) + continue + workload.options[param] = value + + for blocksize in self._block_sizes: + for iodepth in self._queue_depths: + + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" + + workload.options['bs'] = blocksize + workload.options['iodepth'] = iodepth + + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} + + self.logger.info("Workload %s=%s" % + (name, workload.options)) + + workloads.append(parameters) + else: + for workload_module in self.workload_modules: + workload_name = getattr(workload_module, "__name__") + + constructorMethod = getattr(workload_module, workload_name) + workload = constructorMethod() + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + if (workload_name.startswith("_")): + iodepths = [8, ] + blocksizes = [16384, ] + else: + iodepths = self._queue_depths + blocksizes = self._block_sizes - for blocksize in blocksizes: - for iodepth in iodepths: + for blocksize in blocksizes: + for iodepth in iodepths: - name = '%s.%s.queue-depth.%s.block-size.%s' % \ - (self.job_db.job_id, workload_name, iodepth, blocksize) - self.workload_status[name] = "Pending" + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" - parameters = {'queue-depth': iodepth, - 'blocksize': blocksize, - 'name': name, - 'workload_name': workload_name, - 'status': 'Pending', - 'workload': workload} + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} - self.logger.info("Workload %s=%s" % (name, parameters)) + self.logger.info("Workload %s=%s" % (name, parameters)) - workloads.append(parameters) + workloads.append(parameters) return workloads - def execute_on_node(self, workload): + def execute_on_node(self, workload, parse_only=False): invoker = FIOInvoker(self.metadata) - invoker.register(self.event) workload.invoker = invoker self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(workload) - workload.execute() - self.job_db.end_workload(workload) - invoker.unregister(self.event) + if not parse_only: + invoker.register(self.event) + self.job_db.start_workload(workload) + result = workload.execute(parse_only) + if not parse_only: + self.job_db.end_workload(workload) + invoker.unregister(self.event) self.logger.info("Ended " + workload.fullname) + return result |