From ef3f82fc25f2ef8a77e1e4e518b01b80efcd73fd Mon Sep 17 00:00:00 2001 From: DanielMartinBuckley Date: Wed, 6 Jun 2018 15:06:01 +0100 Subject: Addition of Configurable Sampling and Configurable Confirmation Retry JIRA: YARDSTICK-1221 In order to increase accuracy of result the following are done :- - Improve Measurement Accuracy (YARDSTICK-1212) - Improve Sampling Interval (YARDSTICK-1219) - Allow 4 PROX ports to be read simultaneously (YARDSTICK-1220) This change does the following :- - Stores LINE Rate statistics of Sample - Requires a confirmation retry before deciding to increase or decrease this is configurable. - Allows the user to disable Sampling or specify a sample interval - Added Code Coverage of ProxDurationRunner based on YARDSTICK-1199 Change-Id: I27242ac1849c9a2712866385b5fbc05977c71516 Signed-off-by: Daniel Martin Buckley (cherry picked from commit 2e447af9a5a54355aa20028813660d07d1bd2e18) --- .../nsut/prox/tc_prox_baremetal_l2fwd-4.yaml | 10 +- yardstick/benchmark/runners/proxduration.py | 165 ++++++++++++ yardstick/common/constants.py | 1 + .../traffic_profile/prox_binsearch.py | 131 +++++---- yardstick/tests/unit/benchmark/core/test_task.py | 25 ++ .../unit/benchmark/runner/test_proxduration.py | 295 +++++++++++++++++++++ .../traffic_profile/test_prox_binsearch.py | 94 ++++++- 7 files changed, 671 insertions(+), 50 deletions(-) create mode 100644 yardstick/benchmark/runners/proxduration.py create mode 100644 yardstick/tests/unit/benchmark/runner/test_proxduration.py diff --git a/samples/vnf_samples/nsut/prox/tc_prox_baremetal_l2fwd-4.yaml b/samples/vnf_samples/nsut/prox/tc_prox_baremetal_l2fwd-4.yaml index ab067a836..84edcd47d 100644 --- a/samples/vnf_samples/nsut/prox/tc_prox_baremetal_l2fwd-4.yaml +++ b/samples/vnf_samples/nsut/prox/tc_prox_baremetal_l2fwd-4.yaml @@ -42,9 +42,15 @@ scenarios: "-t": "" runner: - type: Duration + type: ProxDuration + # sampling interval + interval: 1 + # sampled : yes OR sampled: no (DEFAULT yes) + sampled: yes # we kill after duration, independent of test duration, so set this high - duration: 300 + duration: 3100 + # Confirmation attempts + confirmation: 1 context: type: Node diff --git a/yardstick/benchmark/runners/proxduration.py b/yardstick/benchmark/runners/proxduration.py new file mode 100644 index 000000000..61a468fd3 --- /dev/null +++ b/yardstick/benchmark/runners/proxduration.py @@ -0,0 +1,165 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# yardstick comment: this is a modified copy of +# rally/rally/benchmark/runners/constant.py + +"""A runner that runs a specific time before it returns +""" + +from __future__ import absolute_import + +import os +import multiprocessing +import logging +import traceback +import time + +from yardstick.benchmark.runners import base +from yardstick.common import exceptions as y_exc +from yardstick.common import constants + +LOG = logging.getLogger(__name__) + +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): + + sequence = 1 + + runner_cfg = scenario_cfg['runner'] + + requested_interval = interval = runner_cfg.get("interval", 1) + duration = runner_cfg.get("duration", 60) + sampled = runner_cfg.get("sampled", False) + + LOG.info("Worker START, duration is %ds", duration) + LOG.debug("class is %s", cls) + + runner_cfg['runner_id'] = os.getpid() + + benchmark = cls(scenario_cfg, context_cfg) + benchmark.setup() + method = getattr(benchmark, method_name) + + sla_action = None + if "sla" in scenario_cfg: + sla_action = scenario_cfg["sla"].get("action", "assert") + + + start = time.time() + timeout = start + duration + while True: + + LOG.debug("runner=%(runner)s seq=%(sequence)s START", + {"runner": runner_cfg["runner_id"], "sequence": sequence}) + + data = {} + errors = "" + + benchmark.pre_run_wait_time(interval) + + if sampled: + try: + pre_adjustment = time.time() + result = method(data) + post_adjustment = time.time() + if requested_interval > post_adjustment - pre_adjustment: + interval = requested_interval - (post_adjustment - pre_adjustment) + else: + interval = 0 + + except y_exc.SLAValidationError as error: + # SLA validation failed in scenario, determine what to do now + if sla_action == "assert": + raise + elif sla_action == "monitor": + LOG.warning("SLA validation failed: %s", error.args) + errors = error.args + # catch all exceptions because with multiprocessing we can have un-picklable exception + # problems https://bugs.python.org/issue9400 + except Exception: # pylint: disable=broad-except + errors = traceback.format_exc() + LOG.exception("") + else: + if result: + # add timeout for put so we don't block test + # if we do timeout we don't care about dropping individual KPIs + output_queue.put(result, True, constants.QUEUE_PUT_TIMEOUT) + + benchmark_output = { + 'timestamp': time.time(), + 'sequence': sequence, + 'data': data, + 'errors': errors + } + + queue.put(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT) + else: + LOG.debug("No sample collected ...Sequence %s", sequence) + + + sequence += 1 + + if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): + LOG.info("Worker END") + break + + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) + LOG.info("Exiting ProxDuration Runner...") + +class ProxDurationRunner(base.Runner): + """Run a scenario for a certain amount of time + +If the scenario ends before the time has elapsed, it will be started again. + + Parameters + duration - amount of time the scenario will be run for + type: int + unit: seconds + default: 60 sec + interval - time to wait between each scenario invocation + type: int + unit: seconds + default: 1 sec + sampled - Sample data is required yes/no + type: boolean + unit: True/False + default: False + confirmation - Number of confirmation retries + type: int + unit: retry attempts + default: 0 + """ + __execution_type__ = 'ProxDuration' + + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) + self.process = multiprocessing.Process( + name=name, + target=_worker_process, + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) + self.process.start() diff --git a/yardstick/common/constants.py b/yardstick/common/constants.py index 3f5c37456..5af33dd69 100644 --- a/yardstick/common/constants.py +++ b/yardstick/common/constants.py @@ -119,6 +119,7 @@ INFLUXDB_DB_NAME = get_param('influxdb.db_name', 'yardstick') INFLUXDB_IMAGE = get_param('influxdb.image', 'tutum/influxdb') INFLUXDB_TAG = get_param('influxdb.tag', '0.13') INFLUXDB_DASHBOARD_PORT = 8083 +QUEUE_PUT_TIMEOUT = 10 # grafana GRAFANA_IP = get_param('grafana.ip', SERVER_IP) diff --git a/yardstick/network_services/traffic_profile/prox_binsearch.py b/yardstick/network_services/traffic_profile/prox_binsearch.py index c3277fb12..a5249c70e 100644 --- a/yardstick/network_services/traffic_profile/prox_binsearch.py +++ b/yardstick/network_services/traffic_profile/prox_binsearch.py @@ -21,6 +21,7 @@ import time from yardstick.network_services.traffic_profile.prox_profile import ProxProfile from yardstick.network_services import constants +from yardstick.common import constants as overall_constants LOG = logging.getLogger(__name__) @@ -84,9 +85,14 @@ class ProxBinSearchProfile(ProxProfile): # success, the binary search will complete on an integer multiple # of the precision, rather than on a fraction of it. - theor_max_thruput = 0 + theor_max_thruput = actual_max_thruput = 0 result_samples = {} + rate_samples = {} + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + ok_retry = 0 # Store one time only value in influxdb single_samples = { @@ -102,52 +108,85 @@ class ProxBinSearchProfile(ProxProfile): successful_pkt_loss = 0.0 line_speed = traffic_gen.scenario_helper.all_options.get( "interface_speed_gbps", constants.NIC_GBPS_DEFAULT) * constants.ONE_GIGABIT_IN_BITS + + ok_retry = traffic_gen.scenario_helper.scenario_cfg["runner"].get("confirmation", 0) for test_value in self.bounds_iterator(LOG): - result, port_samples = self._profile_helper.run_test(pkt_size, duration, - test_value, - self.tolerated_loss, - line_speed) - self.curr_time = time.time() - diff_time = self.curr_time - self.prev_time - self.prev_time = self.curr_time - - if result.success: - LOG.debug("Success! Increasing lower bound") - self.current_lower = test_value - successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - samples["TxThroughput"] = samples["TxThroughput"] * 1000 * 1000 - - # store results with success tag in influxdb - success_samples = {'Success_' + key: value for key, value in samples.items()} - - success_samples["Success_rx_total"] = int(result.rx_total / diff_time) - success_samples["Success_tx_total"] = int(result.tx_total / diff_time) - success_samples["Success_can_be_lost"] = int(result.can_be_lost / diff_time) - success_samples["Success_drop_total"] = int(result.drop_total / diff_time) - self.queue.put(success_samples) - - # Store Actual throughput for result samples - result_samples["Result_Actual_throughput"] = \ - success_samples["Success_RxThroughput"] - else: - LOG.debug("Failure... Decreasing upper bound") - self.current_upper = test_value - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - - for k in samples: - tmp = samples[k] - if isinstance(tmp, dict): - for k2 in tmp: - samples[k][k2] = int(samples[k][k2] / diff_time) - - if theor_max_thruput < samples["TxThroughput"]: - theor_max_thruput = samples['TxThroughput'] - self.queue.put({'theor_max_throughput': theor_max_thruput}) - - LOG.debug("Collect TG KPIs %s %s", datetime.datetime.now(), samples) - self.queue.put(samples) + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + + rate_samples["MAX_Rate"] = self.current_upper + rate_samples["MIN_Rate"] = self.current_lower + rate_samples["Test_Rate"] = test_value + self.queue.put(rate_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + while (pos_retry <= ok_retry) and (neg_retry <= ok_retry): + + total_retry = total_retry + 1 + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, + self.tolerated_loss, + line_speed) + if (total_retry > (ok_retry * 3)) and (ok_retry is not 0): + LOG.info("Failure.!! .. RETRY EXCEEDED ... decrease lower bound") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + self.current_upper = test_value + neg_retry = total_retry + elif result.success: + if (pos_retry < ok_retry) and (ok_retry is not 0): + neg_retry = 0 + LOG.info("Success! ... confirm retry") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + else: + LOG.info("Success! Increasing lower bound") + self.current_lower = test_value + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + # store results with success tag in influxdb + success_samples = \ + {'Success_' + key: value for key, value in samples.items()} + + success_samples["Success_rx_total"] = int(result.rx_total) + success_samples["Success_tx_total"] = int(result.tx_total) + success_samples["Success_can_be_lost"] = int(result.can_be_lost) + success_samples["Success_drop_total"] = int(result.drop_total) + success_samples["Success_RxThroughput"] = samples["RxThroughput"] + LOG.info(">>>##>>Collect SUCCESS TG KPIs %s %s", + datetime.datetime.now(), success_samples) + self.queue.put(success_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + + # Store Actual throughput for result samples + actual_max_thruput = success_samples["Success_RxThroughput"] + + pos_retry = pos_retry + 1 + + else: + if (neg_retry < ok_retry) and (ok_retry is not 0): + + pos_retry = 0 + LOG.info("failure! ... confirm retry") + else: + LOG.info("Failure... Decreasing upper bound") + self.current_upper = test_value + + neg_retry = neg_retry + 1 + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + if theor_max_thruput < samples["TxThroughput"]: + theor_max_thruput = samples['TxThroughput'] + self.queue.put({'theor_max_throughput': theor_max_thruput}) + + LOG.info(">>>##>>Collect TG KPIs %s %s", datetime.datetime.now(), samples) + self.queue.put(samples, True, overall_constants.QUEUE_PUT_TIMEOUT) result_samples["Result_pktSize"] = pkt_size - result_samples["Result_theor_max_throughput"] = theor_max_thruput/ (1000 * 1000) + result_samples["Result_theor_max_throughput"] = theor_max_thruput + result_samples["Result_Actual_throughput"] = actual_max_thruput self.queue.put(result_samples) diff --git a/yardstick/tests/unit/benchmark/core/test_task.py b/yardstick/tests/unit/benchmark/core/test_task.py index 7468368df..0424c77a3 100644 --- a/yardstick/tests/unit/benchmark/core/test_task.py +++ b/yardstick/tests/unit/benchmark/core/test_task.py @@ -156,6 +156,31 @@ class TaskTestCase(unittest.TestCase): t._run([scenario], False, "yardstick.out") runner.run.assert_called_once() + @mock.patch.object(task, 'Context') + @mock.patch.object(task, 'base_runner') + def test_run_ProxDuration(self, mock_base_runner, *args): + scenario = { + 'host': 'athena.demo', + 'target': 'ares.demo', + 'runner': { + 'duration': 60, + 'interval': 1, + 'sampled': 'yes', + 'confirmation': 1, + 'type': 'ProxDuration' + }, + 'type': 'Ping' + } + + t = task.Task() + runner = mock.Mock() + runner.join.return_value = 0 + runner.get_output.return_value = {} + runner.get_result.return_value = [] + mock_base_runner.Runner.get.return_value = runner + t._run([scenario], False, "yardstick.out") + runner.run.assert_called_once() + @mock.patch.object(os, 'environ') def test_check_precondition(self, mock_os_environ): cfg = { diff --git a/yardstick/tests/unit/benchmark/runner/test_proxduration.py b/yardstick/tests/unit/benchmark/runner/test_proxduration.py new file mode 100644 index 000000000..be1715aad --- /dev/null +++ b/yardstick/tests/unit/benchmark/runner/test_proxduration.py @@ -0,0 +1,295 @@ +############################################################################## +# Copyright (c) 2018 Nokia and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import mock +import unittest +import multiprocessing +import os +import time + +from yardstick.benchmark.runners import proxduration +from yardstick.common import exceptions as y_exc + + +class ProxDurationRunnerTest(unittest.TestCase): + class MyMethod(object): + SLA_VALIDATION_ERROR_SIDE_EFFECT = 1 + BROAD_EXCEPTION_SIDE_EFFECT = 2 + + def __init__(self, side_effect=0): + self.count = 101 + self.side_effect = side_effect + + def __call__(self, data): + self.count += 1 + data['my_key'] = self.count + if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT: + raise y_exc.SLAValidationError(case_name='My Case', + error_msg='my error message') + elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT: + raise y_exc.YardstickException + return self.count + + def setUp(self): + self.scenario_cfg = { + 'runner': {'interval': 0, "duration": 0}, + 'type': 'some_type' + } + + self.benchmark = mock.Mock() + self.benchmark_cls = mock.Mock(return_value=self.benchmark) + + def _assert_defaults__worker_run_setup_and_teardown(self): + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.teardown.assert_called_once() + + @mock.patch.object(os, 'getpid') + @mock.patch.object(multiprocessing, 'Process') + def test__run_benchmark_called_with(self, mock_multiprocessing_process, + mock_os_getpid): + mock_os_getpid.return_value = 101 + + runner = proxduration.ProxDurationRunner({}) + benchmark_cls = mock.Mock() + runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg, + {}) + mock_multiprocessing_process.assert_called_once_with( + name='ProxDuration-some_type-101', + target=proxduration._worker_process, + args=(runner.result_queue, benchmark_cls, 'my_method', + self.scenario_cfg, {}, runner.aborted, runner.output_queue)) + + @mock.patch.object(os, 'getpid') + def test__worker_process_runner_id(self, mock_os_getpid): + mock_os_getpid.return_value = 101 + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101) + + def test__worker_process_called_with_cfg(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_called_with_cfg_loop(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.call_count, 2) + + def test__worker_process_called_without_cfg(self): + scenario_cfg = {'runner': {}} + + aborted = multiprocessing.Event() + aborted.set() + + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + scenario_cfg, {}, aborted, mock.Mock()) + + self.benchmark_cls.assert_called_once_with(scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.teardown.assert_called_once() + + def test__worker_process_output_queue(self): + self.benchmark.my_method = mock.Mock(return_value='my_result') + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + output_queue = multiprocessing.Queue() + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), output_queue) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertEquals(output_queue.get(), 'my_result') + + def test__worker_process_output_queue_multiple_iterations(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod() + + output_queue = multiprocessing.Queue() + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), output_queue) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.count, 103) + + count = 101 + while not output_queue.empty(): + count += 1 + self.assertEquals(output_queue.get(), count) + + def test__worker_process_queue(self): + self.benchmark.my_method = self.MyMethod() + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_queue_multiple_iterations(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod() + + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.count, 103) + + count = 0 + while not queue.empty(): + count += 1 + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': count + 101}) + self.assertEqual(result['sequence'], count) + + def test__worker_process_except_sla_validation_error_no_sla_cfg(self): + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self): + self.scenario_cfg['sla'] = {'action': 'monitor'} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_raise_sla_validation_error_sla_cfg_default(self): + self.scenario_cfg['sla'] = {} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + with self.assertRaises(y_exc.SLAValidationError): + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.my_method.assert_called_once_with({}) + + def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg['sla'] = {'action': 'assert'} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + with self.assertRaises(y_exc.SLAValidationError): + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.my_method.assert_called_once_with({}) + + def test__worker_process_queue_on_sla_validation_error_monitor(self): + self.scenario_cfg['sla'] = {'action': 'monitor'} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod( + side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT) + + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], ('My Case SLA validation failed. ' + 'Error: my error message',)) + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_broad_exception(self): + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.YardstickException) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_queue_on_broad_exception(self): + self.benchmark.my_method = self.MyMethod( + side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT) + + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertNotEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_benchmark_teardown_on_broad_exception(self): + self.benchmark.teardown = mock.Mock( + side_effect=y_exc.YardstickException) + + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + + with self.assertRaises(SystemExit) as raised: + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + self.assertEqual(raised.exception.code, 1) + self._assert_defaults__worker_run_setup_and_teardown() diff --git a/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py b/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py index da550ade9..988ebe3de 100644 --- a/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py +++ b/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py @@ -51,6 +51,11 @@ class TestProxBinSearchProfile(unittest.TestCase): fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value' : 10} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value' : 10, 'get.return_value': 10} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) profile_helper = mock.MagicMock() profile_helper.run_test = target @@ -60,9 +65,10 @@ class TestProxBinSearchProfile(unittest.TestCase): profile._profile_helper = profile_helper profile.execute_traffic(traffic_generator) + self.assertEqual(round(profile.current_lower, 2), 74.69) self.assertEqual(round(profile.current_upper, 2), 76.09) - self.assertEqual(len(runs), 7) + self.assertEqual(len(runs), 77) # Result Samples inc theor_max result_tuple = {"Result_Actual_throughput": 7.5e-07, @@ -122,6 +128,11 @@ class TestProxBinSearchProfile(unittest.TestCase): fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value': 10} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value': 0, 'get.return_value': 0} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) profile_helper = mock.MagicMock() profile_helper.run_test = target @@ -171,7 +182,8 @@ class TestProxBinSearchProfile(unittest.TestCase): # Result Samples - result_tuple = {"Result_theor_max_throughput": 0, "Result_pktSize": 200} + result_tuple = {'Result_Actual_throughput': 0, "Result_theor_max_throughput": 0, + "Result_pktSize": 200} profile.queue.put.assert_called_with(result_tuple) # Check for success_ tuple (None expected) @@ -181,3 +193,81 @@ class TestProxBinSearchProfile(unittest.TestCase): for k in call_detail: if "Success_" in k: self.assertRaises(AttributeError) + + def test_execute_4(self): + + def target(*args, **_): + runs.append(args[2]) + if args[2] < 0 or args[2] > 100: + raise RuntimeError(' '.join([str(args), str(runs)])) + if args[2] > 75.0: + return fail_tuple, {} + return success_tuple, {} + + tp_config = { + 'traffic_profile': { + 'packet_sizes': [200], + 'test_precision': 2.0, + 'tolerated_loss': 0.001, + }, + } + + runs = [] + success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) + fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) + + traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value': 100000} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value': 0, 'get.return_value': 0} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) + + profile_helper = mock.MagicMock() + profile_helper.run_test = target + + profile = ProxBinSearchProfile(tp_config) + profile.init(mock.MagicMock()) + profile._profile_helper = profile_helper + + profile.execute_traffic(traffic_generator) + self.assertEqual(round(profile.current_lower, 2), 74.69) + self.assertEqual(round(profile.current_upper, 2), 76.09) + self.assertEqual(len(runs), 7) + + # Result Samples inc theor_max + result_tuple = {'Result_Actual_throughput': 7.5e-07, + 'Result_theor_max_throughput': 0.00012340000000000002, + 'Result_pktSize': 200} + + profile.queue.put.assert_called_with(result_tuple) + + success_result_tuple = {"Success_CurrentDropPackets": 0.5, + "Success_DropPackets": 0.5, + "Success_LatencyAvg": 5.3, + "Success_LatencyMax": 5.2, + "Success_LatencyMin": 5.1, + "Success_PktSize": 200, + "Success_RxThroughput": 7.5e-07, + "Success_Throughput": 7.5e-07, + "Success_TxThroughput": 0.00012340000000000002} + + calls = profile.queue.put(success_result_tuple) + profile.queue.put.assert_has_calls(calls) + + success_result_tuple2 = {"Success_CurrentDropPackets": 0.5, + "Success_DropPackets": 0.5, + "Success_LatencyAvg": 5.3, + "Success_LatencyMax": 5.2, + "Success_LatencyMin": 5.1, + "Success_PktSize": 200, + "Success_RxThroughput": 7.5e-07, + "Success_Throughput": 7.5e-07, + "Success_TxThroughput": 123.4, + "Success_can_be_lost": 409600, + "Success_drop_total": 20480, + "Success_rx_total": 4075520, + "Success_tx_total": 4096000} + + calls = profile.queue.put(success_result_tuple2) + profile.queue.put.assert_has_calls(calls) -- cgit 1.2.3-korg