summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@emc.com>2016-01-19 20:58:35 -0500
committerMark Beierl <mark.beierl@emc.com>2016-01-29 18:54:44 +0000
commitc5f233d3ab0818d8dd598d835742db2ec4c3a890 (patch)
tree295ea3f6df99884675ba8f21c207bf892f0170bd /storperf/test_executor.py
parenta8e5c72b09f829b729515d24ec2a553fa330155a (diff)
Remote slave agent workload
Add storperf master object to manage stack lifecycle. Add configuration db. Creation of CLI vs. main so that ReST API and CLI API can be kept clear. Fixed License in files. Changed DB objects to be thread safe. Added ssh server to container if desired for CLI. Change-Id: Idfe84bfb7920e6ce19d27462593c21ea86e7b406 JIRA: STORPERF-29 Signed-off-by: Mark Beierl <mark.beierl@emc.com> (cherry picked from commit 488a47d945d3ef3dfa9ee37ca0aac3b480ffc800)
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py83
1 files changed, 54 insertions, 29 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 462f06b..a1a817e 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -7,23 +7,20 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+from os import listdir
+from os.path import isfile, join
+from storperf.carbon.converter import JSONToCarbon
+from storperf.carbon.emitter import CarbonMetricTransmitter
+from storperf.db.job_db import JobDB
+from storperf.fio.fio_invoker import FIOInvoker
+from threading import Thread
import imp
import logging
-from os import listdir
import os
-from os.path import isfile, join
-import socket
-
-from carbon.converter import JSONToCarbon
-from carbon.emitter import CarbonMetricTransmitter
-from db.job_db import JobDB
-from fio.fio_invoker import FIOInvoker
class UnknownWorkload(Exception):
-
- def __init__(self, msg):
- self.msg = msg
+ pass
class TestExecutor(object):
@@ -31,7 +28,7 @@ class TestExecutor(object):
def __init__(self):
self.logger = logging.getLogger(__name__)
self.workload_modules = []
- self.filename = "storperf.dat"
+ self.filename = None
self.precondition = True
self.warm = True
self.event_listeners = set()
@@ -39,6 +36,16 @@ class TestExecutor(object):
self.metrics_emitter = CarbonMetricTransmitter()
self.prefix = None
self.job_db = JobDB()
+ self._slaves = []
+
+ @property
+ def slaves(self):
+ return self._slaves
+
+ @slaves.setter
+ def slaves(self, slaves):
+ self.logger.debug("Set slaves to: " + str(slaves))
+ self._slaves = slaves
def register(self, event_listener):
self.event_listeners.add(event_listener)
@@ -46,15 +53,15 @@ class TestExecutor(object):
def unregister(self, event_listener):
self.event_listeners.discard(event_listener)
- def event(self, metric):
+ def event(self, callback_id, metric):
carbon_metrics = self.metrics_converter.convert_to_dictionary(
metric,
- self.prefix)
+ callback_id)
- read_latency = carbon_metrics[self.prefix + ".jobs.1.read.lat.mean"]
- write_latency = carbon_metrics[self.prefix + ".jobs.1.write.lat.mean"]
- read_iops = carbon_metrics[self.prefix + ".jobs.1.read.iops"]
- write_iops = carbon_metrics[self.prefix + ".jobs.1.write.iops"]
+ read_latency = carbon_metrics[callback_id + ".jobs.1.read.lat.mean"]
+ write_latency = carbon_metrics[callback_id + ".jobs.1.write.lat.mean"]
+ read_iops = carbon_metrics[callback_id + ".jobs.1.read.iops"]
+ write_iops = carbon_metrics[callback_id + ".jobs.1.write.iops"]
message = "Average Latency us Read/Write: " + read_latency \
+ "/" + write_latency + " IOPS r/w: " + \
@@ -78,9 +85,11 @@ class TestExecutor(object):
workloads = []
for filename in workload_files:
- mname, ext = os.path.splitext(filename)
+ mname, _ = os.path.splitext(filename)
if (not mname.startswith('_')):
workloads.append(mname)
+ else:
+ workloads = workloads.split(',')
if (self.warm is True):
workloads.insert(0, "_warm_up")
@@ -94,15 +103,16 @@ class TestExecutor(object):
workload + ".py")
self.logger.debug("Found: " + str(workload_module))
if(workload_module is None):
- raise UnknownWorkload("Unknown workload: " + workload)
+ raise UnknownWorkload(
+ "ERROR: Unknown workload: " + workload)
self.workload_modules.append(workload_module)
except ImportError, err:
- raise UnknownWorkload(err)
+ raise UnknownWorkload("ERROR: " + str(err))
def load_from_file(self, uri):
uri = os.path.normpath(os.path.join(os.path.dirname(__file__), uri))
path, fname = os.path.split(uri)
- mname, ext = os.path.splitext(fname)
+ mname, _ = os.path.splitext(fname)
no_ext = os.path.join(path, mname)
self.logger.debug("Looking for: " + no_ext)
if os.path.exists(no_ext + '.pyc'):
@@ -115,21 +125,34 @@ class TestExecutor(object):
def execute(self):
- shortname = socket.getfqdn().split('.')[0]
+ self.job_db.create_job_id()
+ for slave in self.slaves:
+ t = Thread(target=self.execute_on_node, args=(slave,))
+ t.daemon = False
+ t.start()
+
+ return self.job_db.job_id
+
+ def execute_on_node(self, remote_host):
+
+ logger = logging.getLogger(__name__ + ":" + remote_host)
invoker = FIOInvoker()
+ invoker.remote_host = remote_host
invoker.register(self.event)
- self.job_db.create_job_id()
- self.logger.info("Starting job " + self.job_db.job_id)
+
+ logger.info(
+ "Starting job " + self.job_db.job_id + " on " + remote_host)
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
constructorMethod = getattr(workload_module, workload_name)
- self.logger.debug(
+ logger.debug(
"Found workload: " + str(constructorMethod))
workload = constructorMethod()
- workload.filename = self.filename
+ if (self.filename is not None):
+ workload.filename = self.filename
workload.invoker = invoker
if (workload_name.startswith("_")):
@@ -143,6 +166,7 @@ class TestExecutor(object):
for iodepth in iodepths:
full_workload_name = workload_name + \
+ ".host." + remote_host + \
".queue-depth." + str(iodepth) + \
".block-size." + str(blocksize)
@@ -151,14 +175,15 @@ class TestExecutor(object):
self.logger.info(
"Executing workload: " + full_workload_name)
- self.prefix = shortname + "." + self.job_db.job_id + \
+ invoker.callback_id = self.job_db.job_id + \
"." + full_workload_name
self.job_db.start_workload(full_workload_name)
workload.execute()
self.job_db.end_workload(full_workload_name)
- self.logger.info("Finished job " + self.job_db.job_id)
+ logger.info(
+ "Finished job " + self.job_db.job_id + " on " + remote_host)
def fetch_results(self, job, workload_name=""):
self.job_db.job_id = job