diff options
Diffstat (limited to 'snaps/openstack/utils/tests/neutron_utils_tests.py')
-rw-r--r-- | snaps/openstack/utils/tests/neutron_utils_tests.py | 686 |
1 files changed, 464 insertions, 222 deletions
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py index e90b94c..1e89dda 100644 --- a/snaps/openstack/utils/tests/neutron_utils_tests.py +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -14,14 +14,16 @@ # limitations under the License. import uuid -from snaps.openstack.utils import keystone_utils -from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction -from snaps.openstack.tests import openstack_tests -from snaps.openstack.utils import neutron_utils -from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings from snaps.openstack import create_router -from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \ + PortSettings +from snaps.openstack.create_security_group import SecurityGroupSettings, \ + SecurityGroupRuleSettings, Direction +from snaps.openstack.tests import openstack_tests from snaps.openstack.tests import validation_utils +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.utils import keystone_utils +from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' @@ -57,13 +59,14 @@ class NeutronSmokeTests(OSComponentTestCase): with self.assertRaises(Exception): neutron = neutron_utils.neutron_client( - OSCreds(username='user', password='pass', auth_url='url', project_name='project')) + OSCreds(username='user', password='pass', auth_url='url', + project_name='project')) neutron.list_networks() def test_retrieve_ext_network_name(self): """ - Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained - within the returned list + Tests the neutron_utils.get_external_network_names to ensure the + configured self.ext_net_name is contained within the returned list :return: """ neutron = neutron_utils.neutron_client(self.os_creds) @@ -86,7 +89,8 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): self.port_name = str(guid) + '-port' self.neutron = neutron_utils.neutron_client(self.os_creds) self.network = None - self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net') + self.net_config = openstack_tests.get_pub_net_config( + net_name=guid + '-pub-net') def tearDown(self): """ @@ -94,31 +98,40 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): """ if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_network(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) def test_create_network_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty network name + Tests the neutron_utils.create_neutron_net() function with an empty + network name """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings(name='')) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings(name='')) def test_create_network_null_name(self): """ - Tests the neutron_utils.create_neutron_net() function when the network name is None + Tests the neutron_utils.create_neutron_net() function when the network + name is None """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings()) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings()) class NeutronUtilsSubnetTests(OSComponentTestCase): @@ -133,7 +146,8 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): self.network = None self.subnet = None self.net_config = openstack_tests.get_pub_net_config( - net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name) + net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', + external_net=self.ext_net_name) def tearDown(self): """ @@ -142,71 +156,108 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_subnet(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) def test_create_subnet_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet name is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): SubnetSettings(cidr=self.net_config.subnet_cidr) def test_create_subnet_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty name + Tests the neutron_utils.create_neutron_net() function with an empty + name """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet(self.neutron, '', + subnet_setting.cidr, True) def test_create_subnet_null_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr=None, + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) def test_create_subnet_empty_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is empty """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr='', + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) class NeutronUtilsRouterTests(OSComponentTestCase): @@ -232,7 +283,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase): Cleans the remote OpenStack objects """ if self.interface_router: - neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet) + neutron_utils.remove_interface_router(self.neutron, self.router, + self.subnet) if self.router: neutron_utils.delete_router(self.neutron, self.router) @@ -244,30 +296,40 @@ class NeutronUtilsRouterTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_router_simple(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) def test_create_router_with_public_interface(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ + subnet_setting = self.net_config.network_settings.subnet_settings[0] self.net_config = openstack_tests.OSNetworkConfig( self.net_config.network_settings.name, - self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name, + subnet_setting.name, + subnet_setting.cidr, + self.net_config.router_settings.name, self.ext_net_name) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) # TODO - Add validation that the router gatway has been set def test_create_router_empty_name(self): @@ -276,83 +338,125 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings(name='') - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) def test_create_router_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function when the + subnet CIDR value is None """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings() - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) validate_router(self.neutron, None, True) def test_add_interface_router(self): """ Tests the neutron_utils.add_interface_router() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) - - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) - validate_interface_router(self.interface_router, self.router, self.subnet) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) + + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) + + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) + validate_interface_router(self.interface_router, self.router, + self.subnet) def test_add_interface_router_null_router(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.add_interface_router() function for an + Exception when the router value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_add_interface_router_null_subnet(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None + Tests the neutron_utils.add_interface_router() function for an + Exception when the subnet value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_create_port(self): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, + subnet_setting.cidr, True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( name=self.port_name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}], + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}], network_name=self.net_config.network_settings.name)) validate_port(self.neutron, self.port, self.port_name) @@ -360,111 +464,172 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr, + True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}])) + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) validate_port(self.neutron, self.port, self.port_name) def test_create_port_null_name(self): """ - Tests the neutron_utils.create_port() function for an Exception when the port name value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the port name value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) def test_create_port_null_network_object(self): """ - Tests the neutron_utils.create_port() function for an Exception when the network object is None + Tests the neutron_utils.create_port() function for an Exception when + the network object is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - self.neutron, self.port_name, self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': + self.net_config.network_settings.subnet_settings[ + 0].name, + 'ip': ip_1}])) def test_create_port_null_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': None}])) def test_create_port_invalid_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': 'foo'}])) def test_create_port_invalid_ip_to_subnet(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, - 'ip': '10.197.123.100'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': '10.197.123.100'}])) class NeutronUtilsSecurityGroupTests(OSComponentTestCase): @@ -490,7 +655,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): for security_group in self.security_groups: try: - neutron_utils.delete_security_group(self.neutron, security_group) + neutron_utils.delete_security_group(self.neutron, + security_group) except: pass @@ -499,72 +665,102 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): Tests the neutron_utils.create_security_group() function """ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name) - security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + security_group = neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) neutron_utils.delete_security_group(self.neutron, security_group) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNone(sec_grp_get) def test_create_sec_grp_no_name(self): """ - Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure - that attempting to create a security group without a name will raise an exception + Tests the SecurityGroupSettings constructor and + neutron_utils.create_security_group() function to ensure that + attempting to create a security group without a name will raise an + exception """ with self.assertRaises(Exception): sec_grp_settings = SecurityGroupSettings() self.security_groups.append( - neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings)) def test_create_sec_grp_no_rules(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, + description='hello group') + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) - self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name']) - self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + self.security_groups[0]['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + self.security_groups[0]['security_group'][ + 'description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( - self.security_groups[0]['security_group'], sec_grp_get['security_group'])) + self.security_groups[0]['security_group'], + sec_grp_get['security_group'])) def test_create_sec_grp_one_rule(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=[sec_grp_rule_settings]) + sec_grp_rule_settings = SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=[sec_grp_rule_settings]) - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) - free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0]) + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) + free_rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.security_groups[0]) for free_rule in free_rules: self.security_group_rules.append(free_rule) self.security_group_rules.append( - neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0])) + neutron_utils.create_security_group_rule( + self.neutron, sec_grp_settings.rule_settings[0])) # Refresh object so it is populated with the newly added rule - security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + security_group = neutron_utils.get_security_group( + self.neutron, sec_grp_settings.name) - rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + security_group) - self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules)) + self.assertTrue( + validation_utils.objects_equivalent(self.security_group_rules, + rules)) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) - self.assertTrue(sec_grp_settings.description, security_group['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + security_group['security_group']['description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) @@ -576,18 +772,59 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-1', + description='hello group'))) self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-2', + description='hello group'))) sec_grp_1b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[0]['security_group']['id']) sec_grp_2b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[1]['security_group']['id']) - self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id']) - self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id']) + self.assertEqual(self.security_groups[0]['security_group']['id'], + sec_grp_1b['security_group']['id']) + self.assertEqual(self.security_groups[1]['security_group']['id'], + sec_grp_2b['security_group']['id']) + + +class NeutronUtilsFloatingIpTests(OSComponentTestCase): + """ + Test basic nova keypair functionality + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack + """ + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.floating_ip = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.floating_ip: + neutron_utils.delete_floating_ip(self.neutron, self.floating_ip) + + def test_floating_ips(self): + """ + Tests the creation of a floating IP + :return: + """ + initial_fips = neutron_utils.get_floating_ips(self.neutron) + + self.floating_ip = neutron_utils.create_floating_ip(self.neutron, + self.ext_net_name) + all_fips = neutron_utils.get_floating_ips(self.neutron) + self.assertEqual(len(initial_fips) + 1, len(all_fips)) + returned = neutron_utils.get_floating_ip(self.neutron, + self.floating_ip) + self.assertEqual(self.floating_ip.id, returned.id) + self.assertEqual(self.floating_ip.ip, returned.ip) """ @@ -597,8 +834,9 @@ Validation routines def validate_network(neutron, name, exists): """ - Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a network for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a network for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a network for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected network name :param exists: Whether or not the network name should exist or not @@ -614,8 +852,9 @@ def validate_network(neutron, name, exists): def validate_subnet(neutron, name, cidr, exists): """ - Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a subnet for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a subnet for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected subnet name :param cidr: The expected CIDR value @@ -632,8 +871,9 @@ def validate_subnet(neutron, name, cidr, exists): def validate_router(neutron, name, exists): """ - Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a router for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a router for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a router for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected router name :param exists: Whether or not the network name should exist or not @@ -647,7 +887,8 @@ def validate_router(neutron, name, exists): def validate_interface_router(interface_router, router, subnet): """ - Returns true if the router ID & subnet ID have been properly included into the interface router object + Returns true if the router ID & subnet ID have been properly included into + the interface router object :param interface_router: the object to validate :param router: to validate against the interface_router :param subnet: to validate against the interface_router @@ -661,8 +902,9 @@ def validate_interface_router(interface_router, router, subnet): def validate_port(neutron, port_obj, this_port_name): """ - Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a port for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a port for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a port for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param port_obj: The port object to lookup :param this_port_name: The expected router name |