From 7617c6f079a2926c5d0640c40801fa5cb14023ee Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Tue, 26 Apr 2016 13:18:23 -0400 Subject: Cancel Job API Add the ability to terminate a running job via the API JIRA: STORPERF-20 Change-Id: I73a701cff9712207f5e14cfcc6b8fb7e0ab59aed Signed-off-by: Mark Beierl --- cli.py | 16 ++++++++++++++-- rest_server.py | 11 +++++++++-- storperf/fio/fio_invoker.py | 30 +++++++++++++++++++++++++++--- storperf/storperf_master.py | 4 +++- storperf/test_executor.py | 16 ++++++++++++++++ storperf/workloads/_base_workload.py | 4 ++++ 6 files changed, 73 insertions(+), 8 deletions(-) diff --git a/cli.py b/cli.py index 5595314..1f20e31 100644 --- a/cli.py +++ b/cli.py @@ -64,6 +64,7 @@ def main(argv=None): debug = False report = None erase = False + terminate = False options = {} storperf = StorPerfMaster() @@ -72,7 +73,7 @@ def main(argv=None): argv = sys.argv try: try: - opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh", + opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdTh", ["target=", "workload=", "report=", @@ -82,6 +83,7 @@ def main(argv=None): "nowarm", "verbose", "debug", + "terminate", "help", ]) except getopt.error, msg: @@ -110,6 +112,8 @@ def main(argv=None): report = a elif o in ("-e", "--erase"): erase = True + elif o in ("-T", "--terminate"): + terminate = True elif o in ("-f", "--configure"): configuration = dict(x.split('=') for x in a.split(',')) @@ -134,6 +138,14 @@ def main(argv=None): raise Usage(content['message']) return 0 + if (terminate): + response = requests.delete( + 'http://127.0.0.1:5000/api/v1.0/job') + if (response.status_code == 400): + content = json.loads(response.content) + raise Usage(content['message']) + return 0 + if (configuration is not None): response = requests.post( 'http://127.0.0.1:5000/api/v1.0/configure', json=configuration) @@ -146,7 +158,7 @@ def main(argv=None): else: print "Calling start..." response = requests.post( - 'http://127.0.0.1:5000/api/v1.0/start', json=options) + 'http://127.0.0.1:5000/api/v1.0/job', json=options) if (response.status_code == 400): content = json.loads(response.content) raise Usage(content['message']) diff --git a/rest_server.py b/rest_server.py index ffb750e..1194ab5 100644 --- a/rest_server.py +++ b/rest_server.py @@ -64,7 +64,7 @@ class Configure(Resource): abort(400, str(e)) -class StartJob(Resource): +class Job(Resource): def __init__(self): self.logger = logging.getLogger(__name__) @@ -97,6 +97,13 @@ class StartJob(Resource): except Exception as e: abort(400, str(e)) + def delete(self): + try: + storperf.terminate_workloads() + return True + except Exception as e: + abort(400, str(e)) + class Quota(Resource): @@ -129,7 +136,7 @@ def setup_logging(default_path='storperf/logging.json', api.add_resource(Configure, "/api/v1.0/configure") api.add_resource(Quota, "/api/v1.0/quota") -api.add_resource(StartJob, "/api/v1.0/start") +api.add_resource(Job, "/api/v1.0/job") if __name__ == "__main__": setup_logging() diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py index fad2546..4f39eb7 100644 --- a/storperf/fio/fio_invoker.py +++ b/storperf/fio/fio_invoker.py @@ -7,10 +7,10 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from threading import Thread import json import logging import subprocess +from threading import Thread class FIOInvoker(object): @@ -88,11 +88,35 @@ class FIOInvoker(object): stderr=subprocess.PIPE) t = Thread(target=self.stdout_handler, args=()) - t.daemon = False + t.daemon = True t.start() t = Thread(target=self.stderr_handler, args=()) - t.daemon = False + t.daemon = True t.start() + self.logger.debug("Started fio on " + self.remote_host) t.join() + self.logger.debug("Finished fio on " + self.remote_host) + + def terminate(self): + self.logger.debug("Terminating fio on " + self.remote_host) + cmd = ['ssh', '-o', 'StrictHostKeyChecking=no', + '-i', 'storperf/resources/ssh/storperf_rsa', + 'storperf@' + self.remote_host, + 'sudo', 'killall', '-9', 'fio'] + + kill_process = subprocess.Popen(cmd, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + for line in iter(kill_process.stdout.readline, b''): + self.logger.debug("FIO Termination: " + line) + + kill_process.stdout.close() + + for line in iter(kill_process.stderr.readline, b''): + self.logger.debug("FIO Termination: " + line) + + kill_process.stderr.close() diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py index b4fef7f..c684ce6 100644 --- a/storperf/storperf_master.py +++ b/storperf/storperf_master.py @@ -232,7 +232,6 @@ class StorPerfMaster(object): self.stack_id = None def execute_workloads(self): - if (self.stack_id is None): raise ParameterError("ERROR: Stack does not exist") @@ -255,6 +254,9 @@ class StorPerfMaster(object): self._test_executor.slaves = slaves return self._test_executor.execute() + def terminate_workloads(self): + return self._test_executor.terminate() + def _setup_slave(self, slave): logger = logging.getLogger(__name__ + ":" + slave) diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 497d17c..aa8a415 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -38,6 +38,8 @@ class TestExecutor(object): self.prefix = None self.job_db = JobDB() self._slaves = [] + self._terminated = False + self._workload_executors = [] self._workload_thread = None @property @@ -118,7 +120,13 @@ class TestExecutor(object): self._workload_thread.start() return self.job_db.job_id + def terminate(self): + self._terminated = True + for workload in self._workload_executors: + workload.terminate() + def execute_workloads(self): + self._terminated = False for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") constructorMethod = getattr(workload_module, workload_name) @@ -137,6 +145,9 @@ class TestExecutor(object): for blocksize in blocksizes: for iodepth in iodepths: + if self._terminated: + return + workload.options['iodepth'] = str(iodepth) workload.options['bs'] = str(blocksize) @@ -144,6 +155,9 @@ class TestExecutor(object): for slave in self.slaves: slave_workload = copy.copy(workload) slave_workload.remote_host = slave + + self._workload_executors.append(slave_workload) + t = Thread(target=self.execute_on_node, args=(slave_workload,)) t.daemon = False @@ -153,6 +167,8 @@ class TestExecutor(object): for slave_thread in slave_threads: slave_thread.join() + self._workload_executors = [] + def execute_on_node(self, workload): invoker = FIOInvoker() diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py index 4eccc08..050a15c 100644 --- a/storperf/workloads/_base_workload.py +++ b/storperf/workloads/_base_workload.py @@ -57,6 +57,10 @@ class _base_workload(object): self.invoker.execute(args) + def terminate(self): + if self.invoker is not None: + self.invoker.terminate() + def setup(self): pass -- cgit 1.2.3-korg