diff options
24 files changed, 353 insertions, 49 deletions
@@ -2,8 +2,8 @@ Project: Test framework for verifying infrastructure compliance (yardstick) Project Creation Date: April 28th, 2015 Project Category: Integration & Testing Lifecycle State: Incubation -Primary Contact: jean.gaoliang@huawei.com -Project Lead: jean.gaoliang@huawei.com +Primary Contact: ross.b.brattain@intel.com +Project Lead: ross.b.brattain@intel.com Jira Project Name: Infrastructure Verification Jira Project Prefix: Yardstick Mailing list tag: [Yardstick] diff --git a/tests/unit/benchmark/core/test_task.py b/tests/unit/benchmark/core/test_task.py index cd7ffdebb..8034392f4 100644 --- a/tests/unit/benchmark/core/test_task.py +++ b/tests/unit/benchmark/core/test_task.py @@ -64,6 +64,7 @@ class TaskTestCase(unittest.TestCase): t = task.Task() runner = mock.Mock() runner.join.return_value = 0 + runner.get_output.return_value = {} mock_base_runner.Runner.get.return_value = runner t._run([scenario], False, "yardstick.out") self.assertTrue(runner.run.called) @@ -155,6 +156,33 @@ class TaskTestCase(unittest.TestCase): self.assertEqual(task_args_fnames[0], None) self.assertEqual(task_args_fnames[1], None) + def test_parse_options(self): + options = { + 'openstack': { + 'EXTERNAL_NETWORK': '$network' + }, + 'ndoes': ['node1', '$node'], + 'host': '$host' + } + + t = task.Task() + t.outputs = { + 'network': 'ext-net', + 'node': 'node2', + 'host': 'server.yardstick' + } + + idle_result = { + 'openstack': { + 'EXTERNAL_NETWORK': 'ext-net' + }, + 'ndoes': ['node1', 'node2'], + 'host': 'server.yardstick' + } + + actual_result = t._parse_options(options) + self.assertEqual(idle_result, actual_result) + def test_change_server_name_host_str(self): scenario = {'host': 'demo'} suffix = '-8' diff --git a/tests/unit/benchmark/runner/__init__.py b/tests/unit/benchmark/runner/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/tests/unit/benchmark/runner/__init__.py diff --git a/tests/unit/benchmark/runner/test_base.py b/tests/unit/benchmark/runner/test_base.py new file mode 100644 index 000000000..7880fe5a5 --- /dev/null +++ b/tests/unit/benchmark/runner/test_base.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python + +############################################################################## +# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +from __future__ import print_function +from __future__ import absolute_import + +import unittest +import multiprocessing +import time + +from yardstick.benchmark.runners.iteration import IterationRunner + + +class RunnerTestCase(unittest.TestCase): + + def test_get_output(self): + queue = multiprocessing.Queue() + runner = IterationRunner({}, queue) + runner.output_queue.put({'case': 'opnfv_yardstick_tc002'}) + runner.output_queue.put({'criteria': 'PASS'}) + + idle_result = { + 'case': 'opnfv_yardstick_tc002', + 'criteria': 'PASS' + } + + time.sleep(1) + actual_result = runner.get_output() + self.assertEqual(idle_result, actual_result) + + +def main(): + unittest.main() + + +if __name__ == '__main__': + main() diff --git a/tests/unit/benchmark/scenarios/availability/test_attacker_process.py b/tests/unit/benchmark/scenarios/availability/test_attacker_process.py index eec512a58..0a8e8322a 100644 --- a/tests/unit/benchmark/scenarios/availability/test_attacker_process.py +++ b/tests/unit/benchmark/scenarios/availability/test_attacker_process.py @@ -41,7 +41,7 @@ class AttackerServiceTestCase(unittest.TestCase): cls = baseattacker.BaseAttacker.get_attacker_cls(self.attacker_cfg) ins = cls(self.attacker_cfg, self.context) - mock_ssh.SSH.from_node().execute.return_value = (0, "running", '') + mock_ssh.SSH.from_node().execute.return_value = (0, "10", '') ins.setup() ins.inject_fault() ins.recover() @@ -51,5 +51,5 @@ class AttackerServiceTestCase(unittest.TestCase): cls = baseattacker.BaseAttacker.get_attacker_cls(self.attacker_cfg) ins = cls(self.attacker_cfg, self.context) - mock_ssh.SSH.from_node().execute.return_value = (0, "error check", '') + mock_ssh.SSH.from_node().execute.return_value = (0, None, '') ins.setup() diff --git a/tests/unit/benchmark/scenarios/availability/test_basemonitor.py b/tests/unit/benchmark/scenarios/availability/test_basemonitor.py index 7030c7849..3b7e07376 100644 --- a/tests/unit/benchmark/scenarios/availability/test_basemonitor.py +++ b/tests/unit/benchmark/scenarios/availability/test_basemonitor.py @@ -34,7 +34,7 @@ class MonitorMgrTestCase(unittest.TestCase): self.monitor_configs.append(config) def test__MonitorMgr_setup_successful(self, mock_monitor): - instance = basemonitor.MonitorMgr() + instance = basemonitor.MonitorMgr({"nova-api": 10}) instance.init_monitors(self.monitor_configs, None) instance.start_monitors() instance.wait_monitors() @@ -42,7 +42,7 @@ class MonitorMgrTestCase(unittest.TestCase): ret = instance.verify_SLA() def test_MonitorMgr_getitem(self, mock_monitor): - monitorMgr = basemonitor.MonitorMgr() + monitorMgr = basemonitor.MonitorMgr({"nova-api": 10}) monitorMgr.init_monitors(self.monitor_configs, None) monitorIns = monitorMgr['service-status'] @@ -67,12 +67,12 @@ class BaseMonitorTestCase(unittest.TestCase): } def test__basemonitor_start_wait_successful(self): - ins = basemonitor.BaseMonitor(self.monitor_cfg, None) + ins = basemonitor.BaseMonitor(self.monitor_cfg, None, {"nova-api": 10}) ins.start_monitor() ins.wait_monitor() def test__basemonitor_all_successful(self): - ins = self.MonitorSimple(self.monitor_cfg, None) + ins = self.MonitorSimple(self.monitor_cfg, None, {"nova-api": 10}) ins.setup() ins.run() ins.verify_SLA() @@ -81,7 +81,7 @@ class BaseMonitorTestCase(unittest.TestCase): 'yardstick.benchmark.scenarios.availability.monitor.basemonitor' '.multiprocessing') def test__basemonitor_func_false(self, mock_multiprocess): - ins = self.MonitorSimple(self.monitor_cfg, None) + ins = self.MonitorSimple(self.monitor_cfg, None, {"nova-api": 10}) ins.setup() mock_multiprocess.Event().is_set.return_value = False ins.run() diff --git a/tests/unit/benchmark/scenarios/availability/test_monitor_command.py b/tests/unit/benchmark/scenarios/availability/test_monitor_command.py index c179bbfaf..2ed4be731 100644 --- a/tests/unit/benchmark/scenarios/availability/test_monitor_command.py +++ b/tests/unit/benchmark/scenarios/availability/test_monitor_command.py @@ -59,7 +59,7 @@ class MonitorOpenstackCmdTestCase(unittest.TestCase): def test__monitor_command_monitor_func_successful(self, mock_subprocess): - instance = monitor_command.MonitorOpenstackCmd(self.config, None) + instance = monitor_command.MonitorOpenstackCmd(self.config, None, {"nova-api": 10}) instance.setup() mock_subprocess.check_output.return_value = (0, 'unittest') ret = instance.monitor_func() @@ -69,7 +69,7 @@ class MonitorOpenstackCmdTestCase(unittest.TestCase): def test__monitor_command_monitor_func_failure(self, mock_subprocess): mock_subprocess.check_output.return_value = (1, 'unittest') - instance = monitor_command.MonitorOpenstackCmd(self.config, None) + instance = monitor_command.MonitorOpenstackCmd(self.config, None, {"nova-api": 10}) instance.setup() mock_subprocess.check_output.side_effect = RuntimeError ret = instance.monitor_func() @@ -85,7 +85,7 @@ class MonitorOpenstackCmdTestCase(unittest.TestCase): self.config["host"] = "node1" instance = monitor_command.MonitorOpenstackCmd( - self.config, self.context) + self.config, self.context, {"nova-api": 10}) instance.setup() mock_ssh.SSH.from_node().execute.return_value = (0, "0", '') ret = instance.monitor_func() diff --git a/tests/unit/benchmark/scenarios/availability/test_monitor_general.py b/tests/unit/benchmark/scenarios/availability/test_monitor_general.py index 169b630bf..c14f073ec 100644 --- a/tests/unit/benchmark/scenarios/availability/test_monitor_general.py +++ b/tests/unit/benchmark/scenarios/availability/test_monitor_general.py @@ -50,7 +50,7 @@ class GeneralMonitorServiceTestCase(unittest.TestCase): } def test__monitor_general_all_successful(self, mock_open, mock_ssh): - ins = monitor_general.GeneralMonitor(self.monitor_cfg, self.context) + ins = monitor_general.GeneralMonitor(self.monitor_cfg, self.context, {"nova-api": 10}) ins.setup() mock_ssh.SSH.from_node().execute.return_value = (0, "running", '') @@ -61,7 +61,7 @@ class GeneralMonitorServiceTestCase(unittest.TestCase): def test__monitor_general_all_successful_noparam(self, mock_open, mock_ssh): ins = monitor_general.GeneralMonitor( - self.monitor_cfg_noparam, self.context) + self.monitor_cfg_noparam, self.context, {"nova-api": 10}) ins.setup() mock_ssh.SSH.from_node().execute.return_value = (0, "running", '') @@ -71,7 +71,7 @@ class GeneralMonitorServiceTestCase(unittest.TestCase): def test__monitor_general_failure(self, mock_open, mock_ssh): ins = monitor_general.GeneralMonitor( - self.monitor_cfg_noparam, self.context) + self.monitor_cfg_noparam, self.context, {"nova-api": 10}) ins.setup() mock_ssh.SSH.from_node().execute.return_value = (1, "error", 'error') diff --git a/tests/unit/benchmark/scenarios/availability/test_monitor_multi.py b/tests/unit/benchmark/scenarios/availability/test_monitor_multi.py index 5719f286a..f8d12bd29 100644 --- a/tests/unit/benchmark/scenarios/availability/test_monitor_multi.py +++ b/tests/unit/benchmark/scenarios/availability/test_monitor_multi.py @@ -42,7 +42,7 @@ class MultiMonitorServiceTestCase(unittest.TestCase): } def test__monitor_multi_all_successful(self, mock_open, mock_ssh): - ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context) + ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context, {"nova-api": 10}) mock_ssh.SSH.from_node().execute.return_value = (0, "running", '') @@ -51,7 +51,7 @@ class MultiMonitorServiceTestCase(unittest.TestCase): ins.verify_SLA() def test__monitor_multi_all_fail(self, mock_open, mock_ssh): - ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context) + ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context, {"nova-api": 10}) mock_ssh.SSH.from_node().execute.return_value = (0, "running", '') diff --git a/tests/unit/benchmark/scenarios/availability/test_monitor_process.py b/tests/unit/benchmark/scenarios/availability/test_monitor_process.py index 8c267e413..41ce5445e 100644 --- a/tests/unit/benchmark/scenarios/availability/test_monitor_process.py +++ b/tests/unit/benchmark/scenarios/availability/test_monitor_process.py @@ -40,7 +40,7 @@ class MonitorProcessTestCase(unittest.TestCase): def test__monitor_process_all_successful(self, mock_ssh): - ins = monitor_process.MonitorProcess(self.monitor_cfg, self.context) + ins = monitor_process.MonitorProcess(self.monitor_cfg, self.context, {"nova-api": 10}) mock_ssh.SSH.from_node().execute.return_value = (0, "1", '') ins.setup() @@ -50,7 +50,7 @@ class MonitorProcessTestCase(unittest.TestCase): def test__monitor_process_down_failuer(self, mock_ssh): - ins = monitor_process.MonitorProcess(self.monitor_cfg, self.context) + ins = monitor_process.MonitorProcess(self.monitor_cfg, self.context, {"nova-api": 10}) mock_ssh.SSH.from_node().execute.return_value = (0, "0", '') ins.setup() diff --git a/yardstick/benchmark/core/task.py b/yardstick/benchmark/core/task.py index 3a151dbba..c44081b73 100644 --- a/yardstick/benchmark/core/task.py +++ b/yardstick/benchmark/core/task.py @@ -44,6 +44,7 @@ class Task(object): # pragma: no cover def __init__(self): self.config = {} self.contexts = [] + self.outputs = {} def start(self, args, **kwargs): """Start a benchmark scenario.""" @@ -136,6 +137,7 @@ class Task(object): # pragma: no cover # Wait for runners to finish for runner in runners: runner_join(runner) + self.outputs.update(runner.get_output()) print("Runner ended, output in", output_file) else: # run serially @@ -143,6 +145,7 @@ class Task(object): # pragma: no cover if not _is_background_scenario(scenario): runner = self.run_one_scenario(scenario, output_file) runner_join(runner) + self.outputs.update(runner.get_output()) print("Runner ended, output in", output_file) # Abort background runners @@ -155,6 +158,7 @@ class Task(object): # pragma: no cover # Nuke if it did not stop nicely base_runner.Runner.terminate(runner) runner_join(runner) + self.outputs.update(runner.get_output()) else: base_runner.Runner.release(runner) print("Background task ended") @@ -168,11 +172,24 @@ class Task(object): # pragma: no cover for context in self.contexts[::-1]: context.undeploy() + def _parse_options(self, op): + if isinstance(op, dict): + return {k: self._parse_options(v) for k, v in op.items()} + elif isinstance(op, list): + return [self._parse_options(v) for v in op] + elif isinstance(op, str): + return self.outputs.get(op[1:]) if op.startswith('$') else op + else: + return op + def run_one_scenario(self, scenario_cfg, output_file): """run one scenario using context""" runner_cfg = scenario_cfg["runner"] runner_cfg['output_filename'] = output_file + options = scenario_cfg.get('options', {}) + scenario_cfg['options'] = self._parse_options(options) + # TODO support get multi hosts/vms info context_cfg = {} if "host" in scenario_cfg: diff --git a/yardstick/benchmark/runners/arithmetic.py b/yardstick/benchmark/runners/arithmetic.py index 65fdb9d66..7ec593396 100755 --- a/yardstick/benchmark/runners/arithmetic.py +++ b/yardstick/benchmark/runners/arithmetic.py @@ -42,7 +42,7 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, - context_cfg, aborted): + context_cfg, aborted, output_queue): sequence = 1 @@ -108,7 +108,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = "" try: - method(data) + result = method(data) except AssertionError as assertion: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": @@ -119,6 +119,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, except Exception as e: errors = traceback.format_exc() LOG.exception(e) + else: + if result: + output_queue.put(result) time.sleep(interval) @@ -188,5 +191,5 @@ class ArithmeticRunner(base.Runner): self.process = multiprocessing.Process( target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, - context_cfg, self.aborted)) + context_cfg, self.aborted, self.output_queue)) self.process.start() diff --git a/yardstick/benchmark/runners/base.py b/yardstick/benchmark/runners/base.py index b48ed973a..ebb9a91b5 100755 --- a/yardstick/benchmark/runners/base.py +++ b/yardstick/benchmark/runners/base.py @@ -197,6 +197,7 @@ class Runner(object): self.config = config self.periodic_action_process = None self.result_queue = queue + self.output_queue = multiprocessing.Queue() self.process = None self.aborted = multiprocessing.Event() Runner.runners.append(self) @@ -269,3 +270,9 @@ class Runner(object): self.run_post_stop_action() return self.process.exitcode + + def get_output(self): + result = {} + while not self.output_queue.empty(): + result.update(self.output_queue.get()) + return result diff --git a/yardstick/benchmark/runners/duration.py b/yardstick/benchmark/runners/duration.py index 772983cfd..2bf2cd2fe 100644 --- a/yardstick/benchmark/runners/duration.py +++ b/yardstick/benchmark/runners/duration.py @@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, - context_cfg, aborted): + context_cfg, aborted, output_queue): sequence = 1 @@ -66,7 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = "" try: - method(data) + result = method(data) except AssertionError as assertion: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": @@ -77,6 +77,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, except Exception as e: errors = traceback.format_exc() LOG.exception(e) + else: + if result: + output_queue.put(result) time.sleep(interval) @@ -126,5 +129,5 @@ If the scenario ends before the time has elapsed, it will be started again. self.process = multiprocessing.Process( target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, - context_cfg, self.aborted)) + context_cfg, self.aborted, self.output_queue)) self.process.start() diff --git a/yardstick/benchmark/runners/dynamictp.py b/yardstick/benchmark/runners/dynamictp.py new file mode 100755 index 000000000..106595dbd --- /dev/null +++ b/yardstick/benchmark/runners/dynamictp.py @@ -0,0 +1,169 @@ +# Copyright 2016: Nokia +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# yardstick comment: this is a modified copy of +# rally/rally/benchmark/runners/constant.py + +"""A runner that searches for the max throughput with binary search +""" + +import os +import multiprocessing +import logging +import traceback +import time + +from yardstick.benchmark.runners import base + +LOG = logging.getLogger(__name__) + + +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted): # pragma: no cover + + runner_cfg = scenario_cfg['runner'] + iterations = runner_cfg.get("iterations", 1) + interval = runner_cfg.get("interval", 1) + run_step = runner_cfg.get("run_step", "setup,run,teardown") + delta = runner_cfg.get("delta", 1000) + options_cfg = scenario_cfg['options'] + initial_rate = options_cfg.get("pps", 1000000) + LOG.info("worker START, class %s", cls) + + runner_cfg['runner_id'] = os.getpid() + + benchmark = cls(scenario_cfg, context_cfg) + if "setup" in run_step: + benchmark.setup() + + method = getattr(benchmark, method_name) + + queue.put({'runner_id': runner_cfg['runner_id'], + 'scenario_cfg': scenario_cfg, + 'context_cfg': context_cfg}) + + if "run" in run_step: + iterator = 0 + search_max = initial_rate + search_min = 0 + while iterator < iterations: + search_min = int(search_min / 2) + scenario_cfg['options']['pps'] = search_max + search_max_found = False + max_throuput_found = False + sequence = 0 + + last_min_data = {} + last_min_data['packets_per_second'] = 0 + + while True: + sequence += 1 + + data = {} + errors = "" + too_high = False + + LOG.debug("sequence: %s search_min: %s search_max: %s", + sequence, search_min, search_max) + + try: + method(data) + except AssertionError as assertion: + LOG.warning("SLA validation failed: %s" % assertion.args) + too_high = True + except Exception as e: + errors = traceback.format_exc() + LOG.exception(e) + + actual_pps = data['packets_per_second'] + + if too_high: + search_max = actual_pps + + if not search_max_found: + search_max_found = True + else: + last_min_data = data + search_min = actual_pps + + # Check if the actual rate is well below the asked rate + if scenario_cfg['options']['pps'] > actual_pps * 1.5: + search_max = actual_pps + LOG.debug("Sender reached max tput: %s", search_max) + elif not search_max_found: + search_max = int(actual_pps * 1.5) + + if ((search_max - search_min) < delta) or \ + (search_max <= search_min) or (10 <= sequence): + if last_min_data['packets_per_second'] > 0: + data = last_min_data + + benchmark_output = { + 'timestamp': time.time(), + 'sequence': sequence, + 'data': data, + 'errors': errors + } + + record = { + 'runner_id': runner_cfg['runner_id'], + 'benchmark': benchmark_output + } + + queue.put(record) + max_throuput_found = True + + if (errors) or aborted.is_set() or max_throuput_found: + LOG.info("worker END") + break + + if not search_max_found: + scenario_cfg['options']['pps'] = search_max + else: + scenario_cfg['options']['pps'] = \ + (search_max - search_min) / 2 + search_min + + time.sleep(interval) + + iterator += 1 + LOG.debug("iterator: %s iterations: %s", iterator, iterations) + + if "teardown" in run_step: + benchmark.teardown() + + +class IterationRunner(base.Runner): + '''Run a scenario to find the max throughput + +If the scenario ends before the time has elapsed, it will be started again. + + Parameters + interval - time to wait between each scenario invocation + type: int + unit: seconds + default: 1 sec + delta - stop condition for the search. + type: int + unit: pps + default: 1000 pps + ''' + __execution_type__ = 'Dynamictp' + + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + self.process = multiprocessing.Process( + target=_worker_process, + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted)) + self.process.start() diff --git a/yardstick/benchmark/runners/iteration.py b/yardstick/benchmark/runners/iteration.py index 29daa0d42..973bb9ac4 100644 --- a/yardstick/benchmark/runners/iteration.py +++ b/yardstick/benchmark/runners/iteration.py @@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, - context_cfg, aborted): + context_cfg, aborted, output_queue): sequence = 1 @@ -41,10 +41,8 @@ def _worker_process(queue, cls, method_name, scenario_cfg, interval = runner_cfg.get("interval", 1) iterations = runner_cfg.get("iterations", 1) run_step = runner_cfg.get("run_step", "setup,run,teardown") + delta = runner_cfg.get("delta", 2) - options_cfg = scenario_cfg['options'] - initial_rate = options_cfg.get("rate", 100) - scenario_cfg['options']['rate'] = initial_rate LOG.info("worker START, iterations %d times, class %s", iterations, cls) runner_cfg['runner_id'] = os.getpid() @@ -73,7 +71,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = "" try: - method(data) + result = method(data) except AssertionError as assertion: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": @@ -82,12 +80,21 @@ def _worker_process(queue, cls, method_name, scenario_cfg, LOG.warning("SLA validation failed: %s", assertion.args) errors = assertion.args elif sla_action == "rate-control": + try: + scenario_cfg['options']['rate'] + except KeyError: + scenario_cfg.setdefault('options', {}) + scenario_cfg['options']['rate'] = 100 + scenario_cfg['options']['rate'] -= delta sequence = 1 continue except Exception as e: errors = traceback.format_exc() LOG.exception(e) + else: + if result: + output_queue.put(result) time.sleep(interval) @@ -138,5 +145,5 @@ If the scenario ends before the time has elapsed, it will be started again. self.process = multiprocessing.Process( target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, - context_cfg, self.aborted)) + context_cfg, self.aborted, self.output_queue)) self.process.start() diff --git a/yardstick/benchmark/runners/sequence.py b/yardstick/benchmark/runners/sequence.py index af87c006e..74ff82204 100644 --- a/yardstick/benchmark/runners/sequence.py +++ b/yardstick/benchmark/runners/sequence.py @@ -33,7 +33,7 @@ LOG = logging.getLogger(__name__) def _worker_process(queue, cls, method_name, scenario_cfg, - context_cfg, aborted): + context_cfg, aborted, output_queue): sequence = 1 @@ -75,7 +75,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg, errors = "" try: - method(data) + result = method(data) except AssertionError as assertion: # SLA validation failed in scenario, determine what to do now if sla_action == "assert": @@ -86,6 +86,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg, except Exception as e: errors = traceback.format_exc() LOG.exception(e) + else: + if result: + output_queue.put(result) time.sleep(interval) @@ -137,5 +140,5 @@ class SequenceRunner(base.Runner): self.process = multiprocessing.Process( target=_worker_process, args=(self.result_queue, cls, method, scenario_cfg, - context_cfg, self.aborted)) + context_cfg, self.aborted, self.output_queue)) self.process.start() diff --git a/yardstick/benchmark/scenarios/availability/attacker/attacker_process.py b/yardstick/benchmark/scenarios/availability/attacker/attacker_process.py index bff4a6dc3..e0e6cf3bf 100644 --- a/yardstick/benchmark/scenarios/availability/attacker/attacker_process.py +++ b/yardstick/benchmark/scenarios/availability/attacker/attacker_process.py @@ -38,8 +38,7 @@ class ProcessAttacker(BaseAttacker): self.recovery_script = self.get_script_fullpath( self.fault_cfg['recovery_script']) - if self.check(): - self.setup_done = True + self.data[self.service_name] = self.check() def check(self): with open(self.check_script, "r") as stdin_file: @@ -49,7 +48,7 @@ class ProcessAttacker(BaseAttacker): if stdout: LOG.info("check the envrioment success!") - return True + return int(stdout.strip('\n')) else: LOG.error( "the host envrioment is error, stdout:%s, stderr:%s", diff --git a/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py b/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py index ca2324055..7b3d8b0be 100644 --- a/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py +++ b/yardstick/benchmark/scenarios/availability/attacker/baseattacker.py @@ -25,6 +25,7 @@ class AttackerMgr(object): def __init__(self): self._attacker_list = [] + self.data = {} def init_attackers(self, attacker_cfgs, context): LOG.debug("attackerMgr confg: %s", attacker_cfgs) @@ -35,6 +36,8 @@ class AttackerMgr(object): attacker_ins.key = cfg['key'] attacker_ins.setup() self._attacker_list.append(attacker_ins) + self.data = dict(self.data.items() + attacker_ins.data.items()) + return self.data def __getitem__(self, item): for obj in self._attacker_list: @@ -57,6 +60,7 @@ class BaseAttacker(object): self._config = config self._context = context + self.data = {} self.setup_done = False @staticmethod diff --git a/yardstick/benchmark/scenarios/availability/director.py b/yardstick/benchmark/scenarios/availability/director.py index 76fcc0e7f..e0d05ebf5 100644 --- a/yardstick/benchmark/scenarios/availability/director.py +++ b/yardstick/benchmark/scenarios/availability/director.py @@ -24,7 +24,7 @@ LOG = logging.getLogger(__name__) class Director(object): """ Director is used to direct a test scenaio - including the creation of action players, test result verification + including the creation of action players, test result verification and rollback of actions. """ @@ -33,6 +33,7 @@ class Director(object): # A stack store Rollbacker that will be called after # all actionplayers finish. self.executionSteps = [] + self.data = {} self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg @@ -42,12 +43,14 @@ class Director(object): LOG.debug("start init attackers...") attacker_cfgs = self.scenario_cfg["options"]["attackers"] self.attackerMgr = baseattacker.AttackerMgr() - self.attackerMgr.init_attackers(attacker_cfgs, nodes) + self.data = self.attackerMgr.init_attackers(attacker_cfgs, + nodes) + # setup monitors if "monitors" in self.scenario_cfg["options"]: LOG.debug("start init monitors...") monitor_cfgs = self.scenario_cfg["options"]["monitors"] - self.monitorMgr = basemonitor.MonitorMgr() + self.monitorMgr = basemonitor.MonitorMgr(self.data) self.monitorMgr.init_monitors(monitor_cfgs, nodes) # setup operations if "operations" in self.scenario_cfg["options"]: diff --git a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py index a0fc5965b..ba3370003 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py +++ b/yardstick/benchmark/scenarios/availability/monitor/basemonitor.py @@ -25,8 +25,9 @@ monitor_conf_path = pkg_resources.resource_filename( class MonitorMgr(object): """docstring for MonitorMgr""" - def __init__(self): + def __init__(self, data): self._monitor_list = [] + self.monitor_mgr_data = data def init_monitors(self, monitor_cfgs, context): LOG.debug("monitorMgr config: %s", monitor_cfgs) @@ -39,7 +40,8 @@ class MonitorMgr(object): if monitor_number > 1: monitor_cls = BaseMonitor.get_monitor_cls("multi-monitor") - monitor_ins = monitor_cls(monitor_cfg, context) + monitor_ins = monitor_cls(monitor_cfg, context, + self.monitor_mgr_data) if "key" in monitor_cfg: monitor_ins.key = monitor_cfg["key"] self._monitor_list.append(monitor_ins) @@ -69,7 +71,7 @@ class BaseMonitor(multiprocessing.Process): """docstring for BaseMonitor""" monitor_cfgs = {} - def __init__(self, config, context): + def __init__(self, config, context, data): if not BaseMonitor.monitor_cfgs: with open(monitor_conf_path) as stream: BaseMonitor.monitor_cfgs = yaml.load(stream) @@ -78,6 +80,7 @@ class BaseMonitor(multiprocessing.Process): self._context = context self._queue = multiprocessing.Queue() self._event = multiprocessing.Event() + self.monitor_data = data self.setup_done = False @staticmethod diff --git a/yardstick/benchmark/scenarios/availability/monitor/monitor_multi.py b/yardstick/benchmark/scenarios/availability/monitor/monitor_multi.py index 0bd8e6d37..d7d1545da 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/monitor_multi.py +++ b/yardstick/benchmark/scenarios/availability/monitor/monitor_multi.py @@ -20,16 +20,18 @@ class MultiMonitor(basemonitor.BaseMonitor): __monitor_type__ = "multi-monitor" - def __init__(self, config, context): - super(MultiMonitor, self).__init__(config, context) + def __init__(self, config, context, data): + super(MultiMonitor, self).__init__(config, context, data) self.monitors = [] + self.monitor_data = data monitor_type = self._config["monitor_type"] monitor_cls = basemonitor.BaseMonitor.get_monitor_cls(monitor_type) monitor_number = self._config.get("monitor_number", 1) for i in range(monitor_number): - monitor_ins = monitor_cls(self._config, self._context) + monitor_ins = monitor_cls(self._config, self._context, + self.monitor_data) self.monitors.append(monitor_ins) def start_monitor(self): diff --git a/yardstick/benchmark/scenarios/availability/monitor/monitor_process.py b/yardstick/benchmark/scenarios/availability/monitor/monitor_process.py index 31526b011..b0f6f8e9d 100644 --- a/yardstick/benchmark/scenarios/availability/monitor/monitor_process.py +++ b/yardstick/benchmark/scenarios/availability/monitor/monitor_process.py @@ -35,10 +35,13 @@ class MonitorProcess(basemonitor.BaseMonitor): exit_status, stdout, stderr = self.connection.execute( "sudo /bin/sh -s {0}".format(self.process_name), stdin=stdin_file) - if not stdout or int(stdout) <= 0: - LOG.info("the process (%s) is not running!", self.process_name) + + if not stdout or int(stdout) < self.monitor_data[self.process_name]: + LOG.info("the (%s) processes are in recovery!", self.process_name) return False + LOG.info("the (%s) processes have been fully recovered!", + self.process_name) return True def verify_SLA(self): diff --git a/yardstick/benchmark/scenarios/availability/serviceha.py b/yardstick/benchmark/scenarios/availability/serviceha.py index 69727de2b..2e829714d 100755 --- a/yardstick/benchmark/scenarios/availability/serviceha.py +++ b/yardstick/benchmark/scenarios/availability/serviceha.py @@ -28,6 +28,7 @@ class ServiceHA(base.Scenario): self.scenario_cfg = scenario_cfg self.context_cfg = context_cfg self.setup_done = False + self.data = {} def setup(self): """scenario setup""" @@ -44,10 +45,11 @@ class ServiceHA(base.Scenario): attacker_ins = attacker_cls(attacker_cfg, nodes) attacker_ins.setup() self.attackers.append(attacker_ins) + self.data = dict(self.data.items() + attacker_ins.data.items()) monitor_cfgs = self.scenario_cfg["options"]["monitors"] - self.monitorMgr = basemonitor.MonitorMgr() + self.monitorMgr = basemonitor.MonitorMgr(self.data) self.monitorMgr.init_monitors(monitor_cfgs, nodes) self.setup_done = True @@ -68,6 +70,12 @@ class ServiceHA(base.Scenario): LOG.info("HA monitor stop!") sla_pass = self.monitorMgr.verify_SLA() + for k, v in self.data.items(): + if self.data[k] == 0: + result['sla_pass'] = 0 + LOG.info("The service process not found in the host envrioment, \ +the HA test case NOT pass") + return if sla_pass: result['sla_pass'] = 1 LOG.info("The HA test case PASS the SLA") |