From 776ff48bc805fbca34811b54ebd7a3626a20f817 Mon Sep 17 00:00:00 2001 From: Mark Beierl Date: Fri, 25 Nov 2016 13:28:40 -0500 Subject: Data Reporting Gate Module that allows passing of a gate only once all other peers have also reported in within a specified time period Change-Id: If4baf1d4377026c7833a6f30bfc2e36698f675e8 JIRA: STORPERF-71 Signed-off-by: Mark Beierl --- storperf/utilities/thread_gate.py | 58 +++++++++++++++++++++++++++++++ tests/utilities_tests/__init__.py | 3 ++ tests/utilities_tests/thread_gate_test.py | 57 ++++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 storperf/utilities/thread_gate.py create mode 100644 tests/utilities_tests/thread_gate_test.py diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py new file mode 100644 index 0000000..b0dde50 --- /dev/null +++ b/storperf/utilities/thread_gate.py @@ -0,0 +1,58 @@ +############################################################################## +# Copyright (c) 2016 Dell EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +""" +Creates a gate object that allows synchronization between an arbitrary +number of callers. +""" +import logging +import time + + +class FailureToReportException(Exception): + pass + + +class ThreadGate(object): + + def __init__(self, size, timeout=60): + self.logger = logging.getLogger(__name__) + self._gate_size = size + self._timeout = timeout + self._registrants = {} + self._creation_time = time.time() + + """ + Calling this method returns a true or false, indicating that enough + of the other registrants have reported in + """ + + def report(self, gate_id): + now = time.time() + self._registrants[gate_id] = now + ready = True + self.logger.debug("Gate report for %s", gate_id) + + total_missing = self._gate_size - len(self._registrants) + if total_missing > 0: + self.logger.debug("Not all registrants have reported in") + time_since_creation = now - self._creation_time + if (time_since_creation > (self._timeout * 2)): + self.logger.error("%s registrant(s) have never reported in", + total_missing) + raise FailureToReportException + return False + + for k, v in self._registrants.items(): + time_since_last_report = now - v + if time_since_last_report > self._timeout: + self.logger.debug("Registrant %s last reported %s ago", + k, time_since_last_report) + ready = False + + return ready diff --git a/tests/utilities_tests/__init__.py b/tests/utilities_tests/__init__.py index 73444b6..6218fe3 100644 --- a/tests/utilities_tests/__init__.py +++ b/tests/utilities_tests/__init__.py @@ -6,3 +6,6 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## +import logging + +logging.basicConfig(level=logging.DEBUG) diff --git a/tests/utilities_tests/thread_gate_test.py b/tests/utilities_tests/thread_gate_test.py new file mode 100644 index 0000000..de8b15a --- /dev/null +++ b/tests/utilities_tests/thread_gate_test.py @@ -0,0 +1,57 @@ +############################################################################## +# Copyright (c) 2016 EMC and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import time +import unittest + +from storperf.utilities.thread_gate import FailureToReportException +from storperf.utilities.thread_gate import ThreadGate + + +class ThreadGateTest(unittest.TestCase): + + def setUp(self): + pass + + def test_one_one_report(self): + gate = ThreadGate(1) + self.assertEqual(True, gate.report(1)) + + def test_two_one_report(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + + def test_two_two_reports(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + self.assertEqual(True, gate.report(2)) + + def test_two_one_duplicate_reports(self): + gate = ThreadGate(2) + self.assertEqual(False, gate.report(1)) + self.assertEqual(False, gate.report(1)) + self.assertEqual(True, gate.report(2)) + + def test_two_old_old_report(self): + timeout = 5 + gate = ThreadGate(2, timeout) + report_time = time.time() - (timeout * 2) + gate._registrants[2] = report_time + self.assertEqual(False, gate.report(1)) + + def test_two_never_report(self): + timeout = 5 + gate = ThreadGate(2, timeout) + report_time = time.time() - (timeout * 3) + gate._creation_time = report_time + try: + gate.report(1) + self.fail() + except FailureToReportException: + pass -- cgit 1.2.3-korg