summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--samples/background-task.yaml60
-rw-r--r--tests/unit/cmd/commands/test_task.py20
-rwxr-xr-xyardstick/benchmark/runners/arithmetic.py9
-rwxr-xr-xyardstick/benchmark/runners/base.py19
-rw-r--r--yardstick/benchmark/runners/duration.py9
-rwxr-xr-xyardstick/benchmark/runners/iteration.py9
-rw-r--r--yardstick/benchmark/runners/sequence.py8
-rwxr-xr-xyardstick/cmd/commands/task.py41
8 files changed, 157 insertions, 18 deletions
diff --git a/samples/background-task.yaml b/samples/background-task.yaml
new file mode 100644
index 000000000..f81867b33
--- /dev/null
+++ b/samples/background-task.yaml
@@ -0,0 +1,60 @@
+---
+# Sample benchmark task config file
+# Demonstrate use of background scenarios
+
+schema: "yardstick:task:0.1"
+
+scenarios:
+-
+ type: CPUload
+ options:
+ interval: 1
+
+ host: zeus.demo
+
+ # This scenario is run as a background scenario and runs
+ # in parallel with other scenarios.
+ #
+ # Background scenarios are started before normal scenarios
+ # and are terminated when all normal scenarios have ended.
+ #
+ # A background scenario does not need a runner section as it
+ # will always use an infinite duration runner that are terminated
+ # when all normal scenarios have completed.
+ #
+ run_in_background: true # default: false
+-
+ type: Iperf3
+ options:
+ host: zeus.demo
+ target: hera.demo
+
+ runner:
+ type: Duration
+ duration: 60
+
+ sla:
+ bytes_per_second: 2900000000
+ action: monitor
+
+context:
+ name: demo
+ image: yardstick-trusty-server
+ flavor: yardstick-flavor
+ user: ec2-user
+
+ placement_groups:
+ pgrp1:
+ policy: "availability"
+
+ servers:
+ zeus:
+ floating_ip: true
+ placement: "pgrp1"
+ hera:
+ floating_ip: true
+ placement: "pgrp1"
+
+ networks:
+ test:
+ cidr: '10.0.1.0/24'
diff --git a/tests/unit/cmd/commands/test_task.py b/tests/unit/cmd/commands/test_task.py
index 89813cb98..e785e99a9 100644
--- a/tests/unit/cmd/commands/test_task.py
+++ b/tests/unit/cmd/commands/test_task.py
@@ -36,3 +36,23 @@ class TaskCommandsTestCase(unittest.TestCase):
self.assertEqual(context_cfg["host"], server_info)
self.assertEqual(context_cfg["target"], server_info)
+
+ @mock.patch('yardstick.cmd.commands.task.Context')
+ @mock.patch('yardstick.cmd.commands.task.base_runner')
+ def test_run(self, mock_base_runner, mock_ctx):
+ scenario = \
+ {'host': 'athena.demo',
+ 'target': 'ares.demo',
+ 'runner':
+ {'duration': 60,
+ 'interval': 1,
+ 'type': 'Duration'
+ },
+ 'type': 'Ping'}
+
+ t = task.TaskCommands()
+ runner = mock.Mock()
+ runner.join.return_value = 0
+ mock_base_runner.Runner.get.return_value = runner
+ t._run([scenario], False, "yardstick.out")
+ self.assertTrue(runner.run.called)
diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py
index af2303479..4eab6643e 100755
--- a/yardstick/benchmark/runners/arithmetic.py
+++ b/yardstick/benchmark/runners/arithmetic.py
@@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
@@ -55,6 +56,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
for value in range(start, stop+margin, step):
+ if aborted.is_set():
+ break
+
options[arg_name] = value
LOG.debug("runner=%(runner)s seq=%(sequence)s START" %
@@ -133,5 +137,6 @@ class ArithmeticRunner(base.Runner):
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py
index d443806a7..9925ace2f 100755
--- a/yardstick/benchmark/runners/base.py
+++ b/yardstick/benchmark/runners/base.py
@@ -141,13 +141,20 @@ class Runner(object):
@staticmethod
def release(runner):
'''Release the runner'''
- Runner.runners.remove(runner)
+ if runner in Runner.runners:
+ Runner.runners.remove(runner)
# if this was the last runner, stop the output serializer subprocess
if len(Runner.runners) == 0:
Runner.release_dump_process()
@staticmethod
+ def terminate(runner):
+ '''Terminate the runner'''
+ if runner.process and runner.process.is_alive():
+ runner.process.terminate()
+
+ @staticmethod
def terminate_all():
'''Terminate all runners (subprocesses)'''
log.debug("Terminating all runners")
@@ -173,6 +180,7 @@ class Runner(object):
self.periodic_action_process = None
self.result_queue = queue
self.process = None
+ self.aborted = multiprocessing.Event()
Runner.runners.append(self)
def run_post_stop_action(self):
@@ -197,6 +205,7 @@ class Runner(object):
cls = getattr(module, path_split[-1])
self.config['object'] = class_name
+ self.aborted.clear()
# run a potentially configured pre-start action
if "pre-start-action" in self.config:
@@ -230,8 +239,12 @@ class Runner(object):
self._run_benchmark(cls, "run", scenario_cfg, context_cfg)
- def join(self):
- self.process.join()
+ def abort(self):
+ '''Abort the execution of a scenario'''
+ self.aborted.set()
+
+ def join(self, timeout=None):
+ self.process.join(timeout)
if self.periodic_action_process:
self.periodic_action_process.terminate()
self.periodic_action_process = None
diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py
index 40e0aa708..e2a21c23c 100644
--- a/yardstick/benchmark/runners/duration.py
+++ b/yardstick/benchmark/runners/duration.py
@@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
@@ -86,7 +87,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
sequence += 1
- if (errors and sla_action is None) or (time.time() - start > duration):
+ if (errors and sla_action is None) or \
+ (time.time() - start > duration or aborted.is_set()):
LOG.info("worker END")
break
@@ -113,5 +115,6 @@ If the scenario ends before the time has elapsed, it will be started again.
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py
index 077e0e813..e38ed3749 100755
--- a/yardstick/benchmark/runners/iteration.py
+++ b/yardstick/benchmark/runners/iteration.py
@@ -21,7 +21,8 @@ from yardstick.benchmark.runners import base
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
@@ -85,7 +86,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
sequence += 1
- if (errors and sla_action is None) or (sequence > iterations):
+ if (errors and sla_action is None) or \
+ (sequence > iterations or aborted.is_set()):
LOG.info("worker END")
break
@@ -112,5 +114,6 @@ If the scenario ends before the time has elapsed, it will be started again.
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py
index a410eea0e..47708fc5e 100644
--- a/yardstick/benchmark/runners/sequence.py
+++ b/yardstick/benchmark/runners/sequence.py
@@ -22,7 +22,8 @@ from yardstick.benchmark.runners import base
LOG = logging.getLogger(__name__)
-def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
+def _worker_process(queue, cls, method_name, scenario_cfg,
+ context_cfg, aborted):
sequence = 1
@@ -95,7 +96,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, context_cfg):
sequence += 1
- if errors:
+ if errors or aborted.is_set():
break
benchmark.teardown()
@@ -125,5 +126,6 @@ class SequenceRunner(base.Runner):
def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
self.process = multiprocessing.Process(
target=_worker_process,
- args=(self.result_queue, cls, method, scenario_cfg, context_cfg))
+ args=(self.result_queue, cls, method, scenario_cfg,
+ context_cfg, self.aborted))
self.process.start()
diff --git a/yardstick/cmd/commands/task.py b/yardstick/cmd/commands/task.py
index d6cd6984c..a56824aac 100755
--- a/yardstick/cmd/commands/task.py
+++ b/yardstick/cmd/commands/task.py
@@ -16,6 +16,7 @@ import atexit
import ipaddress
import time
import logging
+from itertools import ifilter
from yardstick.benchmark.contexts.base import Context
from yardstick.benchmark.runners import base as base_runner
from yardstick.common.task_template import TaskTemplate
@@ -108,11 +109,20 @@ class TaskCommands(object):
for context in Context.list:
context.deploy()
+ background_runners = []
+
+ # Start all background scenarios
+ for scenario in ifilter(_is_background_scenario, scenarios):
+ scenario["runner"] = dict(type="Duration", duration=1000000000)
+ runner = run_one_scenario(scenario, output_file)
+ background_runners.append(runner)
+
runners = []
if run_in_parallel:
for scenario in scenarios:
- runner = run_one_scenario(scenario, output_file)
- runners.append(runner)
+ if not _is_background_scenario(scenario):
+ runner = run_one_scenario(scenario, output_file)
+ runners.append(runner)
# Wait for runners to finish
for runner in runners:
@@ -121,9 +131,25 @@ class TaskCommands(object):
else:
# run serially
for scenario in scenarios:
- runner = run_one_scenario(scenario, output_file)
+ if not _is_background_scenario(scenario):
+ runner = run_one_scenario(scenario, output_file)
+ runner_join(runner)
+ print "Runner ended, output in", output_file
+
+ # Abort background runners
+ for runner in background_runners:
+ runner.abort()
+
+ # Wait for background runners to finish
+ for runner in background_runners:
+ if runner.join(timeout=60) is None:
+ # Nuke if it did not stop nicely
+ base_runner.Runner.terminate(runner)
runner_join(runner)
- print "Runner ended, output in", output_file
+ else:
+ base_runner.Runner.release(runner)
+ print "Background task ended"
+
# TODO: Move stuff below into TaskCommands class !?
@@ -280,6 +306,13 @@ def _is_same_heat_context(host_attr, target_attr):
return False
+def _is_background_scenario(scenario):
+ if "run_in_background" in scenario:
+ return scenario["run_in_background"]
+ else:
+ return False
+
+
def run_one_scenario(scenario_cfg, output_file):
'''run one scenario using context'''
runner_cfg = scenario_cfg["runner"]