diff options
author | Ross Brattain <ross.b.brattain@intel.com> | 2017-10-01 13:16:58 -0700 |
---|---|---|
committer | Ross Brattain <ross.b.brattain@intel.com> | 2017-10-01 13:18:32 -0700 |
commit | 875906201593d19f78bc1108330b42121d095185 (patch) | |
tree | 2a605ed868c44082496afb31de18e11b4a8b6f11 | |
parent | 2cc3ef430319e1b3b4984737abc656349ae1bc5c (diff) |
NSB: cancel all queue join threads
In some cases we are blocking in base.Runner join() because the
queues are not empty
call cancel_join_thread to prevent the Queue from blocking the
Process exit
https://docs.python.org/3.3/library/multiprocessing.html#all-platforms
Joining processes that use queues
Bear in mind that a process that has put items in a queue will wait
before terminating until all the buffered items are fed by the
"feeder" thread to the underlying pipe. (The child process can call
the cancel_join_thread() method of the queue to avoid this behaviour.)
This means that whenever you use a queue you need to make sure that
all items which have been put on the queue will eventually be removed
before the process is joined. Otherwise you cannot be sure that
processes which have put items on the queue will terminate. Remember
also that non-daemonic processes will be joined automatically.
Warning
As mentioned above, if a child process has put items on a queue (and
it has not used JoinableQueue.cancel_join_thread), then that process
will not terminate until all buffered items have been flushed to the
pipe.
This means that if you try joining that process you may get a deadlock
unless you are sure that all items which have been put on the queue
have been consumed. Similarly, if the child process is non-daemonic
then the parent process may hang on exit when it tries to join all its
non-daemonic children.
cancel_join_thread()
Prevent join_thread() from blocking. In particular, this prevents the
background thread from being joined automatically when the process
exits – see join_thread().
A better name for this method might be allow_exit_without_flush(). It
is likely to cause enqueued data to lost, and you almost certainly
will not need to use it. It is really only there if you need the
current process to exit immediately without waiting to flush enqueued
data to the underlying pipe, and you don’t care about lost data.
Change-Id: I61f11a3b01109d96b7a5445c60f1e171401157fc
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
5 files changed, 9 insertions, 2 deletions
diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py index c32cc2878..078e72b8e 100644 --- a/tests/unit/network_services/traffic_profile/test_prox_profile.py +++ b/tests/unit/network_services/traffic_profile/test_prox_profile.py @@ -63,8 +63,9 @@ class TestProxProfile(unittest.TestCase): } profile = ProxProfile(tp_config) - profile.init(234) - self.assertEqual(profile.queue, 234) + queue = mock.Mock() + profile.init(queue) + self.assertIs(profile.queue, queue) def test_execute_traffic(self): packet_sizes = [ diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py index f2c9d40a7..e0027bbcb 100644 --- a/yardstick/network_services/nfvi/collectd.py +++ b/yardstick/network_services/nfvi/collectd.py @@ -35,6 +35,7 @@ class AmqpConsumer(object): self._consumer_tag = None self._url = amqp_url self._queue = queue + self._queue.cancel_join_thread() def connect(self): """ connect to amqp url """ diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py index 896384d5e..170dfd96f 100644 --- a/yardstick/network_services/traffic_profile/prox_profile.py +++ b/yardstick/network_services/traffic_profile/prox_profile.py @@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile): def init(self, queue): self.pkt_size_iterator = iter(self.pkt_sizes) self.queue = queue + self.queue.cancel_join_thread() def bounds_iterator(self, logger=None): if logger: diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 02ae06170..ba4d44c41 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper): return self._test_type def run_traffic(self, traffic_profile): + self._queue.cancel_join_thread() self.lower = 0.0 self.upper = 100.0 diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py index 91530860e..5cf234514 100644 --- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py @@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper): self._queue.put(samples) def run_traffic(self, traffic_profile): + # if we don't do this we can hang waiting for the queue to drain + # have to do this in the subprocess + self._queue.cancel_join_thread() # fixme: fix passing correct trex config file, # instead of searching the default path try: |