From 7496f13e0be433697a74d7156c47d950b2add0a9 Mon Sep 17 00:00:00 2001 From: Ross Brattain Date: Fri, 29 Sep 2017 15:39:04 -0700 Subject: drain runner queue and undo cancel_join_thread Sometimes the runners can hang. Initially debugging lead to the queue join thread, so I thought we could cancel all the join threads and everything would be okay. But it turns out canceling the queue join threads can lead to corruption of the queues, so when we go to drain the queues the task hangs. But it also turns out that we were not properly draining the queues in the task process. We were waiting for all the runners to exit, then draining the queues. This is bad and will cause the queues to fill up and hang and/or drop data or corrupt the queues. The proper fix seems to be to draining the queues in a loop before calling join with a timeout. Also modified the queue drain loops to no block on queue.get() Revert "cancel all queue join threads" This reverts commit 75c0e3a54b8f6e8fd77c7d9d95decab830159929. Revert "duration runner: add teardown and cancel all queue join threads" This reverts commit 7eb6abb6931b24e085b139cc3500f4497cdde57d. Change-Id: Ic4f8e814cf23615621c1250535967716b425ac18 Signed-off-by: Ross Brattain --- tests/unit/benchmark/runner/test_base.py | 43 +++++++++++++++++++++- .../traffic_profile/test_prox_profile.py | 5 ++- yardstick/benchmark/core/task.py | 31 ++++++++-------- yardstick/benchmark/runners/arithmetic.py | 12 +++++- yardstick/benchmark/runners/base.py | 33 ++++++++++++++--- yardstick/benchmark/runners/duration.py | 23 +++++++++--- yardstick/benchmark/runners/dynamictp.py | 12 +++++- yardstick/benchmark/runners/iteration.py | 19 ++++++++-- yardstick/benchmark/runners/sequence.py | 12 +++++- yardstick/network_services/nfvi/collectd.py | 1 + .../traffic_profile/prox_profile.py | 1 + .../vnf_generic/vnf/prox_helpers.py | 1 + .../network_services/vnf_generic/vnf/sample_vnf.py | 3 ++ 13 files changed, 161 insertions(+), 35 deletions(-) diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py index 956762c40..f47b88e95 100644 --- a/tests/unit/benchmark/runner/test_base.py +++ b/tests/unit/benchmark/runner/test_base.py @@ -17,10 +17,31 @@ import time from mock import mock -from yardstick.benchmark.runners.base import Runner +from yardstick.benchmark.runners import base from yardstick.benchmark.runners.iteration import IterationRunner +class ActionTestCase(unittest.TestCase): + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__execute_shell_command(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + self.assertEqual(base._execute_shell_command("")[0], -1) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__single_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._single_action(0, "echo", mock.MagicMock()) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__periodic_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._periodic_action(0, "echo", mock.MagicMock()) + + class RunnerTestCase(unittest.TestCase): @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") @@ -41,8 +62,26 @@ class RunnerTestCase(unittest.TestCase): actual_result = runner.get_output() self.assertEqual(idle_result, actual_result) + @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") + def test_get_result(self, mock_process): + runner = IterationRunner({}) + runner.result_queue.put({'case': 'opnfv_yardstick_tc002'}) + runner.result_queue.put({'criteria': 'PASS'}) + + idle_result = [ + {'case': 'opnfv_yardstick_tc002'}, + {'criteria': 'PASS'} + ] + + for retries in range(1000): + time.sleep(0.01) + if not runner.result_queue.empty(): + break + actual_result = runner.get_result() + self.assertEqual(idle_result, actual_result) + def test__run_benchmark(self): - runner = Runner(mock.Mock()) + runner = base.Runner(mock.Mock()) with self.assertRaises(NotImplementedError): runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py index c32cc2878..078e72b8e 100644 --- a/tests/unit/network_services/traffic_profile/test_prox_profile.py +++ b/tests/unit/network_services/traffic_profile/test_prox_profile.py @@ -63,8 +63,9 @@ class TestProxProfile(unittest.TestCase): } profile = ProxProfile(tp_config) - profile.init(234) - self.assertEqual(profile.queue, 234) + queue = mock.Mock() + profile.init(queue) + self.assertIs(profile.queue, queue) def test_execute_traffic(self): packet_sizes = [ diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c175a950b..53298d8d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -260,25 +260,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: LOG.error('Scenario NO.%s: "%s" ERROR!', scenarios.index(scenario) + 1, scenario.get('type')) - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) # Abort background runners @@ -287,15 +285,13 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - status = runner.join(JOIN_TIMEOUT) + status = runner.join(self.outputs, result, JOIN_TIMEOUT) if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner.join(JOIN_TIMEOUT) + runner.join(self.outputs, result, JOIN_TIMEOUT) base_runner.Runner.release(runner) - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) print("Background task ended") return result @@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes): return networks -def runner_join(runner): - """join (wait for) a runner, exit process at runner failure""" - status = runner.join() +def runner_join(runner, outputs, result): + """join (wait for) a runner, exit process at runner failure + :param outputs: + :type outputs: dict + :param result: + :type result: list + """ + status = runner.join(outputs, result) base_runner.Runner.release(runner) return status diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 7898ae2bc..3ff064ae1 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -138,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if errors and sla_action is None: break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class ArithmeticRunner(base.Runner): diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a69811f8a..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -116,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -205,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -217,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index c2c6a8f19..75942766d 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -54,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sla_action = scenario_cfg["sla"].get("action", "assert") start = time.time() + timeout = start + duration while True: LOG.debug("runner=%(runner)s seq=%(sequence)s START", @@ -71,9 +72,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg, elif sla_action == "monitor": LOG.warning("SLA validation failed: %s", assertion.args) errors = assertion.args - except Exception as e: + # catch all exceptions because with multiprocessing we can have un-picklable exception + # problems https://bugs.python.org/issue9400 + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: output_queue.put(result) @@ -94,12 +97,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence += 1 - if (errors and sla_action is None) or \ - (time.time() - start > duration or aborted.is_set()): + if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): LOG.info("Worker END") break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class DurationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index afff27d75..01e76c6f4 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -141,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.debug("iterator: %s iterations: %s", iterator, iterations) if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 50fe106bd..4a7439588 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -90,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception(e) else: if result: - output_queue.put(result) + LOG.debug("output_queue.put %s", result) + output_queue.put(result, True, 1) time.sleep(interval) @@ -101,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - queue.put(benchmark_output) + LOG.debug("queue.put, %s", benchmark_output) + queue.put(benchmark_output, True, 1) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], @@ -114,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.info("worker END") break if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 68e272c57..f08ca5dde 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if (errors and sla_action is None) or aborted.is_set(): break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class SequenceRunner(base.Runner): diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py index f2c9d40a7..e0027bbcb 100644 --- a/yardstick/network_services/nfvi/collectd.py +++ b/yardstick/network_services/nfvi/collectd.py @@ -35,6 +35,7 @@ class AmqpConsumer(object): self._consumer_tag = None self._url = amqp_url self._queue = queue + self._queue.cancel_join_thread() def connect(self): """ connect to amqp url """ diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py index 896384d5e..170dfd96f 100644 --- a/yardstick/network_services/traffic_profile/prox_profile.py +++ b/yardstick/network_services/traffic_profile/prox_profile.py @@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile): def init(self, queue): self.pkt_size_iterator = iter(self.pkt_sizes) self.queue = queue + self.queue.cancel_join_thread() def bounds_iterator(self, logger=None): if logger: diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 02ae06170..ba4d44c41 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper): return self._test_type def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 91530860e..5cf234514 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper): self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: -- cgit 1.2.3-korg