From cfc40ad1bb36889e54bc99cb779cfcfa1f3b04dd Mon Sep 17 00:00:00 2001 From: Emma Foley Date: Fri, 27 Apr 2018 10:37:37 +0000 Subject: Move tests: unit/network_services/traffic_profile * Fix pylint errors * Add TODOs Some errors are ignored locally, as they were a symptom of other problems. These issues have been flagged with a TODO, and should be fixed later. JIRA: YARDSTICK-837 Signed-off-by: Emma Foley Change-Id: Id983a2e415d66633331e7fea96a377e2a7589980 --- .../network_services/traffic_profile/__init__.py | 0 .../network_services/traffic_profile/test_base.py | 73 --- .../network_services/traffic_profile/test_fixed.py | 120 ---- .../network_services/traffic_profile/test_http.py | 41 -- .../traffic_profile/test_http_ixload.py | 272 --------- .../traffic_profile/test_ixia_rfc2544.py | 611 --------------------- .../traffic_profile/test_prox_acl.py | 78 --- .../traffic_profile/test_prox_binsearch.py | 185 ------- .../traffic_profile/test_prox_profile.py | 130 ----- .../traffic_profile/test_prox_ramp.py | 97 ---- .../traffic_profile/test_rfc2544.py | 276 ---------- .../traffic_profile/test_trex_traffic_profile.py | 309 ----------- 12 files changed, 2192 deletions(-) delete mode 100644 tests/unit/network_services/traffic_profile/__init__.py delete mode 100644 tests/unit/network_services/traffic_profile/test_base.py delete mode 100644 tests/unit/network_services/traffic_profile/test_fixed.py delete mode 100644 tests/unit/network_services/traffic_profile/test_http.py delete mode 100644 tests/unit/network_services/traffic_profile/test_http_ixload.py delete mode 100644 tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py delete mode 100644 tests/unit/network_services/traffic_profile/test_prox_acl.py delete mode 100644 tests/unit/network_services/traffic_profile/test_prox_binsearch.py delete mode 100644 tests/unit/network_services/traffic_profile/test_prox_profile.py delete mode 100644 tests/unit/network_services/traffic_profile/test_prox_ramp.py delete mode 100644 tests/unit/network_services/traffic_profile/test_rfc2544.py delete mode 100644 tests/unit/network_services/traffic_profile/test_trex_traffic_profile.py (limited to 'tests') diff --git a/tests/unit/network_services/traffic_profile/__init__.py b/tests/unit/network_services/traffic_profile/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/unit/network_services/traffic_profile/test_base.py b/tests/unit/network_services/traffic_profile/test_base.py deleted file mode 100644 index 3b8804976..000000000 --- a/tests/unit/network_services/traffic_profile/test_base.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -import mock -import unittest - -from yardstick.common import exceptions -from yardstick.network_services import traffic_profile as tprofile_package -from yardstick.network_services.traffic_profile import base -from yardstick import tests as y_tests - - -class TestTrafficProfile(unittest.TestCase): - TRAFFIC_PROFILE = { - "schema": "isb:traffic_profile:0.1", - "name": "fixed", - "description": "Fixed traffic profile to run UDP traffic", - "traffic_profile": { - "traffic_type": "FixedTraffic", - "frame_rate": 100, # pps - "flow_number": 10, - "frame_size": 64}} - - def _get_res_mock(self, **kw): - _mock = mock.MagicMock() - for k, v in kw.items(): - setattr(_mock, k, v) - return _mock - - def test___init__(self): - traffic_profile = base.TrafficProfile(self.TRAFFIC_PROFILE) - self.assertEqual(self.TRAFFIC_PROFILE, traffic_profile.params) - - def test_execute(self): - traffic_profile = base.TrafficProfile(self.TRAFFIC_PROFILE) - self.assertRaises(NotImplementedError, - traffic_profile.execute_traffic, {}) - - def test_get_existing_traffic_profile(self): - traffic_profile_list = [ - 'RFC2544Profile', 'FixedProfile', 'TrafficProfileGenericHTTP', - 'IXIARFC2544Profile', 'ProxACLProfile', 'ProxBinSearchProfile', - 'ProxProfile', 'ProxRampProfile'] - with mock.patch.dict(sys.modules, y_tests.STL_MOCKS): - tprofile_package.register_modules() - - for tp in traffic_profile_list: - traffic_profile = base.TrafficProfile.get( - {'traffic_profile': {'traffic_type': tp}}) - self.assertEqual(tp, traffic_profile.__class__.__name__) - - def test_get_non_existing_traffic_profile(self): - self.assertRaises(exceptions.TrafficProfileNotImplemented, - base.TrafficProfile.get, self.TRAFFIC_PROFILE) - - -class TestDummyProfile(unittest.TestCase): - def test_execute(self): - dummy_profile = base.DummyProfile(base.TrafficProfile) - self.assertIsNone(dummy_profile.execute({})) diff --git a/tests/unit/network_services/traffic_profile/test_fixed.py b/tests/unit/network_services/traffic_profile/test_fixed.py deleted file mode 100644 index dec94964b..000000000 --- a/tests/unit/network_services/traffic_profile/test_fixed.py +++ /dev/null @@ -1,120 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import - -import unittest -import mock - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.base import TrafficProfile - from yardstick.network_services.traffic_profile.fixed import FixedProfile - - -class TestFixedProfile(unittest.TestCase): - TRAFFIC_PROFILE = { - "schema": "isb:traffic_profile:0.1", - "name": "fixed", - "description": "Fixed traffic profile to run UDP traffic", - "traffic_profile": { - "traffic_type": "FixedTraffic", - "frame_rate": 100, # pps - "flow_number": 10, - "frame_size": 64}} - - VNFD = {'vnfd:vnfd-catalog': - {'vnfd': - [{'short-name': 'VpeVnf', - 'vdu': - [{'routing_table': - [{'network': '152.16.100.20', - 'netmask': '255.255.255.0', - 'gateway': '152.16.100.20', - 'if': 'xe0'}, - {'network': '152.16.40.20', - 'netmask': '255.255.255.0', - 'gateway': '152.16.40.20', - 'if': 'xe1'}], - 'description': 'VPE approximation using DPDK', - 'name': 'vpevnf-baremetal', - 'nd_route_tbl': - [{'network': '0064:ff9b:0:0:0:0:9810:6414', - 'netmask': '112', - 'gateway': '0064:ff9b:0:0:0:0:9810:6414', - 'if': 'xe0'}, - {'network': '0064:ff9b:0:0:0:0:9810:2814', - 'netmask': '112', - 'gateway': '0064:ff9b:0:0:0:0:9810:2814', - 'if': 'xe1'}], - 'id': 'vpevnf-baremetal', - 'external-interface': - [{'virtual-interface': - {'dst_mac': '00:00:00:00:00:04', - 'vpci': '0000:05:00.0', - 'local_ip': '152.16.100.19', - 'type': 'PCI-PASSTHROUGH', - 'netmask': '255.255.255.0', - 'dpdk_port_num': 0, - 'bandwidth': '10 Gbps', - 'dst_ip': '152.16.100.20', - 'local_mac': '00:00:00:00:00:01'}, - 'vnfd-connection-point-ref': 'xe0', - 'name': 'xe0'}, - {'virtual-interface': - {'dst_mac': '00:00:00:00:00:03', - 'vpci': '0000:05:00.1', - 'local_ip': '152.16.40.19', - 'type': 'PCI-PASSTHROUGH', - 'netmask': '255.255.255.0', - 'dpdk_port_num': 1, - 'bandwidth': '10 Gbps', - 'dst_ip': '152.16.40.20', - 'local_mac': '00:00:00:00:00:02'}, - 'vnfd-connection-point-ref': 'xe1', - 'name': 'xe1'}]}], - 'description': 'Vpe approximation using DPDK', - 'mgmt-interface': - {'vdu-id': 'vpevnf-baremetal', - 'host': '1.1.1.1', - 'password': 'r00t', - 'user': 'root', - 'ip': '1.1.1.1'}, - 'benchmark': - {'kpi': ['packets_in', 'packets_fwd', 'packets_dropped']}, - 'connection-point': [{'type': 'VPORT', 'name': 'xe0'}, - {'type': 'VPORT', 'name': 'xe1'}], - 'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'}]}} - - def test___init__(self): - fixed_profile = \ - FixedProfile(TrafficProfile) - self.assertIsNotNone(fixed_profile) - - def test_execute(self): - traffic_generator = mock.Mock(autospec=TrafficProfile) - traffic_generator.my_ports = [0, 1] - traffic_generator.vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0] - traffic_generator.client = \ - mock.Mock(return_value=True) - fixed_profile = FixedProfile(self.TRAFFIC_PROFILE) - fixed_profile.params = self.TRAFFIC_PROFILE - fixed_profile.first_run = True - self.assertIsNone(fixed_profile.execute(traffic_generator)) diff --git a/tests/unit/network_services/traffic_profile/test_http.py b/tests/unit/network_services/traffic_profile/test_http.py deleted file mode 100644 index 5d8029ea0..000000000 --- a/tests/unit/network_services/traffic_profile/test_http.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import -import unittest - -from yardstick.network_services.traffic_profile.base import TrafficProfile -from yardstick.network_services.traffic_profile.http import \ - TrafficProfileGenericHTTP - - -class TestTrafficProfileGenericHTTP(unittest.TestCase): - def test___init__(self): - traffic_profile_generic_htt_p = \ - TrafficProfileGenericHTTP(TrafficProfile) - self.assertIsNotNone(traffic_profile_generic_htt_p) - - def test_execute(self): - traffic_profile_generic_htt_p = \ - TrafficProfileGenericHTTP(TrafficProfile) - traffic_generator = {} - self.assertIsNone( - traffic_profile_generic_htt_p.execute(traffic_generator)) - - def test__send_http_request(self): - traffic_profile_generic_htt_p = \ - TrafficProfileGenericHTTP(TrafficProfile) - self.assertIsNone(traffic_profile_generic_htt_p._send_http_request( - "10.1.1.1", "250", "/req")) diff --git a/tests/unit/network_services/traffic_profile/test_http_ixload.py b/tests/unit/network_services/traffic_profile/test_http_ixload.py deleted file mode 100644 index 5110439fd..000000000 --- a/tests/unit/network_services/traffic_profile/test_http_ixload.py +++ /dev/null @@ -1,272 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from __future__ import absolute_import -import unittest -import mock - -from oslo_serialization import jsonutils - -from yardstick.network_services.traffic_profile import http_ixload -from yardstick.network_services.traffic_profile.http_ixload import \ - join_non_strings, validate_non_string_sequence - - -class TestJoinNonStrings(unittest.TestCase): - - def test_validate_non_string_sequence(self): - self.assertEqual(validate_non_string_sequence([1, 2, 3]), [1, 2, 3]) - self.assertIsNone(validate_non_string_sequence('123')) - self.assertIsNone(validate_non_string_sequence(1)) - - self.assertEqual(validate_non_string_sequence(1, 2), 2) - self.assertEqual(validate_non_string_sequence(1, default=2), 2) - - with self.assertRaises(RuntimeError): - validate_non_string_sequence(1, raise_exc=RuntimeError) - - def test_join_non_strings(self): - self.assertEqual(join_non_strings(':'), '') - self.assertEqual(join_non_strings(':', 'a'), 'a') - self.assertEqual(join_non_strings(':', 'a', 2, 'c'), 'a:2:c') - self.assertEqual(join_non_strings(':', ['a', 2, 'c']), 'a:2:c') - self.assertEqual(join_non_strings(':', 'abc'), 'abc') - - -class TestIxLoadTrafficGen(unittest.TestCase): - - def test_parse_run_test(self): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - j = jsonutils.dump_as_bytes(test_input) - ixload = http_ixload.IXLOADHttpTest(j) - self.assertDictEqual(ixload.test_input, test_input) - self.assertIsNone(ixload.parse_run_test()) - self.assertEqual(ixload.ports_to_reassign, [ - ["IXIA_CHASSIS", "CARD", 1], - ["IXIA_CHASSIS", "CARD", 2], - ["IXIA_CHASSIS", "CARD", 3], - ]) - - def test_format_ports_for_reassignment(self): - ports = [ - ["IXIA_CHASSIS", "CARD", 1], - ["IXIA_CHASSIS", "CARD", 2], - ["IXIA_CHASSIS", "CARD", 3], - ] - formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports) - self.assertEqual(formatted, [ - "IXIA_CHASSIS;CARD;1", - "IXIA_CHASSIS;CARD;2", - "IXIA_CHASSIS;CARD;3", - ]) - - def test_reassign_ports(self): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - j = jsonutils.dump_as_bytes(test_input) - ixload = http_ixload.IXLOADHttpTest(j) - repository = mock.Mock() - test = mock.MagicMock() - test.setPorts = mock.Mock() - ports_to_reassign = [(1, 2, 3), (1, 2, 4)] - ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"]) - self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign)) - - def test_reassign_ports_error(self): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - j = jsonutils.dump_as_bytes(test_input) - ixload = http_ixload.IXLOADHttpTest(j) - repository = mock.Mock() - test = "test" - ports_to_reassign = [(1, 2, 3), (1, 2, 4)] - ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"]) - ixload.ix_load = mock.MagicMock() - ixload.ix_load.delete = mock.Mock() - ixload.ix_load.disconnect = mock.Mock() - with self.assertRaises(Exception): - ixload.reassign_ports(test, repository, ports_to_reassign) - - def test_stat_collector(self): - args = [0, 1] - self.assertIsNone( - http_ixload.IXLOADHttpTest.stat_collector(*args)) - - def test_IxL_StatCollectorCommand(self): - args = [[0, 1, 2, 3], [0, 1, 2, 3]] - self.assertIsNone( - http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args)) - - def test_set_results_dir(self): - test_stat_collector = mock.MagicMock() - test_stat_collector.setResultDir = mock.Mock() - results_on_windows = "c:/Results" - self.assertIsNone( - http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, - results_on_windows)) - - def test_set_results_dir_error(self): - test_stat_collector = "" - results_on_windows = "c:/Results" - with self.assertRaises(Exception): - http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows) - - def test_load_config_file(self): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - j = jsonutils.dump_as_bytes(test_input) - ixload = http_ixload.IXLOADHttpTest(j) - ixload.ix_load = mock.MagicMock() - ixload.ix_load.new = mock.Mock(return_value="") - self.assertIsNotNone(ixload.load_config_file("ixload.cfg")) - - def test_load_config_file_error(self): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - j = jsonutils.dump_as_bytes(test_input) - ixload = http_ixload.IXLOADHttpTest(j) - ixload.ix_load = "test" - with self.assertRaises(Exception): - ixload.load_config_file("ixload.cfg") - - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad') - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils') - def test_start_http_test_connect_error(self, mock_collector_type, mock_ixload_type): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - - j = jsonutils.dump_as_bytes(test_input) - - mock_ixload = mock_ixload_type() - mock_ixload.connect.side_effect = RuntimeError - - ixload = http_ixload.IXLOADHttpTest(j) - ixload.results_on_windows = 'windows_result_dir' - ixload.result_dir = 'my_result_dir' - - with self.assertRaises(RuntimeError): - ixload.start_http_test() - - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad') - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils') - def test_start_http_test(self, mock_collector_type, mock_ixload_type): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - - j = jsonutils.dump_as_bytes(test_input) - - ixload = http_ixload.IXLOADHttpTest(j) - ixload.results_on_windows = 'windows_result_dir' - ixload.result_dir = 'my_result_dir' - ixload.load_config_file = mock.MagicMock() - - self.assertIsNone(ixload.start_http_test()) - - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad') - @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils') - def test_start_http_test_reassign_error(self, mock_collector_type, mock_ixload_type): - ports = [1, 2, 3] - test_input = { - "remote_server": "REMOTE_SERVER", - "ixload_cfg": "IXLOAD_CFG", - "result_dir": "RESULT_DIR", - "ixia_chassis": "IXIA_CHASSIS", - "IXIA": { - "card": "CARD", - "ports": ports, - }, - } - - j = jsonutils.dump_as_bytes(test_input) - - ixload = http_ixload.IXLOADHttpTest(j) - ixload.load_config_file = mock.MagicMock() - - reassign_ports = mock.Mock(side_effect=RuntimeError) - ixload.reassign_ports = reassign_ports - ixload.results_on_windows = 'windows_result_dir' - ixload.result_dir = 'my_result_dir' - - ixload.start_http_test() - self.assertEqual(reassign_ports.call_count, 1) - - @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest") - def test_main(self, IXLOADHttpTest): - args = ["1", "2", "3"] - http_ixload.main(args) diff --git a/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py b/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py deleted file mode 100644 index e8910d62b..000000000 --- a/tests/unit/network_services/traffic_profile/test_ixia_rfc2544.py +++ /dev/null @@ -1,611 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import -from __future__ import division -import unittest -import mock - -from copy import deepcopy - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.trex_traffic_profile \ - import TrexProfile - from yardstick.network_services.traffic_profile.ixia_rfc2544 import \ - IXIARFC2544Profile - from yardstick.network_services.traffic_profile import ixia_rfc2544 - - -class TestIXIARFC2544Profile(unittest.TestCase): - - TRAFFIC_PROFILE = { - "schema": "isb:traffic_profile:0.1", - "name": "fixed", - "description": "Fixed traffic profile to run UDP traffic", - "traffic_profile": { - "traffic_type": "FixedTraffic", - "frame_rate": 100, # pps - "flow_number": 10, - "frame_size": 64, - }, - } - - PROFILE = {'description': 'Traffic profile to run RFC2544 latency', - 'name': 'rfc2544', - 'traffic_profile': {'traffic_type': 'IXIARFC2544Profile', - 'frame_rate': 100}, - IXIARFC2544Profile.DOWNLINK: {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': {'dstip4': '1.1.1.1-1.15.255.255', - 'proto': 'udp', 'count': '1', - 'srcip4': '90.90.1.1-90.105.255.255', - 'dscp': 0, 'ttl': 32}, - 'outer_l4': {'srcport': '2001', - 'dsrport': '1234'}}}, - IXIARFC2544Profile.UPLINK: {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': {'dstip4': '9.9.1.1-90.105.255.255', - 'proto': 'udp', 'count': '1', - 'srcip4': '1.1.1.1-1.15.255.255', - 'dscp': 0, 'ttl': 32}, - 'outer_l4': {'dstport': '2001', - 'srcport': '1234'}}}, - 'schema': 'isb:traffic_profile:0.1'} - - def test_get_ixia_traffic_profile_error(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.my_ports = [0, 1] - traffic_generator.uplink_ports = [-1] - traffic_generator.downlink_ports = [1] - traffic_generator.client = \ - mock.Mock(return_value=True) - STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - IXIARFC2544Profile.DOWNLINK: { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.rate = 100 - mac = {"src_mac_0": "00:00:00:00:00:01", - "src_mac_1": "00:00:00:00:00:02", - "src_mac_2": "00:00:00:00:00:02", - "dst_mac_0": "00:00:00:00:00:03", - "dst_mac_1": "00:00:00:00:00:04", - "dst_mac_2": "00:00:00:00:00:04"} - result = r_f_c2544_profile._get_ixia_traffic_profile(self.PROFILE, mac) - self.assertIsNotNone(result) - - def test_get_ixia_traffic_profile(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.my_ports = [0, 1] - traffic_generator.uplink_ports = [-1] - traffic_generator.downlink_ports = [1] - traffic_generator.client = \ - mock.Mock(return_value=True) - STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32, - "count": "1" - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32, - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234", - "count": "1" - }, - "traffic_type": "continuous" - }, - IXIARFC2544Profile.DOWNLINK: { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32, - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32, - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001", - "count": "1" - }, - "traffic_type": "continuous" - } - } - ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.rate = 100 - mac = {"src_mac_0": "00:00:00:00:00:01", - "src_mac_1": "00:00:00:00:00:02", - "src_mac_2": "00:00:00:00:00:02", - "dst_mac_0": "00:00:00:00:00:03", - "dst_mac_1": "00:00:00:00:00:04", - "dst_mac_2": "00:00:00:00:00:04"} - result = r_f_c2544_profile._get_ixia_traffic_profile(self.PROFILE, mac) - self.assertIsNotNone(result) - - @mock.patch("yardstick.network_services.traffic_profile.ixia_rfc2544.open") - def test_get_ixia_traffic_profile_v6(self, *args): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.my_ports = [0, 1] - traffic_generator.uplink_ports = [-1] - traffic_generator.downlink_ports = [1] - traffic_generator.client = \ - mock.Mock(return_value=True) - STATIC_TRAFFIC = { - IXIARFC2544Profile.UPLINK: { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - IXIARFC2544Profile.DOWNLINK: { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - ixia_rfc2544.STATIC_TRAFFIC = STATIC_TRAFFIC - - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.rate = 100 - mac = {"src_mac_0": "00:00:00:00:00:01", - "src_mac_1": "00:00:00:00:00:02", - "src_mac_2": "00:00:00:00:00:02", - "dst_mac_0": "00:00:00:00:00:03", - "dst_mac_1": "00:00:00:00:00:04", - "dst_mac_2": "00:00:00:00:00:04"} - profile_data = {'description': 'Traffic profile to run RFC2544', - 'name': 'rfc2544', - 'traffic_profile': - {'traffic_type': 'IXIARFC2544Profile', - 'frame_rate': 100}, - IXIARFC2544Profile.DOWNLINK: - {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': {'dstip4': '1.1.1.1-1.15.255.255', - 'proto': 'udp', 'count': '1', - 'srcip4': '90.90.1.1-90.105.255.255', - 'dscp': 0, 'ttl': 32}, - 'outer_l3v6': {'dstip6': '1.1.1.1-1.15.255.255', - 'proto': 'udp', 'count': '1', - 'srcip6': '90.90.1.1-90.105.255.255', - 'dscp': 0, 'ttl': 32}, - 'outer_l4': {'srcport': '2001', - 'dsrport': '1234'}}}, - IXIARFC2544Profile.UPLINK: {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': - {'dstip4': '9.9.1.1-90.105.255.255', - 'proto': 'udp', 'count': '1', - 'srcip4': '1.1.1.1-1.15.255.255', - 'dscp': 0, 'ttl': 32}, - 'outer_l3v6': - {'dstip6': '9.9.1.1-90.105.255.255', - 'proto': 'udp', 'count': '1', - 'srcip6': '1.1.1.1-1.15.255.255', - 'dscp': 0, 'ttl': 32}, - - 'outer_l4': {'dstport': '2001', - 'srcport': '1234'}}}, - 'schema': 'isb:traffic_profile:0.1'} - result = r_f_c2544_profile._get_ixia_traffic_profile(profile_data, mac) - self.assertIsNotNone(result) - - def test__get_ixia_traffic_profile_default_args(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - - expected = {} - result = r_f_c2544_profile._get_ixia_traffic_profile({}) - self.assertDictEqual(result, expected) - - def test__ixia_traffic_generate(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - traffic = {IXIARFC2544Profile.DOWNLINK: {'iload': 10}, - IXIARFC2544Profile.UPLINK: {'iload': 10}} - ixia_obj = mock.MagicMock() - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.rate = 100 - result = r_f_c2544_profile._ixia_traffic_generate(traffic, ixia_obj) - self.assertIsNone(result) - - def test_execute(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.first_run = True - r_f_c2544_profile.params = {IXIARFC2544Profile.DOWNLINK: {'iload': 10}, - IXIARFC2544Profile.UPLINK: {'iload': 10}} - - r_f_c2544_profile.get_streams = mock.Mock() - r_f_c2544_profile.full_profile = {} - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - ixia_obj = mock.MagicMock() - self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj)) - - def test_update_traffic_profile(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], # private, one value for intfs - "downlink_0": ["xe1", "xe2"], # public, two values for intfs - "downlink_1": ["xe3"], # not in TRAFFIC PROFILE - "tenant_0": ["xe4"], # not public or private - } - - ports_expected = [8, 3, 5] - traffic_generator.vnfd_helper.port_num.side_effect = ports_expected - traffic_generator.client.return_value = True - - traffic_profile = deepcopy(self.TRAFFIC_PROFILE) - traffic_profile.update({ - "uplink_0": ["xe0"], - "downlink_0": ["xe1", "xe2"], - }) - - r_f_c2544_profile = IXIARFC2544Profile(traffic_profile) - r_f_c2544_profile.full_profile = {} - r_f_c2544_profile.get_streams = mock.Mock() - - self.assertIsNone(r_f_c2544_profile.update_traffic_profile(traffic_generator)) - self.assertEqual(r_f_c2544_profile.ports, ports_expected) - - def test_get_drop_percentage(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 1000} - tol_min = 100.0 - tolerance = 0.0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_drop_percentage_update(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 1002} - tol_min = 0.0 - tolerance = 1.0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_drop_percentage_div_zero(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - ixia_obj = mock.MagicMock() - r_f_c2544_profile.execute = mock.Mock() - r_f_c2544_profile._get_ixia_traffic_profile = mock.Mock() - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - r_f_c2544_profile.get_multiplier = mock.Mock() - r_f_c2544_profile.tmp_throughput = 0 - r_f_c2544_profile.tmp_drop = 0 - r_f_c2544_profile.full_profile = {} - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "RxThroughput": 10, - "TxThroughput": 10, - "in_packets": 1000, - "out_packets": 0} - tol_min = 0.0 - tolerance = 0.0 - r_f_c2544_profile.tmp_throughput = 0 - self.assertIsNotNone( - r_f_c2544_profile.get_drop_percentage(samples, tol_min, tolerance, - ixia_obj)) - - def test_get_multiplier(self): - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.max_rate = 100 - r_f_c2544_profile.min_rate = 100 - self.assertEqual("1.0", r_f_c2544_profile.get_multiplier()) - - def test_start_ixia_latency(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = IXIARFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.max_rate = 100 - r_f_c2544_profile.min_rate = 100 - ixia_obj = mock.MagicMock() - r_f_c2544_profile._get_ixia_traffic_profile = \ - mock.Mock(return_value={}) - r_f_c2544_profile.full_profile = {} - r_f_c2544_profile._ixia_traffic_generate = mock.Mock() - self.assertIsNone( - r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj)) diff --git a/tests/unit/network_services/traffic_profile/test_prox_acl.py b/tests/unit/network_services/traffic_profile/test_prox_acl.py deleted file mode 100644 index ef5bac0d5..000000000 --- a/tests/unit/network_services/traffic_profile/test_prox_acl.py +++ /dev/null @@ -1,78 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import - -import unittest -import mock - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.prox_ACL import ProxACLProfile - from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxTestDataTuple - - -class TestProxACLProfile(unittest.TestCase): - - def test_run_test_with_pkt_size(self): - def target(*args, **kwargs): - runs.append(args[2]) - if args[2] < 0 or args[2] > 100: - raise RuntimeError(' '.join([str(args), str(runs)])) - if args[2] > 75.0: - return fail_tuple, {} - return success_tuple, {} - - def get_mock_samples(*args, **kwargs): - if args[2] < 0: - raise RuntimeError(' '.join([str(args), str(runs)])) - return success_tuple - - tp_config = { - 'traffic_profile': { - 'upper_bound': 100.0, - 'lower_bound': 0.0, - 'tolerated_loss': 50.0, - 'attempts': 20 - }, - } - - runs = [] - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) - - traffic_gen = mock.MagicMock() - - profile_helper = mock.MagicMock() - profile_helper.run_test = target - - profile = ProxACLProfile(tp_config) - profile.init(mock.MagicMock()) - - profile.prox_config["attempts"] = 20 - profile.queue = mock.MagicMock() - profile.tolerated_loss = 50.0 - profile.pkt_size = 128 - profile.duration = 30 - profile.test_value = 100.0 - profile.tolerated_loss = 100.0 - profile._profile_helper = profile_helper - - profile.run_test_with_pkt_size(traffic_gen, profile.pkt_size, profile.duration) diff --git a/tests/unit/network_services/traffic_profile/test_prox_binsearch.py b/tests/unit/network_services/traffic_profile/test_prox_binsearch.py deleted file mode 100644 index 28840ef98..000000000 --- a/tests/unit/network_services/traffic_profile/test_prox_binsearch.py +++ /dev/null @@ -1,185 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import - -import unittest -import mock - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxTestDataTuple - from yardstick.network_services.traffic_profile.prox_binsearch import ProxBinSearchProfile - - -class TestProxBinSearchProfile(unittest.TestCase): - - def test_execute_1(self): - def target(*args, **_): - runs.append(args[2]) - if args[2] < 0 or args[2] > 100: - raise RuntimeError(' '.join([str(args), str(runs)])) - if args[2] > 75.0: - return fail_tuple, {} - return success_tuple, {} - - tp_config = { - 'traffic_profile': { - 'packet_sizes': [200], - 'test_precision': 2.0, - 'tolerated_loss': 0.001, - }, - } - - runs = [] - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) - - traffic_generator = mock.MagicMock() - - profile_helper = mock.MagicMock() - profile_helper.run_test = target - - profile = ProxBinSearchProfile(tp_config) - profile.init(mock.MagicMock()) - profile._profile_helper = profile_helper - - profile.execute_traffic(traffic_generator) - self.assertEqual(round(profile.current_lower, 2), 74.69) - self.assertEqual(round(profile.current_upper, 2), 76.09) - self.assertEqual(len(runs), 7) - - # Result Samples inc theor_max - result_tuple = {"Result_Actual_throughput": 7.5e-07, - "Result_theor_max_throughput": 1.234e-10, - "Result_pktSize": 200} - profile.queue.put.assert_called_with(result_tuple) - - success_result_tuple = {"Success_CurrentDropPackets": 0.5, - "Success_DropPackets": 0.5, - "Success_LatencyAvg": 5.3, - "Success_LatencyMax": 5.2, - "Success_LatencyMin": 5.1, - "Success_PktSize": 200, - "Success_RxThroughput": 7.5e-07, - "Success_Throughput": 7.5e-07, - "Success_TxThroughput": 0.00012340000000000002} - - calls = profile.queue.put(success_result_tuple) - profile.queue.put.assert_has_calls(calls) - - success_result_tuple2 = {"Success_CurrentDropPackets": 0.5, - "Success_DropPackets": 0.5, - "Success_LatencyAvg": 5.3, - "Success_LatencyMax": 5.2, - "Success_LatencyMin": 5.1, - "Success_PktSize": 200, - "Success_RxThroughput": 7.5e-07, - "Success_Throughput": 7.5e-07, - "Success_TxThroughput": 123.4, - "Success_can_be_lost": 409600, - "Success_drop_total": 20480, - "Success_rx_total": 4075520, - "Success_tx_total": 4096000} - - calls = profile.queue.put(success_result_tuple2) - profile.queue.put.assert_has_calls(calls) - - def test_execute_2(self): - def target(*args, **_): - runs.append(args[2]) - if args[2] < 0 or args[2] > 100: - raise RuntimeError(' '.join([str(args), str(runs)])) - if args[2] > 25.0: - return fail_tuple, {} - return success_tuple, {} - - tp_config = { - 'traffic_profile': { - 'packet_sizes': [200], - 'test_precision': 2.0, - 'tolerated_loss': 0.001, - }, - } - - runs = [] - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) - - traffic_generator = mock.MagicMock() - - profile_helper = mock.MagicMock() - profile_helper.run_test = target - - profile = ProxBinSearchProfile(tp_config) - profile.init(mock.MagicMock()) - profile._profile_helper = profile_helper - - profile.execute_traffic(traffic_generator) - self.assertEqual(round(profile.current_lower, 2), 24.06) - self.assertEqual(round(profile.current_upper, 2), 25.47) - self.assertEqual(len(runs), 7) - - def test_execute_3(self): - def target(*args, **_): - runs.append(args[2]) - if args[2] < 0 or args[2] > 100: - raise RuntimeError(' '.join([str(args), str(runs)])) - if args[2] > 75.0: - return fail_tuple, {} - return success_tuple, {} - - tp_config = { - 'traffic_profile': { - 'packet_sizes': [200], - 'test_precision': 2.0, - 'tolerated_loss': 0.001, - }, - } - - runs = [] - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) - - traffic_generator = mock.MagicMock() - - profile_helper = mock.MagicMock() - profile_helper.run_test = target - - profile = ProxBinSearchProfile(tp_config) - profile.init(mock.MagicMock()) - profile._profile_helper = profile_helper - - profile.upper_bound = 100.0 - profile.lower_bound = 99.0 - profile.execute_traffic(traffic_generator) - - - # Result Samples - result_tuple = {"Result_theor_max_throughput": 0, "Result_pktSize": 200} - profile.queue.put.assert_called_with(result_tuple) - - # Check for success_ tuple (None expected) - calls = profile.queue.put.mock_calls - for call in calls: - for call_detail in call[1]: - for k in call_detail: - if "Success_" in k: - self.assertRaises(AttributeError) diff --git a/tests/unit/network_services/traffic_profile/test_prox_profile.py b/tests/unit/network_services/traffic_profile/test_prox_profile.py deleted file mode 100644 index e5b36096f..000000000 --- a/tests/unit/network_services/traffic_profile/test_prox_profile.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import - -import unittest -import mock - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.prox_profile import ProxProfile - from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper - - -class TestProxProfile(unittest.TestCase): - - def test_sort_vpci(self): - traffic_generator = mock.Mock() - interface_1 = {'virtual-interface': {'vpci': 'id1'}, 'name': 'name1'} - interface_2 = {'virtual-interface': {'vpci': 'id2'}, 'name': 'name2'} - interface_3 = {'virtual-interface': {'vpci': 'id3'}, 'name': 'name3'} - interfaces = [interface_2, interface_3, interface_1] - traffic_generator.vnfd_helper = { - 'vdu': [{'external-interface': interfaces}]} - output = ProxProfile.sort_vpci(traffic_generator) - self.assertEqual([interface_1, interface_2, interface_3], output) - - def test_fill_samples(self): - samples = {} - - traffic_generator = mock.MagicMock() - interfaces = [ - ['id1', 'name1'], - ['id2', 'name2'] - ] - traffic_generator.resource_helper.sut.port_stats.side_effect = [ - list(range(12)), - list(range(10, 22)), - ] - - expected = { - 'name1': { - 'in_packets': 6, - 'out_packets': 7, - }, - 'name2': { - 'in_packets': 16, - 'out_packets': 17, - }, - } - with mock.patch.object(ProxProfile, 'sort_vpci', return_value=interfaces): - ProxProfile.fill_samples(samples, traffic_generator) - - self.assertDictEqual(samples, expected) - - def test_init(self): - tp_config = { - 'traffic_profile': {}, - } - - profile = ProxProfile(tp_config) - queue = mock.Mock() - profile.init(queue) - self.assertIs(profile.queue, queue) - - def test_execute_traffic(self): - packet_sizes = [ - 10, - 100, - 1000, - ] - tp_config = { - 'traffic_profile': { - 'packet_sizes': packet_sizes, - }, - } - - traffic_generator = mock.MagicMock() - - setup_helper = traffic_generator.setup_helper - setup_helper.find_in_section.return_value = None - - prox_resource_helper = ProxResourceHelper(setup_helper) - traffic_generator.resource_helper = prox_resource_helper - - profile = ProxProfile(tp_config) - - self.assertFalse(profile.done) - for _ in packet_sizes: - with self.assertRaises(NotImplementedError): - profile.execute_traffic(traffic_generator) - - self.assertIsNone(profile.execute_traffic(traffic_generator)) - self.assertTrue(profile.done) - - def test_bounds_iterator(self): - tp_config = { - 'traffic_profile': {}, - } - - profile = ProxProfile(tp_config) - value = 0.0 - for value in profile.bounds_iterator(): - pass - - self.assertEqual(value, 100.0) - - mock_logger = mock.MagicMock() - for _ in profile.bounds_iterator(mock_logger): - pass - - self.assertEqual(mock_logger.debug.call_count, 1) - self.assertEqual(mock_logger.info.call_count, 10) diff --git a/tests/unit/network_services/traffic_profile/test_prox_ramp.py b/tests/unit/network_services/traffic_profile/test_prox_ramp.py deleted file mode 100644 index 1acec2f68..000000000 --- a/tests/unit/network_services/traffic_profile/test_prox_ramp.py +++ /dev/null @@ -1,97 +0,0 @@ -# Copyright (c) 2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -from __future__ import absolute_import - -import unittest -import mock - -from tests.unit import STL_MOCKS - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.prox_ramp import ProxRampProfile - from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxProfileHelper - from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxTestDataTuple - - -class TestProxRampProfile(unittest.TestCase): - - def test_run_test_with_pkt_size(self): - tp_config = { - 'traffic_profile': { - 'lower_bound': 10.0, - 'upper_bound': 100.0, - 'step_value': 10.0, - }, - } - - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - - traffic_gen = mock.MagicMock() - traffic_gen._test_type = 'Generic' - - profile_helper = ProxProfileHelper(traffic_gen.resource_helper) - profile_helper.run_test = run_test = mock.MagicMock(return_value=success_tuple) - - profile = ProxRampProfile(tp_config) - profile.fill_samples = fill_samples = mock.MagicMock() - profile.queue = mock.MagicMock() - profile._profile_helper = profile_helper - - profile.run_test_with_pkt_size(traffic_gen, 128, 30) - self.assertEqual(run_test.call_count, 10) - self.assertEqual(fill_samples.call_count, 10) - - def test_run_test_with_pkt_size_with_fail(self): - tp_config = { - 'traffic_profile': { - 'lower_bound': 10.0, - 'upper_bound': 100.0, - 'step_value': 10.0, - }, - } - - success_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.1, 5.2, 5.3], 995, 1000, 123.4) - fail_tuple = ProxTestDataTuple(10.0, 1, 2, 3, 4, [5.6, 5.7, 5.8], 850, 1000, 123.4) - - result_list = [ - success_tuple, - success_tuple, - success_tuple, - fail_tuple, - success_tuple, - fail_tuple, - fail_tuple, - fail_tuple, - ] - - traffic_gen = mock.MagicMock() - traffic_gen._test_type = 'Generic' - - profile_helper = ProxProfileHelper(traffic_gen.resource_helper) - profile_helper.run_test = run_test = mock.MagicMock(side_effect=result_list) - - profile = ProxRampProfile(tp_config) - profile.fill_samples = fill_samples = mock.MagicMock() - profile.queue = mock.MagicMock() - profile._profile_helper = profile_helper - - profile.run_test_with_pkt_size(traffic_gen, 128, 30) - self.assertEqual(run_test.call_count, 4) - self.assertEqual(fill_samples.call_count, 3) diff --git a/tests/unit/network_services/traffic_profile/test_rfc2544.py b/tests/unit/network_services/traffic_profile/test_rfc2544.py deleted file mode 100644 index cb3a547ee..000000000 --- a/tests/unit/network_services/traffic_profile/test_rfc2544.py +++ /dev/null @@ -1,276 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import unittest -import mock - -from tests.unit import STL_MOCKS - - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.trex_traffic_profile \ - import TrexProfile - from yardstick.network_services.traffic_profile.rfc2544 import \ - RFC2544Profile - - -class TestRFC2544Profile(unittest.TestCase): - TRAFFIC_PROFILE = { - "schema": "isb:traffic_profile:0.1", - "name": "fixed", - "description": "Fixed traffic profile to run UDP traffic", - "traffic_profile": { - "traffic_type": "FixedTraffic", - "frame_rate": 100, # pps - "flow_number": 10, - "frame_size": 64}} - - PROFILE = {'description': 'Traffic profile to run RFC2544 latency', - 'name': 'rfc2544', - 'traffic_profile': {'traffic_type': 'RFC2544Profile', - 'frame_rate': 100}, - 'downlink_0': {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': {'dstip4': '1.1.1.1-1.15.255.255', - 'proto': 'udp', - 'srcip4': '90.90.1.1-90.105.255.255', - 'dscp': 0, 'ttl': 32, 'count': 1}, - 'outer_l4': {'srcport': '2001', - 'dsrport': '1234', 'count': 1}}}, - 'uplink_0': {'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}}, - 'outer_l3v4': {'dstip4': '9.9.1.1-90.105.255.255', - 'proto': 'udp', - 'srcip4': '1.1.1.1-1.15.255.255', - 'dscp': 0, 'ttl': 32, 'count': 1}, - 'outer_l4': {'dstport': '2001', - 'srcport': '1234', 'count': 1}}}, - 'schema': 'isb:traffic_profile:0.1'} - - def test___init__(self): - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - self.assertIsNotNone(r_f_c2544_profile.rate) - - def test_execute(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client.return_value = True - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - r_f_c2544_profile.first_run = True - self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator)) - - def test_get_drop_percentage(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client.return_value = True - - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - r_f_c2544_profile.register_generator(traffic_generator) - self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator)) - - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = { - "rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "in_packets": 1000, - "out_packets": 1000, - } - - expected = { - 'DropPercentage': 0.0, - 'RxThroughput': 100 / 3.0, - 'TxThroughput': 100 / 3.0, - 'CurrentDropPercentage': 0.0, - 'Throughput': 66.66666666666667, - 'xe0': { - 'tx_throughput_fps': 20, - 'in_packets': 1000, - 'out_packets': 1000, - 'rx_throughput_mbps': 10, - 'tx_throughput_mbps': 10, - 'rx_throughput_fps': 20, - }, - } - traffic_generator.generate_samples.return_value = samples - traffic_generator.RUN_DURATION = 30 - traffic_generator.rfc2544_helper.tolerance_low = 0.0001 - traffic_generator.rfc2544_helper.tolerance_high = 0.0001 - result = r_f_c2544_profile.get_drop_percentage(traffic_generator) - self.assertDictEqual(result, expected) - - def test_get_drop_percentage_update(self): - traffic_generator = mock.Mock(autospec=RFC2544Profile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = mock.Mock(return_value=True) - - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - r_f_c2544_profile.register_generator(traffic_generator) - self.assertIsNone(r_f_c2544_profile.execute_traffic()) - - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = { - "rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "in_packets": 1000, - "out_packets": 1002, - } - expected = { - 'DropPercentage': 0.1996, - 'RxThroughput': 33.333333333333336, - 'TxThroughput': 33.4, - 'CurrentDropPercentage': 0.1996, - 'Throughput': 66.66666666666667, - 'xe0': { - 'tx_throughput_fps': 20, - 'in_packets': 1000, - 'out_packets': 1002, - 'rx_throughput_mbps': 10, - 'tx_throughput_mbps': 10, - 'rx_throughput_fps': 20, - }, - } - traffic_generator.generate_samples = mock.MagicMock( - return_value=samples) - traffic_generator.RUN_DURATION = 30 - traffic_generator.rfc2544_helper.tolerance_low = 0.0001 - traffic_generator.rfc2544_helper.tolerance_high = 0.0001 - result = r_f_c2544_profile.get_drop_percentage(traffic_generator) - self.assertDictEqual(expected, result) - - def test_get_drop_percentage_div_zero(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "uplink_0": ["xe0"], - "downlink_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator)) - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "in_packets": 1000, - "out_packets": 0} - r_f_c2544_profile.throughput_max = 0 - expected = { - 'DropPercentage': 100.0, 'RxThroughput': 100 / 3.0, - 'TxThroughput': 0.0, 'CurrentDropPercentage': 100.0, - 'Throughput': 66.66666666666667, - 'xe0': { - 'tx_throughput_fps': 20, 'in_packets': 1000, - 'out_packets': 0, 'rx_throughput_mbps': 10, - 'tx_throughput_mbps': 10, 'rx_throughput_fps': 20 - } - } - traffic_generator.generate_samples = mock.Mock(return_value=samples) - traffic_generator.RUN_DURATION = 30 - traffic_generator.rfc2544_helper.tolerance_low = 0.0001 - traffic_generator.rfc2544_helper.tolerance_high = 0.0001 - self.assertDictEqual(expected, - r_f_c2544_profile.get_drop_percentage(traffic_generator)) - - def test_get_multiplier(self): - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.max_rate = 100 - r_f_c2544_profile.min_rate = 100 - self.assertEqual("1.0", r_f_c2544_profile.get_multiplier()) - - def test_calculate_pps(self): - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.rate = 100 - r_f_c2544_profile.pps = 100 - samples = {'Throughput': 4549093.33} - self.assertEqual((2274546.67, 1.0), - r_f_c2544_profile.calculate_pps(samples)) - - def test_create_single_stream(self): - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile._create_single_packet = mock.MagicMock() - r_f_c2544_profile.pg_id = 1 - self.assertIsNotNone( - r_f_c2544_profile.create_single_stream(64, 2274546.67)) - - def test_create_single_stream_no_pg_id(self): - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile._create_single_packet = mock.MagicMock() - r_f_c2544_profile.pg_id = 0 - self.assertIsNotNone( - r_f_c2544_profile.create_single_stream(64, 2274546.67)) - - def test_execute_latency(self): - traffic_generator = mock.Mock(autospec=TrexProfile) - traffic_generator.networks = { - "private_0": ["xe0"], - "public_0": ["xe1"], - } - traffic_generator.client = \ - mock.Mock(return_value=True) - r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE) - r_f_c2544_profile.params = self.PROFILE - r_f_c2544_profile.first_run = True - samples = {} - for ifname in range(1): - name = "xe{}".format(ifname) - samples[name] = {"rx_throughput_fps": 20, - "tx_throughput_fps": 20, - "rx_throughput_mbps": 10, - "tx_throughput_mbps": 10, - "in_packets": 1000, - "out_packets": 0} - - samples['Throughput'] = 4549093.33 - r_f_c2544_profile.calculate_pps = mock.Mock(return_value=[2274546.67, - 1.0]) - - self.assertIsNone(r_f_c2544_profile.execute_latency(traffic_generator, - samples)) diff --git a/tests/unit/network_services/traffic_profile/test_trex_traffic_profile.py b/tests/unit/network_services/traffic_profile/test_trex_traffic_profile.py deleted file mode 100644 index d1009a5e8..000000000 --- a/tests/unit/network_services/traffic_profile/test_trex_traffic_profile.py +++ /dev/null @@ -1,309 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import ipaddress - -import mock -import six -import unittest - -from tests.unit import STL_MOCKS -from yardstick.common import exceptions as y_exc - -STLClient = mock.MagicMock() -stl_patch = mock.patch.dict("sys.modules", STL_MOCKS) -stl_patch.start() - -if stl_patch: - from yardstick.network_services.traffic_profile.base import TrafficProfile - from yardstick.network_services.traffic_profile.trex_traffic_profile import TrexProfile - from yardstick.network_services.traffic_profile.trex_traffic_profile import SRC - from yardstick.network_services.traffic_profile.trex_traffic_profile import DST - from yardstick.network_services.traffic_profile.trex_traffic_profile import ETHERNET - from yardstick.network_services.traffic_profile.trex_traffic_profile import IP - from yardstick.network_services.traffic_profile.trex_traffic_profile import IPv6 - from yardstick.network_services.traffic_profile.trex_traffic_profile import UDP - from yardstick.network_services.traffic_profile.trex_traffic_profile import SRC_PORT - from yardstick.network_services.traffic_profile.trex_traffic_profile import DST_PORT - from yardstick.network_services.traffic_profile.trex_traffic_profile import TYPE_OF_SERVICE - - -class TestTrexProfile(unittest.TestCase): - TRAFFIC_PROFILE = { - "schema": "isb:traffic_profile:0.1", - "name": "fixed", - "description": "Fixed traffic profile to run UDP traffic", - "traffic_profile": { - "traffic_type": "FixedTraffic", - "frame_rate": 100, # pps - "flow_number": 10, - "frame_size": 64}} - - EXAMPLE_ETHERNET_ADDR = "00:00:00:00:00:01" - EXAMPLE_IP_ADDR = "10.0.0.1" - EXAMPLE_IPv6_ADDR = "0064:ff9b:0:0:0:0:9810:6414" - - PROFILE = { - 'description': 'Traffic profile to run RFC2544 latency', - 'name': 'rfc2544', - 'traffic_profile': {'traffic_type': 'RFC2544Profile', - 'frame_rate': 100}, - TrafficProfile.DOWNLINK: { - 'ipv4': {'outer_l2': {'framesize': {'64B': '100', - '1518B': '0', - '128B': '0', - '1400B': '0', - '256B': '0', - '373b': '0', - '570B': '0'}, - "srcmac": "00:00:00:00:00:02", - "dstmac": "00:00:00:00:00:01"}, - 'outer_l3v4': {'dstip4': '1.1.1.1-1.1.2.2', - 'proto': 'udp', - 'srcip4': '9.9.1.1-90.1.2.2', - 'dscp': 0, 'ttl': 32, - 'count': 1}, - 'outer_l4': {'srcport': '2001', - 'dsrport': '1234', - 'count': 1}}}, - TrafficProfile.UPLINK: { - 'ipv4': - {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}, - "srcmac": "00:00:00:00:00:01", - "dstmac": "00:00:00:00:00:02"}, - 'outer_l3v4': {'dstip4': '9.9.1.1-90.105.255.255', - 'proto': 'udp', - 'srcip4': '1.1.1.1-1.15.255.255', - 'dscp': 0, 'ttl': 32, 'count': 1}, - 'outer_l4': {'dstport': '2001', - 'srcport': '1234', - 'count': 1}}}, - 'schema': 'isb:traffic_profile:0.1'} - PROFILE_v6 = { - 'description': 'Traffic profile to run RFC2544 latency', - 'name': 'rfc2544', - 'traffic_profile': {'traffic_type': 'RFC2544Profile', - 'frame_rate': 100}, - TrafficProfile.DOWNLINK: { - 'ipv6': {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}, - "srcmac": "00:00:00:00:00:02", - "dstmac": "00:00:00:00:00:01"}, - 'outer_l3v4': { - 'dstip6': - '0064:ff9b:0:0:0:0:9810:6414-0064:ff9b:0:0:0:0:9810:6420', - 'proto': 'udp', - 'srcip6': - '0064:ff9b:0:0:0:0:9810:2814-0064:ff9b:0:0:0:0:9810:2820', - 'dscp': 0, 'ttl': 32, - 'count': 1}, - 'outer_l4': {'srcport': '2001', - 'dsrport': '1234', - 'count': 1}}}, - TrafficProfile.UPLINK: { - 'ipv6': {'outer_l2': {'framesize': - {'64B': '100', '1518B': '0', - '128B': '0', '1400B': '0', - '256B': '0', '373b': '0', - '570B': '0'}, - "srcmac": "00:00:00:00:00:01", - "dstmac": "00:00:00:00:00:02"}, - 'outer_l3v4': { - 'dstip6': - '0064:ff9b:0:0:0:0:9810:2814-0064:ff9b:0:0:0:0:9810:2820', - 'proto': 'udp', - 'srcip6': - '0064:ff9b:0:0:0:0:9810:6414-0064:ff9b:0:0:0:0:9810:6420', - 'dscp': 0, 'ttl': 32, - 'count': 1}, - 'outer_l4': {'dstport': '2001', - 'srcport': '1234', - 'count': 1}}}, - 'schema': 'isb:traffic_profile:0.1'} - - def test___init__(self): - TrafficProfile.params = self.PROFILE - trex_profile = \ - TrexProfile(TrafficProfile) - self.assertEqual(trex_profile.pps, 100) - - def test_qinq(self): - qinq = {"S-VLAN": {"id": 128, "priority": 0, "cfi": 0}, - "C-VLAN": {"id": 512, "priority": 0, "cfi": 0}} - - trex_profile = \ - TrexProfile(TrafficProfile) - self.assertIsNone(trex_profile.set_qinq(qinq)) - - qinq = {"S-VLAN": {"id": "128-130", "priority": 0, "cfi": 0}, - "C-VLAN": {"id": "512-515", "priority": 0, "cfi": 0}} - self.assertIsNone(trex_profile.set_qinq(qinq)) - - def test__set_outer_l2_fields(self): - trex_profile = \ - TrexProfile(TrafficProfile) - qinq = {"S-VLAN": {"id": 128, "priority": 0, "cfi": 0}, - "C-VLAN": {"id": 512, "priority": 0, "cfi": 0}} - outer_l2 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l2'] - outer_l2['QinQ'] = qinq - self.assertIsNone(trex_profile._set_outer_l2_fields(outer_l2)) - - def test__set_outer_l3v4_fields(self): - trex_profile = \ - TrexProfile(TrafficProfile) - outer_l3v4 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l3v4'] - outer_l3v4['proto'] = 'tcp' - self.assertIsNone(trex_profile._set_outer_l3v4_fields(outer_l3v4)) - - def test__set_outer_l3v6_fields(self): - trex_profile = \ - TrexProfile(TrafficProfile) - outer_l3v6 = self.PROFILE_v6[TrafficProfile.UPLINK]['ipv6']['outer_l3v4'] - outer_l3v6['proto'] = 'tcp' - outer_l3v6['tc'] = 1 - outer_l3v6['hlim'] = 10 - self.assertIsNone(trex_profile._set_outer_l3v6_fields(outer_l3v6)) - - def test__set_outer_l4_fields(self): - trex_profile = \ - TrexProfile(TrafficProfile) - outer_l4 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l4'] - self.assertIsNone(trex_profile._set_outer_l4_fields(outer_l4)) - - def test_get_streams(self): - trex_profile = \ - TrexProfile(TrafficProfile) - trex_profile.params = self.PROFILE - profile_data = self.PROFILE[TrafficProfile.UPLINK] - self.assertIsNotNone(trex_profile.get_streams(profile_data)) - trex_profile.pg_id = 1 - self.assertIsNotNone(trex_profile.get_streams(profile_data)) - trex_profile.params = self.PROFILE_v6 - trex_profile.profile_data = self.PROFILE_v6[TrafficProfile.UPLINK] - self.assertIsNotNone(trex_profile.get_streams(profile_data)) - trex_profile.pg_id = 1 - self.assertIsNotNone(trex_profile.get_streams(profile_data)) - - def test_generate_packets(self): - trex_profile = \ - TrexProfile(TrafficProfile) - trex_profile.fsize = 10 - trex_profile.base_pkt = [10] - self.assertIsNone(trex_profile.generate_packets()) - - def test_generate_imix_data_error(self): - trex_profile = \ - TrexProfile(TrafficProfile) - self.assertEqual({}, trex_profile.generate_imix_data(False)) - - def test__count_ip_ipv4(self): - start, end, count = TrexProfile._count_ip('1.1.1.1', '1.2.3.4') - self.assertEqual('1.1.1.1', str(start)) - self.assertEqual('1.2.3.4', str(end)) - diff = (int(ipaddress.IPv4Address(six.u('1.2.3.4'))) - - int(ipaddress.IPv4Address(six.u('1.1.1.1')))) - self.assertEqual(diff, count) - - def test__count_ip_ipv6(self): - start_ip = '0064:ff9b:0:0:0:0:9810:6414' - end_ip = '0064:ff9b:0:0:0:0:9810:6420' - start, end, count = TrexProfile._count_ip(start_ip, end_ip) - self.assertEqual(0x98106414, start) - self.assertEqual(0x98106420, end) - self.assertEqual(0x98106420 - 0x98106414, count) - - def test__count_ip_ipv6_exception(self): - start_ip = '0064:ff9b:0:0:0:0:9810:6420' - end_ip = '0064:ff9b:0:0:0:0:9810:6414' - with self.assertRaises(y_exc.IPv6RangeError): - TrexProfile._count_ip(start_ip, end_ip) - - def test__dscp_range_action_partial_actual_count_zero(self): - traffic_profile = TrexProfile(TrafficProfile) - dscp_partial = traffic_profile._dscp_range_action_partial() - - flow_vars_initial_length = len(traffic_profile.vm_flow_vars) - dscp_partial('1', '1', 'unneeded') - self.assertEqual(len(traffic_profile.vm_flow_vars), flow_vars_initial_length + 2) - - def test__dscp_range_action_partial_count_greater_than_actual(self): - traffic_profile = TrexProfile(TrafficProfile) - dscp_partial = traffic_profile._dscp_range_action_partial() - - flow_vars_initial_length = len(traffic_profile.vm_flow_vars) - dscp_partial('1', '10', '100') - self.assertEqual(len(traffic_profile.vm_flow_vars), flow_vars_initial_length + 2) - - def test__udp_range_action_partial_actual_count_zero(self): - traffic_profile = TrexProfile(TrafficProfile) - traffic_profile.udp['field1'] = 'value1' - udp_partial = traffic_profile._udp_range_action_partial('field1') - - flow_vars_initial_length = len(traffic_profile.vm_flow_vars) - udp_partial('1', '1', 'unneeded') - self.assertEqual(len(traffic_profile.vm_flow_vars), flow_vars_initial_length + 2) - - def test__udp_range_action_partial_count_greater_than_actual(self): - traffic_profile = TrexProfile(TrafficProfile) - traffic_profile.udp['field1'] = 'value1' - udp_partial = traffic_profile._udp_range_action_partial('field1', 'not_used_count') - - flow_vars_initial_length = len(traffic_profile.vm_flow_vars) - udp_partial('1', '10', '100') - self.assertEqual(len(traffic_profile.vm_flow_vars), flow_vars_initial_length + 2) - - def test__general_single_action_partial(self): - trex_profile = TrexProfile(TrafficProfile) - - trex_profile._general_single_action_partial(ETHERNET)(SRC)( - self.EXAMPLE_ETHERNET_ADDR) - self.assertEqual(self.EXAMPLE_ETHERNET_ADDR, - trex_profile.ether_packet.src) - - trex_profile._general_single_action_partial(IP)(DST)( - self.EXAMPLE_IP_ADDR) - self.assertEqual(self.EXAMPLE_IP_ADDR, trex_profile.ip_packet.dst) - - trex_profile._general_single_action_partial(IPv6)(DST)( - self.EXAMPLE_IPv6_ADDR) - self.assertEqual(self.EXAMPLE_IPv6_ADDR, trex_profile.ip6_packet.dst) - - trex_profile._general_single_action_partial(UDP)(SRC_PORT)(5060) - self.assertEqual(5060, trex_profile.udp_packet.sport) - - trex_profile._general_single_action_partial(IP)(TYPE_OF_SERVICE)(0) - self.assertEqual(0, trex_profile.ip_packet.tos) - - def test__set_proto_addr(self): - trex_profile = TrexProfile(TrafficProfile) - - ether_range = "00:00:00:00:00:01-00:00:00:00:00:02" - ip_range = "1.1.1.2-1.1.1.10" - ipv6_range = '0064:ff9b:0:0:0:0:9810:6414-0064:ff9b:0:0:0:0:9810:6420' - - trex_profile._set_proto_addr(ETHERNET, SRC, ether_range) - trex_profile._set_proto_addr(ETHERNET, DST, ether_range) - trex_profile._set_proto_addr(IP, SRC, ip_range) - trex_profile._set_proto_addr(IP, DST, ip_range) - trex_profile._set_proto_addr(IPv6, SRC, ipv6_range) - trex_profile._set_proto_addr(IPv6, DST, ipv6_range) - trex_profile._set_proto_addr(UDP, SRC_PORT, "5060-5090") - trex_profile._set_proto_addr(UDP, DST_PORT, "5060") -- cgit 1.2.3-korg