summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@dell.com>2016-11-28 16:17:05 -0500
committerMark Beierl <mark.beierl@dell.com>2016-11-30 21:45:08 +0000
commit30a2f3f91a04d69368a3591f28874a8da6948e36 (patch)
tree4035307d6956b2441ddd36a5a74896025abcaf81
parentb20e463faa9990a2bdfe13683f7fb76d32e2fd65 (diff)
Data Handling Refactoring
Break out test db interaction into new module and make the push event driven instead of the sleep that was there before. Change-Id: I9485aba1405f6c3b4ee5000168fbc037efa87c81 JIRA: STORPERF-90 Signed-off-by: Mark Beierl <mark.beierl@dell.com>
-rw-r--r--storperf/test_executor.py81
-rw-r--r--storperf/utilities/data_handler.py79
-rw-r--r--tests/db_tests/job_db_test.py5
-rw-r--r--tests/utilities_tests/data_handler_test.py76
4 files changed, 185 insertions, 56 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 3c456a6..530ce80 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -19,11 +19,10 @@ import time
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
-from storperf.db import test_results_db
-from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
-from storperf.utilities import dictionary
+from storperf.utilities.thread_gate import ThreadGate
+from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -39,7 +38,9 @@ class TestExecutor(object):
self.precondition = True
self.deadline = None
self.warm = True
- self.metadata = None
+ self.metadata = {}
+ self.start_time = None
+ self.end_time = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -51,6 +52,7 @@ class TestExecutor(object):
self._terminated = False
self._workload_executors = []
self._workload_thread = None
+ self._thread_gate = None
@property
def slaves(self):
@@ -74,6 +76,10 @@ class TestExecutor(object):
def block_sizes(self):
return ','.join(self._block_sizes)
+ @property
+ def terminated(self):
+ return self._terminated
+
@block_sizes.setter
def block_sizes(self, block_sizes):
self.logger.debug("Set block_sizes to: " + str(block_sizes))
@@ -92,6 +98,16 @@ class TestExecutor(object):
self.metrics_emitter.transmit_metrics(carbon_metrics)
+ if self._thread_gate.report(callback_id):
+ self.broadcast_event()
+
+ def broadcast_event(self):
+ for event_listener in self.event_listeners:
+ try:
+ event_listener(self)
+ except Exception, e:
+ self.logger.error("Notifying listener: %s", e)
+
def register_workloads(self, workloads):
self.workload_modules = []
@@ -178,8 +194,10 @@ class TestExecutor(object):
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
+ data_handler = DataHandler()
+ self.register(data_handler.data_event)
- start_time = time.time()
+ self.start_time = time.time()
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
@@ -198,6 +216,8 @@ class TestExecutor(object):
blocksizes = self._block_sizes
workload.id = self.job_db.job_id
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
for blocksize in blocksizes:
for iodepth in iodepths:
@@ -243,56 +263,9 @@ class TestExecutor(object):
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
- end_time = time.time()
- pod_name = dictionary.get_key_from_dict(self.metadata,
- 'pod_name',
- 'Unknown')
- version = dictionary.get_key_from_dict(self.metadata,
- 'version',
- 'Unknown')
- scenario = dictionary.get_key_from_dict(self.metadata,
- 'scenario_name',
- 'Unknown')
- build_tag = dictionary.get_key_from_dict(self.metadata,
- 'build_tag',
- 'Unknown')
- duration = end_time - start_time
- test_db = os.environ.get('TEST_DB_URL')
-
- if test_db is not None:
- # I really do not like doing this. As our threads just
- # terminated, their final results are still being spooled
- # off to Carbon. Need to give that a little time to finish
- time.sleep(5)
- self.logger.info("Pushing results to %s" % (test_db))
-
- payload = self.metadata
- payload['timestart'] = start_time
- payload['duration'] = duration
- payload['status'] = 'OK'
- graphite_db = GraphiteDB()
- payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
- criteria = {}
- criteria['block_sizes'] = self.block_sizes
- criteria['queue_depths'] = self.queue_depths
-
- try:
- test_results_db.push_results_to_db(test_db,
- "storperf",
- "Latency Test",
- start_time,
- end_time,
- self.logger,
- pod_name,
- version,
- scenario,
- criteria,
- build_tag,
- payload)
- except:
- self.logger.exception("Error pushing results into Database")
-
+ self.end_time = time.time()
self._terminated = True
+ self.broadcast_event()
def execute_on_node(self, workload):
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..03c764c
--- /dev/null
+++ b/storperf/utilities/data_handler.py
@@ -0,0 +1,79 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import dictionary
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.info("Event received")
+
+ # Data lookup
+
+ if executor.terminated:
+ self._push_to_db(executor)
+
+ def _push_to_db(self, executor):
+ test_db = os.environ.get('TEST_DB_URL')
+
+ if test_db is not None:
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ self.logger.info("Pushing results to %s" % (test_db))
+
+ payload = executor.metadata
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+ payload['status'] = 'OK'
+ graphite_db = GraphiteDB()
+ payload['metrics'] = graphite_db.fetch_averages(
+ executor.job_db.job_id)
+ criteria = {}
+ criteria['block_sizes'] = executor.block_sizes
+ criteria['queue_depths'] = executor.queue_depths
+
+ try:
+ test_results_db.push_results_to_db(test_db,
+ "storperf",
+ "Latency Test",
+ executor.start_time,
+ executor.end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ except:
+ self.logger.exception("Error pushing results into Database")
diff --git a/tests/db_tests/job_db_test.py b/tests/db_tests/job_db_test.py
index fe3d9f1..ccfb9cc 100644
--- a/tests/db_tests/job_db_test.py
+++ b/tests/db_tests/job_db_test.py
@@ -7,14 +7,15 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.db.job_db import JobDB
-from storperf.workloads.rr import rr
import os
import sqlite3
import unittest
import mock
+from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
+
class JobDBTest(unittest.TestCase):
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
new file mode 100644
index 0000000..482b98e
--- /dev/null
+++ b/tests/utilities_tests/data_handler_test.py
@@ -0,0 +1,76 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import unittest
+
+import mock
+
+from storperf.utilities.data_handler import DataHandler
+
+
+class MockGraphiteDB(object):
+
+ def __init__(self):
+ self.called = False
+
+ def fetch_averages(self, job_id):
+ self.called = True
+ return None
+
+
+class DataHandlerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.event_listeners = set()
+ self.data_handler = DataHandler()
+ self._terminated = False
+ self.args = None
+ self.start_time = 0
+ self.end_time = 1
+ self.metadata = {}
+ self.block_sizes = "1"
+ self.queue_depths = "1"
+ mock.job_id = "1"
+ self.job_db = mock
+ self.pushed = False
+ pass
+
+ @property
+ def terminated(self):
+ return self._terminated
+
+ def push_results_to_db(self, *args):
+ self.pushed = True
+ pass
+
+ def test_not_terminated_report(self):
+ self.data_handler.data_event(self)
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+ def test_terminated_report(self, mock_graphite_db, mock_results_db):
+ self._terminated = True
+ mock_results_db.side_effect = self.push_results_to_db
+ mock_graphite_db.side_effect = MockGraphiteDB
+
+ self.data_handler.data_event(self)
+ self.assertEqual(True, self.pushed)
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+ def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+ self._terminated = False
+ mock_results_db.side_effect = self.push_results_to_db
+ mock_graphite_db.side_effect = MockGraphiteDB
+
+ self.data_handler.data_event(self)
+ self.assertEqual(False, self.pushed)