From 7617c6f079a2926c5d0640c40801fa5cb14023ee Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Tue, 26 Apr 2016 13:18:23 -0400 Subject: Cancel Job API Add the ability to terminate a running job via the API JIRA: STORPERF-20 Change-Id: I73a701cff9712207f5e14cfcc6b8fb7e0ab59aed Signed-off-by: Mark Beierl --- storperf/fio/fio_invoker.py | 30 +++++++++++++++++++++++++++--- storperf/storperf_master.py | 4 +++- storperf/test_executor.py | 16 ++++++++++++++++ storperf/workloads/_base_workload.py | 4 ++++ 4 files changed, 50 insertions(+), 4 deletions(-) (limited to 'storperf') diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index fad2546..4f39eb7 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -7,10 +7,10 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from threading import Thread import json import logging import subprocess +from threading import Thread class FIOInvoker(object): @@ -88,11 +88,35 @@ class FIOInvoker(object): stderr=subprocess.PIPE) t = Thread(target=self.stdout_handler, args=()) - t.daemon = False + t.daemon = True t.start() t = Thread(target=self.stderr_handler, args=()) - t.daemon = False + t.daemon = True t.start() + self.logger.debug("Started fio on " + self.remote_host) t.join() + self.logger.debug("Finished fio on " + self.remote_host) + + def terminate(self): + self.logger.debug("Terminating fio on " + self.remote_host) + cmd = ['ssh', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'storperf@' + self.remote_host, + 'sudo', 'killall', '-9', 'fio'] + + kill_process = subprocess.Popen(cmd, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + for line in iter(kill_process.stdout.readline, b''): + self.logger.debug("FIO Termination: " + line) + + kill_process.stdout.close() + + for line in iter(kill_process.stderr.readline, b''): + self.logger.debug("FIO Termination: " + line) + + kill_process.stderr.close() diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index b4fef7f..c684ce6 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -232,7 +232,6 @@ class StorPerfMaster(object): self.stack_id = None def execute_workloads(self): - if (self.stack_id is None): raise ParameterError("ERROR: Stack does not exist") @@ -255,6 +254,9 @@ class StorPerfMaster(object): self._test_executor.slaves = slaves return self._test_executor.execute() + def terminate_workloads(self): + return self._test_executor.terminate() + def _setup_slave(self, slave): logger = logging.getLogger(__name__ + ":" + slave) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 497d17c..aa8a415 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -38,6 +38,8 @@ class TestExecutor(object): self.prefix = None self.job_db = JobDB() self._slaves = [] + self._terminated = False + self._workload_executors = [] self._workload_thread = None @property @@ -118,7 +120,13 @@ class TestExecutor(object): self._workload_thread.start() return self.job_db.job_id + def terminate(self): + self._terminated = True + for workload in self._workload_executors: + workload.terminate() + def execute_workloads(self): + self._terminated = False for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) @@ -137,6 +145,9 @@ class TestExecutor(object): for blocksize in blocksizes: for iodepth in iodepths: + if self._terminated: + return + workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) @@ -144,6 +155,9 @@ class TestExecutor(object): for slave in self.slaves: slave_workload = copy.copy(workload) slave_workload.remote_host = slave + + self._workload_executors.append(slave_workload) + t = Thread(target=self.execute_on_node, args=(slave_workload,)) t.daemon = False @@ -153,6 +167,8 @@ class TestExecutor(object): for slave_thread in slave_threads: slave_thread.join() + self._workload_executors = [] + def execute_on_node(self, workload): invoker = FIOInvoker() diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index 4eccc08..050a15c 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -57,6 +57,10 @@ class _base_workload(object): self.invoker.execute(args) + def terminate(self): + if self.invoker is not None: + self.invoker.terminate() + def setup(self): pass -- cgit 1.2.3-korg