From 05c1840c9c4dda154c9c5d00ff3cd23ba202330b Mon Sep 17 00:00:00 2001 From: Jo¶rgen Karlsson Date: Fri, 11 Dec 2015 15:50:22 +0100 Subject: Add run_in_background attribute to scenarios MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds the possibility to run scenarios as "background tasks". Background scenarios/tasks: - are started before all "normal scenarios" - runs in parallel with "normal scenarios" - terminates when all "normal scenarios" have completed their tasks They are intended as a way to perform background tasks, e.g. collect data such as cpuload etc, in parallel with the execution of normal benchmarking scenarios. Note that we already have the 'run_in_parallel' attribute but this attribute has a couple of issues and do not solve all the uses cases. Change-Id: I9c5230bfdbbb66030f57b658ce1db87ff2c2d62b Signed-off-by: Jo¶rgen Karlsson --- yardstick/benchmark/runners/arithmetic.py | 9 +++++++-- yardstick/benchmark/runners/base.py | 19 ++++++++++++++++--- yardstick/benchmark/runners/duration.py | 9 ++++++--- yardstick/benchmark/runners/iteration.py | 9 ++++++--- yardstick/benchmark/runners/sequence.py | 8 +++++--- 5 files changed, 40 insertions(+), 14 deletions(-) (limited to 'yardstick/benchmark') diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index af2303479..4eab6643e 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -55,6 +56,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): for value in range(start, stop+margin, step): + if aborted.is_set(): + break + options[arg_name] = value LOG.debug("runner=%(runner)s seq=%(sequence)s START" % @@ -133,5 +137,6 @@ class ArithmeticRunner(base.Runner): def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index d443806a7..9925ace2f 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -141,12 +141,19 @@ class Runner(object): @staticmethod def release(runner): '''Release the runner''' - Runner.runners.remove(runner) + if runner in Runner.runners: + Runner.runners.remove(runner) # if this was the last runner, stop the output serializer subprocess if len(Runner.runners) == 0: Runner.release_dump_process() + @staticmethod + def terminate(runner): + '''Terminate the runner''' + if runner.process and runner.process.is_alive(): + runner.process.terminate() + @staticmethod def terminate_all(): '''Terminate all runners (subprocesses)''' @@ -173,6 +180,7 @@ class Runner(object): self.periodic_action_process = None self.result_queue = queue self.process = None + self.aborted = multiprocessing.Event() Runner.runners.append(self) def run_post_stop_action(self): @@ -197,6 +205,7 @@ class Runner(object): cls = getattr(module, path_split[-1]) self.config['object'] = class_name + self.aborted.clear() # run a potentially configured pre-start action if "pre-start-action" in self.config: @@ -230,8 +239,12 @@ class Runner(object): self._run_benchmark(cls, "run", scenario_cfg, context_cfg) - def join(self): - self.process.join() + def abort(self): + '''Abort the execution of a scenario''' + self.aborted.set() + + def join(self, timeout=None): + self.process.join(timeout) if self.periodic_action_process: self.periodic_action_process.terminate() self.periodic_action_process = None diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 40e0aa708..e2a21c23c 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -86,7 +87,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if (errors and sla_action is None) or (time.time() - start > duration): + if (errors and sla_action is None) or \ + (time.time() - start > duration or aborted.is_set()): LOG.info("worker END") break @@ -113,5 +115,6 @@ If the scenario ends before the time has elapsed, it will be started again. def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 077e0e813..e38ed3749 100755 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -85,7 +86,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if (errors and sla_action is None) or (sequence > iterations): + if (errors and sla_action is None) or \ + (sequence > iterations or aborted.is_set()): LOG.info("worker END") break @@ -112,5 +114,6 @@ If the scenario ends before the time has elapsed, it will be started again. def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index a410eea0e..47708fc5e 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -95,7 +96,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if errors: + if errors or aborted.is_set(): break benchmark.teardown() @@ -125,5 +126,6 @@ class SequenceRunner(base.Runner): def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() -- cgit 1.2.3-korg