summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/test_executor.py
diff options
context:
space:
mode:
authormbeierl <mark.beierl@dell.com>2018-07-05 15:22:43 -0400
committermbeierl <mark.beierl@dell.com>2018-07-06 16:40:31 -0400
commitff389f460711b17db2070ca90417f7ebbc0eff56 (patch)
tree6292493f1acad91b406bfc126885c5bf8c6532ca /docker/storperf-master/storperf/test_executor.py
parent3de258b9d0f6d4249a5e7f42eec41fcb0080bc66 (diff)
Support Custom Workloads
Refactors interaction with test_executor to clean up the tight coupling. Adds ability to specify custom workloads. Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503 JIRA: STORPERF-246 Co-Authored-By: Ameed.Ashour.Ext@Nokia.com Signed-off-by: mbeierl <mark.beierl@dell.com>
Diffstat (limited to 'docker/storperf-master/storperf/test_executor.py')
-rw-r--r--docker/storperf-master/storperf/test_executor.py263
1 files changed, 163 insertions, 100 deletions
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 2ed6a9e..38e052e 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -11,6 +11,7 @@ import copy
import imp
import json
import logging
+from multiprocessing.pool import ThreadPool
from os import listdir
import os
from os.path import isfile, join
@@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
+from storperf.workloads._custom_workload import _custom_workload
class UnknownWorkload(Exception):
pass
+class InvalidWorkloadName(Exception):
+ pass
+
+
class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
+ self._custom_workloads = {}
self.filename = None
self.deadline = None
self.steady_state_samples = 10
@@ -43,7 +50,6 @@ class TestExecutor(object):
self.end_time = None
self.current_workload = None
self.workload_status = {}
- self.result_url = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -98,6 +104,16 @@ class TestExecutor(object):
self._volume_count = volume_count
@property
+ def custom_workloads(self):
+ return self._custom_workloads
+
+ @custom_workloads.setter
+ def custom_workloads(self, custom_workloads):
+ self.logger.debug("Set custom workloads to: %s " %
+ custom_workloads)
+ self._custom_workloads = custom_workloads
+
+ @property
def queue_depths(self):
return ','.join(self._queue_depths)
@@ -152,7 +168,7 @@ class TestExecutor(object):
self.logger.debug("Notifying event listener %s",
event_listener)
event_listener(self)
- except Exception, e:
+ except Exception as e:
self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
@@ -185,7 +201,7 @@ class TestExecutor(object):
"ERROR: Unknown workload: " + workload)
if workload_module not in self.workload_modules:
self.workload_modules.append(workload_module)
- except ImportError, err:
+ except ImportError as err:
raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
@@ -193,23 +209,26 @@ class TestExecutor(object):
path, fname = os.path.split(uri)
mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
- self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
- self.logger.debug("Loading compiled: " + mname + " from " + no_ext)
return imp.load_compiled(mname, no_ext + '.pyc')
if os.path.exists(no_ext + '.py'):
- self.logger.debug("Compiling: " + mname + " from " + no_ext)
return imp.load_source(mname, no_ext + '.py')
return None
def execute(self, metadata):
self.job_db.create_job_id()
+ try:
+ self.test_params()
+ except Exception as e:
+ self.terminate()
+ raise e
self.job_db.record_workload_params(metadata)
self._setup_metadata(metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
self._workload_thread.start()
+ # seems to be hanging here
return self.job_db.job_id
def terminate(self):
@@ -225,38 +244,61 @@ class TestExecutor(object):
terminated_hosts.append(workload.remote_host)
return terminated_hosts
- def execution_status(self, job_id):
-
- result = {}
- status = "Completed"
-
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
-
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
-
- else:
- jobs = self.job_db.fetch_jobs()
- self.logger.info("Jobs")
- self.logger.info(jobs)
- for job in jobs:
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
- else:
- result[job] = {}
- result[job]['Status'] = "Completed"
-
- return result
+ def test_params(self):
+ workloads = self._create_workload_matrix()
+ for current_workload in workloads:
+ workload = current_workload['workload']
+ self.logger.info("Testing FIO parameters for %s"
+ % current_workload)
+ result = self._execute_workload(current_workload,
+ workload,
+ parse_only=True)
+ if result:
+ message = result[0]
+ self.logger.error("FIO parameter validation failed")
+ raise Exception("Workload parameter validation failed %s"
+ % message)
+ pass
+
+ def _execute_workload(self, current_workload, workload, parse_only=False):
+ workload.options['iodepth'] = str(current_workload['queue-depth'])
+ workload.options['bs'] = str(current_workload['blocksize'])
+ slave_threads = []
+ thread_pool = ThreadPool(processes=len(self.slaves) *
+ self.volume_count)
+
+ for slave in self.slaves:
+ volume_number = 0
+ while volume_number < self.volume_count:
+ slave_workload = copy.copy(current_workload['workload'])
+ slave_workload.remote_host = slave
+ last_char_of_filename = chr(
+ ord(slave_workload.filename[-1:]) + volume_number)
+ slave_workload.filename = ("%s%s" %
+ (slave_workload.filename[:-1],
+ last_char_of_filename))
+ self.logger.debug("Device to profile on %s: %s" %
+ (slave, slave_workload.filename))
+ self._workload_executors.append(slave_workload)
+
+ worker = thread_pool.apply_async(
+ self.execute_on_node, (slave_workload, parse_only))
+ slave_threads.append(worker)
+ volume_number += 1
+
+ final_result = None
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ result = slave_thread.get()
+ self.logger.debug("Done waiting for %s, exit status %s" %
+ (slave_thread, result))
+ if result:
+ final_result = result
+ return final_result
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
- self.event_listeners.clear()
data_handler = DataHandler()
self.register(data_handler.data_event)
@@ -267,12 +309,13 @@ class TestExecutor(object):
workloads = self._create_workload_matrix()
for current_workload in workloads:
+ if self._terminated:
+ continue
+
workload = current_workload['workload']
self._thread_gate = ThreadGate(len(self.slaves),
workload.options['status-interval'])
- if self._terminated:
- return
self.current_workload = current_workload['name']
self.logger.info("Starting run %s" % self.current_workload)
@@ -287,34 +330,7 @@ class TestExecutor(object):
t = Thread(target=scheduler.run, args=())
t.start()
- workload.options['iodepth'] = str(current_workload['queue-depth'])
- workload.options['bs'] = str(current_workload['blocksize'])
-
- slave_threads = []
- for slave in self.slaves:
- volume_number = 0
- while volume_number < self.volume_count:
- slave_workload = copy.copy(current_workload['workload'])
- slave_workload.remote_host = slave
- last_char_of_filename = chr(ord(
- slave_workload.filename[-1:]) + volume_number)
- slave_workload.filename = "%s%s" % \
- (slave_workload.filename[:-1], last_char_of_filename)
- self.logger.debug("Device to profile: %s" %
- slave_workload.filename)
- self._workload_executors.append(slave_workload)
- t = Thread(target=self.execute_on_node,
- args=(slave_workload,),
- name="%s worker" % slave)
- t.daemon = False
- t.start()
- slave_threads.append(t)
- volume_number += 1
-
- for slave_thread in slave_threads:
- self.logger.debug("Waiting on %s" % slave_thread)
- slave_thread.join()
- self.logger.debug("Done waiting for %s" % slave_thread)
+ self._execute_workload(current_workload, workload)
if not scheduler.empty():
try:
@@ -337,59 +353,106 @@ class TestExecutor(object):
report = {'report': json.dumps(self.metadata)}
self.job_db.record_workload_params(report)
self.job_db.job_id = None
- if self.result_url is not None:
- self.logger.info("Results can be found at %s" % self.result_url)
def _create_workload_matrix(self):
workloads = []
- for workload_module in self.workload_modules:
- workload_name = getattr(workload_module, "__name__")
-
- constructorMethod = getattr(workload_module, workload_name)
- workload = constructorMethod()
- if (self.filename is not None):
- workload.filename = self.filename
- workload.id = self.job_db.job_id
-
- if (workload_name.startswith("_")):
- iodepths = [8, ]
- blocksizes = [16384, ]
- else:
- iodepths = self._queue_depths
- blocksizes = self._block_sizes
+ if self._custom_workloads:
+ for workload_name in self._custom_workloads.iterkeys():
+ if not workload_name.isalnum():
+ raise InvalidWorkloadName(
+ "Workload name must be alphanumeric only: %s" %
+ workload_name)
+ workload = _custom_workload()
+ workload.options['name'] = workload_name
+ workload.name = workload_name
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ workload_params = self._custom_workloads[workload_name]
+ for param, value in workload_params.iteritems():
+ if param == "readwrite":
+ param = "rw"
+ if param in workload.fixed_options:
+ self.logger.warn("Skipping fixed option %s" % param)
+ continue
+ workload.options[param] = value
+
+ for blocksize in self._block_sizes:
+ for iodepth in self._queue_depths:
+
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
+
+ workload.options['bs'] = blocksize
+ workload.options['iodepth'] = iodepth
+
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
+
+ self.logger.info("Workload %s=%s" %
+ (name, workload.options))
+
+ workloads.append(parameters)
+ else:
+ for workload_module in self.workload_modules:
+ workload_name = getattr(workload_module, "__name__")
+
+ constructorMethod = getattr(workload_module, workload_name)
+ workload = constructorMethod()
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ if (workload_name.startswith("_")):
+ iodepths = [8, ]
+ blocksizes = [16384, ]
+ else:
+ iodepths = self._queue_depths
+ blocksizes = self._block_sizes
- for blocksize in blocksizes:
- for iodepth in iodepths:
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
- name = '%s.%s.queue-depth.%s.block-size.%s' % \
- (self.job_db.job_id, workload_name, iodepth, blocksize)
- self.workload_status[name] = "Pending"
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
- parameters = {'queue-depth': iodepth,
- 'blocksize': blocksize,
- 'name': name,
- 'workload_name': workload_name,
- 'status': 'Pending',
- 'workload': workload}
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
- self.logger.info("Workload %s=%s" % (name, parameters))
+ self.logger.info("Workload %s=%s" % (name, parameters))
- workloads.append(parameters)
+ workloads.append(parameters)
return workloads
- def execute_on_node(self, workload):
+ def execute_on_node(self, workload, parse_only=False):
invoker = FIOInvoker(self.metadata)
- invoker.register(self.event)
workload.invoker = invoker
self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(workload)
- workload.execute()
- self.job_db.end_workload(workload)
- invoker.unregister(self.event)
+ if not parse_only:
+ invoker.register(self.event)
+ self.job_db.start_workload(workload)
+ result = workload.execute(parse_only)
+ if not parse_only:
+ self.job_db.end_workload(workload)
+ invoker.unregister(self.event)
self.logger.info("Ended " + workload.fullname)
+ return result