aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tests/unit/benchmark/runner/test_base.py43
-rw-r--r--tests/unit/network_services/traffic_profile/test_prox_profile.py5
-rw-r--r--yardstick/benchmark/core/task.py31
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py12
-rwxr-xr-xyardstick/benchmark/runners/base.py33
-rw-r--r--yardstick/benchmark/runners/duration.py23
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py12
-rw-r--r--yardstick/benchmark/runners/iteration.py19
-rw-r--r--yardstick/benchmark/runners/sequence.py12
-rw-r--r--yardstick/network_services/nfvi/collectd.py1
-rw-r--r--yardstick/network_services/traffic_profile/prox_profile.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py3
13 files changed, 161 insertions, 35 deletions
diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py
index 956762c40..f47b88e95 100644
--- a/tests/unit/benchmark/runner/test_base.py
+++ b/tests/unit/benchmark/runner/test_base.py
@@ -17,10 +17,31 @@ import time
from mock import mock
-from yardstick.benchmark.runners.base import Runner
+from yardstick.benchmark.runners import base
from yardstick.benchmark.runners.iteration import IterationRunner
+class ActionTestCase(unittest.TestCase):
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__execute_shell_command(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ self.assertEqual(base._execute_shell_command("")[0], -1)
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__single_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._single_action(0, "echo", mock.MagicMock())
+
+ @mock.patch("yardstick.benchmark.runners.base.subprocess")
+ def test__periodic_action(self, mock_subprocess):
+ mock_subprocess.check_output.side_effect = Exception()
+
+ base._periodic_action(0, "echo", mock.MagicMock())
+
+
class RunnerTestCase(unittest.TestCase):
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
@@ -41,8 +62,26 @@ class RunnerTestCase(unittest.TestCase):
actual_result = runner.get_output()
self.assertEqual(idle_result, actual_result)
+ @mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
+ def test_get_result(self, mock_process):
+ runner = IterationRunner({})
+ runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+ runner.result_queue.put({'criteria': 'PASS'})
+
+ idle_result = [
+ {'case': 'opnfv_yardstick_tc002'},
+ {'criteria': 'PASS'}
+ ]
+
+ for retries in range(1000):
+ time.sleep(0.01)
+ if not runner.result_queue.empty():
+ break
+ actual_result = runner.get_result()
+ self.assertEqual(idle_result, actual_result)
+
def test__run_benchmark(self):
- runner = Runner(mock.Mock())
+ runner = base.Runner(mock.Mock())
with self.assertRaises(NotImplementedError):
runner._run_benchmark(mock.Mock(), mock.Mock(), mock.Mock(), mock.Mock())
diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py
index c32cc2878..078e72b8e 100644
--- a/tests/unit/network_services/traffic_profile/test_prox_profile.py
+++ b/tests/unit/network_services/traffic_profile/test_prox_profile.py
@@ -63,8 +63,9 @@ class TestProxProfile(unittest.TestCase):
}
profile = ProxProfile(tp_config)
- profile.init(234)
- self.assertEqual(profile.queue, 234)
+ queue = mock.Mock()
+ profile.init(queue)
+ self.assertIs(profile.queue, queue)
def test_execute_traffic(self):
packet_sizes = [
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index c175a950b..53298d8d3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -260,25 +260,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
@@ -287,15 +285,13 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- status = runner.join(JOIN_TIMEOUT)
+ status = runner.join(self.outputs, result, JOIN_TIMEOUT)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner.join(JOIN_TIMEOUT)
+ runner.join(self.outputs, result, JOIN_TIMEOUT)
base_runner.Runner.release(runner)
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
print("Background task ended")
return result
@@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes):
return networks
-def runner_join(runner):
- """join (wait for) a runner, exit process at runner failure"""
- status = runner.join()
+def runner_join(runner, outputs, result):
+ """join (wait for) a runner, exit process at runner failure
+ :param outputs:
+ :type outputs: dict
+ :param result:
+ :type result: list
+ """
+ status = runner.join(outputs, result)
base_runner.Runner.release(runner)
return status
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 7898ae2bc..3ff064ae1 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -138,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if errors and sla_action is None:
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class ArithmeticRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..3ecf67736 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -24,6 +24,9 @@ import subprocess
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
@@ -116,7 +119,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -205,9 +208,21 @@ class Runner(object):
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
@@ -217,11 +232,19 @@ class Runner(object):
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index c2c6a8f19..75942766d 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -54,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sla_action = scenario_cfg["sla"].get("action", "assert")
start = time.time()
+ timeout = start + duration
while True:
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -71,9 +72,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", assertion.args)
errors = assertion.args
- except Exception as e:
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
output_queue.put(result)
@@ -94,12 +97,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence += 1
- if (errors and sla_action is None) or \
- (time.time() - start > duration or aborted.is_set()):
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
LOG.info("Worker END")
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class DurationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index afff27d75..01e76c6f4 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -141,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 50fe106bd..4a7439588 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -90,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.exception(e)
else:
if result:
- output_queue.put(result)
+ LOG.debug("output_queue.put %s", result)
+ output_queue.put(result, True, 1)
time.sleep(interval)
@@ -101,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- queue.put(benchmark_output)
+ LOG.debug("queue.put, %s", benchmark_output)
+ queue.put(benchmark_output, True, 1)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
@@ -114,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.info("worker END")
break
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index 68e272c57..f08ca5dde 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if (errors and sla_action is None) or aborted.is_set():
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class SequenceRunner(base.Runner):
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
index f2c9d40a7..e0027bbcb 100644
--- a/yardstick/network_services/nfvi/collectd.py
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -35,6 +35,7 @@ class AmqpConsumer(object):
self._consumer_tag = None
self._url = amqp_url
self._queue = queue
+ self._queue.cancel_join_thread()
def connect(self):
""" connect to amqp url """
diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py
index 896384d5e..170dfd96f 100644
--- a/yardstick/network_services/traffic_profile/prox_profile.py
+++ b/yardstick/network_services/traffic_profile/prox_profile.py
@@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile):
def init(self, queue):
self.pkt_size_iterator = iter(self.pkt_sizes)
self.queue = queue
+ self.queue.cancel_join_thread()
def bounds_iterator(self, logger=None):
if logger:
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index 02ae06170..ba4d44c41 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper):
return self._test_type
def run_traffic(self, traffic_profile):
+ self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
index 91530860e..5cf234514 100644
--- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper):
self._queue.put(samples)
def run_traffic(self, traffic_profile):
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
try: