summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--storperf/test_executor.py81
-rw-r--r--storperf/utilities/data_handler.py79
-rw-r--r--tests/db_tests/job_db_test.py5
-rw-r--r--tests/utilities_tests/data_handler_test.py76
4 files changed, 185 insertions, 56 deletions
diff --git a/storperf/test_executor.py b/storperf/test_executor.py
index 3c456a6..530ce80 100644
--- a/storperf/test_executor.py
+++ b/storperf/test_executor.py
@@ -19,11 +19,10 @@ import time
from storperf.carbon.converter import Converter
from storperf.carbon.emitter import CarbonMetricTransmitter
-from storperf.db import test_results_db
-from storperf.db.graphite_db import GraphiteDB
from storperf.db.job_db import JobDB
from storperf.fio.fio_invoker import FIOInvoker
-from storperf.utilities import dictionary
+from storperf.utilities.thread_gate import ThreadGate
+from utilities.data_handler import DataHandler
class UnknownWorkload(Exception):
@@ -39,7 +38,9 @@ class TestExecutor(object):
self.precondition = True
self.deadline = None
self.warm = True
- self.metadata = None
+ self.metadata = {}
+ self.start_time = None
+ self.end_time = None
self._queue_depths = [1, 4, 8]
self._block_sizes = [512, 4096, 16384]
self.event_listeners = set()
@@ -51,6 +52,7 @@ class TestExecutor(object):
self._terminated = False
self._workload_executors = []
self._workload_thread = None
+ self._thread_gate = None
@property
def slaves(self):
@@ -74,6 +76,10 @@ class TestExecutor(object):
def block_sizes(self):
return ','.join(self._block_sizes)
+ @property
+ def terminated(self):
+ return self._terminated
+
@block_sizes.setter
def block_sizes(self, block_sizes):
self.logger.debug("Set block_sizes to: " + str(block_sizes))
@@ -92,6 +98,16 @@ class TestExecutor(object):
self.metrics_emitter.transmit_metrics(carbon_metrics)
+ if self._thread_gate.report(callback_id):
+ self.broadcast_event()
+
+ def broadcast_event(self):
+ for event_listener in self.event_listeners:
+ try:
+ event_listener(self)
+ except Exception, e:
+ self.logger.error("Notifying listener: %s", e)
+
def register_workloads(self, workloads):
self.workload_modules = []
@@ -178,8 +194,10 @@ class TestExecutor(object):
def execute_workloads(self):
self._terminated = False
self.logger.info("Starting job %s" % (self.job_db.job_id))
+ data_handler = DataHandler()
+ self.register(data_handler.data_event)
- start_time = time.time()
+ self.start_time = time.time()
for workload_module in self.workload_modules:
workload_name = getattr(workload_module, "__name__")
@@ -198,6 +216,8 @@ class TestExecutor(object):
blocksizes = self._block_sizes
workload.id = self.job_db.job_id
+ self._thread_gate = ThreadGate(len(self.slaves),
+ workload.options['status-interval'])
for blocksize in blocksizes:
for iodepth in iodepths:
@@ -243,56 +263,9 @@ class TestExecutor(object):
self.logger.info("Completed workload %s" % (workload_name))
self.logger.info("Completed job %s" % (self.job_db.job_id))
- end_time = time.time()
- pod_name = dictionary.get_key_from_dict(self.metadata,
- 'pod_name',
- 'Unknown')
- version = dictionary.get_key_from_dict(self.metadata,
- 'version',
- 'Unknown')
- scenario = dictionary.get_key_from_dict(self.metadata,
- 'scenario_name',
- 'Unknown')
- build_tag = dictionary.get_key_from_dict(self.metadata,
- 'build_tag',
- 'Unknown')
- duration = end_time - start_time
- test_db = os.environ.get('TEST_DB_URL')
-
- if test_db is not None:
- # I really do not like doing this. As our threads just
- # terminated, their final results are still being spooled
- # off to Carbon. Need to give that a little time to finish
- time.sleep(5)
- self.logger.info("Pushing results to %s" % (test_db))
-
- payload = self.metadata
- payload['timestart'] = start_time
- payload['duration'] = duration
- payload['status'] = 'OK'
- graphite_db = GraphiteDB()
- payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id)
- criteria = {}
- criteria['block_sizes'] = self.block_sizes
- criteria['queue_depths'] = self.queue_depths
-
- try:
- test_results_db.push_results_to_db(test_db,
- "storperf",
- "Latency Test",
- start_time,
- end_time,
- self.logger,
- pod_name,
- version,
- scenario,
- criteria,
- build_tag,
- payload)
- except:
- self.logger.exception("Error pushing results into Database")
-
+ self.end_time = time.time()
self._terminated = True
+ self.broadcast_event()
def execute_on_node(self, workload):
diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py
new file mode 100644
index 0000000..03c764c
--- /dev/null
+++ b/storperf/utilities/data_handler.py
@@ -0,0 +1,79 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import logging
+import os
+
+from storperf.db import test_results_db
+from storperf.db.graphite_db import GraphiteDB
+from storperf.utilities import dictionary
+
+
+class DataHandler(object):
+
+ def __init__(self):
+ self.logger = logging.getLogger(__name__)
+
+ """
+ """
+
+ def data_event(self, executor):
+ self.logger.info("Event received")
+
+ # Data lookup
+
+ if executor.terminated:
+ self._push_to_db(executor)
+
+ def _push_to_db(self, executor):
+ test_db = os.environ.get('TEST_DB_URL')
+
+ if test_db is not None:
+ pod_name = dictionary.get_key_from_dict(executor.metadata,
+ 'pod_name',
+ 'Unknown')
+ version = dictionary.get_key_from_dict(executor.metadata,
+ 'version',
+ 'Unknown')
+ scenario = dictionary.get_key_from_dict(executor.metadata,
+ 'scenario_name',
+ 'Unknown')
+ build_tag = dictionary.get_key_from_dict(executor.metadata,
+ 'build_tag',
+ 'Unknown')
+ duration = executor.end_time - executor.start_time
+
+ self.logger.info("Pushing results to %s" % (test_db))
+
+ payload = executor.metadata
+ payload['timestart'] = executor.start_time
+ payload['duration'] = duration
+ payload['status'] = 'OK'
+ graphite_db = GraphiteDB()
+ payload['metrics'] = graphite_db.fetch_averages(
+ executor.job_db.job_id)
+ criteria = {}
+ criteria['block_sizes'] = executor.block_sizes
+ criteria['queue_depths'] = executor.queue_depths
+
+ try:
+ test_results_db.push_results_to_db(test_db,
+ "storperf",
+ "Latency Test",
+ executor.start_time,
+ executor.end_time,
+ self.logger,
+ pod_name,
+ version,
+ scenario,
+ criteria,
+ build_tag,
+ payload)
+ except:
+ self.logger.exception("Error pushing results into Database")
diff --git a/tests/db_tests/job_db_test.py b/tests/db_tests/job_db_test.py
index fe3d9f1..ccfb9cc 100644
--- a/tests/db_tests/job_db_test.py
+++ b/tests/db_tests/job_db_test.py
@@ -7,14 +7,15 @@
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from storperf.db.job_db import JobDB
-from storperf.workloads.rr import rr
import os
import sqlite3
import unittest
import mock
+from storperf.db.job_db import JobDB
+from storperf.workloads.rr import rr
+
class JobDBTest(unittest.TestCase):
diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py
new file mode 100644
index 0000000..482b98e
--- /dev/null
+++ b/tests/utilities_tests/data_handler_test.py
@@ -0,0 +1,76 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import os
+import unittest
+
+import mock
+
+from storperf.utilities.data_handler import DataHandler
+
+
+class MockGraphiteDB(object):
+
+ def __init__(self):
+ self.called = False
+
+ def fetch_averages(self, job_id):
+ self.called = True
+ return None
+
+
+class DataHandlerTest(unittest.TestCase):
+
+ def setUp(self):
+ self.event_listeners = set()
+ self.data_handler = DataHandler()
+ self._terminated = False
+ self.args = None
+ self.start_time = 0
+ self.end_time = 1
+ self.metadata = {}
+ self.block_sizes = "1"
+ self.queue_depths = "1"
+ mock.job_id = "1"
+ self.job_db = mock
+ self.pushed = False
+ pass
+
+ @property
+ def terminated(self):
+ return self._terminated
+
+ def push_results_to_db(self, *args):
+ self.pushed = True
+ pass
+
+ def test_not_terminated_report(self):
+ self.data_handler.data_event(self)
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+ def test_terminated_report(self, mock_graphite_db, mock_results_db):
+ self._terminated = True
+ mock_results_db.side_effect = self.push_results_to_db
+ mock_graphite_db.side_effect = MockGraphiteDB
+
+ self.data_handler.data_event(self)
+ self.assertEqual(True, self.pushed)
+
+ @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'})
+ @mock.patch("storperf.db.test_results_db.push_results_to_db")
+ @mock.patch("storperf.utilities.data_handler.GraphiteDB")
+ def test_non_terminated_report(self, mock_graphite_db, mock_results_db):
+ self._terminated = False
+ mock_results_db.side_effect = self.push_results_to_db
+ mock_graphite_db.side_effect = MockGraphiteDB
+
+ self.data_handler.data_event(self)
+ self.assertEqual(False, self.pushed)