summaryrefslogtreecommitdiffstats
path: root/docker/storperf-master/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/storperf-master/storperf/test_executor.py')
-rw-r--r--docker/storperf-master/storperf/test_executor.py282
1 files changed, 184 insertions, 98 deletions
diff --git a/docker/storperf-master/storperf/test_executor.py b/docker/storperf-master/storperf/test_executor.py
index 9ed6386..cb7e478 100644
--- a/docker/storperf-master/storperf/test_executor.py
+++ b/docker/storperf-master/storperf/test_executor.py
@@ -11,6 +11,7 @@ import copy
import imp
import json
import logging
+from multiprocessing.pool import ThreadPool
from os import listdir
import os
from os.path import isfile, join
@@ -25,17 +26,23 @@ from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
from storperf.utilities.data_handler import DataHandler
from storperf.utilities.thread_gate import ThreadGate
+from storperf.workloads._custom_workload import _custom_workload
class UnknownWorkload(Exception):
pass
+class InvalidWorkloadName(Exception):
+ pass
+
+
class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
+ self._custom_workloads = {}
self.filename = None
self.deadline = None
self.steady_state_samples = 10
@@ -43,7 +50,6 @@ class TestExecutor(object):
self.end_time = None
self.current_workload = None
self.workload_status = {}
- self.result_url = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -53,6 +59,7 @@ class TestExecutor(object):
self.job_db = JobDB()
self._slaves = []
self._terminated = False
+ self._volume_count = 1
self._workload_executors = []
self._workload_thread = None
self._thread_gate = None
@@ -62,7 +69,7 @@ class TestExecutor(object):
try:
installer = os.environ['INSTALLER_TYPE']
except KeyError:
- self.logger.error("Cannot determine installer")
+ self.logger.warn("Cannot determine installer")
installer = "Unknown_installer"
self.metadata = {}
@@ -88,6 +95,25 @@ class TestExecutor(object):
self._slaves = slaves
@property
+ def volume_count(self):
+ return self._volume_count
+
+ @volume_count.setter
+ def volume_count(self, volume_count):
+ self.logger.debug("Set volume count to: " + str(volume_count))
+ self._volume_count = volume_count
+
+ @property
+ def custom_workloads(self):
+ return self._custom_workloads
+
+ @custom_workloads.setter
+ def custom_workloads(self, custom_workloads):
+ self.logger.debug("Set custom workloads to: %s " %
+ custom_workloads)
+ self._custom_workloads = custom_workloads
+
+ @property
def queue_depths(self):
return ','.join(self._queue_depths)
@@ -142,7 +168,7 @@ class TestExecutor(object):
self.logger.debug("Notifying event listener %s",
event_listener)
event_listener(self)
- except Exception, e:
+ except Exception as e:
self.logger.exception("While notifying listener %s", e)
def register_workloads(self, workloads):
@@ -175,7 +201,7 @@ class TestExecutor(object):
"ERROR: Unknown workload: " + workload)
if workload_module not in self.workload_modules:
self.workload_modules.append(workload_module)
- except ImportError, err:
+ except ImportError as err:
raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
@@ -183,19 +209,23 @@ class TestExecutor(object):
path, fname = os.path.split(uri)
mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
- self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
- self.logger.debug("Loading compiled: " + mname + " from " + no_ext)
return imp.load_compiled(mname, no_ext + '.pyc')
if os.path.exists(no_ext + '.py'):
- self.logger.debug("Compiling: " + mname + " from " + no_ext)
return imp.load_source(mname, no_ext + '.py')
return None
def execute(self, metadata):
self.job_db.create_job_id()
- self.job_db.record_workload_params(metadata)
self._setup_metadata(metadata)
+ try:
+ self.test_params()
+ except Exception as e:
+ self.terminate()
+ raise e
+ stripped_metadata = metadata.copy()
+ stripped_metadata.pop('ssh_key', None)
+ self.job_db.record_workload_params(stripped_metadata)
self._workload_thread = Thread(target=self.execute_workloads,
args=(),
name="Workload thread")
@@ -215,38 +245,63 @@ class TestExecutor(object):
terminated_hosts.append(workload.remote_host)
return terminated_hosts
- def execution_status(self, job_id):
-
- result = {}
- status = "Completed"
-
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
-
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
+ def test_params(self):
+ workloads = self._create_workload_matrix()
+ for current_workload in workloads:
+ workload = current_workload['workload']
+ self.logger.info("Testing FIO parameters for %s"
+ % current_workload)
+ result = self._execute_workload(current_workload,
+ workload,
+ parse_only=True)
+ if result:
+ message = result[0]
+ self.logger.error("FIO parameter validation failed")
+ raise Exception("Workload parameter validation failed %s"
+ % message)
+ pass
+
+ def _execute_workload(self, current_workload, workload, parse_only=False):
+ workload.options['iodepth'] = str(current_workload['queue-depth'])
+ workload.options['bs'] = str(current_workload['blocksize'])
+ self._workload_executors = []
+ slave_threads = []
+ thread_pool = ThreadPool(processes=len(self.slaves) *
+ self.volume_count)
- else:
- jobs = self.job_db.fetch_jobs()
- self.logger.info("Jobs")
- self.logger.info(jobs)
- for job in jobs:
- if self.job_db.job_id == job_id and self._terminated is False:
- status = "Running"
- result['Status'] = status
- result['Workloads'] = self.workload_status
- result['TestResultURL'] = self.result_url
- else:
- result[job] = {}
- result[job]['Status'] = "Completed"
+ self._workload_executors = []
+ for slave in self.slaves:
+ volume_number = 0
+ while volume_number < self.volume_count:
+ slave_workload = copy.copy(current_workload['workload'])
+ slave_workload.remote_host = slave
+ last_char_of_filename = chr(
+ ord(slave_workload.filename[-1:]) + volume_number)
+ slave_workload.filename = ("%s%s" %
+ (slave_workload.filename[:-1],
+ last_char_of_filename))
+ self.logger.debug("Device to profile on %s: %s" %
+ (slave, slave_workload.filename))
+ self._workload_executors.append(slave_workload)
- return result
+ worker = thread_pool.apply_async(
+ self.execute_on_node, (slave_workload, parse_only))
+ slave_threads.append(worker)
+ volume_number += 1
+
+ final_result = None
+ for slave_thread in slave_threads:
+ self.logger.debug("Waiting on %s" % slave_thread)
+ result = slave_thread.get()
+ self.logger.debug("Done waiting for %s, exit status %s" %
+ (slave_thread, result))
+ if result:
+ final_result = result
+ return final_result
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
- self.event_listeners.clear()
data_handler = DataHandler()
self.register(data_handler.data_event)
@@ -257,12 +312,14 @@ class TestExecutor(object):
workloads = self._create_workload_matrix()
for current_workload in workloads:
+ if self._terminated:
+ continue
+
workload = current_workload['workload']
- self._thread_gate = ThreadGate(len(self.slaves),
- workload.options['status-interval'])
+ self._thread_gate = ThreadGate(
+ len(self.slaves) * min(1, self.volume_count),
+ float(workload.options['status-interval']))
- if self._terminated:
- return
self.current_workload = current_workload['name']
self.logger.info("Starting run %s" % self.current_workload)
@@ -277,27 +334,7 @@ class TestExecutor(object):
t = Thread(target=scheduler.run, args=())
t.start()
- workload.options['iodepth'] = str(current_workload['queue-depth'])
- workload.options['bs'] = str(current_workload['blocksize'])
-
- slave_threads = []
- for slave in self.slaves:
- slave_workload = copy.copy(current_workload['workload'])
- slave_workload.remote_host = slave
-
- self._workload_executors.append(slave_workload)
-
- t = Thread(target=self.execute_on_node,
- args=(slave_workload,),
- name="%s worker" % slave)
- t.daemon = False
- t.start()
- slave_threads.append(t)
-
- for slave_thread in slave_threads:
- self.logger.debug("Waiting on %s" % slave_thread)
- slave_thread.join()
- self.logger.debug("Done waiting for %s" % slave_thread)
+ self._execute_workload(current_workload, workload)
if not scheduler.empty():
try:
@@ -320,62 +357,111 @@ class TestExecutor(object):
report = {'report': json.dumps(self.metadata)}
self.job_db.record_workload_params(report)
self.job_db.job_id = None
- if self.result_url is not None:
- self.logger.info("Results can be found at %s" % self.result_url)
def _create_workload_matrix(self):
workloads = []
- for workload_module in self.workload_modules:
- workload_name = getattr(workload_module, "__name__")
-
- constructorMethod = getattr(workload_module, workload_name)
- workload = constructorMethod()
- if (self.filename is not None):
- workload.filename = self.filename
- workload.id = self.job_db.job_id
-
- if (self.filename is not None):
- workload.filename = self.filename
-
- if (workload_name.startswith("_")):
- iodepths = [8, ]
- blocksizes = [16384, ]
- else:
- iodepths = self._queue_depths
- blocksizes = self._block_sizes
+ if self._custom_workloads:
+ for workload_name in self._custom_workloads.keys():
+ real_name = workload_name
+ if real_name.startswith('_'):
+ real_name = real_name.replace('_', '')
+ self.logger.info("--- real_name: %s" % real_name)
+
+ if not real_name.isalnum():
+ raise InvalidWorkloadName(
+ "Workload name must be alphanumeric only: %s" %
+ real_name)
+ workload = _custom_workload()
+ workload.options['name'] = real_name
+ workload.name = workload_name
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ workload_params = self._custom_workloads[workload_name]
+ for param, value in workload_params.items():
+ if param == "readwrite":
+ param = "rw"
+ if param in workload.fixed_options:
+ self.logger.warn("Skipping fixed option %s" % param)
+ continue
+ workload.options[param] = value
+
+ for blocksize in self._block_sizes:
+ for iodepth in self._queue_depths:
+
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
+
+ workload.options['bs'] = blocksize
+ workload.options['iodepth'] = iodepth
+
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
+
+ self.logger.info("Workload %s=%s" %
+ (name, workload.options))
+
+ workloads.append(parameters)
+ else:
+ for workload_module in self.workload_modules:
+ workload_name = getattr(workload_module, "__name__")
+
+ constructorMethod = getattr(workload_module, workload_name)
+ workload = constructorMethod()
+ if (self.filename is not None):
+ workload.filename = self.filename
+ workload.id = self.job_db.job_id
+
+ if (workload_name.startswith("_")):
+ iodepths = [8, ]
+ blocksizes = [16384, ]
+ else:
+ iodepths = self._queue_depths
+ blocksizes = self._block_sizes
- for blocksize in blocksizes:
- for iodepth in iodepths:
+ for blocksize in blocksizes:
+ for iodepth in iodepths:
- name = '%s.%s.queue-depth.%s.block-size.%s' % \
- (self.job_db.job_id, workload_name, iodepth, blocksize)
- self.workload_status[name] = "Pending"
+ name = '%s.%s.queue-depth.%s.block-size.%s' % \
+ (self.job_db.job_id, workload_name, iodepth,
+ blocksize)
+ self.workload_status[name] = "Pending"
- parameters = {'queue-depth': iodepth,
- 'blocksize': blocksize,
- 'name': name,
- 'workload_name': workload_name,
- 'status': 'Pending',
- 'workload': workload}
+ parameters = {'queue-depth': iodepth,
+ 'blocksize': blocksize,
+ 'name': name,
+ 'workload_name': workload_name,
+ 'status': 'Pending',
+ 'workload': workload}
- self.logger.info("Workload %s=%s" % (name, parameters))
+ self.logger.info("Workload %s=%s" % (name, parameters))
- workloads.append(parameters)
+ workloads.append(parameters)
return workloads
- def execute_on_node(self, workload):
+ def execute_on_node(self, workload, parse_only=False):
invoker = FIOInvoker(self.metadata)
- invoker.register(self.event)
workload.invoker = invoker
self.logger.info("Starting " + workload.fullname)
- self.job_db.start_workload(workload)
- workload.execute()
- self.job_db.end_workload(workload)
- invoker.unregister(self.event)
+ if not parse_only:
+ invoker.register(self.event)
+ self.job_db.start_workload(workload)
+ result = workload.execute(parse_only)
+ if not parse_only:
+ self.job_db.end_workload(workload)
+ invoker.unregister(self.event)
self.logger.info("Ended " + workload.fullname)
+ return result