diff options
author | mbeierl <mark.beierl@dell.com> | 2018-07-05 15:22:43 -0400 |
---|---|---|
committer | mbeierl <mark.beierl@dell.com> | 2018-07-06 16:40:31 -0400 |
commit | ff389f460711b17db2070ca90417f7ebbc0eff56 (patch) | |
tree | 6292493f1acad91b406bfc126885c5bf8c6532ca /docker/storperf-master | |
parent | 3de258b9d0f6d4249a5e7f42eec41fcb0080bc66 (diff) |
Support Custom Workloads
Refactors interaction with test_executor to clean up the
tight coupling.
Adds ability to specify custom workloads.
Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503
JIRA: STORPERF-246
Co-Authored-By: Ameed.Ashour.Ext@Nokia.com
Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master')
-rw-r--r-- | docker/storperf-master/Dockerfile | 7 | ||||
-rw-r--r-- | docker/storperf-master/rest_server.py | 92 | ||||
-rw-r--r-- | docker/storperf-master/storperf/fio/fio_invoker.py | 9 | ||||
-rw-r--r-- | docker/storperf-master/storperf/storperf_master.py | 101 | ||||
-rw-r--r-- | docker/storperf-master/storperf/test_executor.py | 263 | ||||
-rw-r--r-- | docker/storperf-master/storperf/utilities/data_handler.py | 14 | ||||
-rw-r--r-- | docker/storperf-master/storperf/workloads/_base_workload.py | 12 | ||||
-rw-r--r-- | docker/storperf-master/storperf/workloads/_custom_workload.py | 36 | ||||
-rw-r--r-- | docker/storperf-master/storperf/workloads/_ssd_preconditioning.py | 17 |
9 files changed, 388 insertions, 163 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile index fec3931..c95e3ca 100644 --- a/docker/storperf-master/Dockerfile +++ b/docker/storperf-master/Dockerfile @@ -21,7 +21,7 @@ FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder RUN ulimit -n 1024 -LABEL version="5.0" description="OPNFV Storperf Docker container" +LABEL version="7.0" description="OPNFV Storperf Docker container" ARG BRANCH=master @@ -30,6 +30,7 @@ ENV repos_dir /home/opnfv/repos RUN apk --no-cache add --update \ git \ alpine-sdk \ + coreutils \ linux-headers \ libaio \ libaio-dev \ @@ -38,8 +39,8 @@ RUN apk --no-cache add --update \ # Third party git fetches RUN git config --global http.sslVerify false RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio -RUN cd ${repos_dir}/fio && git checkout tags/fio-2.99 -RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make install +RUN cd ${repos_dir}/fio && git checkout tags/fio-3.7 +RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make -j $(grep -c ^processor /proc/cpuinfo) install # Build StorPerf diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py index 939e91f..9d9b7fa 100644 --- a/docker/storperf-master/rest_server.py +++ b/docker/storperf-master/rest_server.py @@ -350,10 +350,10 @@ for any single test iteration. storperf.queue_depths = request.json['queue_depths'] if ('block_sizes' in request.json): storperf.block_sizes = request.json['block_sizes'] + storperf.workloads = None + storperf.custom_workloads = None if ('workload' in request.json): storperf.workloads = request.json['workload'] - else: - storperf.workloads = None if ('metadata' in request.json): metadata = request.json['metadata'] else: @@ -378,7 +378,6 @@ for any single test iteration. ) def delete(self): self.logger.info("Threads: %s" % sys._current_frames()) - print sys._current_frames() try: return jsonify({'Slaves': storperf.terminate_workloads()}) except Exception as e: @@ -386,6 +385,92 @@ for any single test iteration. @swagger.model +class WorkloadV2Model: + resource_fields = { + 'target': fields.String, + 'deadline': fields.Integer, + "steady_state_samples": fields.Integer, + 'workloads': fields.Nested, + 'queue_depths': fields.String, + 'block_sizes': fields.String + } + + +class Job_v2(Resource): + + """Job API""" + + def __init__(self): + self.logger = logging.getLogger(__name__) + + @swagger.operation( + parameters=[ + { + "name": "body", + "description": """Start execution of a workload with the +following parameters: + +"target": The target device to profile", + +"deadline": if specified, the maximum duration in minutes +for any single test iteration. + +"workloads":if specified, the workload to run. Defaults to all. + """, + "required": True, + "type": "WorkloadV2Model", + "paramType": "body" + } + ], + type=WorkloadResponseModel.__name__, + responseMessages=[ + { + "code": 200, + "message": "Job submitted" + }, + { + "code": 400, + "message": "Missing configuration data" + } + ] + ) + def post(self): + if not request.json: + abort(400, "ERROR: Missing configuration data") + + self.logger.info(request.json) + + try: + if ('target' in request.json): + storperf.filename = request.json['target'] + if ('deadline' in request.json): + storperf.deadline = request.json['deadline'] + if ('steady_state_samples' in request.json): + storperf.steady_state_samples = request.json[ + 'steady_state_samples'] + if ('queue_depths' in request.json): + storperf.queue_depths = request.json['queue_depths'] + if ('block_sizes' in request.json): + storperf.block_sizes = request.json['block_sizes'] + storperf.workloads = None + storperf.custom_workloads = None + if ('workloads' in request.json): + storperf.custom_workloads = request.json['workloads'] + if ('metadata' in request.json): + metadata = request.json['metadata'] + else: + metadata = {} + + job_id = storperf.execute_workloads(metadata) + + return jsonify({'job_id': job_id}) + + except Exception as e: + self.logger.exception(e) + abort(400, str(e)) + + +@swagger.model class QuotaModel: resource_fields = { @@ -433,6 +518,7 @@ def setup_logging(default_path='logging.json', api.add_resource(Configure, "/api/v1.0/configurations") api.add_resource(Quota, "/api/v1.0/quotas") api.add_resource(Job, "/api/v1.0/jobs") +api.add_resource(Job_v2, "/api/v2.0/jobs") api.add_resource(Logs, "/api/v1.0/logs") if __name__ == "__main__": diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py index 0360ea2..a361eec 100644 --- a/docker/storperf-master/storperf/fio/fio_invoker.py +++ b/docker/storperf-master/storperf/fio/fio_invoker.py @@ -23,6 +23,7 @@ class FIOInvoker(object): self.callback_id = None self.terminated = False self.metadata = var_dict + self.stderr = [] @property def remote_host(self): @@ -60,13 +61,13 @@ class FIOInvoker(object): "Event listener callback") event_listener( self.callback_id, json_metric) - except Exception, e: + except Exception as e: self.logger.exception( "Notifying listener %s: %s", self.callback_id, e) self.logger.debug( "Event listener callback complete") - except Exception, e: + except Exception as e: self.logger.error("Error parsing JSON: %s", e) except IOError: pass # We might have read from the closed socket, ignore it @@ -78,6 +79,7 @@ class FIOInvoker(object): self.logger.debug("Started") for line in iter(stderr.readline, b''): self.logger.error("FIO Error: %s", line.rstrip()) + self.stderr.append(line.rstrip()) # Sometime, FIO gets stuck and will give us this message: # fio: job 'sequential_read' hasn't exited in 60 seconds, @@ -125,6 +127,9 @@ class FIOInvoker(object): self.logger.debug("Joining stdout handler") tout.join() self.logger.debug("Ended") + if exit_status != 0: + return self.stderr + return None def terminate(self): self.logger.debug("Terminating fio on " + self.remote_host) diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 7a1444e..1025789 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -59,7 +59,7 @@ class StorPerfMaster(object): self.stack_settings) self.username = None self.password = None - self._test_executor = TestExecutor() + self._test_executor = None self._agent_count = 1 self._agent_image = "Ubuntu 14.04" self._agent_flavor = "storperf" @@ -72,6 +72,13 @@ class StorPerfMaster(object): self._last_snaps_check_time = None self._slave_addresses = [] self._thread_pool = worker_pool(20) + self._filename = None + self._deadline = None + self._steady_state_samples = 10 + self._queue_depths = [1, 4, 8] + self._block_sizes = [512, 4096, 16384] + self._workload_modules = [] + self._custom_workloads = [] @property def volume_count(self): @@ -245,57 +252,68 @@ class StorPerfMaster(object): @property def filename(self): - return self._test_executor.filename + return self._filename @filename.setter def filename(self, value): - self._test_executor.filename = value + self._filename = value @property def deadline(self): - return self._test_executor.deadline + return self._deadline @deadline.setter def deadline(self, value): - self._test_executor.deadline = value + self._deadline = value @property def steady_state_samples(self): - return self._test_executor.steady_state_samples + return self._steady_state_samples @steady_state_samples.setter def steady_state_samples(self, value): - self._test_executor.steady_state_samples = value + self._steady_state_samples = value @property def queue_depths(self): - return self._test_executor.queue_depths + return self._queue_depths @queue_depths.setter def queue_depths(self, value): - self._test_executor.queue_depths = value + self._queue_depths = value @property def block_sizes(self): - return self._test_executor.block_sizes + return self._block_sizes @block_sizes.setter def block_sizes(self, value): - self._test_executor.block_sizes = value - - @property - def is_stack_created(self): - return (self.stack_id is not None and - (self.heat_stack.get_status() == u'CREATE_COMPLETE' or - self.heat_stack.get_status() == u'UPDATE_COMPLETE')) + self._block_sizes = value @property def workloads(self): - return str(self._test_executor.workload_modules) + return self._workload_modules @workloads.setter def workloads(self, value): - self._test_executor.register_workloads(value) + executor = TestExecutor() + executor.register_workloads(value) + self._workload_modules = value + + @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, value): + self.logger.info("Custom workloads = %s" % value) + self._custom_workloads = value + + @property + def is_stack_created(self): + return (self.stack_id is not None and + (self.heat_stack.get_status() == u'CREATE_COMPLETE' or + self.heat_stack.get_status() == u'UPDATE_COMPLETE')) def get_logs(self, lines=None): LOG_DIR = './storperf.log' @@ -358,14 +376,29 @@ class StorPerfMaster(object): self.heat_stack.clean() return stack_id + def executor_event(self, executor): + if executor.terminated: + self._test_executor = None + def execute_workloads(self, metadata={}): + if (self._test_executor is not None and + (not self._test_executor.terminated and + self._test_executor.job_id is not None)): + raise Exception("ERROR: Job {} is already running".format( + self._test_executor.job_id)) + if (self.stack_id is None): raise ParameterError("ERROR: Stack does not exist") - if (not self._test_executor.terminated and - self._test_executor.job_id is not None): - raise Exception("ERROR: Job {} is already running".format( - self._test_executor.job_id)) + self._test_executor = TestExecutor() + self._test_executor.register(self.executor_event) + self._test_executor.register_workloads(self._workload_modules) + self._test_executor.custom_workloads = self.custom_workloads + self._test_executor.block_sizes = self._block_sizes + self._test_executor.filename = self._filename + self._test_executor.deadline = self._deadline + self._test_executor.steady_state_samples = self._steady_state_samples + self._test_executor.queue_depths = self._queue_depths slaves = self._slave_addresses @@ -397,10 +430,14 @@ class StorPerfMaster(object): return job_id def terminate_workloads(self): - return self._test_executor.terminate() + if self._test_executor is not None: + return self._test_executor.terminate() + else: + return True def fetch_results(self, job_id): - if self._test_executor.job_db.job_id == job_id: + if (self._test_executor is not None and + self._test_executor.job_db.job_id == job_id): return self._test_executor.metadata['details']['metrics'] workload_params = self.job_db.fetch_workload_params(job_id) @@ -413,7 +450,19 @@ class StorPerfMaster(object): return self.job_db.fetch_workload_params(job_id) def fetch_job_status(self, job_id): - return self._test_executor.execution_status(job_id) + results = {} + + if (self._test_executor is not None and + self._test_executor.job_id == job_id): + results['Status'] = 'Running' + results['Workloads'] = self._test_executor.workload_status + else: + jobs = self.job_db.fetch_jobs() + for job in jobs: + if job == job_id: + results['Status'] = "Completed" + + return results def fetch_all_jobs(self, metrics_type): job_list = self.job_db.fetch_jobs() diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py index 2ed6a9e..38e052e 100644 --- a/docker/storperf-master/storperf/test_executor.py +++ b/docker/storperf-master/storperf/test_executor.py @@ -11,6 +11,7 @@ import copy import imp import json import logging +from multiprocessing.pool import ThreadPool from os import listdir import os from os.path import isfile, join @@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker from storperf.utilities.data_handler import DataHandler from storperf.utilities.thread_gate import ThreadGate +from storperf.workloads._custom_workload import _custom_workload class UnknownWorkload(Exception): pass +class InvalidWorkloadName(Exception): + pass + + class TestExecutor(object): def __init__(self): self.logger = logging.getLogger(__name__) self.workload_modules = [] + self._custom_workloads = {} self.filename = None self.deadline = None self.steady_state_samples = 10 @@ -43,7 +50,6 @@ class TestExecutor(object): self.end_time = None self.current_workload = None self.workload_status = {} - self.result_url = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -98,6 +104,16 @@ class TestExecutor(object): self._volume_count = volume_count @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, custom_workloads): + self.logger.debug("Set custom workloads to: %s " % + custom_workloads) + self._custom_workloads = custom_workloads + + @property def queue_depths(self): return ','.join(self._queue_depths) @@ -152,7 +168,7 @@ class TestExecutor(object): self.logger.debug("Notifying event listener %s", event_listener) event_listener(self) - except Exception, e: + except Exception as e: self.logger.exception("While notifying listener %s", e) def register_workloads(self, workloads): @@ -185,7 +201,7 @@ class TestExecutor(object): "ERROR: Unknown workload: " + workload) if workload_module not in self.workload_modules: self.workload_modules.append(workload_module) - except ImportError, err: + except ImportError as err: raise UnknownWorkload("ERROR: " + str(err)) def load_from_file(self, uri): @@ -193,23 +209,26 @@ class TestExecutor(object): path, fname = os.path.split(uri) mname, _ = os.path.splitext(fname) no_ext = os.path.join(path, mname) - self.logger.debug("Looking for: " + no_ext) if os.path.exists(no_ext + '.pyc'): - self.logger.debug("Loading compiled: " + mname + " from " + no_ext) return imp.load_compiled(mname, no_ext + '.pyc') if os.path.exists(no_ext + '.py'): - self.logger.debug("Compiling: " + mname + " from " + no_ext) return imp.load_source(mname, no_ext + '.py') return None def execute(self, metadata): self.job_db.create_job_id() + try: + self.test_params() + except Exception as e: + self.terminate() + raise e self.job_db.record_workload_params(metadata) self._setup_metadata(metadata) self._workload_thread = Thread(target=self.execute_workloads, args=(), name="Workload thread") self._workload_thread.start() + # seems to be hanging here return self.job_db.job_id def terminate(self): @@ -225,38 +244,61 @@ class TestExecutor(object): terminated_hosts.append(workload.remote_host) return terminated_hosts - def execution_status(self, job_id): - - result = {} - status = "Completed" - - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url - - else: - jobs = self.job_db.fetch_jobs() - self.logger.info("Jobs") - self.logger.info(jobs) - for job in jobs: - if self.job_db.job_id == job_id and self._terminated is False: - status = "Running" - result['Status'] = status - result['Workloads'] = self.workload_status - result['TestResultURL'] = self.result_url - else: - result[job] = {} - result[job]['Status'] = "Completed" - - return result + def test_params(self): + workloads = self._create_workload_matrix() + for current_workload in workloads: + workload = current_workload['workload'] + self.logger.info("Testing FIO parameters for %s" + % current_workload) + result = self._execute_workload(current_workload, + workload, + parse_only=True) + if result: + message = result[0] + self.logger.error("FIO parameter validation failed") + raise Exception("Workload parameter validation failed %s" + % message) + pass + + def _execute_workload(self, current_workload, workload, parse_only=False): + workload.options['iodepth'] = str(current_workload['queue-depth']) + workload.options['bs'] = str(current_workload['blocksize']) + slave_threads = [] + thread_pool = ThreadPool(processes=len(self.slaves) * + self.volume_count) + + for slave in self.slaves: + volume_number = 0 + while volume_number < self.volume_count: + slave_workload = copy.copy(current_workload['workload']) + slave_workload.remote_host = slave + last_char_of_filename = chr( + ord(slave_workload.filename[-1:]) + volume_number) + slave_workload.filename = ("%s%s" % + (slave_workload.filename[:-1], + last_char_of_filename)) + self.logger.debug("Device to profile on %s: %s" % + (slave, slave_workload.filename)) + self._workload_executors.append(slave_workload) + + worker = thread_pool.apply_async( + self.execute_on_node, (slave_workload, parse_only)) + slave_threads.append(worker) + volume_number += 1 + + final_result = None + for slave_thread in slave_threads: + self.logger.debug("Waiting on %s" % slave_thread) + result = slave_thread.get() + self.logger.debug("Done waiting for %s, exit status %s" % + (slave_thread, result)) + if result: + final_result = result + return final_result def execute_workloads(self): self._terminated = False self.logger.info("Starting job %s" % (self.job_db.job_id)) - self.event_listeners.clear() data_handler = DataHandler() self.register(data_handler.data_event) @@ -267,12 +309,13 @@ class TestExecutor(object): workloads = self._create_workload_matrix() for current_workload in workloads: + if self._terminated: + continue + workload = current_workload['workload'] self._thread_gate = ThreadGate(len(self.slaves), workload.options['status-interval']) - if self._terminated: - return self.current_workload = current_workload['name'] self.logger.info("Starting run %s" % self.current_workload) @@ -287,34 +330,7 @@ class TestExecutor(object): t = Thread(target=scheduler.run, args=()) t.start() - workload.options['iodepth'] = str(current_workload['queue-depth']) - workload.options['bs'] = str(current_workload['blocksize']) - - slave_threads = [] - for slave in self.slaves: - volume_number = 0 - while volume_number < self.volume_count: - slave_workload = copy.copy(current_workload['workload']) - slave_workload.remote_host = slave - last_char_of_filename = chr(ord( - slave_workload.filename[-1:]) + volume_number) - slave_workload.filename = "%s%s" % \ - (slave_workload.filename[:-1], last_char_of_filename) - self.logger.debug("Device to profile: %s" % - slave_workload.filename) - self._workload_executors.append(slave_workload) - t = Thread(target=self.execute_on_node, - args=(slave_workload,), - name="%s worker" % slave) - t.daemon = False - t.start() - slave_threads.append(t) - volume_number += 1 - - for slave_thread in slave_threads: - self.logger.debug("Waiting on %s" % slave_thread) - slave_thread.join() - self.logger.debug("Done waiting for %s" % slave_thread) + self._execute_workload(current_workload, workload) if not scheduler.empty(): try: @@ -337,59 +353,106 @@ class TestExecutor(object): report = {'report': json.dumps(self.metadata)} self.job_db.record_workload_params(report) self.job_db.job_id = None - if self.result_url is not None: - self.logger.info("Results can be found at %s" % self.result_url) def _create_workload_matrix(self): workloads = [] - for workload_module in self.workload_modules: - workload_name = getattr(workload_module, "__name__") - - constructorMethod = getattr(workload_module, workload_name) - workload = constructorMethod() - if (self.filename is not None): - workload.filename = self.filename - workload.id = self.job_db.job_id - - if (workload_name.startswith("_")): - iodepths = [8, ] - blocksizes = [16384, ] - else: - iodepths = self._queue_depths - blocksizes = self._block_sizes + if self._custom_workloads: + for workload_name in self._custom_workloads.iterkeys(): + if not workload_name.isalnum(): + raise InvalidWorkloadName( + "Workload name must be alphanumeric only: %s" % + workload_name) + workload = _custom_workload() + workload.options['name'] = workload_name + workload.name = workload_name + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + workload_params = self._custom_workloads[workload_name] + for param, value in workload_params.iteritems(): + if param == "readwrite": + param = "rw" + if param in workload.fixed_options: + self.logger.warn("Skipping fixed option %s" % param) + continue + workload.options[param] = value + + for blocksize in self._block_sizes: + for iodepth in self._queue_depths: + + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" + + workload.options['bs'] = blocksize + workload.options['iodepth'] = iodepth + + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} + + self.logger.info("Workload %s=%s" % + (name, workload.options)) + + workloads.append(parameters) + else: + for workload_module in self.workload_modules: + workload_name = getattr(workload_module, "__name__") + + constructorMethod = getattr(workload_module, workload_name) + workload = constructorMethod() + if (self.filename is not None): + workload.filename = self.filename + workload.id = self.job_db.job_id + + if (workload_name.startswith("_")): + iodepths = [8, ] + blocksizes = [16384, ] + else: + iodepths = self._queue_depths + blocksizes = self._block_sizes - for blocksize in blocksizes: - for iodepth in iodepths: + for blocksize in blocksizes: + for iodepth in iodepths: - name = '%s.%s.queue-depth.%s.block-size.%s' % \ - (self.job_db.job_id, workload_name, iodepth, blocksize) - self.workload_status[name] = "Pending" + name = '%s.%s.queue-depth.%s.block-size.%s' % \ + (self.job_db.job_id, workload_name, iodepth, + blocksize) + self.workload_status[name] = "Pending" - parameters = {'queue-depth': iodepth, - 'blocksize': blocksize, - 'name': name, - 'workload_name': workload_name, - 'status': 'Pending', - 'workload': workload} + parameters = {'queue-depth': iodepth, + 'blocksize': blocksize, + 'name': name, + 'workload_name': workload_name, + 'status': 'Pending', + 'workload': workload} - self.logger.info("Workload %s=%s" % (name, parameters)) + self.logger.info("Workload %s=%s" % (name, parameters)) - workloads.append(parameters) + workloads.append(parameters) return workloads - def execute_on_node(self, workload): + def execute_on_node(self, workload, parse_only=False): invoker = FIOInvoker(self.metadata) - invoker.register(self.event) workload.invoker = invoker self.logger.info("Starting " + workload.fullname) - self.job_db.start_workload(workload) - workload.execute() - self.job_db.end_workload(workload) - invoker.unregister(self.event) + if not parse_only: + invoker.register(self.event) + self.job_db.start_workload(workload) + result = workload.execute(parse_only) + if not parse_only: + self.job_db.end_workload(workload) + invoker.unregister(self.event) self.logger.info("Ended " + workload.fullname) + return result diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py index b85517f..c7d70a7 100644 --- a/docker/storperf-master/storperf/utilities/data_handler.py +++ b/docker/storperf-master/storperf/utilities/data_handler.py @@ -151,14 +151,12 @@ class DataHandler(object): test_db = os.environ.get('TEST_DB_URL') if test_db is not None: self.logger.info("Pushing results to %s" % (test_db)) - try: - response = test_results_db.push_results_to_db( - test_db, - executor.metadata, - self.logger) - executor.result_url = response['href'] - except Exception: - self.logger.exception("Error pushing results into Database") + response = test_results_db.push_results_to_db( + test_db, + executor.metadata, + self.logger) + if response: + self.logger.info("Results reference: %s" % response['href']) def _determine_criteria(self, metadata): steady_state = True diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py index c2c7b7b..9b04314 100644 --- a/docker/storperf-master/storperf/workloads/_base_workload.py +++ b/docker/storperf-master/storperf/workloads/_base_workload.py @@ -30,8 +30,9 @@ class _base_workload(object): self.invoker = None self.remote_host = None self.id = None + self.name = self.__class__.__name__ - def execute(self): + def execute(self, parse_only=False): if self.invoker is None: raise ValueError("No invoker has been set") @@ -55,7 +56,10 @@ class _base_workload(object): for key, value in self.options.iteritems(): args.append('--' + key + "=" + value) - self.invoker.execute(args) + if parse_only: + args.append('--parse-only') + + return self.invoker.execute(args) def terminate(self): if self.invoker is not None: @@ -74,11 +78,11 @@ class _base_workload(object): @property def fullname(self): - host_file = self.remote_host+"."+self.filename + host_file = self.remote_host + "." + self.filename host_file = host_file.replace(".", "-").replace("/", "-") return ("%s.%s.queue-depth.%s.block-size.%s.%s" % (str(self.id), - self.__class__.__name__, + self.name, str(self.options['iodepth']), str(self.options['bs']), host_file)) diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py new file mode 100644 index 0000000..9e0100d --- /dev/null +++ b/docker/storperf-master/storperf/workloads/_custom_workload.py @@ -0,0 +1,36 @@ +############################################################################## +# Copyright (c) 2015 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import logging +from storperf.workloads import _base_workload + + +class _custom_workload(_base_workload._base_workload): + + def __init__(self): + self.logger = logging.getLogger(self.__class__.__name__) + self.default_filesize = "1G" + self.filename = '/dev/vdb' + self.fixed_options = { + 'loops': '200', + 'output-format': 'json', + 'status-interval': '60' + } + self.options = { + 'ioengine': 'libaio', + 'direct': '1', + 'numjobs': '1', + 'rw': 'read', + 'bs': '64k', + 'iodepth': '1' + } + self.options.update(self.fixed_options) + self.invoker = None + self.remote_host = None + self.id = None diff --git a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py deleted file mode 100644 index cce3c31..0000000 --- a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py +++ /dev/null @@ -1,17 +0,0 @@ -############################################################################## -# Copyright (c) 2015 EMC and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -from storperf.workloads import _base_workload - - -class _ssd_preconditioning(_base_workload._base_workload): - - def setup(self): - self.options['name'] = 'ssd_preconditioning' - self.options['rw'] = 'randwrite' - self.options['loops'] = '1' |