summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMark Beierl <mark.beierl@dell.com>2016-11-25 13:28:40 -0500
committerMark Beierl <mark.beierl@dell.com>2016-11-25 13:40:34 -0500
commit776ff48bc805fbca34811b54ebd7a3626a20f817 (patch)
treeda3c2a91453a4cd80614891a2e61d8511bb0c71a
parent227bb19115eaab4f689cb570f0a5574cd5e318bb (diff)
Data Reporting Gate
Module that allows passing of a gate only once all other peers have also reported in within a specified time period Change-Id: If4baf1d4377026c7833a6f30bfc2e36698f675e8 JIRA: STORPERF-71 Signed-off-by: Mark Beierl <mark.beierl@dell.com>
-rw-r--r--storperf/utilities/thread_gate.py58
-rw-r--r--tests/utilities_tests/__init__.py3
-rw-r--r--tests/utilities_tests/thread_gate_test.py57
3 files changed, 118 insertions, 0 deletions
diff --git a/storperf/utilities/thread_gate.py b/storperf/utilities/thread_gate.py
new file mode 100644
index 0000000..b0dde50
--- /dev/null
+++ b/storperf/utilities/thread_gate.py
@@ -0,0 +1,58 @@
+##############################################################################
+# Copyright (c) 2016 Dell EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+"""
+Creates a gate object that allows synchronization between an arbitrary
+number of callers.
+"""
+import logging
+import time
+
+
+class FailureToReportException(Exception):
+ pass
+
+
+class ThreadGate(object):
+
+ def __init__(self, size, timeout=60):
+ self.logger = logging.getLogger(__name__)
+ self._gate_size = size
+ self._timeout = timeout
+ self._registrants = {}
+ self._creation_time = time.time()
+
+ """
+ Calling this method returns a true or false, indicating that enough
+ of the other registrants have reported in
+ """
+
+ def report(self, gate_id):
+ now = time.time()
+ self._registrants[gate_id] = now
+ ready = True
+ self.logger.debug("Gate report for %s", gate_id)
+
+ total_missing = self._gate_size - len(self._registrants)
+ if total_missing > 0:
+ self.logger.debug("Not all registrants have reported in")
+ time_since_creation = now - self._creation_time
+ if (time_since_creation > (self._timeout * 2)):
+ self.logger.error("%s registrant(s) have never reported in",
+ total_missing)
+ raise FailureToReportException
+ return False
+
+ for k, v in self._registrants.items():
+ time_since_last_report = now - v
+ if time_since_last_report > self._timeout:
+ self.logger.debug("Registrant %s last reported %s ago",
+ k, time_since_last_report)
+ ready = False
+
+ return ready
diff --git a/tests/utilities_tests/__init__.py b/tests/utilities_tests/__init__.py
index 73444b6..6218fe3 100644
--- a/tests/utilities_tests/__init__.py
+++ b/tests/utilities_tests/__init__.py
@@ -6,3 +6,6 @@
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
+import logging
+
+logging.basicConfig(level=logging.DEBUG)
diff --git a/tests/utilities_tests/thread_gate_test.py b/tests/utilities_tests/thread_gate_test.py
new file mode 100644
index 0000000..de8b15a
--- /dev/null
+++ b/tests/utilities_tests/thread_gate_test.py
@@ -0,0 +1,57 @@
+##############################################################################
+# Copyright (c) 2016 EMC and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+import time
+import unittest
+
+from storperf.utilities.thread_gate import FailureToReportException
+from storperf.utilities.thread_gate import ThreadGate
+
+
+class ThreadGateTest(unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def test_one_one_report(self):
+ gate = ThreadGate(1)
+ self.assertEqual(True, gate.report(1))
+
+ def test_two_one_report(self):
+ gate = ThreadGate(2)
+ self.assertEqual(False, gate.report(1))
+
+ def test_two_two_reports(self):
+ gate = ThreadGate(2)
+ self.assertEqual(False, gate.report(1))
+ self.assertEqual(True, gate.report(2))
+
+ def test_two_one_duplicate_reports(self):
+ gate = ThreadGate(2)
+ self.assertEqual(False, gate.report(1))
+ self.assertEqual(False, gate.report(1))
+ self.assertEqual(True, gate.report(2))
+
+ def test_two_old_old_report(self):
+ timeout = 5
+ gate = ThreadGate(2, timeout)
+ report_time = time.time() - (timeout * 2)
+ gate._registrants[2] = report_time
+ self.assertEqual(False, gate.report(1))
+
+ def test_two_never_report(self):
+ timeout = 5
+ gate = ThreadGate(2, timeout)
+ report_time = time.time() - (timeout * 3)
+ gate._creation_time = report_time
+ try:
+ gate.report(1)
+ self.fail()
+ except FailureToReportException:
+ pass