aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--tests/unit/benchmark/scenarios/networking/test_pktgen.py14
-rw-r--r--tests/unit/network_services/traffic_profile/test_prox_profile.py5
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py5
-rwxr-xr-xyardstick/benchmark/runners/base.py4
-rw-r--r--yardstick/benchmark/runners/duration.py25
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py1
-rw-r--r--yardstick/benchmark/runners/iteration.py5
-rw-r--r--yardstick/benchmark/scenarios/availability/monitor/basemonitor.py1
-rw-r--r--yardstick/network_services/nfvi/collectd.py1
-rw-r--r--yardstick/network_services/traffic_profile/prox_profile.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py3
12 files changed, 52 insertions, 14 deletions
diff --git a/tests/unit/benchmark/scenarios/networking/test_pktgen.py b/tests/unit/benchmark/scenarios/networking/test_pktgen.py
index 0ca31d484..3928aacde 100644
--- a/tests/unit/benchmark/scenarios/networking/test_pktgen.py
+++ b/tests/unit/benchmark/scenarios/networking/test_pktgen.py
@@ -264,7 +264,7 @@ class PktgenTestCase(unittest.TestCase):
p._get_available_queue_number()
mock_ssh.SSH.from_node().execute.assert_called_with(
- "sudo ethtool -l eth0 | grep Combined | head -1 |" \
+ "sudo ethtool -l eth0 | grep Combined | head -1 |"
"awk '{printf $2}'")
def test_pktgen_unsuccessful_get_available_queue_number(self, mock_ssh):
@@ -290,7 +290,7 @@ class PktgenTestCase(unittest.TestCase):
p._get_usable_queue_number()
mock_ssh.SSH.from_node().execute.assert_called_with(
- "sudo ethtool -l eth0 | grep Combined | tail -1 |" \
+ "sudo ethtool -l eth0 | grep Combined | tail -1 |"
"awk '{printf $2}'")
def test_pktgen_unsuccessful_get_usable_queue_number(self, mock_ssh):
@@ -541,7 +541,7 @@ class PktgenTestCase(unittest.TestCase):
p._is_irqbalance_disabled = mock_result1
mock_result2 = mock.Mock()
- mock_result2.return_value = "virtio_net"
+ mock_result2.return_value = "virtio_net"
p._get_vnic_driver_name = mock_result2
mock_result3 = mock.Mock()
@@ -571,7 +571,7 @@ class PktgenTestCase(unittest.TestCase):
p._is_irqbalance_disabled = mock_result1
mock_result2 = mock.Mock()
- mock_result2.return_value = "virtio_net"
+ mock_result2.return_value = "virtio_net"
p._get_vnic_driver_name = mock_result2
mock_result3 = mock.Mock()
@@ -601,7 +601,7 @@ class PktgenTestCase(unittest.TestCase):
p._is_irqbalance_disabled = mock_result1
mock_result2 = mock.Mock()
- mock_result2.return_value = "ixgbevf"
+ mock_result2.return_value = "ixgbevf"
p._get_vnic_driver_name = mock_result2
p.multiqueue_setup()
@@ -623,7 +623,7 @@ class PktgenTestCase(unittest.TestCase):
p._is_irqbalance_disabled = mock_result1
mock_result2 = mock.Mock()
- mock_result2.return_value = "ixgbevf"
+ mock_result2.return_value = "ixgbevf"
p._get_vnic_driver_name = mock_result2
p.multiqueue_setup()
@@ -670,7 +670,7 @@ class PktgenTestCase(unittest.TestCase):
p.client = mock_ssh.SSH.from_node()
mock_result = mock.Mock()
- mock_result.return_value = "virtio_net"
+ mock_result.return_value = "virtio_net"
p._get_vnic_driver_name = mock_result
mock_result1 = mock.Mock()
diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py
index c32cc2878..078e72b8e 100644
--- a/tests/unit/network_services/traffic_profile/test_prox_profile.py
+++ b/tests/unit/network_services/traffic_profile/test_prox_profile.py
@@ -63,8 +63,9 @@ class TestProxProfile(unittest.TestCase):
}
profile = ProxProfile(tp_config)
- profile.init(234)
- self.assertEqual(profile.queue, 234)
+ queue = mock.Mock()
+ profile.init(queue)
+ self.assertIs(profile.queue, queue)
def test_execute_traffic(self):
packet_sizes = [
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 7898ae2bc..974fb21b3 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -46,6 +46,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..57903ebb9 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -47,6 +47,7 @@ def _execute_shell_command(command):
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
+ queue.cancel_join_thread()
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
@@ -61,6 +62,7 @@ def _single_action(seconds, command, queue):
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
+ queue.cancel_join_thread()
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
@@ -137,7 +139,9 @@ class Runner(object):
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
+ self.output_queue.cancel_join_thread()
self.result_queue = multiprocessing.Queue()
+ self.result_queue.cancel_join_thread()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index c2c6a8f19..6a09131e1 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -54,6 +59,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sla_action = scenario_cfg["sla"].get("action", "assert")
start = time.time()
+ timeout = start + duration
while True:
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -71,9 +77,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", assertion.args)
errors = assertion.args
- except Exception as e:
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
output_queue.put(result)
@@ -94,12 +102,19 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence += 1
- if (errors and sla_action is None) or \
- (time.time() - start > duration or aborted.is_set()):
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
LOG.info("Worker END")
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
class DurationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index afff27d75..2f5f7e4f4 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -33,6 +33,7 @@ LOG = logging.getLogger(__name__)
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
+ queue.cancel_join_thread()
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 50fe106bd..822e67723 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
index a6c1a28bd..871f13f88 100644
--- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
+++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
@@ -90,6 +90,7 @@ class BaseMonitor(multiprocessing.Process):
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
+ self._queue.cancel_join_thread()
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
index f2c9d40a7..e0027bbcb 100644
--- a/yardstick/network_services/nfvi/collectd.py
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -35,6 +35,7 @@ class AmqpConsumer(object):
self._consumer_tag = None
self._url = amqp_url
self._queue = queue
+ self._queue.cancel_join_thread()
def connect(self):
""" connect to amqp url """
diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py
index 896384d5e..170dfd96f 100644
--- a/yardstick/network_services/traffic_profile/prox_profile.py
+++ b/yardstick/network_services/traffic_profile/prox_profile.py
@@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile):
def init(self, queue):
self.pkt_size_iterator = iter(self.pkt_sizes)
self.queue = queue
+ self.queue.cancel_join_thread()
def bounds_iterator(self, logger=None):
if logger:
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index 02ae06170..ba4d44c41 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper):
return self._test_type
def run_traffic(self, traffic_profile):
+ self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
index 91530860e..5cf234514 100644
--- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper):
self._queue.put(samples)
def run_traffic(self, traffic_profile):
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
try: