From ff389f460711b17db2070ca90417f7ebbc0eff56 Mon Sep 17 00:00:00 2001 From: mbeierl Date: Thu, 5 Jul 2018 15:22:43 -0400 Subject: Support Custom Workloads Refactors interaction with test_executor to clean up the tight coupling. Adds ability to specify custom workloads. Change-Id: Idbadcec1f42714e96c5f96d1e45c05982a531503 JIRA: STORPERF-246 Co-Authored-By: Ameed.Ashour.Ext@Nokia.com Signed-off-by: mbeierl --- docker/storperf-master/storperf/storperf_master.py | 101 +++++++++++++++------ 1 file changed, 75 insertions(+), 26 deletions(-) (limited to 'docker/storperf-master/storperf/storperf_master.py') diff --git a/docker/storperf-master/storperf/storperf_master.py b/docker/storperf-master/storperf/storperf_master.py index 7a1444e..1025789 100644 --- a/docker/storperf-master/storperf/storperf_master.py +++ b/docker/storperf-master/storperf/storperf_master.py @@ -59,7 +59,7 @@ class StorPerfMaster(object): self.stack_settings) self.username = None self.password = None - self._test_executor = TestExecutor() + self._test_executor = None self._agent_count = 1 self._agent_image = "Ubuntu 14.04" self._agent_flavor = "storperf" @@ -72,6 +72,13 @@ class StorPerfMaster(object): self._last_snaps_check_time = None self._slave_addresses = [] self._thread_pool = worker_pool(20) + self._filename = None + self._deadline = None + self._steady_state_samples = 10 + self._queue_depths = [1, 4, 8] + self._block_sizes = [512, 4096, 16384] + self._workload_modules = [] + self._custom_workloads = [] @property def volume_count(self): @@ -245,57 +252,68 @@ class StorPerfMaster(object): @property def filename(self): - return self._test_executor.filename + return self._filename @filename.setter def filename(self, value): - self._test_executor.filename = value + self._filename = value @property def deadline(self): - return self._test_executor.deadline + return self._deadline @deadline.setter def deadline(self, value): - self._test_executor.deadline = value + self._deadline = value @property def steady_state_samples(self): - return self._test_executor.steady_state_samples + return self._steady_state_samples @steady_state_samples.setter def steady_state_samples(self, value): - self._test_executor.steady_state_samples = value + self._steady_state_samples = value @property def queue_depths(self): - return self._test_executor.queue_depths + return self._queue_depths @queue_depths.setter def queue_depths(self, value): - self._test_executor.queue_depths = value + self._queue_depths = value @property def block_sizes(self): - return self._test_executor.block_sizes + return self._block_sizes @block_sizes.setter def block_sizes(self, value): - self._test_executor.block_sizes = value - - @property - def is_stack_created(self): - return (self.stack_id is not None and - (self.heat_stack.get_status() == u'CREATE_COMPLETE' or - self.heat_stack.get_status() == u'UPDATE_COMPLETE')) + self._block_sizes = value @property def workloads(self): - return str(self._test_executor.workload_modules) + return self._workload_modules @workloads.setter def workloads(self, value): - self._test_executor.register_workloads(value) + executor = TestExecutor() + executor.register_workloads(value) + self._workload_modules = value + + @property + def custom_workloads(self): + return self._custom_workloads + + @custom_workloads.setter + def custom_workloads(self, value): + self.logger.info("Custom workloads = %s" % value) + self._custom_workloads = value + + @property + def is_stack_created(self): + return (self.stack_id is not None and + (self.heat_stack.get_status() == u'CREATE_COMPLETE' or + self.heat_stack.get_status() == u'UPDATE_COMPLETE')) def get_logs(self, lines=None): LOG_DIR = './storperf.log' @@ -358,14 +376,29 @@ class StorPerfMaster(object): self.heat_stack.clean() return stack_id + def executor_event(self, executor): + if executor.terminated: + self._test_executor = None + def execute_workloads(self, metadata={}): + if (self._test_executor is not None and + (not self._test_executor.terminated and + self._test_executor.job_id is not None)): + raise Exception("ERROR: Job {} is already running".format( + self._test_executor.job_id)) + if (self.stack_id is None): raise ParameterError("ERROR: Stack does not exist") - if (not self._test_executor.terminated and - self._test_executor.job_id is not None): - raise Exception("ERROR: Job {} is already running".format( - self._test_executor.job_id)) + self._test_executor = TestExecutor() + self._test_executor.register(self.executor_event) + self._test_executor.register_workloads(self._workload_modules) + self._test_executor.custom_workloads = self.custom_workloads + self._test_executor.block_sizes = self._block_sizes + self._test_executor.filename = self._filename + self._test_executor.deadline = self._deadline + self._test_executor.steady_state_samples = self._steady_state_samples + self._test_executor.queue_depths = self._queue_depths slaves = self._slave_addresses @@ -397,10 +430,14 @@ class StorPerfMaster(object): return job_id def terminate_workloads(self): - return self._test_executor.terminate() + if self._test_executor is not None: + return self._test_executor.terminate() + else: + return True def fetch_results(self, job_id): - if self._test_executor.job_db.job_id == job_id: + if (self._test_executor is not None and + self._test_executor.job_db.job_id == job_id): return self._test_executor.metadata['details']['metrics'] workload_params = self.job_db.fetch_workload_params(job_id) @@ -413,7 +450,19 @@ class StorPerfMaster(object): return self.job_db.fetch_workload_params(job_id) def fetch_job_status(self, job_id): - return self._test_executor.execution_status(job_id) + results = {} + + if (self._test_executor is not None and + self._test_executor.job_id == job_id): + results['Status'] = 'Running' + results['Workloads'] = self._test_executor.workload_status + else: + jobs = self.job_db.fetch_jobs() + for job in jobs: + if job == job_id: + results['Status'] = "Completed" + + return results def fetch_all_jobs(self, metrics_type): job_list = self.job_db.fetch_jobs() -- cgit 1.2.3-korg