diff options
author | Ross Brattain <ross.b.brattain@intel.com> | 2017-10-02 15:16:54 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2017-10-02 15:16:54 +0000 |
commit | 0cfce062a9c19751243ef7590ea9a11050bc7a6c (patch) | |
tree | bbe1770ceca73d7a45e4a541d60843fa7dbb6262 | |
parent | def87da57962d932086840fc20950dc90de5c567 (diff) | |
parent | 7826320fa41ad213ac36d369b1e4d71e4cabf176 (diff) |
Merge "drain runner queue and undo cancel_join_thread"
-rw-r--r-- | tests/unit/benchmark/runner/test_base.py | 43 | ||||
-rw-r--r-- | yardstick/benchmark/core/task.py | 31 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/arithmetic.py | 17 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 37 | ||||
-rw-r--r-- | yardstick/benchmark/runners/duration.py | 8 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/dynamictp.py | 13 | ||||
-rw-r--r-- | yardstick/benchmark/runners/iteration.py | 24 | ||||
-rw-r--r-- | yardstick/benchmark/runners/sequence.py | 12 | ||||
-rw-r--r-- | yardstick/benchmark/scenarios/availability/monitor/basemonitor.py | 1 |
9 files changed, 137 insertions, 49 deletions
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py index 956762c40..f47b88e95 100644 --- a/tests/unit/benchmark/runner/test_base.py +++ b/tests/unit/benchmark/runner/test_base.py @@ -17,10 +17,31 @@ import time from mock import mock -from yardstick.benchmark.runners.base import Runner +from yardstick.benchmark.runners import base from yardstick.benchmark.runners.iteration import IterationRunner +class ActionTestCase(unittest.TestCase): + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__execute_shell_command(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + self.assertEqual(base._execute_shell_command("")[0], -1) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__single_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._single_action(0, "echo", mock.MagicMock()) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__periodic_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._periodic_action(0, "echo", mock.MagicMock()) + + class RunnerTestCase(unittest.TestCase): @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") @@ -41,8 +62,26 @@ class RunnerTestCase(unittest.TestCase): actual_result = runner.get_output() self.assertEqual(idle_result, actual_result) + @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") + def test_get_result(self, mock_process): + runner = IterationRunner({}) + runner.result_queue.put({'case': 'opnfv_yardstick_tc002'}) + runner.result_queue.put({'criteria': 'PASS'}) + + idle_result = [ + {'case': 'opnfv_yardstick_tc002'}, + {'criteria': 'PASS'} + ] + + for retries in range(1000): + time.sleep(0.01) + if not runner.result_queue.empty(): + break + actual_result = runner.get_result() + self.assertEqual(idle_result, actual_result) + def test__run_benchmark(self): - runner = Runner(mock.Mock()) + runner = base.Runner(mock.Mock()) with self.assertRaises(NotImplementedError): runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c175a950b..53298d8d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -260,25 +260,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: LOG.error('Scenario NO.%s: "%s" ERROR!', scenarios.index(scenario) + 1, scenario.get('type')) - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) # Abort background runners @@ -287,15 +285,13 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - status = runner.join(JOIN_TIMEOUT) + status = runner.join(self.outputs, result, JOIN_TIMEOUT) if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner.join(JOIN_TIMEOUT) + runner.join(self.outputs, result, JOIN_TIMEOUT) base_runner.Runner.release(runner) - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) print("Background task ended") return result @@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes): return networks -def runner_join(runner): - """join (wait for) a runner, exit process at runner failure""" - status = runner.join() +def runner_join(runner, outputs, result): + """join (wait for) a runner, exit process at runner failure + :param outputs: + :type outputs: dict + :param result: + :type result: list + """ + status = runner.join(outputs, result) base_runner.Runner.release(runner) return status diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 974fb21b3..3ff064ae1 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -46,11 +46,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence = 1 - # if we don't do this we can hang waiting for the queue to drain - # have to do this in the subprocess - queue.cancel_join_thread() - output_queue.cancel_join_thread() - runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) @@ -143,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if errors and sla_action is None: break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class ArithmeticRunner(base.Runner): diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 57903ebb9..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -47,7 +50,6 @@ def _execute_shell_command(command): def _single_action(seconds, command, queue): """entrypoint for the single action process""" - queue.cancel_join_thread() log.debug("single action, fires after %d seconds (from now)", seconds) time.sleep(seconds) log.debug("single action: executing command: '%s'", command) @@ -62,7 +64,6 @@ def _single_action(seconds, command, queue): def _periodic_action(interval, command, queue): """entrypoint for the periodic action process""" - queue.cancel_join_thread() log.debug("periodic action, fires every: %d seconds", interval) time_spent = 0 while True: @@ -118,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -139,9 +140,7 @@ class Runner(object): self.config = config self.periodic_action_process = None self.output_queue = multiprocessing.Queue() - self.output_queue.cancel_join_thread() self.result_queue = multiprocessing.Queue() - self.result_queue.cancel_join_thread() self.process = None self.aborted = multiprocessing.Event() Runner.runners.append(self) @@ -209,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -221,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 6a09131e1..75942766d 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -36,11 +36,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence = 1 - # if we don't do this we can hang waiting for the queue to drain - # have to do this in the subprocess - queue.cancel_join_thread() - output_queue.cancel_join_thread() - runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) @@ -116,6 +111,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception("") raise SystemExit(1) + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) + class DurationRunner(base.Runner): """Run a scenario for a certain amount of time diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index 2f5f7e4f4..01e76c6f4 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -33,7 +33,6 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg, aborted): # pragma: no cover - queue.cancel_join_thread() runner_cfg = scenario_cfg['runner'] iterations = runner_cfg.get("iterations", 1) interval = runner_cfg.get("interval", 1) @@ -142,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.debug("iterator: %s iterations: %s", iterator, iterations) if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 822e67723..4a7439588 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -36,11 +36,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence = 1 - # if we don't do this we can hang waiting for the queue to drain - # have to do this in the subprocess - queue.cancel_join_thread() - output_queue.cancel_join_thread() - runner_cfg = scenario_cfg['runner'] interval = runner_cfg.get("interval", 1) @@ -95,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception(e) else: if result: - output_queue.put(result) + LOG.debug("output_queue.put %s", result) + output_queue.put(result, True, 1) time.sleep(interval) @@ -106,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - queue.put(benchmark_output) + LOG.debug("queue.put, %s", benchmark_output) + queue.put(benchmark_output, True, 1) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], @@ -119,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.info("worker END") break if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 68e272c57..f08ca5dde 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if (errors and sla_action is None) or aborted.is_set(): break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class SequenceRunner(base.Runner): diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py index 871f13f88..a6c1a28bd 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py +++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py @@ -90,7 +90,6 @@ class BaseMonitor(multiprocessing.Process): self._config = config self._context = context self._queue = multiprocessing.Queue() - self._queue.cancel_join_thread() self._event = multiprocessing.Event() self.monitor_data = data self.setup_done = False |