diff options
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-x | yardstick/benchmark/runners/base.py | 39 |
1 files changed, 33 insertions, 6 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index a69811f8a..a887fa5b3 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -17,13 +17,17 @@ # rally/rally/benchmark/runners/base.py from __future__ import absolute_import -import importlib + import logging import multiprocessing import subprocess import time import traceback +import importlib + +from six.moves.queue import Empty + import yardstick.common.utils as utils from yardstick.benchmark.scenarios import base as base_scenario @@ -33,7 +37,6 @@ log = logging.getLogger(__name__) def _execute_shell_command(command): """execute shell script with error handling""" exitcode = 0 - output = [] try: output = subprocess.check_output(command, shell=True) except Exception: @@ -116,7 +119,7 @@ class Runner(object): @staticmethod def terminate_all(): """Terminate all runners (subprocesses)""" - log.debug("Terminating all runners") + log.debug("Terminating all runners", exc_info=True) # release dumper process as some errors before any runner is created if not Runner.runners: @@ -205,9 +208,25 @@ class Runner(object): """Abort the execution of a scenario""" self.aborted.set() - def join(self, timeout=None): + QUEUE_JOIN_INTERVAL = 5 + + def poll(self, timeout=QUEUE_JOIN_INTERVAL): self.process.join(timeout) + return self.process.exitcode + + def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL): + while self.process.exitcode is None: + # drain the queue while we are running otherwise we won't terminate + outputs.update(self.get_output()) + result.extend(self.get_result()) + self.process.join(interval) + # drain after the process has exited + outputs.update(self.get_output()) + result.extend(self.get_result()) + + self.process.terminate() if self.periodic_action_process: + self.periodic_action_process.join(1) self.periodic_action_process.terminate() self.periodic_action_process = None @@ -217,11 +236,19 @@ class Runner(object): def get_output(self): result = {} while not self.output_queue.empty(): - result.update(self.output_queue.get()) + log.debug("output_queue size %s", self.output_queue.qsize()) + try: + result.update(self.output_queue.get(True, 1)) + except Empty: + pass return result def get_result(self): result = [] while not self.result_queue.empty(): - result.append(self.result_queue.get()) + log.debug("result_queue size %s", self.result_queue.qsize()) + try: + result.append(self.result_queue.get(True, 1)) + except Empty: + pass return result |