aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoss Brattain <ross.b.brattain@intel.com>2017-10-02 15:16:54 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-10-02 15:16:54 +0000
commit0cfce062a9c19751243ef7590ea9a11050bc7a6c (patch)
treebbe1770ceca73d7a45e4a541d60843fa7dbb6262
parentdef87da57962d932086840fc20950dc90de5c567 (diff)
parent7826320fa41ad213ac36d369b1e4d71e4cabf176 (diff)
Merge "drain runner queue and undo cancel_join_thread"
-rw-r--r--tests/unit/benchmark/runner/test_base.py43
-rw-r--r--yardstick/benchmark/core/task.py31
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py17
-rwxr-xr-xyardstick/benchmark/runners/base.py37
-rw-r--r--yardstick/benchmark/runners/duration.py8
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py13
-rw-r--r--yardstick/benchmark/runners/iteration.py24
-rw-r--r--yardstick/benchmark/runners/sequence.py12
-rw-r--r--yardstick/benchmark/scenarios/availability/monitor/basemonitor.py1
9 files changed, 137 insertions, 49 deletions
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py
index 956762c40..f47b88e95 100644
--- a/tests/unit/benchmark/runner/test_base.py
+++ b/tests/unit/benchmark/runner/test_base.py
@@ -17,10 +17,31 @@ import time
from mock import mock
-from yardstick.benchmark.runners.base import Runner
+from yardstick.benchmark.runners import base
from yardstick.benchmark.runners.iteration import IterationRunner
+class ActionTestCase(unittest.TestCase):
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__execute_shell_command(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ self.assertEqual(base._execute_shell_command("")[0], -1)
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__single_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._single_action(0, "echo", mock.MagicMock())
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__periodic_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._periodic_action(0, "echo", mock.MagicMock())
+
+
class RunnerTestCase(unittest.TestCase):
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
@@ -41,8 +62,26 @@ class RunnerTestCase(unittest.TestCase):
actual_result = runner.get_output()
self.assertEqual(idle_result, actual_result)
+ @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
+ def test_get_result(self, mock_process):
+ runner = IterationRunner({})
+ runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+ runner.result_queue.put({'criteria': 'PASS'})
+
+ idle_result = [
+ {'case': 'opnfv_yardstick_tc002'},
+ {'criteria': 'PASS'}
+ ]
+
+ for retries in range(1000):
+ time.sleep(0.01)
+ if not runner.result_queue.empty():
+ break
+ actual_result = runner.get_result()
+ self.assertEqual(idle_result, actual_result)
+
def test__run_benchmark(self):
- runner = Runner(mock.Mock())
+ runner = base.Runner(mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index c175a950b..53298d8d3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -260,25 +260,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
@@ -287,15 +285,13 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- status = runner.join(JOIN_TIMEOUT)
+ status = runner.join(self.outputs, result, JOIN_TIMEOUT)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner.join(JOIN_TIMEOUT)
+ runner.join(self.outputs, result, JOIN_TIMEOUT)
base_runner.Runner.release(runner)
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
print("Background task ended")
return result
@@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes):
return networks
-def runner_join(runner):
- """join (wait for) a runner, exit process at runner failure"""
- status = runner.join()
+def runner_join(runner, outputs, result):
+ """join (wait for) a runner, exit process at runner failure
+ :param outputs:
+ :type outputs: dict
+ :param result:
+ :type result: list
+ """
+ status = runner.join(outputs, result)
base_runner.Runner.release(runner)
return status
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 974fb21b3..3ff064ae1 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -46,11 +46,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -143,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if errors and sla_action is None:
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class ArithmeticRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index 57903ebb9..3ecf67736 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -24,6 +24,9 @@ import subprocess
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
@@ -47,7 +50,6 @@ def _execute_shell_command(command):
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
- queue.cancel_join_thread()
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
@@ -62,7 +64,6 @@ def _single_action(seconds, command, queue):
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
- queue.cancel_join_thread()
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
@@ -118,7 +119,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -139,9 +140,7 @@ class Runner(object):
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
- self.output_queue.cancel_join_thread()
self.result_queue = multiprocessing.Queue()
- self.result_queue.cancel_join_thread()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
@@ -209,9 +208,21 @@ class Runner(object):
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
@@ -221,11 +232,19 @@ class Runner(object):
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index 6a09131e1..75942766d 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -36,11 +36,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -116,6 +111,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.exception("")
raise SystemExit(1)
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
+
class DurationRunner(base.Runner):
"""Run a scenario for a certain amount of time
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index 2f5f7e4f4..01e76c6f4 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -33,7 +33,6 @@ LOG = logging.getLogger(__name__)
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
- queue.cancel_join_thread()
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
@@ -142,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 822e67723..4a7439588 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -36,11 +36,6 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
- # if we don't do this we can hang waiting for the queue to drain
- # have to do this in the subprocess
- queue.cancel_join_thread()
- output_queue.cancel_join_thread()
-
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -95,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.exception(e)
else:
if result:
- output_queue.put(result)
+ LOG.debug("output_queue.put %s", result)
+ output_queue.put(result, True, 1)
time.sleep(interval)
@@ -106,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- queue.put(benchmark_output)
+ LOG.debug("queue.put, %s", benchmark_output)
+ queue.put(benchmark_output, True, 1)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
@@ -119,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.info("worker END")
break
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index 68e272c57..f08ca5dde 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if (errors and sla_action is None) or aborted.is_set():
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class SequenceRunner(base.Runner):
diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
index 871f13f88..a6c1a28bd 100644
--- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
+++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
@@ -90,7 +90,6 @@ class BaseMonitor(multiprocessing.Process):
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
- self._queue.cancel_join_thread()
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False