summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py83
1 files changed, 54 insertions, 29 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 462f06b..a1a817e 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -7,23 +7,20 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from os import listdir
+from os.path import isfile, join
+from storperf.carbon.converter import JSONToCarbon
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db.job_db import JobDB
+from storperf.fio.fio_invoker import FIOInvoker
+from threading import Thread
import imp
import logging
-from os import listdir
import os
-from os.path import isfile, join
-import socket
-
-from carbon.converter import JSONToCarbon
-from carbon.emitter import CarbonMetricTransmitter
-from db.job_db import JobDB
-from fio.fio_invoker import FIOInvoker
class UnknownWorkload(Exception):
-
- def __init__(self, msg):
- self.msg = msg
+ pass
class TestExecutor(object):
@@ -31,7 +28,7 @@ class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
- self.filename = "storperf.dat"
+ self.filename = None
self.precondition = True
self.warm = True
self.event_listeners = set()
@@ -39,6 +36,16 @@ class TestExecutor(object):
self.metrics_emitter = CarbonMetricTransmitter()
self.prefix = None
self.job_db = JobDB()
+ self._slaves = []
+
+ @property
+ def slaves(self):
+ return self._slaves
+
+ @slaves.setter
+ def slaves(self, slaves):
+ self.logger.debug("Set slaves to: " + str(slaves))
+ self._slaves = slaves
def register(self, event_listener):
self.event_listeners.add(event_listener)
@@ -46,15 +53,15 @@ class TestExecutor(object):
def unregister(self, event_listener):
self.event_listeners.discard(event_listener)
- def event(self, metric):
+ def event(self, callback_id, metric):
carbon_metrics = self.metrics_converter.convert_to_dictionary(
metric,
- self.prefix)
+ callback_id)
- read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"]
+ read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
+ write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
+ read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
+ write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
message = "Average Latency us Read/Write: " + read_latency \
+ "/" + write_latency + " IOPS r/w: " + \
@@ -78,9 +85,11 @@ class TestExecutor(object):
workloads = []
for filename in workload_files:
- mname, ext = os.path.splitext(filename)
+ mname, _ = os.path.splitext(filename)
if (not mname.startswith('_')):
workloads.append(mname)
+ else:
+ workloads = workloads.split(',')
if (self.warm is True):
workloads.insert(0, "_warm_up")
@@ -94,15 +103,16 @@ class TestExecutor(object):
workload + ".py")
self.logger.debug("Found: " + str(workload_module))
if(workload_module is None):
- raise UnknownWorkload("Unknown workload: " + workload)
+ raise UnknownWorkload(
+ "ERROR: Unknown workload: " + workload)
self.workload_modules.append(workload_module)
except ImportError, err:
- raise UnknownWorkload(err)
+ raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri))
path, fname = os.path.split(uri)
- mname, ext = os.path.splitext(fname)
+ mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
@@ -115,21 +125,34 @@ class TestExecutor(object):
def execute(self):
- shortname = socket.getfqdn().split('.')[0]
+ self.job_db.create_job_id()
+ for slave in self.slaves:
+ t = Thread(target=self.execute_on_node, args=(slave,))
+ t.daemon = False
+ t.start()
+
+ return self.job_db.job_id
+
+ def execute_on_node(self, remote_host):
+
+ logger = logging.getLogger(__name__ + ":" + remote_host)
invoker = FIOInvoker()
+ invoker.remote_host = remote_host
invoker.register(self.event)
- self.job_db.create_job_id()
- self.logger.info("Starting job " + self.job_db.job_id)
+
+ logger.info(
+ "Starting job " + self.job_db.job_id + " on " + remote_host)
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- self.logger.debug(
+ logger.debug(
"Found workload: " + str(constructorMethod))
workload = constructorMethod()
- workload.filename = self.filename
+ if (self.filename is not None):
+ workload.filename = self.filename
workload.invoker = invoker
if (workload_name.startswith("_")):
@@ -143,6 +166,7 @@ class TestExecutor(object):
for iodepth in iodepths:
full_workload_name = workload_name + \
+ ".host." + remote_host + \
".queue-depth." + str(iodepth) + \
".block-size." + str(blocksize)
@@ -151,14 +175,15 @@ class TestExecutor(object):
self.logger.info(
"Executing workload: " + full_workload_name)
- self.prefix = shortname + "." + self.job_db.job_id + \
+ invoker.callback_id = self.job_db.job_id + \
"." + full_workload_name
self.job_db.start_workload(full_workload_name)
workload.execute()
self.job_db.end_workload(full_workload_name)
- self.logger.info("Finished job " + self.job_db.job_id)
+ logger.info(
+ "Finished job " + self.job_db.job_id + " on " + remote_host)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job