summaryrefslogtreecommitdiffstats
path: root/storperf/test_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'storperf/test_executor.py')
-rw-r--r--storperf/test_executor.py81
1 files changed, 27 insertions, 54 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 3c456a6..530ce80 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -19,11 +19,10 @@ import time
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
-from storperf.db import test_results_db
-from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
-from storperf.utilities import dictionary
+from storperf.utilities.thread_gate import ThreadGate
+from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -39,7 +38,9 @@ class TestExecutor(object):
self.precondition = True
self.deadline = None
self.warm = True
- self.metadata = None
+ self.metadata = {}
+ self.start_time = None
+ self.end_time = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -51,6 +52,7 @@ class TestExecutor(object):
self._terminated = False
self._workload_executors = []
self._workload_thread = None
+ self._thread_gate = None
@property
def slaves(self):
@@ -74,6 +76,10 @@ class TestExecutor(object):
def block_sizes(self):
return ','.join(self._block_sizes)
+ @property
+ def terminated(self):
+ return self._terminated
+
@block_sizes.setter
def block_sizes(self, block_sizes):
self.logger.debug("Set block_sizes to: " + str(block_sizes))
@@ -92,6 +98,16 @@ class TestExecutor(object):
self.metrics_emitter.transmit_metrics(carbon_metrics)
+ if self._thread_gate.report(callback_id):
+ self.broadcast_event()
+
+ def broadcast_event(self):
+ for event_listener in self.event_listeners:
+ try:
+ event_listener(self)
+ except Exception, e:
+ self.logger.error("Notifying listener: %s", e)
+
def register_workloads(self, workloads):
self.workload_modules = []
@@ -178,8 +194,10 @@ class TestExecutor(object):
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
+ data_handler = DataHandler()
+ self.register(data_handler.data_event)
- start_time = time.time()
+ self.start_time = time.time()
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
@@ -198,6 +216,8 @@ class TestExecutor(object):
blocksizes = self._block_sizes
workload.id = self.job_db.job_id
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
for blocksize in blocksizes:
for iodepth in iodepths:
@@ -243,56 +263,9 @@ class TestExecutor(object):
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
- end_time = time.time()
- pod_name = dictionary.get_key_from_dict(self.metadata,
- 'pod_name',
- 'Unknown')
- version = dictionary.get_key_from_dict(self.metadata,
- 'version',
- 'Unknown')
- scenario = dictionary.get_key_from_dict(self.metadata,
- 'scenario_name',
- 'Unknown')
- build_tag = dictionary.get_key_from_dict(self.metadata,
- 'build_tag',
- 'Unknown')
- duration = end_time - start_time
- test_db = os.environ.get('TEST_DB_URL')
-
- if test_db is not None:
- # I really do not like doing this. As our threads just
- # terminated, their final results are still being spooled
- # off to Carbon. Need to give that a little time to finish
- time.sleep(5)
- self.logger.info("Pushing results to %s" % (test_db))
-
- payload = self.metadata
- payload['timestart'] = start_time
- payload['duration'] = duration
- payload['status'] = 'OK'
- graphite_db = GraphiteDB()
- payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
- criteria = {}
- criteria['block_sizes'] = self.block_sizes
- criteria['queue_depths'] = self.queue_depths
-
- try:
- test_results_db.push_results_to_db(test_db,
- "storperf",
- "Latency Test",
- start_time,
- end_time,
- self.logger,
- pod_name,
- version,
- scenario,
- criteria,
- build_tag,
- payload)
- except:
- self.logger.exception("Error pushing results into Database")
-
+ self.end_time = time.time()
self._terminated = True
+ self.broadcast_event()
def execute_on_node(self, workload):