From 05c1840c9c4dda154c9c5d00ff3cd23ba202330b Mon Sep 17 00:00:00 2001 From: Jo¶rgen Karlsson Date: Fri, 11 Dec 2015 15:50:22 +0100 Subject: Add run_in_background attribute to scenarios MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change adds the possibility to run scenarios as "background tasks". Background scenarios/tasks: - are started before all "normal scenarios" - runs in parallel with "normal scenarios" - terminates when all "normal scenarios" have completed their tasks They are intended as a way to perform background tasks, e.g. collect data such as cpuload etc, in parallel with the execution of normal benchmarking scenarios. Note that we already have the 'run_in_parallel' attribute but this attribute has a couple of issues and do not solve all the uses cases. Change-Id: I9c5230bfdbbb66030f57b658ce1db87ff2c2d62b Signed-off-by: Jo¶rgen Karlsson --- samples/background-task.yaml | 60 +++++++++++++++++++++++++++++++ tests/unit/cmd/commands/test_task.py | 20 +++++++++++ yardstick/benchmark/runners/arithmetic.py | 9 +++-- yardstick/benchmark/runners/base.py | 19 ++++++++-- yardstick/benchmark/runners/duration.py | 9 +++-- yardstick/benchmark/runners/iteration.py | 9 +++-- yardstick/benchmark/runners/sequence.py | 8 +++-- yardstick/cmd/commands/task.py | 41 ++++++++++++++++++--- 8 files changed, 157 insertions(+), 18 deletions(-) create mode 100644 samples/background-task.yaml diff --git a/samples/background-task.yaml b/samples/background-task.yaml new file mode 100644 index 000000000..f81867b33 --- /dev/null +++ b/samples/background-task.yaml @@ -0,0 +1,60 @@ +--- +# Sample benchmark task config file +# Demonstrate use of background scenarios + +schema: "yardstick:task:0.1" + +scenarios: +- + type: CPUload + options: + interval: 1 + + host: zeus.demo + + # This scenario is run as a background scenario and runs + # in parallel with other scenarios. + # + # Background scenarios are started before normal scenarios + # and are terminated when all normal scenarios have ended. + # + # A background scenario does not need a runner section as it + # will always use an infinite duration runner that are terminated + # when all normal scenarios have completed. + # + run_in_background: true # default: false +- + type: Iperf3 + options: + host: zeus.demo + target: hera.demo + + runner: + type: Duration + duration: 60 + + sla: + bytes_per_second: 2900000000 + action: monitor + +context: + name: demo + image: yardstick-trusty-server + flavor: yardstick-flavor + user: ec2-user + + placement_groups: + pgrp1: + policy: "availability" + + servers: + zeus: + floating_ip: true + placement: "pgrp1" + hera: + floating_ip: true + placement: "pgrp1" + + networks: + test: + cidr: '10.0.1.0/24' diff --git a/tests/unit/cmd/commands/test_task.py b/tests/unit/cmd/commands/test_task.py index 89813cb98..e785e99a9 100644 --- a/tests/unit/cmd/commands/test_task.py +++ b/tests/unit/cmd/commands/test_task.py @@ -36,3 +36,23 @@ class TaskCommandsTestCase(unittest.TestCase): self.assertEqual(context_cfg["host"], server_info) self.assertEqual(context_cfg["target"], server_info) + + @mock.patch('yardstick.cmd.commands.task.Context') + @mock.patch('yardstick.cmd.commands.task.base_runner') + def test_run(self, mock_base_runner, mock_ctx): + scenario = \ + {'host': 'athena.demo', + 'target': 'ares.demo', + 'runner': + {'duration': 60, + 'interval': 1, + 'type': 'Duration' + }, + 'type': 'Ping'} + + t = task.TaskCommands() + runner = mock.Mock() + runner.join.return_value = 0 + mock_base_runner.Runner.get.return_value = runner + t._run([scenario], False, "yardstick.out") + self.assertTrue(runner.run.called) diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index af2303479..4eab6643e 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -55,6 +56,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): for value in range(start, stop+margin, step): + if aborted.is_set(): + break + options[arg_name] = value LOG.debug("runner=%(runner)s seq=%(sequence)s START" % @@ -133,5 +137,6 @@ class ArithmeticRunner(base.Runner): def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index d443806a7..9925ace2f 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -141,12 +141,19 @@ class Runner(object): @staticmethod def release(runner): '''Release the runner''' - Runner.runners.remove(runner) + if runner in Runner.runners: + Runner.runners.remove(runner) # if this was the last runner, stop the output serializer subprocess if len(Runner.runners) == 0: Runner.release_dump_process() + @staticmethod + def terminate(runner): + '''Terminate the runner''' + if runner.process and runner.process.is_alive(): + runner.process.terminate() + @staticmethod def terminate_all(): '''Terminate all runners (subprocesses)''' @@ -173,6 +180,7 @@ class Runner(object): self.periodic_action_process = None self.result_queue = queue self.process = None + self.aborted = multiprocessing.Event() Runner.runners.append(self) def run_post_stop_action(self): @@ -197,6 +205,7 @@ class Runner(object): cls = getattr(module, path_split[-1]) self.config['object'] = class_name + self.aborted.clear() # run a potentially configured pre-start action if "pre-start-action" in self.config: @@ -230,8 +239,12 @@ class Runner(object): self._run_benchmark(cls, "run", scenario_cfg, context_cfg) - def join(self): - self.process.join() + def abort(self): + '''Abort the execution of a scenario''' + self.aborted.set() + + def join(self, timeout=None): + self.process.join(timeout) if self.periodic_action_process: self.periodic_action_process.terminate() self.periodic_action_process = None diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 40e0aa708..e2a21c23c 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -86,7 +87,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if (errors and sla_action is None) or (time.time() - start > duration): + if (errors and sla_action is None) or \ + (time.time() - start > duration or aborted.is_set()): LOG.info("worker END") break @@ -113,5 +115,6 @@ If the scenario ends before the time has elapsed, it will be started again. def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 077e0e813..e38ed3749 100755 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -85,7 +86,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if (errors and sla_action is None) or (sequence > iterations): + if (errors and sla_action is None) or \ + (sequence > iterations or aborted.is_set()): LOG.info("worker END") break @@ -112,5 +114,6 @@ If the scenario ends before the time has elapsed, it will be started again. def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index a410eea0e..47708fc5e 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base LOG = logging.getLogger(__name__) -def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): sequence = 1 @@ -95,7 +96,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg): sequence += 1 - if errors: + if errors or aborted.is_set(): break benchmark.teardown() @@ -125,5 +126,6 @@ class SequenceRunner(base.Runner): def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): self.process = multiprocessing.Process( target=_worker_process, - args=(self.result_queue, cls, method, scenario_cfg, context_cfg)) + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) self.process.start() diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py index d6cd6984c..a56824aac 100755 --- a/yardstick/cmd/commands/task.py +++ b/yardstick/cmd/commands/task.py @@ -16,6 +16,7 @@ import atexit import ipaddress import time import logging +from itertools import ifilter from yardstick.benchmark.contexts.base import Context from yardstick.benchmark.runners import base as base_runner from yardstick.common.task_template import TaskTemplate @@ -108,11 +109,20 @@ class TaskCommands(object): for context in Context.list: context.deploy() + background_runners = [] + + # Start all background scenarios + for scenario in ifilter(_is_background_scenario, scenarios): + scenario["runner"] = dict(type="Duration", duration=1000000000) + runner = run_one_scenario(scenario, output_file) + background_runners.append(runner) + runners = [] if run_in_parallel: for scenario in scenarios: - runner = run_one_scenario(scenario, output_file) - runners.append(runner) + if not _is_background_scenario(scenario): + runner = run_one_scenario(scenario, output_file) + runners.append(runner) # Wait for runners to finish for runner in runners: @@ -121,9 +131,25 @@ class TaskCommands(object): else: # run serially for scenario in scenarios: - runner = run_one_scenario(scenario, output_file) + if not _is_background_scenario(scenario): + runner = run_one_scenario(scenario, output_file) + runner_join(runner) + print "Runner ended, output in", output_file + + # Abort background runners + for runner in background_runners: + runner.abort() + + # Wait for background runners to finish + for runner in background_runners: + if runner.join(timeout=60) is None: + # Nuke if it did not stop nicely + base_runner.Runner.terminate(runner) runner_join(runner) - print "Runner ended, output in", output_file + else: + base_runner.Runner.release(runner) + print "Background task ended" + # TODO: Move stuff below into TaskCommands class !? @@ -280,6 +306,13 @@ def _is_same_heat_context(host_attr, target_attr): return False +def _is_background_scenario(scenario): + if "run_in_background" in scenario: + return scenario["run_in_background"] + else: + return False + + def run_one_scenario(scenario_cfg, output_file): '''run one scenario using context''' runner_cfg = scenario_cfg["runner"] -- cgit 1.2.3-korg