summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py5
-rwxr-xr-xyardstick/benchmark/runners/base.py4
-rw-r--r--yardstick/benchmark/runners/duration.py25
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py1
-rw-r--r--yardstick/benchmark/runners/iteration.py5
-rw-r--r--yardstick/benchmark/scenarios/availability/monitor/basemonitor.py1
6 files changed, 36 insertions, 5 deletions
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 7898ae2bc..974fb21b3 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -46,6 +46,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..57903ebb9 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -47,6 +47,7 @@ def _execute_shell_command(command):
def _single_action(seconds, command, queue):
"""entrypoint for the single action process"""
+ queue.cancel_join_thread()
log.debug("single action, fires after %d seconds (from now)", seconds)
time.sleep(seconds)
log.debug("single action: executing command: '%s'", command)
@@ -61,6 +62,7 @@ def _single_action(seconds, command, queue):
def _periodic_action(interval, command, queue):
"""entrypoint for the periodic action process"""
+ queue.cancel_join_thread()
log.debug("periodic action, fires every: %d seconds", interval)
time_spent = 0
while True:
@@ -137,7 +139,9 @@ class Runner(object):
self.config = config
self.periodic_action_process = None
self.output_queue = multiprocessing.Queue()
+ self.output_queue.cancel_join_thread()
self.result_queue = multiprocessing.Queue()
+ self.result_queue.cancel_join_thread()
self.process = None
self.aborted = multiprocessing.Event()
Runner.runners.append(self)
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index c2c6a8f19..6a09131e1 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
@@ -54,6 +59,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sla_action = scenario_cfg["sla"].get("action", "assert")
start = time.time()
+ timeout = start + duration
while True:
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -71,9 +77,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", assertion.args)
errors = assertion.args
- except Exception as e:
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
output_queue.put(result)
@@ -94,12 +102,19 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence += 1
- if (errors and sla_action is None) or \
- (time.time() - start > duration or aborted.is_set()):
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
LOG.info("Worker END")
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
class DurationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index afff27d75..2f5f7e4f4 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -33,6 +33,7 @@ LOG = logging.getLogger(__name__)
def _worker_process(queue, cls, method_name, scenario_cfg,
context_cfg, aborted): # pragma: no cover
+ queue.cancel_join_thread()
runner_cfg = scenario_cfg['runner']
iterations = runner_cfg.get("iterations", 1)
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 50fe106bd..822e67723 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -36,6 +36,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence = 1
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ queue.cancel_join_thread()
+ output_queue.cancel_join_thread()
+
runner_cfg = scenario_cfg['runner']
interval = runner_cfg.get("interval", 1)
diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
index a6c1a28bd..871f13f88 100644
--- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
+++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py
@@ -90,6 +90,7 @@ class BaseMonitor(multiprocessing.Process):
self._config = config
self._context = context
self._queue = multiprocessing.Queue()
+ self._queue.cancel_join_thread()
self._event = multiprocessing.Event()
self.monitor_data = data
self.setup_done = False