diff options
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 33 |
1 files changed, 28 insertions, 5 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a69811f8a..3ecf67736 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -24,6 +24,9 @@ import subprocess import time import traceback + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -116,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -205,9 +208,21 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): - self.process.join(timeout) + QUEUE_JOIN_INTERVAL = 5 + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -217,11 +232,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result |