diff options
Diffstat (limited to 'yardstick')
10 files changed, 794 insertions, 89 deletions
diff --git a/yardstick/benchmark/runners/proxduration.py b/yardstick/benchmark/runners/proxduration.py new file mode 100644 index 000000000..61a468fd3 --- /dev/null +++ b/yardstick/benchmark/runners/proxduration.py @@ -0,0 +1,165 @@ +# Copyright 2014: Mirantis Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# yardstick comment: this is a modified copy of +# rally/rally/benchmark/runners/constant.py + +"""A runner that runs a specific time before it returns +""" + +from __future__ import absolute_import + +import os +import multiprocessing +import logging +import traceback +import time + +from yardstick.benchmark.runners import base +from yardstick.common import exceptions as y_exc +from yardstick.common import constants + +LOG = logging.getLogger(__name__) + +def _worker_process(queue, cls, method_name, scenario_cfg, + context_cfg, aborted, output_queue): + + sequence = 1 + + runner_cfg = scenario_cfg['runner'] + + requested_interval = interval = runner_cfg.get("interval", 1) + duration = runner_cfg.get("duration", 60) + sampled = runner_cfg.get("sampled", False) + + LOG.info("Worker START, duration is %ds", duration) + LOG.debug("class is %s", cls) + + runner_cfg['runner_id'] = os.getpid() + + benchmark = cls(scenario_cfg, context_cfg) + benchmark.setup() + method = getattr(benchmark, method_name) + + sla_action = None + if "sla" in scenario_cfg: + sla_action = scenario_cfg["sla"].get("action", "assert") + + + start = time.time() + timeout = start + duration + while True: + + LOG.debug("runner=%(runner)s seq=%(sequence)s START", + {"runner": runner_cfg["runner_id"], "sequence": sequence}) + + data = {} + errors = "" + + benchmark.pre_run_wait_time(interval) + + if sampled: + try: + pre_adjustment = time.time() + result = method(data) + post_adjustment = time.time() + if requested_interval > post_adjustment - pre_adjustment: + interval = requested_interval - (post_adjustment - pre_adjustment) + else: + interval = 0 + + except y_exc.SLAValidationError as error: + # SLA validation failed in scenario, determine what to do now + if sla_action == "assert": + raise + elif sla_action == "monitor": + LOG.warning("SLA validation failed: %s", error.args) + errors = error.args + # catch all exceptions because with multiprocessing we can have un-picklable exception + # problems https://bugs.python.org/issue9400 + except Exception: # pylint: disable=broad-except + errors = traceback.format_exc() + LOG.exception("") + else: + if result: + # add timeout for put so we don't block test + # if we do timeout we don't care about dropping individual KPIs + output_queue.put(result, True, constants.QUEUE_PUT_TIMEOUT) + + benchmark_output = { + 'timestamp': time.time(), + 'sequence': sequence, + 'data': data, + 'errors': errors + } + + queue.put(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT) + else: + LOG.debug("No sample collected ...Sequence %s", sequence) + + + sequence += 1 + + if (errors and sla_action is None) or time.time() > timeout or aborted.is_set(): + LOG.info("Worker END") + break + + try: + benchmark.teardown() + except Exception: + # catch any exception in teardown and convert to simple exception + # never pass exceptions back to multiprocessing, because some exceptions can + # be unpicklable + # https://bugs.python.org/issue9400 + LOG.exception("") + raise SystemExit(1) + + LOG.debug("queue.qsize() = %s", queue.qsize()) + LOG.debug("output_queue.qsize() = %s", output_queue.qsize()) + LOG.info("Exiting ProxDuration Runner...") + +class ProxDurationRunner(base.Runner): + """Run a scenario for a certain amount of time + +If the scenario ends before the time has elapsed, it will be started again. + + Parameters + duration - amount of time the scenario will be run for + type: int + unit: seconds + default: 60 sec + interval - time to wait between each scenario invocation + type: int + unit: seconds + default: 1 sec + sampled - Sample data is required yes/no + type: boolean + unit: True/False + default: False + confirmation - Number of confirmation retries + type: int + unit: retry attempts + default: 0 + """ + __execution_type__ = 'ProxDuration' + + def _run_benchmark(self, cls, method, scenario_cfg, context_cfg): + name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid()) + self.process = multiprocessing.Process( + name=name, + target=_worker_process, + args=(self.result_queue, cls, method, scenario_cfg, + context_cfg, self.aborted, self.output_queue)) + self.process.start() diff --git a/yardstick/common/constants.py b/yardstick/common/constants.py index f6e4ab7e9..1ebd32509 100644 --- a/yardstick/common/constants.py +++ b/yardstick/common/constants.py @@ -119,6 +119,7 @@ INFLUXDB_DB_NAME = get_param('influxdb.db_name', 'yardstick') INFLUXDB_IMAGE = get_param('influxdb.image', 'tutum/influxdb') INFLUXDB_TAG = get_param('influxdb.tag', '0.13') INFLUXDB_DASHBOARD_PORT = 8083 +QUEUE_PUT_TIMEOUT = 10 # grafana GRAFANA_IP = get_param('grafana.ip', SERVER_IP) diff --git a/yardstick/network_services/traffic_profile/prox_binsearch.py b/yardstick/network_services/traffic_profile/prox_binsearch.py index c3277fb12..23d71936b 100644 --- a/yardstick/network_services/traffic_profile/prox_binsearch.py +++ b/yardstick/network_services/traffic_profile/prox_binsearch.py @@ -21,6 +21,7 @@ import time from yardstick.network_services.traffic_profile.prox_profile import ProxProfile from yardstick.network_services import constants +from yardstick.common import constants as overall_constants LOG = logging.getLogger(__name__) @@ -84,9 +85,14 @@ class ProxBinSearchProfile(ProxProfile): # success, the binary search will complete on an integer multiple # of the precision, rather than on a fraction of it. - theor_max_thruput = 0 + theor_max_thruput = actual_max_thruput = 0 result_samples = {} + rate_samples = {} + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + ok_retry = 0 # Store one time only value in influxdb single_samples = { @@ -102,52 +108,89 @@ class ProxBinSearchProfile(ProxProfile): successful_pkt_loss = 0.0 line_speed = traffic_gen.scenario_helper.all_options.get( "interface_speed_gbps", constants.NIC_GBPS_DEFAULT) * constants.ONE_GIGABIT_IN_BITS - for test_value in self.bounds_iterator(LOG): - result, port_samples = self._profile_helper.run_test(pkt_size, duration, - test_value, - self.tolerated_loss, - line_speed) - self.curr_time = time.time() - diff_time = self.curr_time - self.prev_time - self.prev_time = self.curr_time - - if result.success: - LOG.debug("Success! Increasing lower bound") - self.current_lower = test_value - successful_pkt_loss = result.pkt_loss - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - samples["TxThroughput"] = samples["TxThroughput"] * 1000 * 1000 - - # store results with success tag in influxdb - success_samples = {'Success_' + key: value for key, value in samples.items()} - - success_samples["Success_rx_total"] = int(result.rx_total / diff_time) - success_samples["Success_tx_total"] = int(result.tx_total / diff_time) - success_samples["Success_can_be_lost"] = int(result.can_be_lost / diff_time) - success_samples["Success_drop_total"] = int(result.drop_total / diff_time) - self.queue.put(success_samples) - - # Store Actual throughput for result samples - result_samples["Result_Actual_throughput"] = \ - success_samples["Success_RxThroughput"] - else: - LOG.debug("Failure... Decreasing upper bound") - self.current_upper = test_value - samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) - - for k in samples: - tmp = samples[k] - if isinstance(tmp, dict): - for k2 in tmp: - samples[k][k2] = int(samples[k][k2] / diff_time) - - if theor_max_thruput < samples["TxThroughput"]: - theor_max_thruput = samples['TxThroughput'] - self.queue.put({'theor_max_throughput': theor_max_thruput}) - - LOG.debug("Collect TG KPIs %s %s", datetime.datetime.now(), samples) - self.queue.put(samples) + ok_retry = traffic_gen.scenario_helper.scenario_cfg["runner"].get("confirmation", 0) + for test_value in self.bounds_iterator(LOG): + pos_retry = 0 + neg_retry = 0 + total_retry = 0 + + rate_samples["MAX_Rate"] = self.current_upper + rate_samples["MIN_Rate"] = self.current_lower + rate_samples["Test_Rate"] = test_value + self.queue.put(rate_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + LOG.info("Checking MAX %s MIN %s TEST %s", + self.current_upper, self.lower_bound, test_value) + while (pos_retry <= ok_retry) and (neg_retry <= ok_retry): + + total_retry = total_retry + 1 + result, port_samples = self._profile_helper.run_test(pkt_size, duration, + test_value, + self.tolerated_loss, + line_speed) + if (total_retry > (ok_retry * 3)) and (ok_retry is not 0): + LOG.info("Failure.!! .. RETRY EXCEEDED ... decrease lower bound") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + self.current_upper = test_value + neg_retry = total_retry + elif result.success: + if (pos_retry < ok_retry) and (ok_retry is not 0): + neg_retry = 0 + LOG.info("Success! ... confirm retry") + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + else: + LOG.info("Success! Increasing lower bound") + self.current_lower = test_value + + successful_pkt_loss = result.pkt_loss + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + # store results with success tag in influxdb + success_samples = \ + {'Success_' + key: value for key, value in samples.items()} + + success_samples["Success_rx_total"] = int(result.rx_total) + success_samples["Success_tx_total"] = int(result.tx_total) + success_samples["Success_can_be_lost"] = int(result.can_be_lost) + success_samples["Success_drop_total"] = int(result.drop_total) + success_samples["Success_RxThroughput"] = samples["RxThroughput"] + LOG.info(">>>##>>Collect SUCCESS TG KPIs %s %s", + datetime.datetime.now(), success_samples) + self.queue.put(success_samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + + # Store Actual throughput for result samples + actual_max_thruput = success_samples["Success_RxThroughput"] + + pos_retry = pos_retry + 1 + + else: + if (neg_retry < ok_retry) and (ok_retry is not 0): + + pos_retry = 0 + LOG.info("failure! ... confirm retry") + else: + LOG.info("Failure... Decreasing upper bound") + self.current_upper = test_value + + neg_retry = neg_retry + 1 + samples = result.get_samples(pkt_size, successful_pkt_loss, port_samples) + + if theor_max_thruput < samples["TxThroughput"]: + theor_max_thruput = samples['TxThroughput'] + self.queue.put({'theor_max_throughput': theor_max_thruput}) + + LOG.info(">>>##>>Collect TG KPIs %s %s", datetime.datetime.now(), samples) + self.queue.put(samples, True, overall_constants.QUEUE_PUT_TIMEOUT) + + LOG.info(">>>##>> Result Reached PktSize %s Theor_Max_Thruput %s Actual_throughput %s", + pkt_size, theor_max_thruput, actual_max_thruput) result_samples["Result_pktSize"] = pkt_size - result_samples["Result_theor_max_throughput"] = theor_max_thruput/ (1000 * 1000) + result_samples["Result_theor_max_throughput"] = theor_max_thruput + result_samples["Result_Actual_throughput"] = actual_max_thruput self.queue.put(result_samples) diff --git a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py index 31ed30140..12ec1c8c5 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_helpers.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_helpers.py @@ -315,7 +315,7 @@ class ProxSocketHelper(object): return ret_str - def get_data(self, pkt_dump_only=False, timeout=1): + def get_data(self, pkt_dump_only=False, timeout=0.01): """ read data from the socket """ # This method behaves slightly differently depending on whether it is @@ -520,6 +520,51 @@ class ProxSocketHelper(object): tsc = int(ret[3]) return rx, tx, drop, tsc + def multi_port_stats(self, ports): + """get counter values from all ports port""" + + ports_str = "" + for port in ports: + ports_str = ports_str + str(port) + "," + ports_str = ports_str[:-1] + + ports_all_data = [] + tot_result = [0] * len(ports) + + retry_counter = 0 + port_index = 0 + while (len(ports) is not len(ports_all_data)) and (retry_counter < 10): + self.put_command("multi port stats {}\n".format(ports_str)) + ports_all_data = self.get_data().split(";") + + if len(ports) is len(ports_all_data): + for port_data_str in ports_all_data: + + try: + tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")] + except (IndexError, TypeError): + LOG.error("Port Index error %d %s - retrying ", port_index, port_data_str) + + if (len(tot_result[port_index]) is not 6) or \ + tot_result[port_index][0] is not ports[port_index]: + ports_all_data = [] + tot_result = [0] * len(ports) + port_index = 0 + time.sleep(0.1) + LOG.error("Corrupted PACKET %s - retrying", port_data_str) + break + else: + port_index = port_index + 1 + else: + LOG.error("Empty / too much data - retry -%s-", ports_all_data) + ports_all_data = [] + tot_result = [0] * len(ports) + port_index = 0 + time.sleep(0.1) + + retry_counter = retry_counter + 1 + return tot_result + def port_stats(self, ports): """get counter values from a specific port""" tot_result = [0] * 12 @@ -1000,9 +1045,13 @@ class ProxDataHelper(object): @property def totals_and_pps(self): if self._totals_and_pps is None: - rx_total, tx_total = self.sut.port_stats(range(self.port_count))[6:8] - pps = self.value / 100.0 * self.line_rate_to_pps() - self._totals_and_pps = rx_total, tx_total, pps + rx_total = tx_total = 0 + all_ports = self.sut.multi_port_stats(range(self.port_count)) + for port in all_ports: + rx_total = rx_total + port[1] + tx_total = tx_total + port[2] + requested_pps = self.value / 100.0 * self.line_rate_to_pps() + self._totals_and_pps = rx_total, tx_total, requested_pps return self._totals_and_pps @property @@ -1020,19 +1069,18 @@ class ProxDataHelper(object): @property def samples(self): samples = {} + ports = [] + port_names = [] for port_name, port_num in self.vnfd_helper.ports_iter(): - try: - port_rx_total, port_tx_total = self.sut.port_stats([port_num])[6:8] - samples[port_name] = { - "in_packets": port_rx_total, - "out_packets": port_tx_total, - } - except (KeyError, TypeError, NameError, MemoryError, ValueError, - SystemError, BufferError): - samples[port_name] = { - "in_packets": 0, - "out_packets": 0, - } + ports.append(port_num) + port_names.append(port_name) + + results = self.sut.multi_port_stats(ports) + for result in results: + port_num = result[0] + samples[port_names[port_num]] = { + "in_packets": result[1], + "out_packets": result[2]} return samples def __enter__(self): diff --git a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py index 285e08659..cb97f7711 100644 --- a/yardstick/network_services/vnf_generic/vnf/prox_vnf.py +++ b/yardstick/network_services/vnf_generic/vnf/prox_vnf.py @@ -85,14 +85,15 @@ class ProxApproxVnf(SampleVNF): raise RuntimeError("Failed ..Invalid no of ports .. " "1, 2 or 4 ports only supported at this time") - self.port_stats = self.vnf_execute('port_stats', range(port_count)) + all_port_stats = self.vnf_execute('multi_port_stats', range(port_count)) curr_time = time.time() + rx_total = tx_total = 0 try: - rx_total = self.port_stats[6] - tx_total = self.port_stats[7] - except IndexError: - LOG.debug("port_stats parse fail ") - # return empty dict so we don't mess up existing KPIs + for single_port_stats in all_port_stats: + rx_total = rx_total + single_port_stats[1] + tx_total = tx_total + single_port_stats[2] + except (TypeError, IndexError): + LOG.error("Invalid data ...") return {} result = { @@ -103,8 +104,19 @@ class ProxApproxVnf(SampleVNF): # collectd KPIs here and not TG KPIs, so use a different method name "collect_stats": self.resource_helper.collect_collectd_kpi(), } - curr_packets_in = int((rx_total - self.prev_packets_in) / (curr_time - self.prev_time)) - curr_packets_fwd = int((tx_total - self.prev_packets_sent) / (curr_time - self.prev_time)) + try: + curr_packets_in = int((rx_total - self.prev_packets_in) + / (curr_time - self.prev_time)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_in = 0 + + try: + curr_packets_fwd = int((tx_total - self.prev_packets_sent) + / (curr_time - self.prev_time)) + except ZeroDivisionError: + LOG.error("Error.... Divide by Zero") + curr_packets_fwd = 0 result["curr_packets_in"] = curr_packets_in result["curr_packets_fwd"] = curr_packets_fwd diff --git a/yardstick/tests/unit/benchmark/core/test_task.py b/yardstick/tests/unit/benchmark/core/test_task.py index 7468368df..0424c77a3 100644 --- a/yardstick/tests/unit/benchmark/core/test_task.py +++ b/yardstick/tests/unit/benchmark/core/test_task.py @@ -156,6 +156,31 @@ class TaskTestCase(unittest.TestCase): t._run([scenario], False, "yardstick.out") runner.run.assert_called_once() + @mock.patch.object(task, 'Context') + @mock.patch.object(task, 'base_runner') + def test_run_ProxDuration(self, mock_base_runner, *args): + scenario = { + 'host': 'athena.demo', + 'target': 'ares.demo', + 'runner': { + 'duration': 60, + 'interval': 1, + 'sampled': 'yes', + 'confirmation': 1, + 'type': 'ProxDuration' + }, + 'type': 'Ping' + } + + t = task.Task() + runner = mock.Mock() + runner.join.return_value = 0 + runner.get_output.return_value = {} + runner.get_result.return_value = [] + mock_base_runner.Runner.get.return_value = runner + t._run([scenario], False, "yardstick.out") + runner.run.assert_called_once() + @mock.patch.object(os, 'environ') def test_check_precondition(self, mock_os_environ): cfg = { diff --git a/yardstick/tests/unit/benchmark/runner/test_proxduration.py b/yardstick/tests/unit/benchmark/runner/test_proxduration.py new file mode 100644 index 000000000..be1715aad --- /dev/null +++ b/yardstick/tests/unit/benchmark/runner/test_proxduration.py @@ -0,0 +1,295 @@ +############################################################################## +# Copyright (c) 2018 Nokia and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## + +import mock +import unittest +import multiprocessing +import os +import time + +from yardstick.benchmark.runners import proxduration +from yardstick.common import exceptions as y_exc + + +class ProxDurationRunnerTest(unittest.TestCase): + class MyMethod(object): + SLA_VALIDATION_ERROR_SIDE_EFFECT = 1 + BROAD_EXCEPTION_SIDE_EFFECT = 2 + + def __init__(self, side_effect=0): + self.count = 101 + self.side_effect = side_effect + + def __call__(self, data): + self.count += 1 + data['my_key'] = self.count + if self.side_effect == self.SLA_VALIDATION_ERROR_SIDE_EFFECT: + raise y_exc.SLAValidationError(case_name='My Case', + error_msg='my error message') + elif self.side_effect == self.BROAD_EXCEPTION_SIDE_EFFECT: + raise y_exc.YardstickException + return self.count + + def setUp(self): + self.scenario_cfg = { + 'runner': {'interval': 0, "duration": 0}, + 'type': 'some_type' + } + + self.benchmark = mock.Mock() + self.benchmark_cls = mock.Mock(return_value=self.benchmark) + + def _assert_defaults__worker_run_setup_and_teardown(self): + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.teardown.assert_called_once() + + @mock.patch.object(os, 'getpid') + @mock.patch.object(multiprocessing, 'Process') + def test__run_benchmark_called_with(self, mock_multiprocessing_process, + mock_os_getpid): + mock_os_getpid.return_value = 101 + + runner = proxduration.ProxDurationRunner({}) + benchmark_cls = mock.Mock() + runner._run_benchmark(benchmark_cls, 'my_method', self.scenario_cfg, + {}) + mock_multiprocessing_process.assert_called_once_with( + name='ProxDuration-some_type-101', + target=proxduration._worker_process, + args=(runner.result_queue, benchmark_cls, 'my_method', + self.scenario_cfg, {}, runner.aborted, runner.output_queue)) + + @mock.patch.object(os, 'getpid') + def test__worker_process_runner_id(self, mock_os_getpid): + mock_os_getpid.return_value = 101 + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101) + + def test__worker_process_called_with_cfg(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_called_with_cfg_loop(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.call_count, 2) + + def test__worker_process_called_without_cfg(self): + scenario_cfg = {'runner': {}} + + aborted = multiprocessing.Event() + aborted.set() + + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + scenario_cfg, {}, aborted, mock.Mock()) + + self.benchmark_cls.assert_called_once_with(scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.teardown.assert_called_once() + + def test__worker_process_output_queue(self): + self.benchmark.my_method = mock.Mock(return_value='my_result') + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + output_queue = multiprocessing.Queue() + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), output_queue) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertEquals(output_queue.get(), 'my_result') + + def test__worker_process_output_queue_multiple_iterations(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod() + + output_queue = multiprocessing.Queue() + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), output_queue) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.count, 103) + + count = 101 + while not output_queue.empty(): + count += 1 + self.assertEquals(output_queue.get(), count) + + def test__worker_process_queue(self): + self.benchmark.my_method = self.MyMethod() + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_queue_multiple_iterations(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod() + + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + self.assertGreater(self.benchmark.my_method.count, 103) + + count = 0 + while not queue.empty(): + count += 1 + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': count + 101}) + self.assertEqual(result['sequence'], count) + + def test__worker_process_except_sla_validation_error_no_sla_cfg(self): + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self): + self.scenario_cfg['sla'] = {'action': 'monitor'} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_raise_sla_validation_error_sla_cfg_default(self): + self.scenario_cfg['sla'] = {} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + with self.assertRaises(y_exc.SLAValidationError): + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.my_method.assert_called_once_with({}) + + def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self): + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.scenario_cfg['sla'] = {'action': 'assert'} + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.SLAValidationError) + + with self.assertRaises(y_exc.SLAValidationError): + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {}) + self.benchmark.setup.assert_called_once() + self.benchmark.my_method.assert_called_once_with({}) + + def test__worker_process_queue_on_sla_validation_error_monitor(self): + self.scenario_cfg['sla'] = {'action': 'monitor'} + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + self.benchmark.my_method = self.MyMethod( + side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT) + + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertEqual(result['errors'], ('My Case SLA validation failed. ' + 'Error: my error message',)) + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_broad_exception(self): + self.benchmark.my_method = mock.Mock( + side_effect=y_exc.YardstickException) + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + + self._assert_defaults__worker_run_setup_and_teardown() + + def test__worker_process_queue_on_broad_exception(self): + self.benchmark.my_method = self.MyMethod( + side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT) + + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + queue = multiprocessing.Queue() + timestamp = time.time() + proxduration._worker_process(queue, self.benchmark_cls, 'my_method', + self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + time.sleep(0.1) + + self._assert_defaults__worker_run_setup_and_teardown() + + result = queue.get() + self.assertGreater(result['timestamp'], timestamp) + self.assertNotEqual(result['errors'], '') + self.assertEqual(result['data'], {'my_key': 102}) + self.assertEqual(result['sequence'], 1) + + def test__worker_process_benchmark_teardown_on_broad_exception(self): + self.benchmark.teardown = mock.Mock( + side_effect=y_exc.YardstickException) + + self.scenario_cfg["runner"] = {"sampled": True, "duration": 1} + + with self.assertRaises(SystemExit) as raised: + proxduration._worker_process(mock.Mock(), self.benchmark_cls, + 'my_method', self.scenario_cfg, {}, + multiprocessing.Event(), mock.Mock()) + self.assertEqual(raised.exception.code, 1) + self._assert_defaults__worker_run_setup_and_teardown() diff --git a/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py b/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py index da550ade9..988ebe3de 100644 --- a/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py +++ b/yardstick/tests/unit/network_services/traffic_profile/test_prox_binsearch.py @@ -51,6 +51,11 @@ class TestProxBinSearchProfile(unittest.TestCase): fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value' : 10} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value' : 10, 'get.return_value': 10} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) profile_helper = mock.MagicMock() profile_helper.run_test = target @@ -60,9 +65,10 @@ class TestProxBinSearchProfile(unittest.TestCase): profile._profile_helper = profile_helper profile.execute_traffic(traffic_generator) + self.assertEqual(round(profile.current_lower, 2), 74.69) self.assertEqual(round(profile.current_upper, 2), 76.09) - self.assertEqual(len(runs), 7) + self.assertEqual(len(runs), 77) # Result Samples inc theor_max result_tuple = {"Result_Actual_throughput": 7.5e-07, @@ -122,6 +128,11 @@ class TestProxBinSearchProfile(unittest.TestCase): fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value': 10} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value': 0, 'get.return_value': 0} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) profile_helper = mock.MagicMock() profile_helper.run_test = target @@ -171,7 +182,8 @@ class TestProxBinSearchProfile(unittest.TestCase): # Result Samples - result_tuple = {"Result_theor_max_throughput": 0, "Result_pktSize": 200} + result_tuple = {'Result_Actual_throughput': 0, "Result_theor_max_throughput": 0, + "Result_pktSize": 200} profile.queue.put.assert_called_with(result_tuple) # Check for success_ tuple (None expected) @@ -181,3 +193,81 @@ class TestProxBinSearchProfile(unittest.TestCase): for k in call_detail: if "Success_" in k: self.assertRaises(AttributeError) + + def test_execute_4(self): + + def target(*args, **_): + runs.append(args[2]) + if args[2] < 0 or args[2] > 100: + raise RuntimeError(' '.join([str(args), str(runs)])) + if args[2] > 75.0: + return fail_tuple, {} + return success_tuple, {} + + tp_config = { + 'traffic_profile': { + 'packet_sizes': [200], + 'test_precision': 2.0, + 'tolerated_loss': 0.001, + }, + } + + runs = [] + success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) + fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) + + traffic_generator = mock.MagicMock() + attrs1 = {'get.return_value': 100000} + traffic_generator.scenario_helper.all_options.configure_mock(**attrs1) + + attrs2 = {'__getitem__.return_value': 0, 'get.return_value': 0} + traffic_generator.scenario_helper.scenario_cfg["runner"].configure_mock(**attrs2) + + profile_helper = mock.MagicMock() + profile_helper.run_test = target + + profile = ProxBinSearchProfile(tp_config) + profile.init(mock.MagicMock()) + profile._profile_helper = profile_helper + + profile.execute_traffic(traffic_generator) + self.assertEqual(round(profile.current_lower, 2), 74.69) + self.assertEqual(round(profile.current_upper, 2), 76.09) + self.assertEqual(len(runs), 7) + + # Result Samples inc theor_max + result_tuple = {'Result_Actual_throughput': 7.5e-07, + 'Result_theor_max_throughput': 0.00012340000000000002, + 'Result_pktSize': 200} + + profile.queue.put.assert_called_with(result_tuple) + + success_result_tuple = {"Success_CurrentDropPackets": 0.5, + "Success_DropPackets": 0.5, + "Success_LatencyAvg": 5.3, + "Success_LatencyMax": 5.2, + "Success_LatencyMin": 5.1, + "Success_PktSize": 200, + "Success_RxThroughput": 7.5e-07, + "Success_Throughput": 7.5e-07, + "Success_TxThroughput": 0.00012340000000000002} + + calls = profile.queue.put(success_result_tuple) + profile.queue.put.assert_has_calls(calls) + + success_result_tuple2 = {"Success_CurrentDropPackets": 0.5, + "Success_DropPackets": 0.5, + "Success_LatencyAvg": 5.3, + "Success_LatencyMax": 5.2, + "Success_LatencyMin": 5.1, + "Success_PktSize": 200, + "Success_RxThroughput": 7.5e-07, + "Success_Throughput": 7.5e-07, + "Success_TxThroughput": 123.4, + "Success_can_be_lost": 409600, + "Success_drop_total": 20480, + "Success_rx_total": 4075520, + "Success_tx_total": 4096000} + + calls = profile.queue.put(success_result_tuple2) + profile.queue.put.assert_has_calls(calls) diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py index 0cb11933f..b1853f12f 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py @@ -556,6 +556,31 @@ class TestProxSocketHelper(unittest.TestCase): result = prox.core_stats([3, 4, 5], 16) self.assertEqual(result, expected) + def test_multi_port_stats(self): + + mock_socket = mock.MagicMock() + prox = ProxSocketHelper(mock_socket) + prox.get_data = mock.MagicMock(return_value='0,1,2,3,4,5;1,1,2,3,4,5') + expected = [[0, 1, 2, 3, 4, 5], [1, 1, 2, 3, 4, 5]] + result = prox.multi_port_stats([0, 1]) + self.assertEqual(result, expected) + + prox.get_data = mock.MagicMock(return_value='0,1,2,3,4,5;1,1,2,3,4,5') + result = prox.multi_port_stats([0]) + expected = [0] + self.assertEqual(result, expected) + + prox.get_data = mock.MagicMock(return_value='0,1,2,3;1,1,2,3,4,5') + result = prox.multi_port_stats([0, 1]) + expected = [0] * 2 + self.assertEqual(result, expected) + + prox.get_data = mock.MagicMock(return_value='99,1,2,3,4,5;1,1,2,3,4,5') + expected = [0] * 2 + result = prox.multi_port_stats([0, 1]) + self.assertEqual(result, expected) + + def test_port_stats(self): port_stats = [ ','.join(str(n) for n in range(3, 15)), @@ -1542,34 +1567,34 @@ class TestProxDataHelper(unittest.TestCase): vnfd_helper.port_pairs.all_ports = list(range(4)) sut = mock.MagicMock() - sut.port_stats.return_value = list(range(10)) + sut.multi_port_stats.return_value = [[0, 1, 2, 3, 4, 5], [1, 1, 2, 3, 4, 5], + [2, 1, 2, 3, 4, 5], [3, 1, 2, 3, 4, 5]] data_helper = ProxDataHelper( vnfd_helper, sut, pkt_size, 25, None, constants.NIC_GBPS_DEFAULT * constants.ONE_GIGABIT_IN_BITS) - self.assertEqual(data_helper.rx_total, 6) - self.assertEqual(data_helper.tx_total, 7) + self.assertEqual(data_helper.rx_total, 4) + self.assertEqual(data_helper.tx_total, 8) self.assertEqual(data_helper.pps, 6.25e6) def test_samples(self): vnfd_helper = mock.MagicMock() - vnfd_helper.port_pairs.all_ports = list(range(4)) - vnfd_helper.ports_iter.return_value = [('xe1', 3), ('xe2', 7)] + vnfd_helper.ports_iter.return_value = [('xe0', 0), ('xe1', 1)] sut = mock.MagicMock() - sut.port_stats.return_value = list(range(10)) + sut.multi_port_stats.return_value = [[0, 1, 2, 3, 4, 5], [1, 11, 12, 3, 4, 5]] data_helper = ProxDataHelper(vnfd_helper, sut, None, None, None, None) expected = { - 'xe1': { - 'in_packets': 6, - 'out_packets': 7, + 'xe0': { + 'in_packets': 1, + 'out_packets': 2, }, - 'xe2': { - 'in_packets': 6, - 'out_packets': 7, + 'xe1': { + 'in_packets': 11, + 'out_packets': 12, }, } result = data_helper.samples diff --git a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py index fa2d462ab..351607fb5 100644 --- a/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py +++ b/yardstick/tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py @@ -339,16 +339,17 @@ class TestProxApproxVnf(unittest.TestCase): mock_ssh(ssh) resource_helper = mock.MagicMock() - resource_helper.execute.return_value = list(range(12)) + resource_helper.execute.return_value = [[0, 1, 2, 3, 4, 5], [1, 1, 2, 3, 4, 5], + [2, 1, 2, 3, 4, 5], [3, 1, 2, 3, 4, 5]] resource_helper.collect_collectd_kpi.return_value = {'core': {'result': 234}} prox_approx_vnf = ProxApproxVnf(NAME, self.VNFD0) prox_approx_vnf.resource_helper = resource_helper expected = { - 'packets_in': 6, - 'packets_dropped': 1, - 'packets_fwd': 7, + 'packets_in': 4, + 'packets_dropped': 4, + 'packets_fwd': 8, 'collect_stats': {'core': {'result': 234}}, } result = prox_approx_vnf.collect_kpi() |