summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRoss Brattain <ross.b.brattain@intel.com>2017-10-01 13:14:33 -0700
committerRoss Brattain <ross.b.brattain@intel.com>2017-10-01 13:18:25 -0700
commit7eb6abb6931b24e085b139cc3500f4497cdde57d (patch)
treef510815d2cc03c2098b44dfdfffea64702dafbe5
parent2cc3ef430319e1b3b4984737abc656349ae1bc5c (diff)
duration runner: add teardown and cancel all queue join threads
calculate timeout once catch exceptions in benchmark.teardown() In some cases we are blocking in base.Runner join() because the queues are not empty call cancel_join_thread to prevent the Queue from blocking the Process exit https://docs.python.org/3.3/library/multiprocessing.html#all-platforms Joining processes that use queues Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the "feeder" thread to the underlying pipe. (The child process can call the cancel_join_thread() method of the queue to avoid this behaviour.) This means that whenever you use a queue you need to make sure that all items which have been put on the queue will eventually be removed before the process is joined. Otherwise you cannot be sure that processes which have put items on the queue will terminate. Remember also that non-daemonic processes will be joined automatically. Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children. cancel_join_thread() Prevent join_thread() from blocking. In particular, this prevents the background thread from being joined automatically when the process exits – see join_thread(). A better name for this method might be allow_exit_without_flush(). It is likely to cause enqueued data to lost, and you almost certainly will not need to use it. It is really only there if you need the current process to exit immediately without waiting to flush enqueued data to the underlying pipe, and you don’t care about lost data. Change-Id: If7b904a060b9ed68b7def78c851deefca4e0de5d Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
-rw-r--r--yardstick/benchmark/runners/duration.py25
1 files changed, 20 insertions, 5 deletions
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index c2c6a8f19..6a09131e1 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -54,6 +59,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sla_action = scenario_cfg["sla"].get("action", "assert")
start = time.time()
+ timeout = start + duration
while True:
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -71,9 +77,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", assertion.args)
errors = assertion.args
- except Exception as e:
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
output_queue.put(result)
@@ -94,12 +102,19 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence += 1
- if (errors and sla_action is None) or \
- (time.time() - start > duration or aborted.is_set()):
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
LOG.info("Worker END")
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
class DurationRunner(base.Runner):