summaryrefslogtreecommitdiffstats
path: root/docker
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2018-07-05 15:22:43 -0400
committermbeierl <mark.beierl@dell.com>2018-07-06 16:40:31 -0400
commitff389f460711b17db2070ca90417f7ebbc0eff56 (patch)
tree6292493f1acad91b406bfc126885c5bf8c6532ca /docker
parent3de258b9d0f6d4249a5e7f42eec41fcb0080bc66 (diff)
Support Custom Workloads
Refactors interaction with test_executor to clean up the tight coupling. Adds ability to specify custom workloads. Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503 JIRA: STORPERF-246 Co-Authored-By: Ameed.Ashour.Ext@Nokia.com Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker')
-rw-r--r--docker/storperf-master/Dockerfile7
-rw-r--r--docker/storperf-master/rest_server.py92
-rw-r--r--docker/storperf-master/storperf/fio/fio_invoker.py9
-rw-r--r--docker/storperf-master/storperf/storperf_master.py101
-rw-r--r--docker/storperf-master/storperf/test_executor.py263
-rw-r--r--docker/storperf-master/storperf/utilities/data_handler.py14
-rw-r--r--docker/storperf-master/storperf/workloads/_base_workload.py12
-rw-r--r--docker/storperf-master/storperf/workloads/_custom_workload.py36
-rw-r--r--docker/storperf-master/storperf/workloads/_ssd_preconditioning.py17
9 files changed, 388 insertions, 163 deletions
diff --git a/docker/storperf-master/Dockerfile b/docker/storperf-master/Dockerfile
index fec3931..c95e3ca 100644
--- a/docker/storperf-master/Dockerfile
+++ b/docker/storperf-master/Dockerfile
@@ -21,7 +21,7 @@ FROM multiarch/alpine:$ARCH-$ALPINE_VERSION as storperf-builder
RUN ulimit -n 1024
-LABEL version="5.0" description="OPNFV Storperf Docker container"
+LABEL version="7.0" description="OPNFV Storperf Docker container"
ARG BRANCH=master
@@ -30,6 +30,7 @@ ENV repos_dir /home/opnfv/repos
RUN apk --no-cache add --update \
git \
alpine-sdk \
+ coreutils \
linux-headers \
libaio \
libaio-dev \
@@ -38,8 +39,8 @@ RUN apk --no-cache add --update \
# Third party git fetches
RUN git config --global http.sslVerify false
RUN git clone http://git.kernel.dk/fio.git ${repos_dir}/fio
-RUN cd ${repos_dir}/fio && git checkout tags/fio-2.99
-RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make install
+RUN cd ${repos_dir}/fio && git checkout tags/fio-3.7
+RUN cd ${repos_dir}/fio && EXTFLAGS="-static" make -j $(grep -c ^processor /proc/cpuinfo) install
# Build StorPerf
diff --git a/docker/storperf-master/rest_server.py b/docker/storperf-master/rest_server.py
index 939e91f..9d9b7fa 100644
--- a/docker/storperf-master/rest_server.py
+++ b/docker/storperf-master/rest_server.py
@@ -350,10 +350,10 @@ for any single test iteration.
storperf.queue_depths = request.json['queue_depths']
if ('block_sizes' in request.json):
storperf.block_sizes = request.json['block_sizes']
+ storperf.workloads = None
+ storperf.custom_workloads = None
if ('workload' in request.json):
storperf.workloads = request.json['workload']
- else:
- storperf.workloads = None
if ('metadata' in request.json):
metadata = request.json['metadata']
else:
@@ -378,7 +378,6 @@ for any single test iteration.
)
def delete(self):
self.logger.info("Threads: %s" % sys._current_frames())
- print sys._current_frames()
try:
return jsonify({'Slaves': storperf.terminate_workloads()})
except Exception as e:
@@ -386,6 +385,92 @@ for any single test iteration.
@swagger.model
+class WorkloadV2Model:
+ resource_fields = {
+ 'target': fields.String,
+ 'deadline': fields.Integer,
+ "steady_state_samples": fields.Integer,
+ 'workloads': fields.Nested,
+ 'queue_depths': fields.String,
+ 'block_sizes': fields.String
+ }
+
+
+class Job_v2(Resource):
+
+ """Job API"""
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ @swagger.operation(
+ parameters=[
+ {
+ "name": "body",
+ "description": """Start execution of a workload with the
+following parameters:
+
+"target": The target device to profile",
+
+"deadline": if specified, the maximum duration in minutes
+for any single test iteration.
+
+"workloads":if specified, the workload to run. Defaults to all.
+ """,
+ "required": True,
+ "type": "WorkloadV2Model",
+ "paramType": "body"
+ }
+ ],
+ type=WorkloadResponseModel.__name__,
+ responseMessages=[
+ {
+ "code": 200,
+ "message": "Job submitted"
+ },
+ {
+ "code": 400,
+ "message": "Missing configuration data"
+ }
+ ]
+ )
+ def post(self):
+ if not request.json:
+ abort(400, "ERROR: Missing configuration data")
+
+ self.logger.info(request.json)
+
+ try:
+ if ('target' in request.json):
+ storperf.filename = request.json['target']
+ if ('deadline' in request.json):
+ storperf.deadline = request.json['deadline']
+ if ('steady_state_samples' in request.json):
+ storperf.steady_state_samples = request.json[
+ 'steady_state_samples']
+ if ('queue_depths' in request.json):
+ storperf.queue_depths = request.json['queue_depths']
+ if ('block_sizes' in request.json):
+ storperf.block_sizes = request.json['block_sizes']
+ storperf.workloads = None
+ storperf.custom_workloads = None
+ if ('workloads' in request.json):
+ storperf.custom_workloads = request.json['workloads']
+ if ('metadata' in request.json):
+ metadata = request.json['metadata']
+ else:
+ metadata = {}
+
+ job_id = storperf.execute_workloads(metadata)
+
+ return jsonify({'job_id': job_id})
+
+ except Exception as e:
+ self.logger.exception(e)
+ abort(400, str(e))
+
+
+@swagger.model
class QuotaModel:
resource_fields = {
@@ -433,6 +518,7 @@ def setup_logging(default_path='logging.json',
api.add_resource(Configure, "/api/v1.0/configurations")
api.add_resource(Quota, "/api/v1.0/quotas")
api.add_resource(Job, "/api/v1.0/jobs")
+api.add_resource(Job_v2, "/api/v2.0/jobs")
api.add_resource(Logs, "/api/v1.0/logs")
if __name__ == "__main__":
diff --git a/docker/storperf-master/storperf/fio/fio_invoker.py b/docker/storperf-master/storperf/fio/fio_invoker.py
index 0360ea2..a361eec 100644
--- a/docker/storperf-master/storperf/fio/fio_invoker.py
+++ b/docker/storperf-master/storperf/fio/fio_invoker.py
@@ -23,6 +23,7 @@ class FIOInvoker(object):
self.callback_id = None
self.terminated = False
self.metadata = var_dict
+ self.stderr = []
@property
def remote_host(self):
@@ -60,13 +61,13 @@ class FIOInvoker(object):
"Event listener callback")
event_listener(
self.callback_id, json_metric)
- except Exception, e:
+ except Exception as e:
self.logger.exception(
"Notifying listener %s: %s",
self.callback_id, e)
self.logger.debug(
"Event listener callback complete")
- except Exception, e:
+ except Exception as e:
self.logger.error("Error parsing JSON: %s", e)
except IOError:
pass # We might have read from the closed socket, ignore it
@@ -78,6 +79,7 @@ class FIOInvoker(object):
self.logger.debug("Started")
for line in iter(stderr.readline, b''):
self.logger.error("FIO Error: %s", line.rstrip())
+ self.stderr.append(line.rstrip())
# Sometime, FIO gets stuck and will give us this message:
# fio: job 'sequential_read' hasn't exited in 60 seconds,
@@ -125,6 +127,9 @@ class FIOInvoker(object):
self.logger.debug("Joining stdout handler")
tout.join()
self.logger.debug("Ended")
+ if exit_status != 0:
+ return self.stderr
+ return None
def terminate(self):
self.logger.debug("Terminating fio on " + self.remote_host)
diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py
index 7a1444e..1025789 100644
--- a/docker/storperf-master/storperf/storperf_master.py
+++ b/docker/storperf-master/storperf/storperf_master.py
@@ -59,7 +59,7 @@ class StorPerfMaster(object):
self.stack_settings)
self.username = None
self.password = None
- self._test_executor = TestExecutor()
+ self._test_executor = None
self._agent_count = 1
self._agent_image = "Ubuntu 14.04"
self._agent_flavor = "storperf"
@@ -72,6 +72,13 @@ class StorPerfMaster(object):
self._last_snaps_check_time = None
self._slave_addresses = []
self._thread_pool = worker_pool(20)
+ self._filename = None
+ self._deadline = None
+ self._steady_state_samples = 10
+ self._queue_depths = [1, 4, 8]
+ self._block_sizes = [512, 4096, 16384]
+ self._workload_modules = []
+ self._custom_workloads = []
@property
def volume_count(self):
@@ -245,57 +252,68 @@ class StorPerfMaster(object):
@property
def filename(self):
- return self._test_executor.filename
+ return self._filename
@filename.setter
def filename(self, value):
- self._test_executor.filename = value
+ self._filename = value
@property
def deadline(self):
- return self._test_executor.deadline
+ return self._deadline
@deadline.setter
def deadline(self, value):
- self._test_executor.deadline = value
+ self._deadline = value
@property
def steady_state_samples(self):
- return self._test_executor.steady_state_samples
+ return self._steady_state_samples
@steady_state_samples.setter
def steady_state_samples(self, value):
- self._test_executor.steady_state_samples = value
+ self._steady_state_samples = value
@property
def queue_depths(self):
- return self._test_executor.queue_depths
+ return self._queue_depths
@queue_depths.setter
def queue_depths(self, value):
- self._test_executor.queue_depths = value
+ self._queue_depths = value
@property
def block_sizes(self):
- return self._test_executor.block_sizes
+ return self._block_sizes
@block_sizes.setter
def block_sizes(self, value):
- self._test_executor.block_sizes = value
-
- @property
- def is_stack_created(self):
- return (self.stack_id is not None and
- (self.heat_stack.get_status() == u'CREATE_COMPLETE' or
- self.heat_stack.get_status() == u'UPDATE_COMPLETE'))
+ self._block_sizes = value
@property
def workloads(self):
- return str(self._test_executor.workload_modules)
+ return self._workload_modules
@workloads.setter
def workloads(self, value):
- self._test_executor.register_workloads(value)
+ executor = TestExecutor()
+ executor.register_workloads(value)
+ self._workload_modules = value
+
+ @property
+ def custom_workloads(self):
+ return self._custom_workloads
+
+ @custom_workloads.setter
+ def custom_workloads(self, value):
+ self.logger.info("Custom workloads = %s" % value)
+ self._custom_workloads = value
+
+ @property
+ def is_stack_created(self):
+ return (self.stack_id is not None and
+ (self.heat_stack.get_status() == u'CREATE_COMPLETE' or
+ self.heat_stack.get_status() == u'UPDATE_COMPLETE'))
def get_logs(self, lines=None):
LOG_DIR = './storperf.log'
@@ -358,14 +376,29 @@ class StorPerfMaster(object):
self.heat_stack.clean()
return stack_id
+ def executor_event(self, executor):
+ if executor.terminated:
+ self._test_executor = None
+
def execute_workloads(self, metadata={}):
+ if (self._test_executor is not None and
+ (not self._test_executor.terminated and
+ self._test_executor.job_id is not None)):
+ raise Exception("ERROR: Job {} is already running".format(
+ self._test_executor.job_id))
+
if (self.stack_id is None):
raise ParameterError("ERROR: Stack does not exist")
- if (not self._test_executor.terminated and
- self._test_executor.job_id is not None):
- raise Exception("ERROR: Job {} is already running".format(
- self._test_executor.job_id))
+ self._test_executor = TestExecutor()
+ self._test_executor.register(self.executor_event)
+ self._test_executor.register_workloads(self._workload_modules)
+ self._test_executor.custom_workloads = self.custom_workloads
+ self._test_executor.block_sizes = self._block_sizes
+ self._test_executor.filename = self._filename
+ self._test_executor.deadline = self._deadline
+ self._test_executor.steady_state_samples = self._steady_state_samples
+ self._test_executor.queue_depths = self._queue_depths
slaves = self._slave_addresses
@@ -397,10 +430,14 @@ class StorPerfMaster(object):
return job_id
def terminate_workloads(self):
- return self._test_executor.terminate()
+ if self._test_executor is not None:
+ return self._test_executor.terminate()
+ else:
+ return True
def fetch_results(self, job_id):
- if self._test_executor.job_db.job_id == job_id:
+ if (self._test_executor is not None and
+ self._test_executor.job_db.job_id == job_id):
return self._test_executor.metadata['details']['metrics']
workload_params = self.job_db.fetch_workload_params(job_id)
@@ -413,7 +450,19 @@ class StorPerfMaster(object):
return self.job_db.fetch_workload_params(job_id)
def fetch_job_status(self, job_id):
- return self._test_executor.execution_status(job_id)
+ results = {}
+
+ if (self._test_executor is not None and
+ self._test_executor.job_id == job_id):
+ results['Status'] = 'Running'
+ results['Workloads'] = self._test_executor.workload_status
+ else:
+ jobs = self.job_db.fetch_jobs()
+ for job in jobs:
+ if job == job_id:
+ results['Status'] = "Completed"
+
+ return results
def fetch_all_jobs(self, metrics_type):
job_list = self.job_db.fetch_jobs()
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 2ed6a9e..38e052e 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -11,6 +11,7 @@ import copy
import imp
import json
import logging
+from multiprocessing.pool import ThreadPool
from os import listdir
import os
from os.path import isfile, join
@@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
+from storperf.workloads._custom_workload import _custom_workload
class UnknownWorkload(Exception):
pass
+class InvalidWorkloadName(Exception):
+ pass
+
+
class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
+ self._custom_workloads = {}
self.filename = None
self.deadline = None
self.steady_state_samples = 10
@@ -43,7 +50,6 @@ class TestExecutor(object):
self.end_time = None
self.current_workload = None
self.workload_status = {}
- self.result_url = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -98,6 +104,16 @@ class TestExecutor(object):
self._volume_count = volume_count
@property
+ def custom_workloads(self):
+ return self._custom_workloads
+
+ @custom_workloads.setter
+ def custom_workloads(self, custom_workloads):
+ self.logger.debug("Set custom workloads to: %s " %
+ custom_workloads)
+ self._custom_workloads = custom_workloads
+
+ @property
def queue_depths(self):
return ','.join(self._queue_depths)
@@ -152,7 +168,7 @@ class TestExecutor(object):
self.logger.debug("Notifying event listener %s",
event_listener)
event_listener(self)
- except Exception, e:
+ except Exception as e:
self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
@@ -185,7 +201,7 @@ class TestExecutor(object):
"ERROR: Unknown workload: " + workload)
if workload_module not in self.workload_modules:
self.workload_modules.append(workload_module)
- except ImportError, err:
+ except ImportError as err:
raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
@@ -193,23 +209,26 @@ class TestExecutor(object):
path, fname = os.path.split(uri)
mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
- self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
- self.logger.debug("Loading compiled: " + mname + " from " + no_ext)
return imp.load_compiled(mname, no_ext + '.pyc')
if os.path.exists(no_ext + '.py'):
- self.logger.debug("Compiling: " + mname + " from " + no_ext)
return imp.load_source(mname, no_ext + '.py')
return None
def execute(self, metadata):
self.job_db.create_job_id()
+ try:
+ self.test_params()
+ except Exception as e:
+ self.terminate()
+ raise e
self.job_db.record_workload_params(metadata)
self._setup_metadata(metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
self._workload_thread.start()
+ # seems to be hanging here
return self.job_db.job_id
def terminate(self):
@@ -225,38 +244,61 @@ class TestExecutor(object):
terminated_hosts.append(workload.remote_host)
return terminated_hosts
- def execution_status(self, job_id):
-
- result = {}
- status = "Completed"
-
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
-
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
-
- else:
- jobs = self.job_db.fetch_jobs()
- self.logger.info("Jobs")
- self.logger.info(jobs)
- for job in jobs:
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
- else:
- result[job] = {}
- result[job]['Status'] = "Completed"
-
- return result
+ def test_params(self):
+ workloads = self._create_workload_matrix()
+ for current_workload in workloads:
+ workload = current_workload['workload']
+ self.logger.info("Testing FIO parameters for %s"
+ % current_workload)
+ result = self._execute_workload(current_workload,
+ workload,
+ parse_only=True)
+ if result:
+ message = result[0]
+ self.logger.error("FIO parameter validation failed")
+ raise Exception("Workload parameter validation failed %s"
+ % message)
+ pass
+
+ def _execute_workload(self, current_workload, workload, parse_only=False):
+ workload.options['iodepth'] = str(current_workload['queue-depth'])
+ workload.options['bs'] = str(current_workload['blocksize'])
+ slave_threads = []
+ thread_pool = ThreadPool(processes=len(self.slaves) *
+ self.volume_count)
+
+ for slave in self.slaves:
+ volume_number = 0
+ while volume_number < self.volume_count:
+ slave_workload = copy.copy(current_workload['workload'])
+ slave_workload.remote_host = slave
+ last_char_of_filename = chr(
+ ord(slave_workload.filename[-1:]) + volume_number)
+ slave_workload.filename = ("%s%s" %
+ (slave_workload.filename[:-1],
+ last_char_of_filename))
+ self.logger.debug("Device to profile on %s: %s" %
+ (slave, slave_workload.filename))
+ self._workload_executors.append(slave_workload)
+
+ worker = thread_pool.apply_async(
+ self.execute_on_node, (slave_workload, parse_only))
+ slave_threads.append(worker)
+ volume_number += 1
+
+ final_result = None
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ result = slave_thread.get()
+ self.logger.debug("Done waiting for %s, exit status %s" %
+ (slave_thread, result))
+ if result:
+ final_result = result
+ return final_result
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
- self.event_listeners.clear()
data_handler = DataHandler()
self.register(data_handler.data_event)
@@ -267,12 +309,13 @@ class TestExecutor(object):
workloads = self._create_workload_matrix()
for current_workload in workloads:
+ if self._terminated:
+ continue
+
workload = current_workload['workload']
self._thread_gate = ThreadGate(len(self.slaves),
workload.options['status-interval'])
- if self._terminated:
- return
self.current_workload = current_workload['name']
self.logger.info("Starting run %s" % self.current_workload)
@@ -287,34 +330,7 @@ class TestExecutor(object):
t = Thread(target=scheduler.run, args=())
t.start()
- workload.options['iodepth'] = str(current_workload['queue-depth'])
- workload.options['bs'] = str(current_workload['blocksize'])
-
- slave_threads = []
- for slave in self.slaves:
- volume_number = 0
- while volume_number < self.volume_count:
- slave_workload = copy.copy(current_workload['workload'])
- slave_workload.remote_host = slave
- last_char_of_filename = chr(ord(
- slave_workload.filename[-1:]) + volume_number)
- slave_workload.filename = "%s%s" % \
- (slave_workload.filename[:-1], last_char_of_filename)
- self.logger.debug("Device to profile: %s" %
- slave_workload.filename)
- self._workload_executors.append(slave_workload)
- t = Thread(target=self.execute_on_node,
- args=(slave_workload,),
- name="%s worker" % slave)
- t.daemon = False
- t.start()
- slave_threads.append(t)
- volume_number += 1
-
- for slave_thread in slave_threads:
- self.logger.debug("Waiting on %s" % slave_thread)
- slave_thread.join()
- self.logger.debug("Done waiting for %s" % slave_thread)
+ self._execute_workload(current_workload, workload)
if not scheduler.empty():
try:
@@ -337,59 +353,106 @@ class TestExecutor(object):
report = {'report': json.dumps(self.metadata)}
self.job_db.record_workload_params(report)
self.job_db.job_id = None
- if self.result_url is not None:
- self.logger.info("Results can be found at %s" % self.result_url)
def _create_workload_matrix(self):
workloads = []
- for workload_module in self.workload_modules:
- workload_name = getattr(workload_module, "__name__")
-
- constructorMethod = getattr(workload_module, workload_name)
- workload = constructorMethod()
- if (self.filename is not None):
- workload.filename = self.filename
- workload.id = self.job_db.job_id
-
- if (workload_name.startswith("_")):
- iodepths = [8, ]
- blocksizes = [16384, ]
- else:
- iodepths = self._queue_depths
- blocksizes = self._block_sizes
+ if self._custom_workloads:
+ for workload_name in self._custom_workloads.iterkeys():
+ if not workload_name.isalnum():
+ raise InvalidWorkloadName(
+ "Workload name must be alphanumeric only: %s" %
+ workload_name)
+ workload = _custom_workload()
+ workload.options['name'] = workload_name
+ workload.name = workload_name
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ workload_params = self._custom_workloads[workload_name]
+ for param, value in workload_params.iteritems():
+ if param == "readwrite":
+ param = "rw"
+ if param in workload.fixed_options:
+ self.logger.warn("Skipping fixed option %s" % param)
+ continue
+ workload.options[param] = value
+
+ for blocksize in self._block_sizes:
+ for iodepth in self._queue_depths:
+
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
+
+ workload.options['bs'] = blocksize
+ workload.options['iodepth'] = iodepth
+
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
+
+ self.logger.info("Workload %s=%s" %
+ (name, workload.options))
+
+ workloads.append(parameters)
+ else:
+ for workload_module in self.workload_modules:
+ workload_name = getattr(workload_module, "__name__")
+
+ constructorMethod = getattr(workload_module, workload_name)
+ workload = constructorMethod()
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ if (workload_name.startswith("_")):
+ iodepths = [8, ]
+ blocksizes = [16384, ]
+ else:
+ iodepths = self._queue_depths
+ blocksizes = self._block_sizes
- for blocksize in blocksizes:
- for iodepth in iodepths:
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
- name = '%s.%s.queue-depth.%s.block-size.%s' % \
- (self.job_db.job_id, workload_name, iodepth, blocksize)
- self.workload_status[name] = "Pending"
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
- parameters = {'queue-depth': iodepth,
- 'blocksize': blocksize,
- 'name': name,
- 'workload_name': workload_name,
- 'status': 'Pending',
- 'workload': workload}
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
- self.logger.info("Workload %s=%s" % (name, parameters))
+ self.logger.info("Workload %s=%s" % (name, parameters))
- workloads.append(parameters)
+ workloads.append(parameters)
return workloads
- def execute_on_node(self, workload):
+ def execute_on_node(self, workload, parse_only=False):
invoker = FIOInvoker(self.metadata)
- invoker.register(self.event)
workload.invoker = invoker
self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(workload)
- workload.execute()
- self.job_db.end_workload(workload)
- invoker.unregister(self.event)
+ if not parse_only:
+ invoker.register(self.event)
+ self.job_db.start_workload(workload)
+ result = workload.execute(parse_only)
+ if not parse_only:
+ self.job_db.end_workload(workload)
+ invoker.unregister(self.event)
self.logger.info("Ended " + workload.fullname)
+ return result
diff --git a/docker/storperf-master/storperf/utilities/data_handler.py b/docker/storperf-master/storperf/utilities/data_handler.py
index b85517f..c7d70a7 100644
--- a/docker/storperf-master/storperf/utilities/data_handler.py
+++ b/docker/storperf-master/storperf/utilities/data_handler.py
@@ -151,14 +151,12 @@ class DataHandler(object):
test_db = os.environ.get('TEST_DB_URL')
if test_db is not None:
self.logger.info("Pushing results to %s" % (test_db))
- try:
- response = test_results_db.push_results_to_db(
- test_db,
- executor.metadata,
- self.logger)
- executor.result_url = response['href']
- except Exception:
- self.logger.exception("Error pushing results into Database")
+ response = test_results_db.push_results_to_db(
+ test_db,
+ executor.metadata,
+ self.logger)
+ if response:
+ self.logger.info("Results reference: %s" % response['href'])
def _determine_criteria(self, metadata):
steady_state = True
diff --git a/docker/storperf-master/storperf/workloads/_base_workload.py b/docker/storperf-master/storperf/workloads/_base_workload.py
index c2c7b7b..9b04314 100644
--- a/docker/storperf-master/storperf/workloads/_base_workload.py
+++ b/docker/storperf-master/storperf/workloads/_base_workload.py
@@ -30,8 +30,9 @@ class _base_workload(object):
self.invoker = None
self.remote_host = None
self.id = None
+ self.name = self.__class__.__name__
- def execute(self):
+ def execute(self, parse_only=False):
if self.invoker is None:
raise ValueError("No invoker has been set")
@@ -55,7 +56,10 @@ class _base_workload(object):
for key, value in self.options.iteritems():
args.append('--' + key + "=" + value)
- self.invoker.execute(args)
+ if parse_only:
+ args.append('--parse-only')
+
+ return self.invoker.execute(args)
def terminate(self):
if self.invoker is not None:
@@ -74,11 +78,11 @@ class _base_workload(object):
@property
def fullname(self):
- host_file = self.remote_host+"."+self.filename
+ host_file = self.remote_host + "." + self.filename
host_file = host_file.replace(".", "-").replace("/", "-")
return ("%s.%s.queue-depth.%s.block-size.%s.%s"
% (str(self.id),
- self.__class__.__name__,
+ self.name,
str(self.options['iodepth']),
str(self.options['bs']),
host_file))
diff --git a/docker/storperf-master/storperf/workloads/_custom_workload.py b/docker/storperf-master/storperf/workloads/_custom_workload.py
new file mode 100644
index 0000000..9e0100d
--- /dev/null
+++ b/docker/storperf-master/storperf/workloads/_custom_workload.py
@@ -0,0 +1,36 @@
+##############################################################################
+# Copyright (c) 2015 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+from storperf.workloads import _base_workload
+
+
+class _custom_workload(_base_workload._base_workload):
+
+ def __init__(self):
+ self.logger = logging.getLogger(self.__class__.__name__)
+ self.default_filesize = "1G"
+ self.filename = '/dev/vdb'
+ self.fixed_options = {
+ 'loops': '200',
+ 'output-format': 'json',
+ 'status-interval': '60'
+ }
+ self.options = {
+ 'ioengine': 'libaio',
+ 'direct': '1',
+ 'numjobs': '1',
+ 'rw': 'read',
+ 'bs': '64k',
+ 'iodepth': '1'
+ }
+ self.options.update(self.fixed_options)
+ self.invoker = None
+ self.remote_host = None
+ self.id = None
diff --git a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py b/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py
deleted file mode 100644
index cce3c31..0000000
--- a/docker/storperf-master/storperf/workloads/_ssd_preconditioning.py
+++ /dev/null
@@ -1,17 +0,0 @@
-##############################################################################
-# Copyright (c) 2015 EMC and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-from storperf.workloads import _base_workload
-
-
-class _ssd_preconditioning(_base_workload._base_workload):
-
- def setup(self):
- self.options['name'] = 'ssd_preconditioning'
- self.options['rw'] = 'randwrite'
- self.options['loops'] = '1'