summaryrefslogtreecommitdiffstats
path: root/storperf
diff options
context:
space:
mode:
Diffstat (limited to 'storperf')
-rw-r--r--storperf/fio/fio_invoker.py30
-rw-r--r--storperf/storperf_master.py4
-rw-r--r--storperf/test_executor.py16
-rw-r--r--storperf/workloads/_base_workload.py4
4 files changed, 50 insertions, 4 deletions
diff --git a/storperf/fio/fio_invoker.py b/storperf/fio/fio_invoker.py
index fad2546..4f39eb7 100644
--- a/storperf/fio/fio_invoker.py
+++ b/storperf/fio/fio_invoker.py
@@ -7,10 +7,10 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from threading import Thread
import json
import logging
import subprocess
+from threading import Thread
class FIOInvoker(object):
@@ -88,11 +88,35 @@ class FIOInvoker(object):
stderr=subprocess.PIPE)
t = Thread(target=self.stdout_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
t = Thread(target=self.stderr_handler, args=())
- t.daemon = False
+ t.daemon = True
t.start()
+ self.logger.debug("Started fio on " + self.remote_host)
t.join()
+ self.logger.debug("Finished fio on " + self.remote_host)
+
+ def terminate(self):
+ self.logger.debug("Terminating fio on " + self.remote_host)
+ cmd = ['ssh', '-o', 'StrictHostKeyChecking=no',
+ '-i', 'storperf/resources/ssh/storperf_rsa',
+ 'storperf@' + self.remote_host,
+ 'sudo', 'killall', '-9', 'fio']
+
+ kill_process = subprocess.Popen(cmd,
+ universal_newlines=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ for line in iter(kill_process.stdout.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stdout.close()
+
+ for line in iter(kill_process.stderr.readline, b''):
+ self.logger.debug("FIO Termination: " + line)
+
+ kill_process.stderr.close()
diff --git a/storperf/storperf_master.py b/storperf/storperf_master.py
index b4fef7f..c684ce6 100644
--- a/storperf/storperf_master.py
+++ b/storperf/storperf_master.py
@@ -232,7 +232,6 @@ class StorPerfMaster(object):
self.stack_id = None
def execute_workloads(self):
-
if (self.stack_id is None):
raise ParameterError("ERROR: Stack does not exist")
@@ -255,6 +254,9 @@ class StorPerfMaster(object):
self._test_executor.slaves = slaves
return self._test_executor.execute()
+ def terminate_workloads(self):
+ return self._test_executor.terminate()
+
def _setup_slave(self, slave):
logger = logging.getLogger(__name__ + ":" + slave)
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 497d17c..aa8a415 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -38,6 +38,8 @@ class TestExecutor(object):
self.prefix = None
self.job_db = JobDB()
self._slaves = []
+ self._terminated = False
+ self._workload_executors = []
self._workload_thread = None
@property
@@ -118,7 +120,13 @@ class TestExecutor(object):
self._workload_thread.start()
return self.job_db.job_id
+ def terminate(self):
+ self._terminated = True
+ for workload in self._workload_executors:
+ workload.terminate()
+
def execute_workloads(self):
+ self._terminated = False
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
@@ -137,6 +145,9 @@ class TestExecutor(object):
for blocksize in blocksizes:
for iodepth in iodepths:
+ if self._terminated:
+ return
+
workload.options['iodepth'] = str(iodepth)
workload.options['bs'] = str(blocksize)
@@ -144,6 +155,9 @@ class TestExecutor(object):
for slave in self.slaves:
slave_workload = copy.copy(workload)
slave_workload.remote_host = slave
+
+ self._workload_executors.append(slave_workload)
+
t = Thread(target=self.execute_on_node,
args=(slave_workload,))
t.daemon = False
@@ -153,6 +167,8 @@ class TestExecutor(object):
for slave_thread in slave_threads:
slave_thread.join()
+ self._workload_executors = []
+
def execute_on_node(self, workload):
invoker = FIOInvoker()
diff --git a/storperf/workloads/_base_workload.py b/storperf/workloads/_base_workload.py
index 4eccc08..050a15c 100644
--- a/storperf/workloads/_base_workload.py
+++ b/storperf/workloads/_base_workload.py
@@ -57,6 +57,10 @@ class _base_workload(object):
self.invoker.execute(args)
+ def terminate(self):
+ if self.invoker is not None:
+ self.invoker.terminate()
+
def setup(self):
pass