diff options
author | spisarski <s.pisarski@cablelabs.com> | 2017-06-22 12:43:09 -0600 |
---|---|---|
committer | spisarski <s.pisarski@cablelabs.com> | 2017-06-22 14:28:08 -0600 |
commit | c7ba89444d160cb81656a49cb93416ee5013aa8f (patch) | |
tree | 0f46d74f98e17e256c3e1eb9a592c3bcc29044e8 | |
parent | dbfb9c4e94e500592a8b93f42b7b87230d0af311 (diff) |
Use neutron to create floating IPs.
This patch moves the floating IP creation out of nova and into neutron.
Other changes include the use of domain objects for VM and Floating IP
instances, addition of new nova_utils tests to exercise the create
server functionality, and more PEP8 compliance.
JIRA: SNAPS-92
Change-Id: I16c12b26b56008901633e90ae307586ad2045f9b
Signed-off-by: spisarski <s.pisarski@cablelabs.com>
-rw-r--r-- | requirements.txt | 2 | ||||
-rw-r--r-- | snaps/domain/test/vm_inst_tests.py | 49 | ||||
-rw-r--r-- | snaps/domain/vm_inst.py | 44 | ||||
-rw-r--r-- | snaps/openstack/create_instance.py | 84 | ||||
-rw-r--r-- | snaps/openstack/tests/create_instance_tests.py | 69 | ||||
-rw-r--r-- | snaps/openstack/tests/openstack_tests.py | 2 | ||||
-rw-r--r-- | snaps/openstack/utils/neutron_utils.py | 181 | ||||
-rw-r--r-- | snaps/openstack/utils/nova_utils.py | 192 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/neutron_utils_tests.py | 686 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/nova_utils_tests.py | 170 | ||||
-rw-r--r-- | snaps/provisioning/tests/ansible_utils_tests.py | 128 | ||||
-rw-r--r-- | snaps/test_suite_builder.py | 396 |
12 files changed, 1376 insertions, 627 deletions
diff --git a/requirements.txt b/requirements.txt index c3e748f..c708103 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -python-novaclient!=7.0.0,<8.0.0,>=6.0.0 # Apache-2.0 +python-novaclient>=6.0.0 # Apache-2.0 python-neutronclient>=5.1.0 # Apache-2.0 python-keystoneclient>=3.8.0 # Apache-2.0 python-glanceclient>=2.5.0 # Apache-2.0 diff --git a/snaps/domain/test/vm_inst_tests.py b/snaps/domain/test/vm_inst_tests.py new file mode 100644 index 0000000..e722a06 --- /dev/null +++ b/snaps/domain/test/vm_inst_tests.py @@ -0,0 +1,49 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from snaps.domain.vm_inst import VmInst, FloatingIp + + +class VmInstDomainObjectTests(unittest.TestCase): + """ + Tests the construction of the snaps.domain.test.Image class + """ + + def test_construction_positional(self): + vm_inst = VmInst('name', 'id') + self.assertEqual('name', vm_inst.name) + self.assertEqual('id', vm_inst.id) + + def test_construction_named(self): + vm_inst = VmInst(inst_id='id', name='name') + self.assertEqual('name', vm_inst.name) + self.assertEqual('id', vm_inst.id) + + +class FloatingIpDomainObjectTests(unittest.TestCase): + """ + Tests the construction of the snaps.domain.test.Image class + """ + + def test_construction_positional(self): + vm_inst = FloatingIp('id-123', '10.0.0.1') + self.assertEqual('id-123', vm_inst.id) + self.assertEqual('10.0.0.1', vm_inst.ip) + + def test_construction_named(self): + vm_inst = FloatingIp(ip='10.0.0.1', inst_id='id-123') + self.assertEqual('id-123', vm_inst.id) + self.assertEqual('10.0.0.1', vm_inst.ip) diff --git a/snaps/domain/vm_inst.py b/snaps/domain/vm_inst.py new file mode 100644 index 0000000..0e12d14 --- /dev/null +++ b/snaps/domain/vm_inst.py @@ -0,0 +1,44 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class VmInst: + """ + SNAPS domain object for Images. Should contain attributes that + are shared amongst cloud providers + """ + def __init__(self, name, inst_id): + """ + Constructor + :param name: the image's name + :param inst_id: the instance's id + """ + self.name = name + self.id = inst_id + + +class FloatingIp: + """ + SNAPS domain object for Images. Should contain attributes that + are shared amongst cloud providers + """ + def __init__(self, inst_id, ip): + """ + Constructor + :param inst_id: the floating ip's id + :param ip: the IP address + """ + self.id = inst_id + self.ip = ip diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py index 991aca5..68c421d 100644 --- a/snaps/openstack/create_instance.py +++ b/snaps/openstack/create_instance.py @@ -102,7 +102,7 @@ class OpenStackVmInstance: logger.info( 'Found existing machine with name - %s', self.instance_settings.name) - fips = self.__nova.floating_ips.list() + fips = neutron_utils.get_floating_ips(self.__nova) for fip in fips: if fip.instance_id == server.id: self.__floating_ips.append(fip) @@ -116,45 +116,12 @@ class OpenStackVmInstance: active, error, or timeout waiting. Floating IPs will be assigned after active when block=True """ - nics = [] - for key, port in self.__ports: - kv = dict() - kv['port-id'] = port['port']['id'] - nics.append(kv) - - logger.info('Creating VM with name - ' + self.instance_settings.name) - keypair_name = None - if self.keypair_settings: - keypair_name = self.keypair_settings.name - - flavor = nova_utils.get_flavor_by_name(self.__nova, - self.instance_settings.flavor) - if not flavor: - raise Exception( - 'Flavor not found with name - %s', - self.instance_settings.flavor) - - image = glance_utils.get_image( - glance_utils.glance_client(self.__os_creds), - self.image_settings.name) - if image: - self.__vm = self.__nova.servers.create( - name=self.instance_settings.name, - flavor=flavor, - image=image, - nics=nics, - key_name=keypair_name, - security_groups=self.instance_settings.security_group_names, - userdata=self.instance_settings.userdata, - availability_zone=self.instance_settings.availability_zone) - - else: - raise Exception( - 'Cannot create instance, image cannot be located with name %s', - self.image_settings.name) - - logger.info( - 'Created instance with name - %s', self.instance_settings.name) + glance = glance_utils.glance_client(self.__os_creds) + self.__vm = nova_utils.create_server( + self.__nova, self.__neutron, glance, self.instance_settings, + self.image_settings, self.keypair_settings) + logger.info('Created instance with name - %s', + self.instance_settings.name) if block: if not self.vm_active(block=True): @@ -162,9 +129,7 @@ class OpenStackVmInstance: 'Fatal error, VM did not become ACTIVE within the alloted ' 'time') - # TODO - the call above should add security groups. The return object - # shows they exist but the association had never been made by - # OpenStack. This call is here to ensure they have been added + # Create server should do this but found it needed to occur here for sec_grp_name in self.instance_settings.security_group_names: if self.vm_active(block=True): nova_utils.add_security_group(self.__nova, self.__vm, @@ -202,8 +167,8 @@ class OpenStackVmInstance: if ext_gateway: subnet = neutron_utils.get_subnet_by_name( self.__neutron, floating_ip_setting.subnet_name) - floating_ip = nova_utils.create_floating_ip(self.__nova, - ext_gateway) + floating_ip = neutron_utils.create_floating_ip( + self.__neutron, ext_gateway) self.__floating_ips.append(floating_ip) self.__floating_ip_dict[floating_ip_setting.name] = floating_ip @@ -241,7 +206,7 @@ class OpenStackVmInstance: for floating_ip in self.__floating_ips: try: logger.info('Deleting Floating IP - ' + floating_ip.ip) - nova_utils.delete_floating_ip(self.__nova, floating_ip) + neutron_utils.delete_floating_ip(self.__neutron, floating_ip) except Exception as e: logger.error('Error deleting Floating IP - ' + str(e)) self.__floating_ips = list() @@ -345,7 +310,8 @@ class OpenStackVmInstance: while count > 0: logger.debug('Attempting to add floating IP to instance') try: - self.__vm.add_floating_ip(floating_ip, ip) + nova_utils.add_floating_ip_to_server( + self.__nova, self.__vm, floating_ip, ip) logger.info( 'Added floating IP %s to port IP %s on instance %s', floating_ip.ip, ip, self.instance_settings.name) @@ -376,7 +342,14 @@ class OpenStackVmInstance: Returns the latest version of this server object from OpenStack :return: Server object """ - return nova_utils.get_latest_server_object(self.__nova, self.__vm) + return self.__vm + + def get_os_vm_server_obj(self): + """ + Returns the OpenStack server object + :return: the server object + """ + return nova_utils.get_latest_server_os_object(self.__nova, self.__vm) def get_port_ip(self, port_name, subnet_name=None): """ @@ -466,21 +439,21 @@ class OpenStackVmInstance: elif len(self.__floating_ips) > 0: return self.__floating_ips[0] - def __config_nic(self, nic_name, port, floating_ip): + def __config_nic(self, nic_name, port, ip): """ Although ports/NICs can contain multiple IPs, this code currently only supports the first. :param nic_name: Name of the interface :param port: The port information containing the expected IP values. - :param floating_ip: The floating IP on which to apply the playbook. + :param ip: The IP on which to apply the playbook. :return: the return value from ansible """ - ip = port['port']['fixed_ips'][0]['ip_address'] + port_ip = port['port']['fixed_ips'][0]['ip_address'] variables = { - 'floating_ip': floating_ip, + 'floating_ip': ip, 'nic_name': nic_name, - 'nic_ip': ip + 'nic_ip': port_ip } if self.image_settings.nic_config_pb_loc and self.keypair_settings: @@ -594,7 +567,8 @@ class OpenStackVmInstance: if not self.__vm: return False - instance = self.__nova.servers.get(self.__vm.id) + instance = nova_utils.get_latest_server_os_object( + self.__nova, self.__vm) if not instance: logger.warning('Cannot find instance with id - ' + self.__vm.id) return False @@ -770,7 +744,7 @@ class VmInstanceSettings: if kwargs.get('security_group_names'): if isinstance(kwargs['security_group_names'], list): - self.security_group_names = kwargs['security_group_names'] + self.security_group_names = kwargs['security_group_names'] elif isinstance(kwargs['security_group_names'], set): self.security_group_names = kwargs['security_group_names'] elif isinstance(kwargs['security_group_names'], str): diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py index 998fe88..dc8d79b 100644 --- a/snaps/openstack/tests/create_instance_tests.py +++ b/snaps/openstack/tests/create_instance_tests.py @@ -261,6 +261,7 @@ class SimpleHealthCheck(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.vm_inst_name = guid + '-inst' self.port_1_name = guid + 'port-1' @@ -361,7 +362,7 @@ class SimpleHealthCheck(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) - self.assertTrue(check_dhcp_lease(vm, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm, ip)) class CreateInstanceSimpleTests(OSIntegrationTestCase): @@ -498,6 +499,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.keypair_priv_filepath = 'tmp/' + guid self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub' @@ -696,7 +698,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertTrue(inst_creator.vm_active(block=True)) ip = inst_creator.get_port_ip(port_settings.name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) inst_creator.add_security_group( self.sec_grp_creator.get_security_group()) @@ -734,7 +736,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertTrue(inst_creator.vm_active(block=True)) ip = inst_creator.get_port_ip(port_settings.name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) inst_creator.add_security_group( self.sec_grp_creator.get_security_group()) @@ -1165,7 +1167,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase): for zone in zones: creator = self.inst_creators[index] self.assertTrue(creator.vm_active(block=True)) - vm = creator.get_vm_inst() + vm = creator.get_os_vm_server_obj() deployed_zone = vm._info['OS-EXT-AZ:availability_zone'] deployed_host = vm._info['OS-EXT-SRV-ATTR:host'] self.assertEqual(zone, deployed_zone + ':' + deployed_host) @@ -1186,6 +1188,8 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) + # Initialize for tearDown() self.image_creator = None self.network_creators = list() @@ -1387,7 +1391,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) ip = self.inst_creator.get_port_ip(ports_settings[0].name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) # Add security group to VM self.inst_creator.add_security_group( @@ -1539,15 +1543,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_creators.append(sec_grp_creator) # Check that group has not been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.inst_creator.add_security_group(sec_grp) # Validate that security group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_add_invalid_security_group(self): """ @@ -1574,15 +1578,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_creators.append(sec_grp_creator) # Check that group has not been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertFalse(self.inst_creator.add_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_remove_security_group(self): """ @@ -1610,14 +1614,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertTrue(inst_has_sec_grp(vm_inst, sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, vm_inst, sec_grp_settings.name)) # Add security group to instance after activated self.assertTrue(self.inst_creator.remove_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_remove_security_group_never_added(self): """ @@ -1644,15 +1649,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertFalse(self.inst_creator.remove_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_add_same_security_group(self): """ @@ -1680,27 +1685,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertTrue(self.inst_creator.add_security_group(sec_grp)) # Validate that security group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) -def inst_has_sec_grp(vm_inst, sec_grp_name): +def inst_has_sec_grp(nova, vm_inst, sec_grp_name): """ Returns true if instance has a security group of a given name + :param nova: the nova client + :param vm_inst: the VmInst domain object + :param sec_grp_name: the name of the security group to validate :return: """ - if not hasattr(vm_inst, 'security_groups'): + vm = nova_utils.get_latest_server_os_object(nova, vm_inst) + if not hasattr(vm, 'security_groups'): return False found = False - for sec_grp_dict in vm_inst.security_groups: + for sec_grp_dict in vm.security_groups: if sec_grp_name in sec_grp_dict['name']: found = True break @@ -2357,10 +2366,11 @@ class CreateInstanceMockOfflineTests(OSComponentTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) -def check_dhcp_lease(vm, ip, timeout=160): +def check_dhcp_lease(nova_client, vm_domain, ip, timeout=160): """ Returns true if the expected DHCP lease has been acquired - :param vm: the OpenStack VM instance object + :param nova_client: the nova client + :param vm_domain: the SNAPS VM instance domain object :param ip: the IP address to look for :param timeout: how long to query for IP address :return: @@ -2371,6 +2381,7 @@ def check_dhcp_lease(vm, ip, timeout=160): logger.info("Looking for IP %s in the console log" % ip) full_log = '' while timeout > time.time() - start_time: + vm = nova_utils.get_latest_server_os_object(nova_client, vm_domain) output = vm.get_console_output() full_log = full_log + output if re.search(ip, output): diff --git a/snaps/openstack/tests/openstack_tests.py b/snaps/openstack/tests/openstack_tests.py index 109d2ce..c0b3a4a 100644 --- a/snaps/openstack/tests/openstack_tests.py +++ b/snaps/openstack/tests/openstack_tests.py @@ -158,7 +158,7 @@ def create_image_settings(image_name, image_user, image_format, metadata, logger.debug('Image metadata - ' + str(metadata)) if metadata and 'config' in metadata: - return ImageSettings(config=metadata['config']) + return ImageSettings(**metadata['config']) disk_file = None if metadata: diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py index 3063176..1889748 100644 --- a/snaps/openstack/utils/neutron_utils.py +++ b/snaps/openstack/utils/neutron_utils.py @@ -16,6 +16,7 @@ import logging from neutronclient.common.exceptions import NotFound from neutronclient.neutron.client import Client +from snaps.domain.vm_inst import FloatingIp from snaps.openstack.utils import keystone_utils __author__ = 'spisarski' @@ -29,11 +30,13 @@ Utilities for basic neutron API calls def neutron_client(os_creds): """ - Instantiates and returns a client for communications with OpenStack's Neutron server + Instantiates and returns a client for communications with OpenStack's + Neutron server :param os_creds: the credentials for connecting to the OpenStack remote API :return: the client object """ - return Client(api_version=os_creds.network_api_version, session=keystone_utils.keystone_session(os_creds)) + return Client(api_version=os_creds.network_api_version, + session=keystone_utils.keystone_session(os_creds)) def create_network(neutron, os_creds, network_settings): @@ -41,8 +44,9 @@ def create_network(neutron, os_creds, network_settings): Creates a network for OpenStack :param neutron: the client :param os_creds: the OpenStack credentials - :param network_settings: A dictionary containing the network configuration and is responsible for creating the - network request JSON body + :param network_settings: A dictionary containing the network configuration + and is responsible for creating the network + request JSON body :return: the network object """ if neutron and network_settings: @@ -67,7 +71,8 @@ def delete_network(neutron, network): def get_network(neutron, network_name, project_id=None): """ - Returns an object (dictionary) of the first network found with a given name and project_id (if included) + Returns an object (dictionary) of the first network found with a given name + and project_id (if included) :param neutron: the client :param network_name: the name of the network to retrieve :param project_id: the id of the network's project @@ -110,13 +115,15 @@ def create_subnet(neutron, subnet_settings, os_creds, network=None): Creates a network subnet for OpenStack :param neutron: the client :param network: the network object - :param subnet_settings: A dictionary containing the subnet configuration and is responsible for creating the subnet - request JSON body + :param subnet_settings: A dictionary containing the subnet configuration + and is responsible for creating the subnet request + JSON body :param os_creds: the OpenStack credentials :return: the subnet object """ if neutron and network and subnet_settings: - json_body = {'subnets': [subnet_settings.dict_for_neutron(os_creds, network=network)]} + json_body = {'subnets': [subnet_settings.dict_for_neutron( + os_creds, network=network)]} logger.info('Creating subnet with name ' + subnet_settings.name) subnets = neutron.create_subnet(body=json_body) return {'subnet': subnets['subnets'][0]} @@ -156,8 +163,9 @@ def create_router(neutron, os_creds, router_settings): Creates a router for OpenStack :param neutron: the client :param os_creds: the OpenStack credentials - :param router_settings: A dictionary containing the router configuration and is responsible for creating the subnet - request JSON body + :param router_settings: A dictionary containing the router configuration + and is responsible for creating the subnet request + JSON body :return: the router object """ if neutron: @@ -198,7 +206,8 @@ def get_router_by_name(neutron, router_name): def add_interface_router(neutron, router, subnet=None, port=None): """ - Adds an interface router for OpenStack for either a subnet or port. Exception will be raised if requesting for both. + Adds an interface router for OpenStack for either a subnet or port. + Exception will be raised if requesting for both. :param neutron: the client :param router: the router object :param subnet: the subnet object @@ -206,13 +215,18 @@ def add_interface_router(neutron, router, subnet=None, port=None): :return: the interface router object """ if subnet and port: - raise Exception('Cannot add interface to the router. Both subnet and port were sent in. Either or please.') + raise Exception('Cannot add interface to the router. Both subnet and ' + 'port were sent in. Either or please.') if neutron and router and (router or subnet): - logger.info('Adding interface to router with name ' + router['router']['name']) - return neutron.add_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + logger.info('Adding interface to router with name ' + + router['router']['name']) + return neutron.add_interface_router( + router=router['router']['id'], + body=__create_port_json_body(subnet, port)) else: - raise Exception("Unable to create interface router as neutron client, router or subnet were not created") + raise Exception('Unable to create interface router as neutron client,' + ' router or subnet were not created') def remove_interface_router(neutron, router, subnet=None, port=None): @@ -225,10 +239,14 @@ def remove_interface_router(neutron, router, subnet=None, port=None): """ if router: try: - logger.info('Removing router interface from router named ' + router['router']['name']) - neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + logger.info('Removing router interface from router named ' + + router['router']['name']) + neutron.remove_interface_router( + router=router['router']['id'], + body=__create_port_json_body(subnet, port)) except NotFound as e: - logger.warning('Could not remove router interface. NotFound - ' + str(e)) + logger.warning('Could not remove router interface. NotFound - %s', + e) pass else: logger.warning('Could not remove router interface, No router object') @@ -236,8 +254,9 @@ def remove_interface_router(neutron, router, subnet=None, port=None): def __create_port_json_body(subnet=None, port=None): """ - Returns the dictionary required for creating and deleting router interfaces. Will only work on a subnet or port - object. Will throw and exception if parameters contain both or neither + Returns the dictionary required for creating and deleting router + interfaces. Will only work on a subnet or port object. Will throw and + exception if parameters contain both or neither :param subnet: the subnet object :param port: the port object :return: the dict @@ -262,7 +281,8 @@ def create_port(neutron, os_creds, port_settings): :return: the port object """ json_body = port_settings.dict_for_neutron(neutron, os_creds) - logger.info('Creating port for network with name - ' + port_settings.network_name) + logger.info('Creating port for network with name - %s', + port_settings.network_name) return neutron.create_port(body=json_body) @@ -299,8 +319,10 @@ def create_security_group(neutron, keystone, sec_grp_settings): :param sec_grp_settings: the security group settings :return: the security group object """ - logger.info('Creating security group with name - ' + sec_grp_settings.name) - return neutron.create_security_group(sec_grp_settings.dict_for_neutron(keystone)) + logger.info('Creating security group with name - %s', + sec_grp_settings.name) + return neutron.create_security_group( + sec_grp_settings.dict_for_neutron(keystone)) def delete_security_group(neutron, sec_grp): @@ -309,7 +331,8 @@ def delete_security_group(neutron, sec_grp): :param neutron: the client :param sec_grp: the security group object to delete """ - logger.info('Deleting security group with name - ' + sec_grp['security_group']['name']) + logger.info('Deleting security group with name - %s', + sec_grp['security_group']['name']) return neutron.delete_security_group(sec_grp['security_group']['id']) @@ -350,8 +373,10 @@ def create_security_group_rule(neutron, sec_grp_rule_settings): :param sec_grp_rule_settings: the security group rule settings :return: the security group object """ - logger.info('Creating security group to security group - ' + sec_grp_rule_settings.sec_grp_name) - return neutron.create_security_group_rule(sec_grp_rule_settings.dict_for_neutron(neutron)) + logger.info('Creating security group to security group - %s', + sec_grp_rule_settings.sec_grp_name) + return neutron.create_security_group_rule( + sec_grp_rule_settings.dict_for_neutron(neutron)) def delete_security_group_rule(neutron, sec_grp_rule): @@ -360,8 +385,10 @@ def delete_security_group_rule(neutron, sec_grp_rule): :param neutron: the client :param sec_grp_rule: the security group rule object to delete """ - logger.info('Deleting security group rule with ID - ' + sec_grp_rule['security_group_rule']['id']) - neutron.delete_security_group_rule(sec_grp_rule['security_group_rule']['id']) + logger.info('Deleting security group rule with ID - %s', + sec_grp_rule['security_group_rule']['id']) + neutron.delete_security_group_rule( + sec_grp_rule['security_group_rule']['id']) def get_rules_by_security_group(neutron, sec_grp): @@ -370,10 +397,11 @@ def get_rules_by_security_group(neutron, sec_grp): :param neutron: the client :param sec_grp: the security group object """ - logger.info('Retrieving security group rules associate with the security group - ' + - sec_grp['security_group']['name']) + logger.info('Retrieving security group rules associate with the ' + 'security group - %s', sec_grp['security_group']['name']) out = list() - rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + rules = neutron.list_security_group_rules( + **{'security_group_id': sec_grp['security_group']['id']}) for rule in rules['security_group_rules']: if rule['security_group_id'] == sec_grp['security_group']['id']: out.append({'security_group_rule': rule}) @@ -387,7 +415,8 @@ def get_rule_by_id(neutron, sec_grp, rule_id): :param sec_grp: the security group object :param rule_id: the rule's ID """ - rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + rules = neutron.list_security_group_rules( + **{'security_group_id': sec_grp['security_group']['id']}) for rule in rules['security_group_rules']: if rule['id'] == rule_id: return {'security_group_rule': rule} @@ -396,11 +425,95 @@ def get_rule_by_id(neutron, sec_grp, rule_id): def get_external_networks(neutron): """ - Returns a list of external OpenStack network object/dict for all external networks + Returns a list of external OpenStack network object/dict for all external + networks :param neutron: the client :return: a list of external networks (empty list if none configured) """ out = list() - for network in neutron.list_networks(**{'router:external': True})['networks']: + for network in neutron.list_networks( + **{'router:external': True})['networks']: out.append({'network': network}) return out + + +def get_floating_ips(neutron): + """ + Returns all of the floating IPs + :param neutron: the Neutron client + :return: a list of SNAPS FloatingIp objects + """ + out = list() + fips = neutron.list_floatingips() + for fip in fips['floatingips']: + out.append(FloatingIp(inst_id=fip['id'], + ip=fip['floating_ip_address'])) + + return out + + +def create_floating_ip(neutron, ext_net_name): + """ + Returns the floating IP object that was created with this call + :param neutron: the Neutron client + :param ext_net_name: the name of the external network on which to apply the + floating IP address + :return: the SNAPS FloatingIp object + """ + logger.info('Creating floating ip to external network - ' + ext_net_name) + ext_net = get_network(neutron, ext_net_name) + if ext_net: + fip = neutron.create_floatingip( + body={'floatingip': + {'floating_network_id': ext_net['network']['id']}}) + + return FloatingIp(inst_id=fip['floatingip']['id'], + ip=fip['floatingip']['floating_ip_address']) + else: + raise Exception('Cannot create floating IP, ' + 'external network not found') + + +def get_floating_ip(neutron, floating_ip): + """ + Returns a floating IP object that should be identical to the floating_ip + parameter + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: hopefully the same floating IP object input + """ + logger.debug('Attempting to retrieve existing floating ip with IP - %s', + floating_ip.ip) + os_fip = get_os_floating_ip(neutron, floating_ip) + if os_fip: + return FloatingIp( + inst_id=os_fip['id'], ip=os_fip['floating_ip_address']) + + +def get_os_floating_ip(neutron, floating_ip): + """ + Returns an OpenStack floating IP object + parameter + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: hopefully the same floating IP object input + """ + logger.debug('Attempting to retrieve existing floating ip with IP - %s', + floating_ip.ip) + fips = neutron.list_floatingips(ip=floating_ip.id) + + for fip in fips['floatingips']: + if fip['id'] == floating_ip.id: + return fip + + +def delete_floating_ip(neutron, floating_ip): + """ + Responsible for deleting a floating IP + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: + """ + logger.debug('Attempting to delete existing floating ip with IP - %s', + floating_ip.ip) + return neutron.delete_floatingip(floating_ip.id) diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py index a1b959a..90eec13 100644 --- a/snaps/openstack/utils/nova_utils.py +++ b/snaps/openstack/utils/nova_utils.py @@ -13,16 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.backends import default_backend - -import os import logging -from snaps.openstack.utils import keystone_utils +import os +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa from novaclient.client import Client from novaclient.exceptions import NotFound +from snaps.domain.vm_inst import VmInst +from snaps.openstack.utils import keystone_utils, glance_utils, neutron_utils __author__ = 'spisarski' @@ -35,12 +35,69 @@ Utilities for basic OpenStack Nova API calls def nova_client(os_creds): """ - Instantiates and returns a client for communications with OpenStack's Nova server + Instantiates and returns a client for communications with OpenStack's Nova + server :param os_creds: The connection credentials to the OpenStack API :return: the client object """ logger.debug('Retrieving Nova Client') - return Client(os_creds.compute_api_version, session=keystone_utils.keystone_session(os_creds)) + return Client(os_creds.compute_api_version, + session=keystone_utils.keystone_session(os_creds)) + + +def create_server(nova, neutron, glance, instance_settings, image_settings, + keypair_settings=None): + """ + Creates a VM instance + :param nova: the nova client (required) + :param neutron: the neutron client for retrieving ports (required) + :param glance: the glance client (required) + :param instance_settings: the VM instance settings object (required) + :param image_settings: the VM's image settings object (required) + :param keypair_settings: the VM's keypair settings object (optional) + :return: a snaps.domain.VmInst object + """ + + ports = list() + + for port_setting in instance_settings.port_settings: + ports.append(neutron_utils.get_port_by_name( + neutron, port_setting.name)) + nics = [] + for port in ports: + kv = dict() + kv['port-id'] = port['port']['id'] + nics.append(kv) + + logger.info('Creating VM with name - ' + instance_settings.name) + keypair_name = None + if keypair_settings: + keypair_name = keypair_settings.name + + flavor = get_flavor_by_name(nova, instance_settings.flavor) + if not flavor: + raise Exception( + 'Flavor not found with name - %s', + instance_settings.flavor) + + image = glance_utils.get_image(glance, image_settings.name) + if image: + args = {'name': instance_settings.name, + 'flavor': flavor, + 'image': image, + 'nics': nics, + 'key_name': keypair_name, + 'security_groups': + instance_settings.security_group_names, + 'userdata': instance_settings.userdata, + 'availability_zone': + instance_settings.availability_zone} + server = nova.servers.create(**args) + return VmInst(name=server.name, inst_id=server.id) + else: + raise Exception( + 'Cannot create instance, image cannot be located with name %s', + image_settings.name) def get_servers_by_name(nova, name): @@ -50,7 +107,21 @@ def get_servers_by_name(nova, name): :param name: the server name :return: the list of servers """ - return nova.servers.list(search_opts={'name': name}) + out = list() + servers = nova.servers.list(search_opts={'name': name}) + for server in servers: + out.append(VmInst(name=server.name, inst_id=server.id)) + return out + + +def get_latest_server_os_object(nova, server): + """ + Returns a server with a given id + :param nova: the Nova client + :param server: the domain VmInst object + :return: the list of servers or None if not found + """ + return nova.servers.get(server.id) def get_latest_server_object(nova, server): @@ -60,7 +131,8 @@ def get_latest_server_object(nova, server): :param server: the old server object :return: the list of servers or None if not found """ - return nova.servers.get(server) + server = get_latest_server_os_object(nova, server) + return VmInst(name=server.name, inst_id=server.id) def create_keys(key_size=2048): @@ -69,7 +141,8 @@ def create_keys(key_size=2048): :param key_size: the number of bytes for the key size :return: the cryptography keys """ - return rsa.generate_private_key(backend=default_backend(), public_exponent=65537, + return rsa.generate_private_key(backend=default_backend(), + public_exponent=65537, key_size=key_size) @@ -79,7 +152,8 @@ def public_key_openssh(keys): :param keys: the keys generated by create_keys() from cryptography :return: the OpenSSH public key """ - return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH) def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): @@ -97,7 +171,8 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): os.mkdir(pub_dir) public_handle = open(pub_file_path, 'wb') public_bytes = keys.public_key().public_bytes( - serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH) public_handle.write(public_bytes) public_handle.close() os.chmod(pub_file_path, 0o400) @@ -107,9 +182,11 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): if not os.path.isdir(priv_dir): os.mkdir(priv_dir) private_handle = open(priv_file_path, 'wb') - private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption())) + private_handle.write( + keys.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption())) private_handle.close() os.chmod(priv_file_path, 0o400) logger.info("Saved private key to - " + priv_file_path) @@ -179,57 +256,6 @@ def delete_keypair(nova, key): nova.keypairs.delete(key) -def get_floating_ip_pools(nova): - """ - Returns all of the available floating IP pools - :param nova: the Nova client - :return: a list of pools - """ - return nova.floating_ip_pools.list() - - -def get_floating_ips(nova): - """ - Returns all of the floating IPs - :param nova: the Nova client - :return: a list of floating IPs - """ - return nova.floating_ips.list() - - -def create_floating_ip(nova, ext_net_name): - """ - Returns the floating IP object that was created with this call - :param nova: the Nova client - :param ext_net_name: the name of the external network on which to apply the floating IP address - :return: the floating IP object - """ - logger.info('Creating floating ip to external network - ' + ext_net_name) - return nova.floating_ips.create(ext_net_name) - - -def get_floating_ip(nova, floating_ip): - """ - Returns a floating IP object that should be identical to the floating_ip parameter - :param nova: the Nova client - :param floating_ip: the floating IP object to lookup - :return: hopefully the same floating IP object input - """ - logger.debug('Attempting to retrieve existing floating ip with IP - ' + floating_ip.ip) - return nova.floating_ips.get(floating_ip) - - -def delete_floating_ip(nova, floating_ip): - """ - Responsible for deleting a floating IP - :param nova: the Nova client - :param floating_ip: the floating IP object to delete - :return: - """ - logger.debug('Attempting to delete existing floating ip with IP - ' + floating_ip.ip) - return nova.floating_ips.delete(floating_ip) - - def get_nova_availability_zones(nova): """ Returns the names of all nova active compute servers @@ -251,9 +277,9 @@ def delete_vm_instance(nova, vm_inst): """ Deletes a VM instance :param nova: the nova client - :param vm_inst: the OpenStack instance object to delete + :param vm_inst: the snaps.domain.VmInst object """ - nova.servers.delete(vm_inst) + nova.servers.delete(vm_inst.id) def get_flavor_by_name(nova, name): @@ -276,10 +302,15 @@ def create_flavor(nova, flavor_settings): :param flavor_settings: the flavor settings :return: the Flavor """ - return nova.flavors.create(name=flavor_settings.name, flavorid=flavor_settings.flavor_id, ram=flavor_settings.ram, - vcpus=flavor_settings.vcpus, disk=flavor_settings.disk, - ephemeral=flavor_settings.ephemeral, swap=flavor_settings.swap, - rxtx_factor=flavor_settings.rxtx_factor, is_public=flavor_settings.is_public) + return nova.flavors.create(name=flavor_settings.name, + flavorid=flavor_settings.flavor_id, + ram=flavor_settings.ram, + vcpus=flavor_settings.vcpus, + disk=flavor_settings.disk, + ephemeral=flavor_settings.ephemeral, + swap=flavor_settings.swap, + rxtx_factor=flavor_settings.rxtx_factor, + is_public=flavor_settings.is_public) def delete_flavor(nova, flavor): @@ -308,4 +339,17 @@ def remove_security_group(nova, vm, security_group): :param vm: the OpenStack server object (VM) to alter :param security_group: the OpenStack security group object to add """ - nova.servers.remove_security_group(str(vm.id), security_group['security_group']['name']) + nova.servers.remove_security_group( + str(vm.id), security_group['security_group']['name']) + + +def add_floating_ip_to_server(nova, vm, floating_ip, ip_addr): + """ + Adds a floating IP to a server instance + :param nova: the nova client + :param vm: VmInst domain object + :param floating_ip: FloatingIp domain object + :param ip_addr: the IP to which to bind the floating IP to + """ + vm = get_latest_server_os_object(nova, vm) + vm.add_floating_ip(floating_ip.ip, ip_addr) diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py index e90b94c..1e89dda 100644 --- a/snaps/openstack/utils/tests/neutron_utils_tests.py +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -14,14 +14,16 @@ # limitations under the License. import uuid -from snaps.openstack.utils import keystone_utils -from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction -from snaps.openstack.tests import openstack_tests -from snaps.openstack.utils import neutron_utils -from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings from snaps.openstack import create_router -from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \ + PortSettings +from snaps.openstack.create_security_group import SecurityGroupSettings, \ + SecurityGroupRuleSettings, Direction +from snaps.openstack.tests import openstack_tests from snaps.openstack.tests import validation_utils +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.utils import keystone_utils +from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' @@ -57,13 +59,14 @@ class NeutronSmokeTests(OSComponentTestCase): with self.assertRaises(Exception): neutron = neutron_utils.neutron_client( - OSCreds(username='user', password='pass', auth_url='url', project_name='project')) + OSCreds(username='user', password='pass', auth_url='url', + project_name='project')) neutron.list_networks() def test_retrieve_ext_network_name(self): """ - Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained - within the returned list + Tests the neutron_utils.get_external_network_names to ensure the + configured self.ext_net_name is contained within the returned list :return: """ neutron = neutron_utils.neutron_client(self.os_creds) @@ -86,7 +89,8 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): self.port_name = str(guid) + '-port' self.neutron = neutron_utils.neutron_client(self.os_creds) self.network = None - self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net') + self.net_config = openstack_tests.get_pub_net_config( + net_name=guid + '-pub-net') def tearDown(self): """ @@ -94,31 +98,40 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): """ if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_network(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) def test_create_network_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty network name + Tests the neutron_utils.create_neutron_net() function with an empty + network name """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings(name='')) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings(name='')) def test_create_network_null_name(self): """ - Tests the neutron_utils.create_neutron_net() function when the network name is None + Tests the neutron_utils.create_neutron_net() function when the network + name is None """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings()) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings()) class NeutronUtilsSubnetTests(OSComponentTestCase): @@ -133,7 +146,8 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): self.network = None self.subnet = None self.net_config = openstack_tests.get_pub_net_config( - net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name) + net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', + external_net=self.ext_net_name) def tearDown(self): """ @@ -142,71 +156,108 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_subnet(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) def test_create_subnet_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet name is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): SubnetSettings(cidr=self.net_config.subnet_cidr) def test_create_subnet_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty name + Tests the neutron_utils.create_neutron_net() function with an empty + name """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet(self.neutron, '', + subnet_setting.cidr, True) def test_create_subnet_null_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr=None, + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) def test_create_subnet_empty_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is empty """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr='', + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) class NeutronUtilsRouterTests(OSComponentTestCase): @@ -232,7 +283,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase): Cleans the remote OpenStack objects """ if self.interface_router: - neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet) + neutron_utils.remove_interface_router(self.neutron, self.router, + self.subnet) if self.router: neutron_utils.delete_router(self.neutron, self.router) @@ -244,30 +296,40 @@ class NeutronUtilsRouterTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_router_simple(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) def test_create_router_with_public_interface(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ + subnet_setting = self.net_config.network_settings.subnet_settings[0] self.net_config = openstack_tests.OSNetworkConfig( self.net_config.network_settings.name, - self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name, + subnet_setting.name, + subnet_setting.cidr, + self.net_config.router_settings.name, self.ext_net_name) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) # TODO - Add validation that the router gatway has been set def test_create_router_empty_name(self): @@ -276,83 +338,125 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings(name='') - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) def test_create_router_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function when the + subnet CIDR value is None """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings() - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) validate_router(self.neutron, None, True) def test_add_interface_router(self): """ Tests the neutron_utils.add_interface_router() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) - - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) - validate_interface_router(self.interface_router, self.router, self.subnet) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) + + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) + + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) + validate_interface_router(self.interface_router, self.router, + self.subnet) def test_add_interface_router_null_router(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.add_interface_router() function for an + Exception when the router value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_add_interface_router_null_subnet(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None + Tests the neutron_utils.add_interface_router() function for an + Exception when the subnet value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_create_port(self): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, + subnet_setting.cidr, True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( name=self.port_name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}], + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}], network_name=self.net_config.network_settings.name)) validate_port(self.neutron, self.port, self.port_name) @@ -360,111 +464,172 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr, + True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}])) + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) validate_port(self.neutron, self.port, self.port_name) def test_create_port_null_name(self): """ - Tests the neutron_utils.create_port() function for an Exception when the port name value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the port name value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) def test_create_port_null_network_object(self): """ - Tests the neutron_utils.create_port() function for an Exception when the network object is None + Tests the neutron_utils.create_port() function for an Exception when + the network object is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - self.neutron, self.port_name, self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': + self.net_config.network_settings.subnet_settings[ + 0].name, + 'ip': ip_1}])) def test_create_port_null_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': None}])) def test_create_port_invalid_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': 'foo'}])) def test_create_port_invalid_ip_to_subnet(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, - 'ip': '10.197.123.100'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': '10.197.123.100'}])) class NeutronUtilsSecurityGroupTests(OSComponentTestCase): @@ -490,7 +655,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): for security_group in self.security_groups: try: - neutron_utils.delete_security_group(self.neutron, security_group) + neutron_utils.delete_security_group(self.neutron, + security_group) except: pass @@ -499,72 +665,102 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): Tests the neutron_utils.create_security_group() function """ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name) - security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + security_group = neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) neutron_utils.delete_security_group(self.neutron, security_group) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNone(sec_grp_get) def test_create_sec_grp_no_name(self): """ - Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure - that attempting to create a security group without a name will raise an exception + Tests the SecurityGroupSettings constructor and + neutron_utils.create_security_group() function to ensure that + attempting to create a security group without a name will raise an + exception """ with self.assertRaises(Exception): sec_grp_settings = SecurityGroupSettings() self.security_groups.append( - neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings)) def test_create_sec_grp_no_rules(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, + description='hello group') + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) - self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name']) - self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + self.security_groups[0]['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + self.security_groups[0]['security_group'][ + 'description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( - self.security_groups[0]['security_group'], sec_grp_get['security_group'])) + self.security_groups[0]['security_group'], + sec_grp_get['security_group'])) def test_create_sec_grp_one_rule(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=[sec_grp_rule_settings]) + sec_grp_rule_settings = SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=[sec_grp_rule_settings]) - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) - free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0]) + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) + free_rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.security_groups[0]) for free_rule in free_rules: self.security_group_rules.append(free_rule) self.security_group_rules.append( - neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0])) + neutron_utils.create_security_group_rule( + self.neutron, sec_grp_settings.rule_settings[0])) # Refresh object so it is populated with the newly added rule - security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + security_group = neutron_utils.get_security_group( + self.neutron, sec_grp_settings.name) - rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + security_group) - self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules)) + self.assertTrue( + validation_utils.objects_equivalent(self.security_group_rules, + rules)) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) - self.assertTrue(sec_grp_settings.description, security_group['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + security_group['security_group']['description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) @@ -576,18 +772,59 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-1', + description='hello group'))) self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-2', + description='hello group'))) sec_grp_1b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[0]['security_group']['id']) sec_grp_2b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[1]['security_group']['id']) - self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id']) - self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id']) + self.assertEqual(self.security_groups[0]['security_group']['id'], + sec_grp_1b['security_group']['id']) + self.assertEqual(self.security_groups[1]['security_group']['id'], + sec_grp_2b['security_group']['id']) + + +class NeutronUtilsFloatingIpTests(OSComponentTestCase): + """ + Test basic nova keypair functionality + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack + """ + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.floating_ip = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.floating_ip: + neutron_utils.delete_floating_ip(self.neutron, self.floating_ip) + + def test_floating_ips(self): + """ + Tests the creation of a floating IP + :return: + """ + initial_fips = neutron_utils.get_floating_ips(self.neutron) + + self.floating_ip = neutron_utils.create_floating_ip(self.neutron, + self.ext_net_name) + all_fips = neutron_utils.get_floating_ips(self.neutron) + self.assertEqual(len(initial_fips) + 1, len(all_fips)) + returned = neutron_utils.get_floating_ip(self.neutron, + self.floating_ip) + self.assertEqual(self.floating_ip.id, returned.id) + self.assertEqual(self.floating_ip.ip, returned.ip) """ @@ -597,8 +834,9 @@ Validation routines def validate_network(neutron, name, exists): """ - Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a network for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a network for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a network for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected network name :param exists: Whether or not the network name should exist or not @@ -614,8 +852,9 @@ def validate_network(neutron, name, exists): def validate_subnet(neutron, name, cidr, exists): """ - Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a subnet for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a subnet for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected subnet name :param cidr: The expected CIDR value @@ -632,8 +871,9 @@ def validate_subnet(neutron, name, cidr, exists): def validate_router(neutron, name, exists): """ - Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a router for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a router for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a router for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected router name :param exists: Whether or not the network name should exist or not @@ -647,7 +887,8 @@ def validate_router(neutron, name, exists): def validate_interface_router(interface_router, router, subnet): """ - Returns true if the router ID & subnet ID have been properly included into the interface router object + Returns true if the router ID & subnet ID have been properly included into + the interface router object :param interface_router: the object to validate :param router: to validate against the interface_router :param subnet: to validate against the interface_router @@ -661,8 +902,9 @@ def validate_interface_router(interface_router, router, subnet): def validate_port(neutron, port_obj, this_port_name): """ - Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a port for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a port for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a port for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param port_obj: The port object to lookup :param this_port_name: The expected router name diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py index 98aa889..552ffc7 100644 --- a/snaps/openstack/utils/tests/nova_utils_tests.py +++ b/snaps/openstack/utils/tests/nova_utils_tests.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import os import uuid -from snaps.openstack.utils import nova_utils +import os +from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor +from snaps.openstack.create_image import OpenStackImage +from snaps.openstack.create_instance import VmInstanceSettings +from snaps.openstack.create_network import OpenStackNetwork, PortSettings +from snaps.openstack.tests import openstack_tests from snaps.openstack.tests.os_source_file_test import OSComponentTestCase -from snaps.openstack.create_flavor import FlavorSettings +from snaps.openstack.utils import nova_utils, neutron_utils, glance_utils __author__ = 'spisarski' @@ -46,8 +50,10 @@ class NovaSmokeTests(OSComponentTestCase): from snaps.openstack.os_credentials import OSCreds nova = nova_utils.nova_client( - OSCreds(username='user', password='pass', auth_url=self.os_creds.auth_url, - project_name=self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings)) + OSCreds(username='user', password='pass', + auth_url=self.os_creds.auth_url, + project_name=self.os_creds.project_name, + proxy_settings=self.os_creds.proxy_settings)) # This should throw an exception with self.assertRaises(Exception): @@ -61,8 +67,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase): def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.priv_key_file_path = 'tmp/' + guid @@ -73,7 +79,6 @@ class NovaUtilsKeypairTests(OSComponentTestCase): self.public_key = nova_utils.public_key_openssh(self.keys) self.keypair_name = guid self.keypair = None - self.floating_ip = None def tearDown(self): """ @@ -97,14 +102,12 @@ class NovaUtilsKeypairTests(OSComponentTestCase): except: pass - if self.floating_ip: - nova_utils.delete_floating_ip(self.nova, self.floating_ip) - def test_create_keypair(self): """ Tests the creation of an OpenStack keypair that does not exist. """ - self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, + self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) self.assertEqual(self.keypair, result) keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name) @@ -114,7 +117,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase): """ Tests the creation of an OpenStack keypair that does not exist. """ - self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, + self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) self.assertEqual(self.keypair, result) nova_utils.delete_keypair(self.nova, self.keypair) @@ -126,25 +130,16 @@ class NovaUtilsKeypairTests(OSComponentTestCase): Tests that the generated RSA keys are properly saved to files :return: """ - nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path) - self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path) + nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, + self.priv_key_file_path) + self.keypair = nova_utils.upload_keypair_file(self.nova, + self.keypair_name, + self.pub_key_file_path) pub_key_file = open(os.path.expanduser(self.pub_key_file_path)) pub_key = pub_key_file.read() pub_key_file.close() self.assertEqual(self.keypair.public_key, pub_key) - def test_floating_ips(self): - """ - Tests the creation of a floating IP - :return: - """ - ips = nova_utils.get_floating_ips(self.nova) - self.assertIsNotNone(ips) - - self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name) - returned = nova_utils.get_floating_ip(self.nova, self.floating_ip) - self.assertEqual(self.floating_ip, returned) - class NovaUtilsFlavorTests(OSComponentTestCase): """ @@ -153,12 +148,15 @@ class NovaUtilsFlavorTests(OSComponentTestCase): def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) - self.flavor_settings = FlavorSettings(name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1, vcpus=1, - ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False) + self.flavor_settings = FlavorSettings(name=guid + '-name', + flavor_id=guid + '-id', ram=1, + disk=1, vcpus=1, + ephemeral=1, swap=2, + rxtx_factor=3.0, is_public=False) self.nova = nova_utils.nova_client(self.os_creds) self.flavor = None @@ -186,7 +184,8 @@ class NovaUtilsFlavorTests(OSComponentTestCase): self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings) self.validate_flavor() nova_utils.delete_flavor(self.nova, self.flavor) - flavor = nova_utils.get_flavor_by_name(self.nova, self.flavor_settings.name) + flavor = nova_utils.get_flavor_by_name(self.nova, + self.flavor_settings.name) self.assertIsNone(flavor) def validate_flavor(self): @@ -206,5 +205,110 @@ class NovaUtilsFlavorTests(OSComponentTestCase): else: self.assertEqual(self.flavor_settings.swap, self.flavor.swap) - self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor) + self.assertEqual(self.flavor_settings.rxtx_factor, + self.flavor.rxtx_factor) self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public) + + +class NovaUtilsInstanceTests(OSComponentTestCase): + """ + Tests the creation of VM instances via nova_utils.py + """ + + def setUp(self): + """ + Setup objects required by VM instances + :return: + """ + + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + + self.nova = nova_utils.nova_client(self.os_creds) + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.glance = glance_utils.glance_client(self.os_creds) + + self.image_creator = None + self.network_creator = None + self.flavor_creator = None + self.port = None + self.vm_inst = None + + try: + image_settings = openstack_tests.cirros_image_settings( + name=guid + '-image', image_metadata=self.image_metadata) + self.image_creator = OpenStackImage( + self.os_creds, image_settings=image_settings) + self.image_creator.create() + + network_settings = openstack_tests.get_priv_net_config( + guid + '-net', guid + '-subnet').network_settings + self.network_creator = OpenStackNetwork( + self.os_creds, network_settings) + self.network_creator.create() + + self.flavor_creator = OpenStackFlavor( + self.os_creds, + FlavorSettings( + name=guid + '-flavor-name', ram=128, disk=10, vcpus=1)) + self.flavor_creator.create() + + port_settings = PortSettings(name=guid + '-port', + network_name=network_settings.name) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, port_settings) + + self.instance_settings = VmInstanceSettings( + name=guid + '-vm_inst', + flavor=self.flavor_creator.flavor_settings.name, + port_settings=[port_settings]) + except: + self.tearDown() + raise + + def tearDown(self): + """ + Cleanup deployed resources + :return: + """ + if self.vm_inst: + try: + nova_utils.delete_vm_instance(self.nova, self.vm_inst) + except: + pass + if self.port: + try: + neutron_utils.delete_port(self.neutron, self.port) + except: + pass + if self.flavor_creator: + try: + self.flavor_creator.clean() + except: + pass + if self.network_creator: + try: + self.network_creator.clean() + except: + pass + if self.image_creator: + try: + self.image_creator.clean() + except: + pass + + def test_create_instance(self): + """ + Tests the nova_utils.create_server() method + :return: + """ + + self.vm_inst = nova_utils.create_server( + self.nova, self.neutron, self.glance, self.instance_settings, + self.image_creator.image_settings) + + self.assertIsNotNone(self.vm_inst) + + vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst) + + self.assertEqual(self.vm_inst.name, vm_inst.name) + self.assertEqual(self.vm_inst.id, vm_inst.id) diff --git a/snaps/provisioning/tests/ansible_utils_tests.py b/snaps/provisioning/tests/ansible_utils_tests.py index cddedcd..733068f 100644 --- a/snaps/provisioning/tests/ansible_utils_tests.py +++ b/snaps/provisioning/tests/ansible_utils_tests.py @@ -13,21 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid + import os import pkg_resources -import uuid from scp import SCPClient -from snaps.openstack.create_security_group import SecurityGroupRuleSettings, Direction, Protocol, \ - OpenStackSecurityGroup, SecurityGroupSettings - from snaps.openstack import create_flavor -from snaps.openstack import create_instance from snaps.openstack import create_image +from snaps.openstack import create_instance from snaps.openstack import create_keypairs from snaps.openstack import create_network from snaps.openstack import create_router +from snaps.openstack.create_security_group import ( + SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup, + SecurityGroupSettings) from snaps.openstack.tests import openstack_tests, create_instance_tests from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase +from snaps.openstack.utils import nova_utils from snaps.provisioning import ansible_utils VM_BOOT_TIMEOUT = 600 @@ -38,16 +40,19 @@ ip_2 = '10.0.1.200' class AnsibleProvisioningTests(OSIntegrationTestCase): """ - Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP and eth1 w/o + Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP + and eth1 w/o """ def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.keypair_priv_filepath = 'tmp/' + guid self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub' @@ -69,62 +74,78 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): try: # Create Image - os_image_settings = openstack_tests.ubuntu_image_settings(name=guid + '-' + '-image', - image_metadata=self.image_metadata) - self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings) + os_image_settings = openstack_tests.ubuntu_image_settings( + name=guid + '-' + '-image', + image_metadata=self.image_metadata) + self.image_creator = create_image.OpenStackImage(self.os_creds, + os_image_settings) self.image_creator.create() # First network is public self.pub_net_config = openstack_tests.get_pub_net_config( net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', - router_name=guid + '-pub-router', external_net=self.ext_net_name) + router_name=guid + '-pub-router', + external_net=self.ext_net_name) - self.network_creator = create_network.OpenStackNetwork(self.os_creds, self.pub_net_config.network_settings) + self.network_creator = create_network.OpenStackNetwork( + self.os_creds, self.pub_net_config.network_settings) self.network_creator.create() # Create routers - self.router_creator = create_router.OpenStackRouter(self.os_creds, self.pub_net_config.router_settings) + self.router_creator = create_router.OpenStackRouter( + self.os_creds, self.pub_net_config.router_settings) self.router_creator.create() # Create Flavor self.flavor_creator = create_flavor.OpenStackFlavor( self.admin_os_creds, - create_flavor.FlavorSettings(name=guid + '-flavor-name', ram=2048, disk=10, vcpus=2, + create_flavor.FlavorSettings(name=guid + '-flavor-name', + ram=2048, disk=10, vcpus=2, metadata=self.flavor_metadata)) self.flavor_creator.create() # Create Key/Pair self.keypair_creator = create_keypairs.OpenStackKeypair( self.os_creds, create_keypairs.KeypairSettings( - name=self.keypair_name, public_filepath=self.keypair_pub_filepath, + name=self.keypair_name, + public_filepath=self.keypair_pub_filepath, private_filepath=self.keypair_priv_filepath)) self.keypair_creator.create() # Create Security Group sec_grp_name = guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress, + rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, + direction=Direction.ingress, protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress, - protocol=Protocol.tcp, port_range_min=22, port_range_max=22) + rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, + direction=Direction.ingress, + protocol=Protocol.tcp, + port_range_min=22, + port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2])) + SecurityGroupSettings(name=sec_grp_name, + rule_settings=[rule1, rule2])) self.sec_grp_creator.create() # Create instance ports_settings = list() ports_settings.append( - create_network.PortSettings(name=self.port_1_name, - network_name=self.pub_net_config.network_settings.name)) + create_network.PortSettings( + name=self.port_1_name, + network_name=self.pub_net_config.network_settings.name)) instance_settings = create_instance.VmInstanceSettings( - name=self.vm_inst_name, flavor=self.flavor_creator.flavor_settings.name, port_settings=ports_settings, + name=self.vm_inst_name, + flavor=self.flavor_creator.flavor_settings.name, + port_settings=ports_settings, floating_ip_settings=[create_instance.FloatingIpSettings( name=self.floating_ip_name, port_name=self.port_1_name, router_name=self.pub_net_config.router_settings.name)]) self.inst_creator = create_instance.OpenStackVmInstance( - self.os_creds, instance_settings, self.image_creator.image_settings, + self.os_creds, instance_settings, + self.image_creator.image_settings, keypair_settings=self.keypair_creator.keypair_settings) except: self.tearDown() @@ -165,10 +186,13 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): def test_apply_simple_playbook(self): """ - Tests application of an Ansible playbook that simply copies over a file: - 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False - 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False - Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail. + Tests application of an Ansible playbook that simply copies over a file + 1. Have a ~/.ansible.cfg (or alternate means) to + set host_key_checking = False + 2. Set the following environment variable in your executing shell: + ANSIBLE_HOST_KEY_CHECKING=False + Should this not be performed, the creation of the host ssh key will + cause your ansible calls to fail. """ vm = self.inst_creator.create(block=True) @@ -176,10 +200,12 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_ssh_active(block=True)) priv_ip = self.inst_creator.get_port_ip(self.port_1_name) - self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip)) + self.assertTrue(create_instance_tests.check_dhcp_lease( + self.nova, vm, priv_ip)) # Apply Security Group - self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group()) + self.inst_creator.add_security_group( + self.sec_grp_creator.get_security_group()) ssh_client = self.inst_creator.ssh_client() self.assertIsNotNone(ssh_client) @@ -187,16 +213,19 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertIsNotNone(out) self.assertGreater(len(out), 1) - # Need to use the first floating IP as subsequent ones are currently broken with Apex CO + # Need to use the first floating IP as subsequent ones are currently + # broken with Apex CO ip = self.inst_creator.get_floating_ip().ip user = self.inst_creator.get_image_user() priv_key = self.inst_creator.keypair_settings.private_filepath - relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks', 'simple_playbook.yml') + relative_pb_path = pkg_resources.resource_filename( + 'snaps.provisioning.tests.playbooks', 'simple_playbook.yml') retval = self.inst_creator.apply_ansible_playbook(relative_pb_path) self.assertEqual(0, retval) - ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings) + ssh = ansible_utils.ssh_client(ip, user, priv_key, + self.os_creds.proxy_settings) self.assertIsNotNone(ssh) scp = SCPClient(ssh.get_transport()) scp.get('~/hello.txt', self.test_file_local_path) @@ -209,10 +238,14 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): def test_apply_template_playbook(self): """ - Tests application of an Ansible playbook that applies a template to a file: - 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False - 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False - Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail. + Tests application of an Ansible playbook that applies a template to a + file: + 1. Have a ~/.ansible.cfg (or alternate means) to set + host_key_checking = False + 2. Set the following environment variable in your executing shell: + ANSIBLE_HOST_KEY_CHECKING=False + Should this not be performed, the creation of the host ssh key will + cause your ansible calls to fail. """ vm = self.inst_creator.create(block=True) @@ -220,22 +253,29 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_ssh_active(block=True)) priv_ip = self.inst_creator.get_port_ip(self.port_1_name) - self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip)) + self.assertTrue(create_instance_tests.check_dhcp_lease( + self.nova, vm, priv_ip)) # Apply Security Group - self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group()) + self.inst_creator.add_security_group( + self.sec_grp_creator.get_security_group()) - # Need to use the first floating IP as subsequent ones are currently broken with Apex CO + # Need to use the first floating IP as subsequent ones are currently + # broken with Apex CO ip = self.inst_creator.get_floating_ip().ip user = self.inst_creator.get_image_user() priv_key = self.inst_creator.keypair_settings.private_filepath - relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks', - 'template_playbook.yml') - retval = self.inst_creator.apply_ansible_playbook(relative_pb_path, variables={'name': 'Foo'}) + relative_pb_path = pkg_resources.resource_filename( + 'snaps.provisioning.tests.playbooks', + 'template_playbook.yml') + retval = self.inst_creator.apply_ansible_playbook(relative_pb_path, + variables={ + 'name': 'Foo'}) self.assertEqual(0, retval) - ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings) + ssh = ansible_utils.ssh_client(ip, user, priv_key, + self.os_creds.proxy_settings) self.assertIsNotNone(ssh) scp = SCPClient(ssh.get_transport()) scp.get('/tmp/hello.txt', self.test_file_local_path) diff --git a/snaps/test_suite_builder.py b/snaps/test_suite_builder.py index e4f5685..d87ae5a 100644 --- a/snaps/test_suite_builder.py +++ b/snaps/test_suite_builder.py @@ -18,34 +18,54 @@ import unittest from snaps.domain.test.image_tests import ImageDomainObjectTests from snaps.domain.test.stack_tests import StackDomainObjectTests -from snaps.openstack.tests.create_stack_tests import StackSettingsUnitTests, CreateStackSuccessTests, \ - CreateStackNegativeTests -from snaps.openstack.utils.tests.glance_utils_tests import GlanceSmokeTests, GlanceUtilsTests +from snaps.domain.test.vm_inst_tests import (VmInstDomainObjectTests, + FloatingIpDomainObjectTests) from snaps.openstack.tests.create_flavor_tests import CreateFlavorTests -from snaps.openstack.utils.tests.heat_utils_tests import HeatUtilsCreateStackTests, HeatSmokeTests +from snaps.openstack.tests.create_image_tests import ( + CreateImageSuccessTests, CreateImageNegativeTests, ImageSettingsUnitTests, + CreateMultiPartImageTests) +from snaps.openstack.tests.create_instance_tests import ( + CreateInstanceSingleNetworkTests, CreateInstancePubPrivNetTests, + CreateInstanceOnComputeHost, CreateInstanceSimpleTests, + FloatingIpSettingsUnitTests, InstanceSecurityGroupTests, + VmInstanceSettingsUnitTests, CreateInstancePortManipulationTests, + SimpleHealthCheck, CreateInstanceFromThreePartImage, + CreateInstanceMockOfflineTests) +from snaps.openstack.tests.create_keypairs_tests import ( + CreateKeypairsTests, KeypairSettingsUnitTests) +from snaps.openstack.tests.create_network_tests import ( + CreateNetworkSuccessTests, NetworkSettingsUnitTests, PortSettingsUnitTests, + SubnetSettingsUnitTests, CreateNetworkTypeTests) +from snaps.openstack.tests.create_project_tests import ( + CreateProjectSuccessTests, ProjectSettingsUnitTests, + CreateProjectUserTests) +from snaps.openstack.tests.create_router_tests import ( + CreateRouterSuccessTests, CreateRouterNegativeTests) +from snaps.openstack.tests.create_security_group_tests import ( + CreateSecurityGroupTests, SecurityGroupRuleSettingsUnitTests, + SecurityGroupSettingsUnitTests) +from snaps.openstack.tests.create_stack_tests import ( + StackSettingsUnitTests, CreateStackSuccessTests, CreateStackNegativeTests) +from snaps.openstack.tests.create_user_tests import ( + UserSettingsUnitTests, CreateUserSuccessTests) +from snaps.openstack.tests.os_source_file_test import ( + OSComponentTestCase, OSIntegrationTestCase) +from snaps.openstack.utils.tests.glance_utils_tests import ( + GlanceSmokeTests, GlanceUtilsTests) +from snaps.openstack.utils.tests.heat_utils_tests import ( + HeatUtilsCreateStackTests, HeatSmokeTests) +from snaps.openstack.utils.tests.keystone_utils_tests import ( + KeystoneSmokeTests, KeystoneUtilsTests) +from snaps.openstack.utils.tests.neutron_utils_tests import ( + NeutronSmokeTests, NeutronUtilsNetworkTests, NeutronUtilsSubnetTests, + NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests, + NeutronUtilsFloatingIpTests) +from snaps.openstack.utils.tests.nova_utils_tests import ( + NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests, + NovaUtilsInstanceTests) +from snaps.provisioning.tests.ansible_utils_tests import ( + AnsibleProvisioningTests) from snaps.tests.file_utils_tests import FileUtilsTests -from snaps.openstack.tests.create_security_group_tests import CreateSecurityGroupTests, \ - SecurityGroupRuleSettingsUnitTests, SecurityGroupSettingsUnitTests -from snaps.openstack.tests.create_project_tests import CreateProjectSuccessTests, ProjectSettingsUnitTests, \ - CreateProjectUserTests -from snaps.openstack.tests.create_user_tests import UserSettingsUnitTests, CreateUserSuccessTests -from snaps.openstack.utils.tests.keystone_utils_tests import KeystoneSmokeTests, KeystoneUtilsTests -from snaps.openstack.utils.tests.neutron_utils_tests import NeutronSmokeTests, NeutronUtilsNetworkTests, \ - NeutronUtilsSubnetTests, NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests -from snaps.openstack.tests.create_image_tests import CreateImageSuccessTests, CreateImageNegativeTests, \ - ImageSettingsUnitTests, CreateMultiPartImageTests -from snaps.openstack.tests.create_keypairs_tests import CreateKeypairsTests, KeypairSettingsUnitTests -from snaps.openstack.tests.create_network_tests import CreateNetworkSuccessTests, NetworkSettingsUnitTests, \ - PortSettingsUnitTests, SubnetSettingsUnitTests, CreateNetworkTypeTests -from snaps.openstack.tests.create_router_tests import CreateRouterSuccessTests, CreateRouterNegativeTests -from snaps.openstack.tests.create_instance_tests import CreateInstanceSingleNetworkTests, \ - CreateInstancePubPrivNetTests, CreateInstanceOnComputeHost, CreateInstanceSimpleTests, \ - FloatingIpSettingsUnitTests, InstanceSecurityGroupTests, VmInstanceSettingsUnitTests, \ - CreateInstancePortManipulationTests, SimpleHealthCheck, CreateInstanceFromThreePartImage, \ - CreateInstanceMockOfflineTests -from snaps.provisioning.tests.ansible_utils_tests import AnsibleProvisioningTests -from snaps.openstack.tests.os_source_file_test import OSComponentTestCase, OSIntegrationTestCase -from snaps.openstack.utils.tests.nova_utils_tests import NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests __author__ = 'spisarski' @@ -57,59 +77,95 @@ def add_unit_tests(suite): :return: None as the tests will be adding to the 'suite' parameter object """ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FileUtilsTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupRuleSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FloatingIpSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(VmInstanceSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests)) - - -def add_openstack_client_tests(suite, os_creds, ext_net_name, use_keystone=True, log_level=logging.INFO): + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + SecurityGroupRuleSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + SecurityGroupSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + FloatingIpSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + VmInstanceSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(VmInstDomainObjectTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + FloatingIpDomainObjectTests)) + + +def add_openstack_client_tests(suite, os_creds, ext_net_name, + use_keystone=True, log_level=logging.INFO): """ Adds tests written to exercise OpenStack client retrieval :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network + :param os_creds: and instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ # Basic connection tests - suite.addTest(OSComponentTestCase.parameterize(GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) if use_keystone: - suite.addTest(OSComponentTestCase.parameterize(KeystoneSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + KeystoneSmokeTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) -def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, image_metadata=None, - log_level=logging.INFO): +def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, + image_metadata=None, log_level=logging.INFO): """ Adds tests written to exercise all existing OpenStack APIs :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network - :param image_metadata: dict() object containing metadata for creating an image with custom config + :param os_creds: Instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network + :param image_metadata: dict() object containing metadata for creating an + image with custom config (see YAML files in examples/image-metadata) :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object @@ -117,131 +173,203 @@ def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, im # Tests the OpenStack API calls if use_keystone: suite.addTest(OSComponentTestCase.parameterize( - KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateUserSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateUserSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateProjectSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateProjectSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateProjectUserTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateProjectUserTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, image_metadata=image_metadata, + GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, + image_metadata=image_metadata, + log_level=log_level)) + suite.addTest(OSComponentTestCase.parameterize( + NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest(OSComponentTestCase.parameterize( + NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsSecurityGroupTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsFloatingIpTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsInstanceTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - HeatUtilsCreateStackTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level, + HeatUtilsCreateStackTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level, image_metadata=image_metadata)) -def add_openstack_integration_tests(suite, os_creds, ext_net_name, use_keystone=True, flavor_metadata=None, - image_metadata=None, use_floating_ips=True, log_level=logging.INFO): +def add_openstack_integration_tests(suite, os_creds, ext_net_name, + use_keystone=True, flavor_metadata=None, + image_metadata=None, use_floating_ips=True, + log_level=logging.INFO): """ - Adds tests written to exercise all long-running OpenStack integration tests meaning they will be creating VM - instances and potentially performing some SSH functions through floating IPs + Adds tests written to exercise all long-running OpenStack integration tests + meaning they will be creating VM instances and potentially performing some + SSH functions through floatingIPs :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network - :param image_metadata: dict() object containing metadata for creating an image with custom config + :param os_creds: and instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network + :param image_metadata: dict() object containing metadata for creating an + image with custom config (see YAML files in examples/image-metadata) - :param flavor_metadata: dict() object containing the metadata required by your flavor based on your configuration: + :param flavor_metadata: dict() object containing the metadata required by + your flavor based on your configuration: (i.e. {'hw:mem_page_size': 'large'}) - :param use_floating_ips: when true, all tests requiring Floating IPs will be added to the suite + :param use_floating_ips: when true, all tests requiring Floating IPs will + be added to the suite :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ - # Tests the OpenStack API calls via a creator. If use_keystone, objects will be created with a custom user - # and project + # Tests the OpenStack API calls via a creator. If use_keystone, objects + # will be created with a custom user and project # Creator Object tests suite.addTest(OSIntegrationTestCase.parameterize( - CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateMultiPartImageTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateMultiPartImageTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateNetworkSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateNetworkSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateRouterNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateRouterNegativeTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) # VM Instances suite.addTest(OSIntegrationTestCase.parameterize( - SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceSimpleTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceSimpleTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstancePortManipulationTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstancePortManipulationTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - InstanceSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + InstanceSecurityGroupTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceOnComputeHost, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceOnComputeHost, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceFromThreePartImage, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceFromThreePartImage, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) if use_floating_ips: suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceSingleNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceSingleNetworkTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstancePubPrivNetTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstancePubPrivNetTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - AnsibleProvisioningTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + AnsibleProvisioningTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) -def add_openstack_staging_tests(suite, os_creds, ext_net_name, log_level=logging.INFO): +def add_openstack_staging_tests(suite, os_creds, ext_net_name, + log_level=logging.INFO): """ - Adds tests that are still in development have not been designed to run successfully against all OpenStack pods + Adds tests that are still in development have not been designed to run + successfully against all OpenStack pods :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test + :param os_creds: Instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ suite.addTest(OSComponentTestCase.parameterize( - CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateInstanceMockOfflineTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateInstanceMockOfflineTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) |