summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-04-26 13:18:23 -0400
committerMark Beierl <mark.beierl@emc.com>2016-04-26 13:42:25 -0400
commit7617c6f079a2926c5d0640c40801fa5cb14023ee (patch)
tree1b8228f87a7afd090782e2dd4e3eb6d36fad85bd
parent311eee3bec00d5acc32b6eba76a7ff0d1990f4b2 (diff)
Cancel Job API
Add the ability to terminate a running job via the API JIRA: STORPERF-20 Change-Id: I73a701cff9712207f5e14cfcc6b8fb7e0ab59aed Signed-off-by: Mark Beierl <mark.beierl@emc.com>
-rw-r--r--cli.py16
-rw-r--r--rest_server.py11
-rw-r--r--storperf/fio/fio_invoker.py30
-rw-r--r--storperf/storperf_master.py4
-rw-r--r--storperf/test_executor.py16
-rw-r--r--storperf/workloads/_base_workload.py4
6 files changed, 73 insertions, 8 deletions
diff --git a/cli.py b/cli.py
index 5595314..1f20e31 100644
--- a/cli.py
+++ b/cli.py
@@ -64,6 +64,7 @@ def main(argv=None):
debug = False
report = None
erase = False
+ terminate = False
options = {}
storperf = StorPerfMaster()
@@ -72,7 +73,7 @@ def main(argv=None):
argv = sys.argv
try:
try:
- opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdh",
+ opts, args = getopt.getopt(argv[1:], "t:w:r:f:escvdTh",
["target=",
"workload=",
"report=",
@@ -82,6 +83,7 @@ def main(argv=None):
"nowarm",
"verbose",
"debug",
+ "terminate",
"help",
])
except getopt.error, msg:
@@ -110,6 +112,8 @@ def main(argv=None):
report = a
elif o in ("-e", "--erase"):
erase = True
+ elif o in ("-T", "--terminate"):
+ terminate = True
elif o in ("-f", "--configure"):
configuration = dict(x.split('=') for x in a.split(','))
@@ -134,6 +138,14 @@ def main(argv=None):
raise Usage(content['message'])
return 0
+ if (terminate):
+ response = requests.delete(
+ 'http://127.0.0.1:5000/api/v1.0/job')
+ if (response.status_code == 400):
+ content = json.loads(response.content)
+ raise Usage(content['message'])
+ return 0
+
if (configuration is not None):
response = requests.post(
'http://127.0.0.1:5000/api/v1.0/configure', json=configuration)
@@ -146,7 +158,7 @@ def main(argv=None):
else:
print "Calling start..."
response = requests.post(
- 'http://127.0.0.1:5000/api/v1.0/start', json=options)
+ 'http://127.0.0.1:5000/api/v1.0/job', json=options)
if (response.status_code == 400):
content = json.loads(response.content)
raise Usage(content['message'])
diff --git a/rest_server.py b/rest_server.py
index ffb750e..1194ab5 100644
--- a/rest_server.py
+++ b/rest_server.py
@@ -64,7 +64,7 @@ class Configure(Resource):
abort(400, str(e))
-class StartJob(Resource):
+class Job(Resource):
def __init__(self):
self.logger = logging.getLogger(__name__)
@@ -97,6 +97,13 @@ class StartJob(Resource):
except Exception as e:
abort(400, str(e))
+ def delete(self):
+ try:
+ storperf.terminate_workloads()
+ return True
+ except Exception as e:
+ abort(400, str(e))
+
class Quota(Resource):
@@ -129,7 +136,7 @@ def setup_logging(default_path='storperf/logging.json',
api.add_resource(Configure, "/api/v1.0/configure")
api.add_resource(Quota, "/api/v1.0/quota")
-api.add_resource(StartJob, "/api/v1.0/start")
+api.add_resource(Job, "/api/v1.0/job")
if __name__ == "__main__":
setup_logging()
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index fad2546..4f39eb7 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -7,10 +7,10 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from threading import Thread
import json
import logging
import subprocess
+from threading import Thread
class FIOInvoker(object):
@@ -88,11 +88,35 @@ class FIOInvoker(object):
stderr=subprocess.PIPE)
t = Thread(target=self.stdout_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
t = Thread(target=self.stderr_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
+ self.logger.debug("Started fio on " + self.remote_host)
t.join()
+ self.logger.debug("Finished fio on " + self.remote_host)
+
+ def terminate(self):
+ self.logger.debug("Terminating fio on " + self.remote_host)
+ cmd = ['ssh', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ 'storperf@' + self.remote_host,
+ 'sudo', 'killall', '-9', 'fio']
+
+ kill_process = subprocess.Popen(cmd,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ for line in iter(kill_process.stdout.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stdout.close()
+
+ for line in iter(kill_process.stderr.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stderr.close()
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index b4fef7f..c684ce6 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -232,7 +232,6 @@ class StorPerfMaster(object):
self.stack_id = None
def execute_workloads(self):
-
if (self.stack_id is None):
raise ParameterError("ERROR: Stack does not exist")
@@ -255,6 +254,9 @@ class StorPerfMaster(object):
self._test_executor.slaves = slaves
return self._test_executor.execute()
+ def terminate_workloads(self):
+ return self._test_executor.terminate()
+
def _setup_slave(self, slave):
logger = logging.getLogger(__name__ + ":" + slave)
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 497d17c..aa8a415 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -38,6 +38,8 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._terminated = False
+ self._workload_executors = []
self._workload_thread = None
@property
@@ -118,7 +120,13 @@ class TestExecutor(object):
self._workload_thread.start()
return self.job_db.job_id
+ def terminate(self):
+ self._terminated = True
+ for workload in self._workload_executors:
+ workload.terminate()
+
def execute_workloads(self):
+ self._terminated = False
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
@@ -137,6 +145,9 @@ class TestExecutor(object):
for blocksize in blocksizes:
for iodepth in iodepths:
+ if self._terminated:
+ return
+
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
@@ -144,6 +155,9 @@ class TestExecutor(object):
for slave in self.slaves:
slave_workload = copy.copy(workload)
slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
t = Thread(target=self.execute_on_node,
args=(slave_workload,))
t.daemon = False
@@ -153,6 +167,8 @@ class TestExecutor(object):
for slave_thread in slave_threads:
slave_thread.join()
+ self._workload_executors = []
+
def execute_on_node(self, workload):
invoker = FIOInvoker()
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index 4eccc08..050a15c 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -57,6 +57,10 @@ class _base_workload(object):
self.invoker.execute(args)
+ def terminate(self):
+ if self.invoker is not None:
+ self.invoker.terminate()
+
def setup(self):
pass