aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-xyardstick/benchmark/runners/base.py33
1 files changed, 28 insertions, 5 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..3ecf67736 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -24,6 +24,9 @@ import subprocess
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
@@ -116,7 +119,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -205,9 +208,21 @@ class Runner(object):
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
@@ -217,11 +232,19 @@ class Runner(object):
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result