aboutsummaryrefslogtreecommitdiffstats
path: root/yardstick
diff options
context:
space:
mode:
Diffstat (limited to 'yardstick')
-rw-r--r--yardstick/benchmark/core/task.py31
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py12
-rwxr-xr-xyardstick/benchmark/runners/base.py33
-rw-r--r--yardstick/benchmark/runners/duration.py23
-rwxr-xr-xyardstick/benchmark/runners/dynamictp.py12
-rw-r--r--yardstick/benchmark/runners/iteration.py19
-rw-r--r--yardstick/benchmark/runners/sequence.py12
-rw-r--r--yardstick/network_services/nfvi/collectd.py1
-rw-r--r--yardstick/network_services/traffic_profile/prox_profile.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/prox_helpers.py1
-rw-r--r--yardstick/network_services/vnf_generic/vnf/sample_vnf.py3
11 files changed, 117 insertions, 31 deletions
diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py
index c175a950b..53298d8d3 100644
--- a/yardstick/benchmark/core/task.py
+++ b/yardstick/benchmark/core/task.py
@@ -260,25 +260,23 @@ class Task(object): # pragma: no cover
# Wait for runners to finish
for runner in runners:
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
else:
# run serially
for scenario in scenarios:
if not _is_background_scenario(scenario):
runner = self.run_one_scenario(scenario, output_file)
- status = runner_join(runner)
+ status = runner_join(runner, self.outputs, result)
if status != 0:
LOG.error('Scenario NO.%s: "%s" ERROR!',
scenarios.index(scenario) + 1,
scenario.get('type'))
- raise RuntimeError
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
+ raise RuntimeError(
+ "{0} runner status {1}".format(runner.__execution_type__, status))
LOG.info("Runner ended, output in %s", output_file)
# Abort background runners
@@ -287,15 +285,13 @@ class Task(object): # pragma: no cover
# Wait for background runners to finish
for runner in background_runners:
- status = runner.join(JOIN_TIMEOUT)
+ status = runner.join(self.outputs, result, JOIN_TIMEOUT)
if status is None:
# Nuke if it did not stop nicely
base_runner.Runner.terminate(runner)
- runner.join(JOIN_TIMEOUT)
+ runner.join(self.outputs, result, JOIN_TIMEOUT)
base_runner.Runner.release(runner)
- self.outputs.update(runner.get_output())
- result.extend(runner.get_result())
print("Background task ended")
return result
@@ -645,9 +641,14 @@ def get_networks_from_nodes(nodes):
return networks
-def runner_join(runner):
- """join (wait for) a runner, exit process at runner failure"""
- status = runner.join()
+def runner_join(runner, outputs, result):
+ """join (wait for) a runner, exit process at runner failure
+ :param outputs:
+ :type outputs: dict
+ :param result:
+ :type result: list
+ """
+ status = runner.join(outputs, result)
base_runner.Runner.release(runner)
return status
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index 7898ae2bc..3ff064ae1 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -138,8 +138,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if errors and sla_action is None:
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class ArithmeticRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index a69811f8a..3ecf67736 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -24,6 +24,9 @@ import subprocess
import time
import traceback
+
+from six.moves.queue import Empty
+
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios import base as base_scenario
@@ -116,7 +119,7 @@ class Runner(object):
@staticmethod
def terminate_all():
"""Terminate all runners (subprocesses)"""
- log.debug("Terminating all runners")
+ log.debug("Terminating all runners", exc_info=True)
# release dumper process as some errors before any runner is created
if not Runner.runners:
@@ -205,9 +208,21 @@ class Runner(object):
"""Abort the execution of a scenario"""
self.aborted.set()
- def join(self, timeout=None):
- self.process.join(timeout)
+ QUEUE_JOIN_INTERVAL = 5
+
+ def join(self, outputs, result, interval=QUEUE_JOIN_INTERVAL):
+ while self.process.exitcode is None:
+ # drain the queue while we are running otherwise we won't terminate
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+ self.process.join(interval)
+ # drain after the process has exited
+ outputs.update(self.get_output())
+ result.extend(self.get_result())
+
+ self.process.terminate()
if self.periodic_action_process:
+ self.periodic_action_process.join(1)
self.periodic_action_process.terminate()
self.periodic_action_process = None
@@ -217,11 +232,19 @@ class Runner(object):
def get_output(self):
result = {}
while not self.output_queue.empty():
- result.update(self.output_queue.get())
+ log.debug("output_queue size %s", self.output_queue.qsize())
+ try:
+ result.update(self.output_queue.get(True, 1))
+ except Empty:
+ pass
return result
def get_result(self):
result = []
while not self.result_queue.empty():
- result.append(self.result_queue.get())
+ log.debug("result_queue size %s", self.result_queue.qsize())
+ try:
+ result.append(self.result_queue.get(True, 1))
+ except Empty:
+ pass
return result
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index c2c6a8f19..75942766d 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -54,6 +54,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sla_action = scenario_cfg["sla"].get("action", "assert")
start = time.time()
+ timeout = start + duration
while True:
LOG.debug("runner=%(runner)s seq=%(sequence)s START",
@@ -71,9 +72,11 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
elif sla_action == "monitor":
LOG.warning("SLA validation failed: %s", assertion.args)
errors = assertion.args
- except Exception as e:
+ # catch all exceptions because with multiprocessing we can have un-picklable exception
+ # problems https://bugs.python.org/issue9400
+ except Exception:
errors = traceback.format_exc()
- LOG.exception(e)
+ LOG.exception("")
else:
if result:
output_queue.put(result)
@@ -94,12 +97,22 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
sequence += 1
- if (errors and sla_action is None) or \
- (time.time() - start > duration or aborted.is_set()):
+ if (errors and sla_action is None) or time.time() > timeout or aborted.is_set():
LOG.info("Worker END")
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class DurationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py
index afff27d75..01e76c6f4 100755
--- a/yardstick/benchmark/runners/dynamictp.py
+++ b/yardstick/benchmark/runners/dynamictp.py
@@ -141,7 +141,17 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.debug("iterator: %s iterations: %s", iterator, iterations)
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 50fe106bd..4a7439588 100644
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -90,7 +90,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.exception(e)
else:
if result:
- output_queue.put(result)
+ LOG.debug("output_queue.put %s", result)
+ output_queue.put(result, True, 1)
time.sleep(interval)
@@ -101,7 +102,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
'errors': errors
}
- queue.put(benchmark_output)
+ LOG.debug("queue.put, %s", benchmark_output)
+ queue.put(benchmark_output, True, 1)
LOG.debug("runner=%(runner)s seq=%(sequence)s END",
{"runner": runner_cfg["runner_id"],
@@ -114,7 +116,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
LOG.info("worker END")
break
if "teardown" in run_step:
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
+
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class IterationRunner(base.Runner):
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index 68e272c57..f08ca5dde 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -105,8 +105,18 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
if (errors and sla_action is None) or aborted.is_set():
break
- benchmark.teardown()
+ try:
+ benchmark.teardown()
+ except Exception:
+ # catch any exception in teardown and convert to simple exception
+ # never pass exceptions back to multiprocessing, because some exceptions can
+ # be unpicklable
+ # https://bugs.python.org/issue9400
+ LOG.exception("")
+ raise SystemExit(1)
LOG.info("worker END")
+ LOG.debug("queue.qsize() = %s", queue.qsize())
+ LOG.debug("output_queue.qsize() = %s", output_queue.qsize())
class SequenceRunner(base.Runner):
diff --git a/yardstick/network_services/nfvi/collectd.py b/yardstick/network_services/nfvi/collectd.py
index f2c9d40a7..e0027bbcb 100644
--- a/yardstick/network_services/nfvi/collectd.py
+++ b/yardstick/network_services/nfvi/collectd.py
@@ -35,6 +35,7 @@ class AmqpConsumer(object):
self._consumer_tag = None
self._url = amqp_url
self._queue = queue
+ self._queue.cancel_join_thread()
def connect(self):
""" connect to amqp url """
diff --git a/yardstick/network_services/traffic_profile/prox_profile.py b/yardstick/network_services/traffic_profile/prox_profile.py
index 896384d5e..170dfd96f 100644
--- a/yardstick/network_services/traffic_profile/prox_profile.py
+++ b/yardstick/network_services/traffic_profile/prox_profile.py
@@ -67,6 +67,7 @@ class ProxProfile(TrafficProfile):
def init(self, queue):
self.pkt_size_iterator = iter(self.pkt_sizes)
self.queue = queue
+ self.queue.cancel_join_thread()
def bounds_iterator(self, logger=None):
if logger:
diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
index 02ae06170..ba4d44c41 100644
--- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
+++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py
@@ -878,6 +878,7 @@ class ProxResourceHelper(ClientResourceHelper):
return self._test_type
def run_traffic(self, traffic_profile):
+ self._queue.cancel_join_thread()
self.lower = 0.0
self.upper = 100.0
diff --git a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
index 91530860e..5cf234514 100644
--- a/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
+++ b/yardstick/network_services/vnf_generic/vnf/sample_vnf.py
@@ -476,6 +476,9 @@ class ClientResourceHelper(ResourceHelper):
self._queue.put(samples)
def run_traffic(self, traffic_profile):
+ # if we don't do this we can hang waiting for the queue to drain
+ # have to do this in the subprocess
+ self._queue.cancel_join_thread()
# fixme: fix passing correct trex config file,
# instead of searching the default path
try: