diff options
-rw-r--r-- | tests/unit/benchmark/runner/test_base.py | 43 | ||||
-rw-r--r-- | tests/unit/network_services/traffic_profile/test_prox_profile.py | 5 | ||||
-rw-r--r-- | yardstick/benchmark/core/task.py | 31 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/arithmetic.py | 12 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 33 | ||||
-rw-r--r-- | yardstick/benchmark/runners/duration.py | 23 | ||||
-rwxr-xr-x | yardstick/benchmark/runners/dynamictp.py | 12 | ||||
-rw-r--r-- | yardstick/benchmark/runners/iteration.py | 19 | ||||
-rw-r--r-- | yardstick/benchmark/runners/sequence.py | 12 | ||||
-rw-r--r-- | yardstick/network_services/nfvi/collectd.py | 1 | ||||
-rw-r--r-- | yardstick/network_services/traffic_profile/prox_profile.py | 1 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/prox_helpers.py | 1 | ||||
-rw-r--r-- | yardstick/network_services/vnf_generic/vnf/sample_vnf.py | 3 |
13 files changed, 161 insertions, 35 deletions
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py index 956762c40..f47b88e95 100644 --- a/tests/unit/benchmark/runner/test_base.py +++ b/tests/unit/benchmark/runner/test_base.py @@ -17,10 +17,31 @@ import time from mock import mock -from yardstick.benchmark.runners.base import Runner +from yardstick.benchmark.runners import base from yardstick.benchmark.runners.iteration import IterationRunner +class ActionTestCase(unittest.TestCase): + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__execute_shell_command(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + self.assertEqual(base._execute_shell_command("")[0], -1) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__single_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._single_action(0, "echo", mock.MagicMock()) + + @mock.patch("yardstick.benchmark.runners.base.subprocess") + def test__periodic_action(self, mock_subprocess): + mock_subprocess.check_output.side_effect = Exception() + + base._periodic_action(0, "echo", mock.MagicMock()) + + class RunnerTestCase(unittest.TestCase): @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") @@ -41,8 +62,26 @@ class RunnerTestCase(unittest.TestCase): actual_result = runner.get_output() self.assertEqual(idle_result, actual_result) + @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing") + def test_get_result(self, mock_process): + runner = IterationRunner({}) + runner.result_queue.put({'case': 'opnfv_yardstick_tc002'}) + runner.result_queue.put({'criteria': 'PASS'}) + + idle_result = [ + {'case': 'opnfv_yardstick_tc002'}, + {'criteria': 'PASS'} + ] + + for retries in range(1000): + time.sleep(0.01) + if not runner.result_queue.empty(): + break + actual_result = runner.get_result() + self.assertEqual(idle_result, actual_result) + def test__run_benchmark(self): - runner = Runner(mock.Mock()) + runner = base.Runner(mock.Mock()) with self.assertRaises(NotImplementedError): runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock()) diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py index c32cc2878..078e72b8e 100644 --- a/tests/unit/network_services/traffic_profile/test_prox_profile.py +++ b/tests/unit/network_services/traffic_profile/test_prox_profile.py @@ -63,8 +63,9 @@ class TestProxProfile(unittest.TestCase): } profile = ProxProfile(tp_config) - profile.init(234) - self.assertEqual(profile.queue, 234) + queue = mock.Mock() + profile.init(queue) + self.assertIs(profile.queue, queue) def test_execute_traffic(self): packet_sizes = [ diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index c175a950b..53298d8d3 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -260,25 +260,23 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) else: # run serially for scenario in scenarios: if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) - status = runner_join(runner) + status = runner_join(runner, self.outputs, result) if status != 0: LOG.error('Scenario NO.%s: "%s" ERROR!', scenarios.index(scenario) + 1, scenario.get('type')) - raise RuntimeError - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) + raise RuntimeError( + "{0} runner status {1}".format(runner.__execution_type__, status)) LOG.info("Runner ended, output in %s", output_file) # Abort background runners @@ -287,15 +285,13 @@ class Task(object): # pragma: no cover # Wait for background runners to finish for runner in background_runners: - status = runner.join(JOIN_TIMEOUT) + status = runner.join(self.outputs, result, JOIN_TIMEOUT) if status is None: # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) - runner.join(JOIN_TIMEOUT) + runner.join(self.outputs, result, JOIN_TIMEOUT) base_runner.Runner.release(runner) - self.outputs.update(runner.get_output()) - result.extend(runner.get_result()) print("Background task ended") return result @@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes): return networks -def runner_join(runner): - """join (wait for) a runner, exit process at runner failure""" - status = runner.join() +def runner_join(runner, outputs, result): + """join (wait for) a runner, exit process at runner failure + :param outputs: + :type outputs: dict + :param result: + :type result: list + """ + status = runner.join(outputs, result) base_runner.Runner.release(runner) return status diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 7898ae2bc..3ff064ae1 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -138,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if errors and sla_action is None: break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class ArithmeticRunner(base.Runner): diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a69811f8a..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -116,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -205,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -217,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index c2c6a8f19..75942766d 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -54,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sla_action = scenario_cfg["sla"].get("action", "assert") start = time.time() + timeout = start + duration while True: LOG.debug("runner=%(runner)s seq=%(sequence)s START", @@ -71,9 +72,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg, elif sla_action == "monitor": LOG.warning("SLA validation failed: %s", assertion.args) errors = assertion.args - except Exception as e: + # catch all exceptions because with multiprocessing we can have un-picklable exception + # problems https://bugs.python.org/issue9400 + except Exception: errors = traceback.format_exc() - LOG.exception(e) + LOG.exception("") else: if result: output_queue.put(result) @@ -94,12 +97,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg, sequence += 1 - if (errors and sla_action is None) or \ - (time.time() - start > duration or aborted.is_set()): + if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): LOG.info("Worker END") break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class DurationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py index afff27d75..01e76c6f4 100755 --- a/yardstick/benchmark/runners/dynamictp.py +++ b/yardstick/benchmark/runners/dynamictp.py @@ -141,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.debug("iterator: %s iterations: %s", iterator, iterations) if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 50fe106bd..4a7439588 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -90,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.exception(e) else: if result: - output_queue.put(result) + LOG.debug("output_queue.put %s", result) + output_queue.put(result, True, 1) time.sleep(interval) @@ -101,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, 'errors': errors } - queue.put(benchmark_output) + LOG.debug("queue.put, %s", benchmark_output) + queue.put(benchmark_output, True, 1) LOG.debug("runner=%(runner)s seq=%(sequence)s END", {"runner": runner_cfg["runner_id"], @@ -114,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.info("worker END") break if "teardown" in run_step: - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class IterationRunner(base.Runner): diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index 68e272c57..f08ca5dde 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg, if (errors and sla_action is None) or aborted.is_set(): break - benchmark.teardown() + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) LOG.info("worker END") + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) class SequenceRunner(base.Runner): diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py index f2c9d40a7..e0027bbcb 100644 --- a/yardstick/network_services/nfvi/collectd.py +++ b/yardstick/network_services/nfvi/collectd.py @@ -35,6 +35,7 @@ class AmqpConsumer(object): self._consumer_tag = None self._url = amqp_url self._queue = queue + self._queue.cancel_join_thread() def connect(self): """ connect to amqp url """ diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py index 896384d5e..170dfd96f 100644 --- a/yardstick/network_services/traffic_profile/prox_profile.py +++ b/yardstick/network_services/traffic_profile/prox_profile.py @@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile): def init(self, queue): self.pkt_size_iterator = iter(self.pkt_sizes) self.queue = queue + self.queue.cancel_join_thread() def bounds_iterator(self, logger=None): if logger: diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 02ae06170..ba4d44c41 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper): return self._test_type def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 91530860e..5cf234514 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper): self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: |