summaryrefslogtreecommitdiffstats
path: root/storperf
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@dell.com>2016-11-28 16:17:05 -0500
committerMark Beierl <mark.beierl@dell.com>2016-11-30 21:45:08 +0000
commit30a2f3f91a04d69368a3591f28874a8da6948e36 (patch)
tree4035307d6956b2441ddd36a5a74896025abcaf81 /storperf
parentb20e463faa9990a2bdfe13683f7fb76d32e2fd65 (diff)
Data Handling Refactoring
Break out test db interaction into new module and make the push event driven instead of the sleep that was there before. Change-Id: I9485aba1405f6c3b4ee5000168fbc037efa87c81 JIRA: STORPERF-90 Signed-off-by: Mark Beierl <mark.beierl@dell.com>
Diffstat (limited to 'storperf')
-rw-r--r--storperf/test_executor.py81
-rw-r--r--storperf/utilities/data_handler.py79
2 files changed, 106 insertions, 54 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 3c456a6..530ce80 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -19,11 +19,10 @@ import time
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
-from storperf.db import test_results_db
-from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
-from storperf.utilities import dictionary
+from storperf.utilities.thread_gate import ThreadGate
+from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -39,7 +38,9 @@ class TestExecutor(object):
self.precondition = True
self.deadline = None
self.warm = True
- self.metadata = None
+ self.metadata = {}
+ self.start_time = None
+ self.end_time = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -51,6 +52,7 @@ class TestExecutor(object):
self._terminated = False
self._workload_executors = []
self._workload_thread = None
+ self._thread_gate = None
@property
def slaves(self):
@@ -74,6 +76,10 @@ class TestExecutor(object):
def block_sizes(self):
return ','.join(self._block_sizes)
+ @property
+ def terminated(self):
+ return self._terminated
+
@block_sizes.setter
def block_sizes(self, block_sizes):
self.logger.debug("Set block_sizes to: " + str(block_sizes))
@@ -92,6 +98,16 @@ class TestExecutor(object):
self.metrics_emitter.transmit_metrics(carbon_metrics)
+ if self._thread_gate.report(callback_id):
+ self.broadcast_event()
+
+ def broadcast_event(self):
+ for event_listener in self.event_listeners:
+ try:
+ event_listener(self)
+ except Exception, e:
+ self.logger.error("Notifying listener: %s", e)
+
def register_workloads(self, workloads):
self.workload_modules = []
@@ -178,8 +194,10 @@ class TestExecutor(object):
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
+ data_handler = DataHandler()
+ self.register(data_handler.data_event)
- start_time = time.time()
+ self.start_time = time.time()
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
@@ -198,6 +216,8 @@ class TestExecutor(object):
blocksizes = self._block_sizes
workload.id = self.job_db.job_id
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
for blocksize in blocksizes:
for iodepth in iodepths:
@@ -243,56 +263,9 @@ class TestExecutor(object):
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
- end_time = time.time()
- pod_name = dictionary.get_key_from_dict(self.metadata,
- 'pod_name',
- 'Unknown')
- version = dictionary.get_key_from_dict(self.metadata,
- 'version',
- 'Unknown')
- scenario = dictionary.get_key_from_dict(self.metadata,
- 'scenario_name',
- 'Unknown')
- build_tag = dictionary.get_key_from_dict(self.metadata,
- 'build_tag',
- 'Unknown')
- duration = end_time - start_time
- test_db = os.environ.get('TEST_DB_URL')
-
- if test_db is not None:
- # I really do not like doing this. As our threads just
- # terminated, their final results are still being spooled
- # off to Carbon. Need to give that a little time to finish
- time.sleep(5)
- self.logger.info("Pushing results to %s" % (test_db))
-
- payload = self.metadata
- payload['timestart'] = start_time
- payload['duration'] = duration
- payload['status'] = 'OK'
- graphite_db = GraphiteDB()
- payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
- criteria = {}
- criteria['block_sizes'] = self.block_sizes
- criteria['queue_depths'] = self.queue_depths
-
- try:
- test_results_db.push_results_to_db(test_db,
- "storperf",
- "Latency Test",
- start_time,
- end_time,
- self.logger,
- pod_name,
- version,
- scenario,
- criteria,
- build_tag,
- payload)
- except:
- self.logger.exception("Error pushing results into Database")
-
+ self.end_time = time.time()
self._terminated = True
+ self.broadcast_event()
def execute_on_node(self, workload):
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..03c764c
--- /dev/null
+++ b/storperf/utilities/data_handler.py
@@ -0,0 +1,79 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import dictionary
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.info("Event received")
+
+ # Data lookup
+
+ if executor.terminated:
+ self._push_to_db(executor)
+
+ def _push_to_db(self, executor):
+ test_db = os.environ.get('TEST_DB_URL')
+
+ if test_db is not None:
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ self.logger.info("Pushing results to %s" % (test_db))
+
+ payload = executor.metadata
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+ payload['status'] = 'OK'
+ graphite_db = GraphiteDB()
+ payload['metrics'] = graphite_db.fetch_averages(
+ executor.job_db.job_id)
+ criteria = {}
+ criteria['block_sizes'] = executor.block_sizes
+ criteria['queue_depths'] = executor.queue_depths
+
+ try:
+ test_results_db.push_results_to_db(test_db,
+ "storperf",
+ "Latency Test",
+ executor.start_time,
+ executor.end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ except:
+ self.logger.exception("Error pushing results into Database")