diff options
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 37 |
1 files changed, 28 insertions, 9 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index 57903ebb9..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -47,7 +50,6 @@ def _execute_shell_command(command): def _single_action(seconds, command, queue): """entrypoint for the single action process""" - queue.cancel_join_thread() log.debug("single action, fires after %d seconds (from now)", seconds) time.sleep(seconds) log.debug("single action: executing command: '%s'", command) @@ -62,7 +64,6 @@ def _single_action(seconds, command, queue): def _periodic_action(interval, command, queue): """entrypoint for the periodic action process""" - queue.cancel_join_thread() log.debug("periodic action, fires every: %d seconds", interval) time_spent = 0 while True: @@ -118,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -139,9 +140,7 @@ class Runner(object): self.config = config self.periodic_action_process = None self.output_queue = multiprocessing.Queue() - self.output_queue.cancel_join_thread() self.result_queue = multiprocessing.Queue() - self.result_queue.cancel_join_thread() self.process = None self.aborted = multiprocessing.Event() Runner.runners.append(self) @@ -209,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -221,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result |