From 30a2f3f91a04d69368a3591f28874a8da6948e36 Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Mon, 28 Nov 2016 16:17:05 -0500 Subject: Data Handling Refactoring Break out test db interaction into new module and make the push event driven instead of the sleep that was there before. Change-Id: I9485aba1405f6c3b4ee5000168fbc037efa87c81 JIRA: STORPERF-90 Signed-off-by: Mark Beierl --- storperf/test_executor.py | 81 ++++++++++-------------------- storperf/utilities/data_handler.py | 79 +++++++++++++++++++++++++++++ tests/db_tests/job_db_test.py | 5 +- tests/utilities_tests/data_handler_test.py | 76 ++++++++++++++++++++++++++++ 4 files changed, 185 insertions(+), 56 deletions(-) create mode 100644 storperf/utilities/data_handler.py create mode 100644 tests/utilities_tests/data_handler_test.py diff --git a/storperf/test_executor.py b/storperf/test_executor.py index 3c456a6..530ce80 100644 --- a/storperf/test_executor.py +++ b/storperf/test_executor.py @@ -19,11 +19,10 @@ import time from storperf.carbon.converter import Converter from storperf.carbon.emitter import CarbonMetricTransmitter -from storperf.db import test_results_db -from storperf.db.graphite_db import GraphiteDB from storperf.db.job_db import JobDB from storperf.fio.fio_invoker import FIOInvoker -from storperf.utilities import dictionary +from storperf.utilities.thread_gate import ThreadGate +from utilities.data_handler import DataHandler class UnknownWorkload(Exception): @@ -39,7 +38,9 @@ class TestExecutor(object): self.precondition = True self.deadline = None self.warm = True - self.metadata = None + self.metadata = {} + self.start_time = None + self.end_time = None self._queue_depths = [1, 4, 8] self._block_sizes = [512, 4096, 16384] self.event_listeners = set() @@ -51,6 +52,7 @@ class TestExecutor(object): self._terminated = False self._workload_executors = [] self._workload_thread = None + self._thread_gate = None @property def slaves(self): @@ -74,6 +76,10 @@ class TestExecutor(object): def block_sizes(self): return ','.join(self._block_sizes) + @property + def terminated(self): + return self._terminated + @block_sizes.setter def block_sizes(self, block_sizes): self.logger.debug("Set block_sizes to: " + str(block_sizes)) @@ -92,6 +98,16 @@ class TestExecutor(object): self.metrics_emitter.transmit_metrics(carbon_metrics) + if self._thread_gate.report(callback_id): + self.broadcast_event() + + def broadcast_event(self): + for event_listener in self.event_listeners: + try: + event_listener(self) + except Exception, e: + self.logger.error("Notifying listener: %s", e) + def register_workloads(self, workloads): self.workload_modules = [] @@ -178,8 +194,10 @@ class TestExecutor(object): def execute_workloads(self): self._terminated = False self.logger.info("Starting job %s" % (self.job_db.job_id)) + data_handler = DataHandler() + self.register(data_handler.data_event) - start_time = time.time() + self.start_time = time.time() for workload_module in self.workload_modules: workload_name = getattr(workload_module, "__name__") @@ -198,6 +216,8 @@ class TestExecutor(object): blocksizes = self._block_sizes workload.id = self.job_db.job_id + self._thread_gate = ThreadGate(len(self.slaves), + workload.options['status-interval']) for blocksize in blocksizes: for iodepth in iodepths: @@ -243,56 +263,9 @@ class TestExecutor(object): self.logger.info("Completed workload %s" % (workload_name)) self.logger.info("Completed job %s" % (self.job_db.job_id)) - end_time = time.time() - pod_name = dictionary.get_key_from_dict(self.metadata, - 'pod_name', - 'Unknown') - version = dictionary.get_key_from_dict(self.metadata, - 'version', - 'Unknown') - scenario = dictionary.get_key_from_dict(self.metadata, - 'scenario_name', - 'Unknown') - build_tag = dictionary.get_key_from_dict(self.metadata, - 'build_tag', - 'Unknown') - duration = end_time - start_time - test_db = os.environ.get('TEST_DB_URL') - - if test_db is not None: - # I really do not like doing this. As our threads just - # terminated, their final results are still being spooled - # off to Carbon. Need to give that a little time to finish - time.sleep(5) - self.logger.info("Pushing results to %s" % (test_db)) - - payload = self.metadata - payload['timestart'] = start_time - payload['duration'] = duration - payload['status'] = 'OK' - graphite_db = GraphiteDB() - payload['metrics'] = graphite_db.fetch_averages(self.job_db.job_id) - criteria = {} - criteria['block_sizes'] = self.block_sizes - criteria['queue_depths'] = self.queue_depths - - try: - test_results_db.push_results_to_db(test_db, - "storperf", - "Latency Test", - start_time, - end_time, - self.logger, - pod_name, - version, - scenario, - criteria, - build_tag, - payload) - except: - self.logger.exception("Error pushing results into Database") - + self.end_time = time.time() self._terminated = True + self.broadcast_event() def execute_on_node(self, workload): diff --git a/storperf/utilities/data_handler.py b/storperf/utilities/data_handler.py new file mode 100644 index 0000000..03c764c --- /dev/null +++ b/storperf/utilities/data_handler.py @@ -0,0 +1,79 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import logging +import os + +from storperf.db import test_results_db +from storperf.db.graphite_db import GraphiteDB +from storperf.utilities import dictionary + + +class DataHandler(object): + + def __init__(self): + self.logger = logging.getLogger(__name__) + + """ + """ + + def data_event(self, executor): + self.logger.info("Event received") + + # Data lookup + + if executor.terminated: + self._push_to_db(executor) + + def _push_to_db(self, executor): + test_db = os.environ.get('TEST_DB_URL') + + if test_db is not None: + pod_name = dictionary.get_key_from_dict(executor.metadata, + 'pod_name', + 'Unknown') + version = dictionary.get_key_from_dict(executor.metadata, + 'version', + 'Unknown') + scenario = dictionary.get_key_from_dict(executor.metadata, + 'scenario_name', + 'Unknown') + build_tag = dictionary.get_key_from_dict(executor.metadata, + 'build_tag', + 'Unknown') + duration = executor.end_time - executor.start_time + + self.logger.info("Pushing results to %s" % (test_db)) + + payload = executor.metadata + payload['timestart'] = executor.start_time + payload['duration'] = duration + payload['status'] = 'OK' + graphite_db = GraphiteDB() + payload['metrics'] = graphite_db.fetch_averages( + executor.job_db.job_id) + criteria = {} + criteria['block_sizes'] = executor.block_sizes + criteria['queue_depths'] = executor.queue_depths + + try: + test_results_db.push_results_to_db(test_db, + "storperf", + "Latency Test", + executor.start_time, + executor.end_time, + self.logger, + pod_name, + version, + scenario, + criteria, + build_tag, + payload) + except: + self.logger.exception("Error pushing results into Database") diff --git a/tests/db_tests/job_db_test.py b/tests/db_tests/job_db_test.py index fe3d9f1..ccfb9cc 100644 --- a/tests/db_tests/job_db_test.py +++ b/tests/db_tests/job_db_test.py @@ -7,14 +7,15 @@ # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -from storperf.db.job_db import JobDB -from storperf.workloads.rr import rr import os import sqlite3 import unittest import mock +from storperf.db.job_db import JobDB +from storperf.workloads.rr import rr + class JobDBTest(unittest.TestCase): diff --git a/tests/utilities_tests/data_handler_test.py b/tests/utilities_tests/data_handler_test.py new file mode 100644 index 0000000..482b98e --- /dev/null +++ b/tests/utilities_tests/data_handler_test.py @@ -0,0 +1,76 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import os +import unittest + +import mock + +from storperf.utilities.data_handler import DataHandler + + +class MockGraphiteDB(object): + + def __init__(self): + self.called = False + + def fetch_averages(self, job_id): + self.called = True + return None + + +class DataHandlerTest(unittest.TestCase): + + def setUp(self): + self.event_listeners = set() + self.data_handler = DataHandler() + self._terminated = False + self.args = None + self.start_time = 0 + self.end_time = 1 + self.metadata = {} + self.block_sizes = "1" + self.queue_depths = "1" + mock.job_id = "1" + self.job_db = mock + self.pushed = False + pass + + @property + def terminated(self): + return self._terminated + + def push_results_to_db(self, *args): + self.pushed = True + pass + + def test_not_terminated_report(self): + self.data_handler.data_event(self) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.utilities.data_handler.GraphiteDB") + def test_terminated_report(self, mock_graphite_db, mock_results_db): + self._terminated = True + mock_results_db.side_effect = self.push_results_to_db + mock_graphite_db.side_effect = MockGraphiteDB + + self.data_handler.data_event(self) + self.assertEqual(True, self.pushed) + + @mock.patch.dict(os.environ, {'TEST_DB_URL': 'mock'}) + @mock.patch("storperf.db.test_results_db.push_results_to_db") + @mock.patch("storperf.utilities.data_handler.GraphiteDB") + def test_non_terminated_report(self, mock_graphite_db, mock_results_db): + self._terminated = False + mock_results_db.side_effect = self.push_results_to_db + mock_graphite_db.side_effect = MockGraphiteDB + + self.data_handler.data_event(self) + self.assertEqual(False, self.pushed) -- cgit 1.2.3-korg