aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick/benchmark/runners/base.py
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick/benchmark/runners/base.py')
-rwxr-xr-xyardstick/benchmark/runners/base.py39
1 files changed, 33 insertions, 6 deletions
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..a887fa5b3 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -17,13 +17,17 @@
# rally/rally/benchmark/runners/base.py
from __future__ import absolute_import
-import importlib
+
import logging
import multiprocessing
import subprocess
import time
import traceback
+import importlib
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
@@ -33,7 +37,6 @@ log = logging.getLogger(__name__)
def _execute_shell_command(command):
"""execute shell script with error handling"""
exitcode = 0
- output = []
try:
output = subprocess.check_output(command, shell=True)
except Exception:
@@ -116,7 +119,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -205,9 +208,25 @@ class Runner(object):
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
+ QUEUE_JOIN_INTERVAL = 5
+
+ def poll(self, timeout=QUEUE_JOIN_INTERVAL):
self.process.join(timeout)
+ return self.process.exitcode
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
@@ -217,11 +236,19 @@ class Runner(object):
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result