diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/unit/network_services/collector/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/network_services/collector/test_publisher.py | 39 | ||||
-rw-r--r-- | tests/unit/network_services/collector/test_subscriber.py | 96 | ||||
-rw-r--r-- | tests/unit/network_services/libs/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/network_services/libs/ixia_libs/__init__.py | 0 | ||||
-rw-r--r-- | tests/unit/network_services/libs/ixia_libs/test_IxNet.py | 874 | ||||
-rw-r--r-- | tests/unit/network_services/test_utils.py | 143 | ||||
-rw-r--r-- | tests/unit/network_services/test_yang_model.py | 135 |
8 files changed, 0 insertions, 1287 deletions
diff --git a/tests/unit/network_services/collector/__init__.py b/tests/unit/network_services/collector/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/unit/network_services/collector/__init__.py +++ /dev/null diff --git a/tests/unit/network_services/collector/test_publisher.py b/tests/unit/network_services/collector/test_publisher.py deleted file mode 100644 index 4a175841d..000000000 --- a/tests/unit/network_services/collector/test_publisher.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Unittest for yardstick.network_services.collector.publisher - -from __future__ import absolute_import -import unittest - -from yardstick.network_services.collector import publisher - - -class PublisherTestCase(unittest.TestCase): - - def setUp(self): - self.test_publisher = publisher.Publisher() - - def test_successful_init(self): - pass - - def test_unsuccessful_init(self): - pass - - def test_start(self): - self.assertIsNone(self.test_publisher.start()) - - def test_stop(self): - self.assertIsNone(self.test_publisher.stop()) diff --git a/tests/unit/network_services/collector/test_subscriber.py b/tests/unit/network_services/collector/test_subscriber.py deleted file mode 100644 index d4b4ecf7a..000000000 --- a/tests/unit/network_services/collector/test_subscriber.py +++ /dev/null @@ -1,96 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Unittest for yardstick.network_services.collector.subscriber - -from __future__ import absolute_import -import unittest -import mock - -from yardstick.network_services.collector import subscriber - - -class MockVnfAprrox(object): - - def __init__(self): - self.result = {} - self.name = "vnf__1" - - def collect_kpi(self): - self.result = { - 'pkt_in_up_stream': 100, - 'pkt_drop_up_stream': 5, - 'pkt_in_down_stream': 50, - 'pkt_drop_down_stream': 40 - } - return self.result - - -class CollectorTestCase(unittest.TestCase): - - NODES = { - 'node1': {}, - 'node2': { - 'ip': '1.2.3.4', - 'collectd': { - 'plugins': {'abc': 12, 'def': 34}, - 'interval': 987, - }, - }, - } - TRAFFIC_PROFILE = { - 'key1': 'value1', - } - - def setUp(self): - vnf = MockVnfAprrox() - self.ssh_patch = mock.patch('yardstick.network_services.nfvi.resource.ssh', autospec=True) - mock_ssh = self.ssh_patch.start() - mock_instance = mock.Mock() - mock_instance.execute.return_value = 0, '', '' - mock_ssh.AutoConnectSSH.from_node.return_value = mock_instance - self.collector = subscriber.Collector([vnf], self.NODES, self.TRAFFIC_PROFILE, 1800) - - def tearDown(self): - self.ssh_patch.stop() - - def test___init__(self, *_): - vnf = MockVnfAprrox() - collector = subscriber.Collector([vnf], {}, {}) - self.assertEqual(len(collector.vnfs), 1) - self.assertEqual(collector.traffic_profile, {}) - - def test___init___with_data(self, *_): - self.assertEqual(len(self.collector.vnfs), 1) - self.assertDictEqual(self.collector.traffic_profile, self.TRAFFIC_PROFILE) - self.assertEqual(len(self.collector.resource_profiles), 1) - - def test___init___negative(self, *_): - pass - - def test_start(self, *_): - self.assertIsNone(self.collector.start()) - - def test_stop(self, *_): - self.assertIsNone(self.collector.stop()) - - def test_get_kpi(self, *_): - result = self.collector.get_kpi() - - self.assertEqual(result["vnf__1"]["pkt_in_up_stream"], 100) - self.assertEqual(result["vnf__1"]["pkt_drop_up_stream"], 5) - self.assertEqual(result["vnf__1"]["pkt_in_down_stream"], 50) - self.assertEqual(result["vnf__1"]["pkt_drop_down_stream"], 40) - self.assertIn('node2', result) diff --git a/tests/unit/network_services/libs/__init__.py b/tests/unit/network_services/libs/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/unit/network_services/libs/__init__.py +++ /dev/null diff --git a/tests/unit/network_services/libs/ixia_libs/__init__.py b/tests/unit/network_services/libs/ixia_libs/__init__.py deleted file mode 100644 index e69de29bb..000000000 --- a/tests/unit/network_services/libs/ixia_libs/__init__.py +++ /dev/null diff --git a/tests/unit/network_services/libs/ixia_libs/test_IxNet.py b/tests/unit/network_services/libs/ixia_libs/test_IxNet.py deleted file mode 100644 index 2a97048aa..000000000 --- a/tests/unit/network_services/libs/ixia_libs/test_IxNet.py +++ /dev/null @@ -1,874 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Unittest for yardstick.network_services.libs.ixia_libs.IxNet - -from __future__ import absolute_import -import unittest -import mock - -from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IxNextgen -from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IP_VERSION_4 -from yardstick.network_services.libs.ixia_libs.IxNet.IxNet import IP_VERSION_6 - - -UPLINK = "uplink" -DOWNLINK = "downlink" - -class TestIxNextgen(unittest.TestCase): - - def test___init__(self): - ixnet_gen = IxNextgen() - self.assertIsNone(ixnet_gen._bidir) - - @mock.patch("yardstick.network_services.libs.ixia_libs.IxNet.IxNet.sys") - def test_connect(self, *args): - - ixnet_gen = IxNextgen() - ixnet_gen.get_config = mock.MagicMock() - ixnet_gen.get_ixnet = mock.MagicMock() - - self.assertRaises(ImportError, ixnet_gen._connect, {"py_lib_path": "/tmp"}) - - def test_clear_ixia_config(self): - ixnet = mock.MagicMock() - ixnet.execute = mock.Mock() - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.clear_ixia_config() - self.assertIsNone(result) - self.assertEqual(ixnet.execute.call_count, 1) - - def test_load_ixia_profile(self): - ixnet = mock.MagicMock() - ixnet.execute = mock.Mock() - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.load_ixia_profile({}) - self.assertIsNone(result) - self.assertEqual(ixnet.execute.call_count, 1) - - def test_load_ixia_config(self): - ixnet = mock.MagicMock() - ixnet.execute = mock.Mock() - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_load_config({}) - self.assertIsNone(result) - self.assertEqual(ixnet.execute.call_count, 2) - - @mock.patch('yardstick.network_services.libs.ixia_libs.IxNet.IxNet.log') - def test_ix_assign_ports(self, mock_logger): - ixnet = mock.MagicMock() - ixnet.getList.return_value = [0, 1] - ixnet.getAttribute.side_effect = ['up', 'down'] - - config = { - 'chassis': '1.1.1.1', - 'cards': ['1', '2'], - 'ports': ['2', '2'], - } - - ixnet_gen = IxNextgen(ixnet) - ixnet_gen._cfg = config - - result = ixnet_gen.ix_assign_ports() - self.assertIsNone(result) - self.assertEqual(ixnet.execute.call_count, 1) - self.assertEqual(ixnet.commit.call_count, 1) - self.assertEqual(ixnet.getAttribute.call_count, 2) - self.assertEqual(mock_logger.error.call_count, 1) - - def test_ix_update_frame(self): - static_traffic_params = { - UPLINK: { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": { - "64B": "100", - "1KB": "0", - }, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - DOWNLINK: { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": False, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.remapIds.return_value = ["0"] - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - ixnet.getList.side_effect = [ - [1], - [1], - [1], - [ - "ethernet.header.destinationAddress", - "ethernet.header.sourceAddress", - ], - ] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_update_frame(static_traffic_params) - self.assertIsNone(result) - self.assertEqual(ixnet.setMultiAttribute.call_count, 7) - self.assertEqual(ixnet.commit.call_count, 2) - - def test_ix_update_udp(self): - ixnet = mock.MagicMock() - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_update_udp({}) - self.assertIsNone(result) - - def test_ix_update_tcp(self): - ixnet = mock.MagicMock() - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_update_tcp({}) - self.assertIsNone(result) - - def test_ix_start_traffic(self): - ixnet = mock.MagicMock() - ixnet.getList.return_value = [0] - ixnet.getAttribute.return_value = 'down' - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_start_traffic() - self.assertIsNone(result) - self.assertEqual(ixnet.getList.call_count, 1) - self.assertEqual(ixnet.execute.call_count, 3) - - def test_ix_stop_traffic(self): - ixnet = mock.MagicMock() - ixnet.getList.return_value = [0] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_stop_traffic() - self.assertIsNone(result) - self.assertEqual(ixnet.getList.call_count, 1) - self.assertEqual(ixnet.execute.call_count, 1) - - def test_ix_get_statistics(self): - ixnet = mock.MagicMock() - ixnet.execute.return_value = "" - ixnet.getList.side_effect = [ - [ - '::ixNet::OBJ-/statistics/view:"Traffic Item Statistics"', - '::ixNet::OBJ-/statistics/view:"Port Statistics"', - ], - [ - '::ixNet::OBJ-/statistics/view:"Flow Statistics"', - ], - ] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_get_statistics() - self.assertIsNotNone(result) - self.assertEqual(ixnet.getList.call_count, 1) - self.assertEqual(ixnet.execute.call_count, 20) - - def test_find_view_obj_no_where(self): - views = ['here', 'there', 'everywhere'] - result = IxNextgen.find_view_obj('no_where', views) - self.assertEqual(result, '') - - def test_add_ip_header_v4(self): - static_traffic_params = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "count": 1024, - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.remapIds.return_value = ["0"] - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - ixnet.getList.side_effect = [[1], [0], [0], ["srcIp", "dstIp"]] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_4) - self.assertIsNone(result) - self.assertGreater(ixnet.setMultiAttribute.call_count, 0) - self.assertEqual(ixnet.commit.call_count, 1) - - def test_add_ip_header_v4_nothing_to_do(self): - static_traffic_params = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "count": 1024, - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.remapIds.return_value = ["0"] - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - ixnet.getList.side_effect = [[1], [0, 1], [0], ["srcIp", "dstIp"]] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_4) - self.assertIsNone(result) - self.assertGreater(ixnet.setMultiAttribute.call_count, 0) - self.assertEqual(ixnet.commit.call_count, 1) - - def test_add_ip_header_v6(self): - static_traffic_profile = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.getList.side_effect = [[1], [1], [1], ["srcIp", "dstIp"]] - ixnet.remapIds.return_value = ["0"] - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.add_ip_header(static_traffic_profile, IP_VERSION_6) - self.assertIsNone(result) - self.assertGreater(ixnet.setMultiAttribute.call_count, 0) - self.assertEqual(ixnet.commit.call_count, 1) - - def test_add_ip_header_v6_nothing_to_do(self): - static_traffic_params = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "count": 1024, - "ttl": 32 - }, - "outer_l3v6": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": {"64B": "100"}, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.getList.side_effect = [[1], [0, 1], [1], ["srcIP", "dstIP"]] - ixnet.remapIds.return_value = ["0"] - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.add_ip_header(static_traffic_params, IP_VERSION_6) - self.assertIsNone(result) - self.assertEqual(ixnet.setMultiAttribute.call_count, 0) - - def test_set_random_ip_multi_attributes_bad_ip_version(self): - bad_ip_version = object() - ixnet_gen = IxNextgen(mock.Mock()) - mock1 = mock.Mock() - mock2 = mock.Mock() - mock3 = mock.Mock() - with self.assertRaises(ValueError): - ixnet_gen.set_random_ip_multi_attributes(mock1, bad_ip_version, mock2, mock3) - - def test_get_config(self): - tg_cfg = { - "vdu": [ - { - "external-interface": [ - { - "virtual-interface": { - "vpci": "0000:07:00.1", - }, - }, - { - "virtual-interface": { - "vpci": "0001:08:01.2", - }, - }, - ], - }, - ], - "mgmt-interface": { - "ip": "test1", - "tg-config": { - "dut_result_dir": "test2", - "version": "test3", - "ixchassis": "test4", - "tcl_port": "test5", - "py_lib_path": "test6", - }, - } - } - - expected = { - 'py_lib_path': 'test6', - 'machine': 'test1', - 'port': 'test5', - 'chassis': 'test4', - 'cards': ['0000', '0001'], - 'ports': ['07', '08'], - 'output_dir': 'test2', - 'version': 'test3', - 'bidir': True, - } - - result = IxNextgen.get_config(tg_cfg) - self.assertDictEqual(result, expected) - - def test_ix_update_ether(self): - static_traffic_params = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:03", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l2": { - "dstmac": "00:00:00:00:00:04", - "framesPerSecond": True, - "framesize": 64, - "srcmac": "00:00:00:00:00:01" - }, - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - ixnet.getList.side_effect = [ - [1], - [1], - [1], - [ - "ethernet.header.destinationAddress", - "ethernet.header.sourceAddress", - ], - ] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_update_ether(static_traffic_params) - self.assertIsNone(result) - self.assertGreater(ixnet.setMultiAttribute.call_count, 0) - - def test_ix_update_ether_nothing_to_do(self): - static_traffic_params = { - "uplink_0": { - "id": 1, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l3": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v4": { - "dscp": 0, - "dstip4": "152.16.40.20", - "proto": "udp", - "srcip4": "152.16.100.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "2001", - "srcport": "1234" - }, - "traffic_type": "continuous" - }, - "downlink_0": { - "id": 2, - "bidir": "False", - "duration": 60, - "iload": "100", - "outer_l3": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v4": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l3v6": { - "count": 1024, - "dscp": 0, - "dstip4": "152.16.100.20", - "proto": "udp", - "srcip4": "152.16.40.20", - "ttl": 32 - }, - "outer_l4": { - "dstport": "1234", - "srcport": "2001" - }, - "traffic_type": "continuous" - } - } - - ixnet = mock.MagicMock() - ixnet.setMultiAttribute.return_value = [1] - ixnet.commit.return_value = [1] - ixnet.getList.side_effect = [ - [1], - [1], - [1], - [ - "ethernet.header.destinationAddress", - "ethernet.header.sourceAddress", - ], - ] - - ixnet_gen = IxNextgen(ixnet) - - result = ixnet_gen.ix_update_ether(static_traffic_params) - self.assertIsNone(result) - self.assertEqual(ixnet.setMultiAttribute.call_count, 0) diff --git a/tests/unit/network_services/test_utils.py b/tests/unit/network_services/test_utils.py deleted file mode 100644 index bf98a4474..000000000 --- a/tests/unit/network_services/test_utils.py +++ /dev/null @@ -1,143 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Unittest for yardstick.network_services.utils - -import os -import unittest -import mock - -from yardstick.network_services import utils - - -class UtilsTestCase(unittest.TestCase): - """Test all VNF helper methods.""" - - DPDK_PATH = os.path.join(utils.NSB_ROOT, "dpdk-devbind.py") - - def setUp(self): - super(UtilsTestCase, self).setUp() - - def test_get_nsb_options(self): - result = utils.get_nsb_option("bin_path", None) - self.assertEqual(result, utils.NSB_ROOT) - - def test_get_nsb_option_is_invalid_key(self): - result = utils.get_nsb_option("bin", None) - self.assertEqual(result, None) - - def test_get_nsb_option_default(self): - default = object() - result = utils.get_nsb_option("nosuch", default) - self.assertIs(result, default) - - def test_provision_tool(self): - with mock.patch("yardstick.ssh.SSH") as ssh: - ssh_mock = mock.Mock(autospec=ssh.SSH) - ssh_mock.execute = \ - mock.Mock(return_value=(0, self.DPDK_PATH, "")) - ssh.return_value = ssh_mock - tool_path = utils.provision_tool(ssh_mock, self.DPDK_PATH) - self.assertEqual(tool_path, self.DPDK_PATH) - - def test_provision_tool_no_path(self): - with mock.patch("yardstick.ssh.SSH") as ssh: - ssh_mock = mock.Mock(autospec=ssh.SSH) - ssh_mock.execute = \ - mock.Mock(return_value=(1, self.DPDK_PATH, "")) - ssh.return_value = ssh_mock - tool_path = utils.provision_tool(ssh_mock, self.DPDK_PATH) - self.assertEqual(tool_path, self.DPDK_PATH) - - -class PciAddressTestCase(unittest.TestCase): - - PCI_ADDRESS_DBSF = '000A:07:03.2' - PCI_ADDRESS_BSF = '06:02.1' - PCI_ADDRESS_DBSF_MULTILINE_1 = '0001:08:04.3\nother text\n' - PCI_ADDRESS_DBSF_MULTILINE_2 = 'first line\n 0001:08:04.3 \nother text\n' - # Will match and return the first address found. - PCI_ADDRESS_DBSF_MULTILINE_3 = ' 0001:08:04.1 \n 05:03.1 \nother\n' - PCI_ADDRESS_BSF_MULTILINE_1 = 'first line\n 08:04.3 \n 0002:05:03.1\n' - BAD_INPUT_1 = 'no address found' - BAD_INPUT_2 = '001:08:04.1' - BAD_INPUT_3 = '08:4.1' - - def test_pciaddress_dbsf(self): - pci_address = utils.PciAddress(PciAddressTestCase.PCI_ADDRESS_DBSF) - self.assertEqual('000a', pci_address.domain) - self.assertEqual('07', pci_address.bus) - self.assertEqual('03', pci_address.slot) - self.assertEqual('2', pci_address.function) - - def test_pciaddress_bsf(self): - pci_address = utils.PciAddress(PciAddressTestCase.PCI_ADDRESS_BSF) - self.assertEqual('0000', pci_address.domain) - self.assertEqual('06', pci_address.bus) - self.assertEqual('02', pci_address.slot) - self.assertEqual('1', pci_address.function) - - def test_pciaddress_dbsf_multiline_1(self): - pci_address = utils.PciAddress( - PciAddressTestCase.PCI_ADDRESS_DBSF_MULTILINE_1) - self.assertEqual('0001', pci_address.domain) - self.assertEqual('08', pci_address.bus) - self.assertEqual('04', pci_address.slot) - self.assertEqual('3', pci_address.function) - - def test_pciaddress_dbsf_multiline_2(self): - pci_address = utils.PciAddress( - PciAddressTestCase.PCI_ADDRESS_DBSF_MULTILINE_2) - self.assertEqual('0001', pci_address.domain) - self.assertEqual('08', pci_address.bus) - self.assertEqual('04', pci_address.slot) - self.assertEqual('3', pci_address.function) - - def test_pciaddress_dbsf_multiline_3(self): - pci_address = utils.PciAddress( - PciAddressTestCase.PCI_ADDRESS_DBSF_MULTILINE_3) - self.assertEqual('0001', pci_address.domain) - self.assertEqual('08', pci_address.bus) - self.assertEqual('04', pci_address.slot) - self.assertEqual('1', pci_address.function) - - def test_pciaddress_bsf_multiline_1(self): - pci_address = utils.PciAddress( - PciAddressTestCase.PCI_ADDRESS_BSF_MULTILINE_1) - self.assertEqual('0000', pci_address.domain) - self.assertEqual('08', pci_address.bus) - self.assertEqual('04', pci_address.slot) - self.assertEqual('3', pci_address.function) - - def test_pciaddress_bad_input_no_address(self): - with self.assertRaises(ValueError) as exception: - utils.PciAddress(PciAddressTestCase.BAD_INPUT_1) - self.assertEqual('Invalid PCI address: {}'.format( - PciAddressTestCase.BAD_INPUT_1), str(exception.exception)) - - def test_pciaddress_bad_input_dbsf_bad_formatted(self): - # In this test case, the domain has only 3 characters instead of 4. - pci_address = utils.PciAddress( - PciAddressTestCase.BAD_INPUT_2) - self.assertEqual('0000', pci_address.domain) - self.assertEqual('08', pci_address.bus) - self.assertEqual('04', pci_address.slot) - self.assertEqual('1', pci_address.function) - - def test_pciaddress_bad_input_bsf_bad_formatted(self): - with self.assertRaises(ValueError) as exception: - utils.PciAddress(PciAddressTestCase.BAD_INPUT_3) - self.assertEqual('Invalid PCI address: {}'.format( - PciAddressTestCase.BAD_INPUT_3), str(exception.exception)) diff --git a/tests/unit/network_services/test_yang_model.py b/tests/unit/network_services/test_yang_model.py deleted file mode 100644 index 0b29da701..000000000 --- a/tests/unit/network_services/test_yang_model.py +++ /dev/null @@ -1,135 +0,0 @@ -# Copyright (c) 2016-2017 Intel Corporation -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -# Unittest for yardstick.network_services.utils - -from __future__ import absolute_import - -import unittest -import mock - -import yaml - -from yardstick.network_services.yang_model import YangModel - - -class YangModelTestCase(unittest.TestCase): - """Test all Yang Model methods.""" - - ENTRIES = { - 'access-list1': { - 'acl': { - 'access-list-entries': [{ - 'ace': { - 'ace-oper-data': { - 'match-counter': 0}, - 'actions': 'drop,count', - 'matches': { - 'destination-ipv4-network': - '152.16.40.20/24', - 'destination-port-range': { - 'lower-port': 0, - 'upper-port': 65535}, - 'source-ipv4-network': '0.0.0.0/0', - 'source-port-range': { - 'lower-port': 0, - 'upper-port': 65535}}, - 'rule-name': 'rule1588'}}, - { - 'ace': { - 'ace-oper-data': { - 'match-counter': 0}, - 'actions': 'drop,count', - 'matches': { - 'destination-ipv4-network': - '0.0.0.0/0', - 'destination-port-range': { - 'lower-port': 0, - 'upper-port': 65535}, - 'source-ipv4-network': - '152.16.100.20/24', - 'source-port-range': { - 'lower-port': 0, - 'upper-port': 65535}}, - 'rule-name': 'rule1589'}}], - 'acl-name': 'sample-ipv4-acl', - 'acl-type': 'ipv4-acl'} - } - } - - def test__init__(self): - cfg = "yang.yaml" - y = YangModel(cfg) - self.assertEqual(y.config_file, cfg) - - def test_config_file_setter(self): - cfg = "yang.yaml" - y = YangModel(cfg) - self.assertEqual(y.config_file, cfg) - cfg2 = "yang2.yaml" - y.config_file = cfg2 - self.assertEqual(y.config_file, cfg2) - - def test__get_entries(self): - cfg = "yang.yaml" - y = YangModel(cfg) - y._options = self.ENTRIES - y._get_entries() - self.assertIn("p acl add", y._rules) - - def test__get_entries_no_options(self): - cfg = "yang.yaml" - y = YangModel(cfg) - y._get_entries() - self.assertEqual(y._rules, '') - - @mock.patch('yardstick.network_services.yang_model.yaml_load') - @mock.patch('yardstick.network_services.yang_model.open') - def test__read_config(self, mock_open, mock_safe_load): - cfg = "yang.yaml" - y = YangModel(cfg) - mock_safe_load.return_value = expected = {'key1': 'value1', 'key2': 'value2'} - y._read_config() - self.assertDictEqual(y._options, expected) - - @mock.patch('yardstick.network_services.yang_model.open') - def test__read_config_open_error(self, mock_open): - cfg = "yang.yaml" - y = YangModel(cfg) - mock_open.side_effect = IOError('my error') - - self.assertEqual(y._options, {}) - with self.assertRaises(IOError) as raised: - y._read_config() - - self.assertIn('my error', str(raised.exception)) - self.assertEqual(y._options, {}) - - def test_get_rules(self): - cfg = "yang.yaml" - y = YangModel(cfg) - y._read_config = read_mock = mock.Mock() - y._get_entries = get_mock = mock.Mock() - - y._rules = None - self.assertIsNone(y.get_rules()) - self.assertEqual(read_mock.call_count, 1) - self.assertEqual(get_mock.call_count, 1) - - # True value should prevent calling read and get - y._rules = 999 - self.assertEqual(y.get_rules(), 999) - self.assertEqual(read_mock.call_count, 1) - self.assertEqual(get_mock.call_count, 1) |