diff options
-rw-r--r-- | requirements.txt | 2 | ||||
-rw-r--r-- | snaps/domain/test/vm_inst_tests.py | 49 | ||||
-rw-r--r-- | snaps/domain/vm_inst.py | 44 | ||||
-rw-r--r-- | snaps/openstack/create_instance.py | 84 | ||||
-rw-r--r-- | snaps/openstack/tests/create_instance_tests.py | 69 | ||||
-rw-r--r-- | snaps/openstack/tests/openstack_tests.py | 2 | ||||
-rw-r--r-- | snaps/openstack/utils/neutron_utils.py | 181 | ||||
-rw-r--r-- | snaps/openstack/utils/nova_utils.py | 192 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/neutron_utils_tests.py | 686 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/nova_utils_tests.py | 170 | ||||
-rw-r--r-- | snaps/provisioning/tests/ansible_utils_tests.py | 128 | ||||
-rw-r--r-- | snaps/test_suite_builder.py | 396 |
12 files changed, 1376 insertions, 627 deletions
diff --git a/requirements.txt b/requirements.txt index c3e748f..c708103 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -python-novaclient!=7.0.0,<8.0.0,>=6.0.0 # Apache-2.0 +python-novaclient>=6.0.0 # Apache-2.0 python-neutronclient>=5.1.0 # Apache-2.0 python-keystoneclient>=3.8.0 # Apache-2.0 python-glanceclient>=2.5.0 # Apache-2.0 diff --git a/snaps/domain/test/vm_inst_tests.py b/snaps/domain/test/vm_inst_tests.py new file mode 100644 index 0000000..e722a06 --- /dev/null +++ b/snaps/domain/test/vm_inst_tests.py @@ -0,0 +1,49 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from snaps.domain.vm_inst import VmInst, FloatingIp + + +class VmInstDomainObjectTests(unittest.TestCase): + """ + Tests the construction of the snaps.domain.test.Image class + """ + + def test_construction_positional(self): + vm_inst = VmInst('name', 'id') + self.assertEqual('name', vm_inst.name) + self.assertEqual('id', vm_inst.id) + + def test_construction_named(self): + vm_inst = VmInst(inst_id='id', name='name') + self.assertEqual('name', vm_inst.name) + self.assertEqual('id', vm_inst.id) + + +class FloatingIpDomainObjectTests(unittest.TestCase): + """ + Tests the construction of the snaps.domain.test.Image class + """ + + def test_construction_positional(self): + vm_inst = FloatingIp('id-123', '10.0.0.1') + self.assertEqual('id-123', vm_inst.id) + self.assertEqual('10.0.0.1', vm_inst.ip) + + def test_construction_named(self): + vm_inst = FloatingIp(ip='10.0.0.1', inst_id='id-123') + self.assertEqual('id-123', vm_inst.id) + self.assertEqual('10.0.0.1', vm_inst.ip) diff --git a/snaps/domain/vm_inst.py b/snaps/domain/vm_inst.py new file mode 100644 index 0000000..0e12d14 --- /dev/null +++ b/snaps/domain/vm_inst.py @@ -0,0 +1,44 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class VmInst: + """ + SNAPS domain object for Images. Should contain attributes that + are shared amongst cloud providers + """ + def __init__(self, name, inst_id): + """ + Constructor + :param name: the image's name + :param inst_id: the instance's id + """ + self.name = name + self.id = inst_id + + +class FloatingIp: + """ + SNAPS domain object for Images. Should contain attributes that + are shared amongst cloud providers + """ + def __init__(self, inst_id, ip): + """ + Constructor + :param inst_id: the floating ip's id + :param ip: the IP address + """ + self.id = inst_id + self.ip = ip diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py index 991aca5..68c421d 100644 --- a/snaps/openstack/create_instance.py +++ b/snaps/openstack/create_instance.py @@ -102,7 +102,7 @@ class OpenStackVmInstance: logger.info( 'Found existing machine with name - %s', self.instance_settings.name) - fips = self.__nova.floating_ips.list() + fips = neutron_utils.get_floating_ips(self.__nova) for fip in fips: if fip.instance_id == server.id: self.__floating_ips.append(fip) @@ -116,45 +116,12 @@ class OpenStackVmInstance: active, error, or timeout waiting. Floating IPs will be assigned after active when block=True """ - nics = [] - for key, port in self.__ports: - kv = dict() - kv['port-id'] = port['port']['id'] - nics.append(kv) - - logger.info('Creating VM with name - ' + self.instance_settings.name) - keypair_name = None - if self.keypair_settings: - keypair_name = self.keypair_settings.name - - flavor = nova_utils.get_flavor_by_name(self.__nova, - self.instance_settings.flavor) - if not flavor: - raise Exception( - 'Flavor not found with name - %s', - self.instance_settings.flavor) - - image = glance_utils.get_image( - glance_utils.glance_client(self.__os_creds), - self.image_settings.name) - if image: - self.__vm = self.__nova.servers.create( - name=self.instance_settings.name, - flavor=flavor, - image=image, - nics=nics, - key_name=keypair_name, - security_groups=self.instance_settings.security_group_names, - userdata=self.instance_settings.userdata, - availability_zone=self.instance_settings.availability_zone) - - else: - raise Exception( - 'Cannot create instance, image cannot be located with name %s', - self.image_settings.name) - - logger.info( - 'Created instance with name - %s', self.instance_settings.name) + glance = glance_utils.glance_client(self.__os_creds) + self.__vm = nova_utils.create_server( + self.__nova, self.__neutron, glance, self.instance_settings, + self.image_settings, self.keypair_settings) + logger.info('Created instance with name - %s', + self.instance_settings.name) if block: if not self.vm_active(block=True): @@ -162,9 +129,7 @@ class OpenStackVmInstance: 'Fatal error, VM did not become ACTIVE within the alloted ' 'time') - # TODO - the call above should add security groups. The return object - # shows they exist but the association had never been made by - # OpenStack. This call is here to ensure they have been added + # Create server should do this but found it needed to occur here for sec_grp_name in self.instance_settings.security_group_names: if self.vm_active(block=True): nova_utils.add_security_group(self.__nova, self.__vm, @@ -202,8 +167,8 @@ class OpenStackVmInstance: if ext_gateway: subnet = neutron_utils.get_subnet_by_name( self.__neutron, floating_ip_setting.subnet_name) - floating_ip = nova_utils.create_floating_ip(self.__nova, - ext_gateway) + floating_ip = neutron_utils.create_floating_ip( + self.__neutron, ext_gateway) self.__floating_ips.append(floating_ip) self.__floating_ip_dict[floating_ip_setting.name] = floating_ip @@ -241,7 +206,7 @@ class OpenStackVmInstance: for floating_ip in self.__floating_ips: try: logger.info('Deleting Floating IP - ' + floating_ip.ip) - nova_utils.delete_floating_ip(self.__nova, floating_ip) + neutron_utils.delete_floating_ip(self.__neutron, floating_ip) except Exception as e: logger.error('Error deleting Floating IP - ' + str(e)) self.__floating_ips = list() @@ -345,7 +310,8 @@ class OpenStackVmInstance: while count > 0: logger.debug('Attempting to add floating IP to instance') try: - self.__vm.add_floating_ip(floating_ip, ip) + nova_utils.add_floating_ip_to_server( + self.__nova, self.__vm, floating_ip, ip) logger.info( 'Added floating IP %s to port IP %s on instance %s', floating_ip.ip, ip, self.instance_settings.name) @@ -376,7 +342,14 @@ class OpenStackVmInstance: Returns the latest version of this server object from OpenStack :return: Server object """ - return nova_utils.get_latest_server_object(self.__nova, self.__vm) + return self.__vm + + def get_os_vm_server_obj(self): + """ + Returns the OpenStack server object + :return: the server object + """ + return nova_utils.get_latest_server_os_object(self.__nova, self.__vm) def get_port_ip(self, port_name, subnet_name=None): """ @@ -466,21 +439,21 @@ class OpenStackVmInstance: elif len(self.__floating_ips) > 0: return self.__floating_ips[0] - def __config_nic(self, nic_name, port, floating_ip): + def __config_nic(self, nic_name, port, ip): """ Although ports/NICs can contain multiple IPs, this code currently only supports the first. :param nic_name: Name of the interface :param port: The port information containing the expected IP values. - :param floating_ip: The floating IP on which to apply the playbook. + :param ip: The IP on which to apply the playbook. :return: the return value from ansible """ - ip = port['port']['fixed_ips'][0]['ip_address'] + port_ip = port['port']['fixed_ips'][0]['ip_address'] variables = { - 'floating_ip': floating_ip, + 'floating_ip': ip, 'nic_name': nic_name, - 'nic_ip': ip + 'nic_ip': port_ip } if self.image_settings.nic_config_pb_loc and self.keypair_settings: @@ -594,7 +567,8 @@ class OpenStackVmInstance: if not self.__vm: return False - instance = self.__nova.servers.get(self.__vm.id) + instance = nova_utils.get_latest_server_os_object( + self.__nova, self.__vm) if not instance: logger.warning('Cannot find instance with id - ' + self.__vm.id) return False @@ -770,7 +744,7 @@ class VmInstanceSettings: if kwargs.get('security_group_names'): if isinstance(kwargs['security_group_names'], list): - self.security_group_names = kwargs['security_group_names'] + self.security_group_names = kwargs['security_group_names'] elif isinstance(kwargs['security_group_names'], set): self.security_group_names = kwargs['security_group_names'] elif isinstance(kwargs['security_group_names'], str): diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py index 998fe88..dc8d79b 100644 --- a/snaps/openstack/tests/create_instance_tests.py +++ b/snaps/openstack/tests/create_instance_tests.py @@ -261,6 +261,7 @@ class SimpleHealthCheck(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.vm_inst_name = guid + '-inst' self.port_1_name = guid + 'port-1' @@ -361,7 +362,7 @@ class SimpleHealthCheck(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) - self.assertTrue(check_dhcp_lease(vm, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm, ip)) class CreateInstanceSimpleTests(OSIntegrationTestCase): @@ -498,6 +499,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.keypair_priv_filepath = 'tmp/' + guid self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub' @@ -696,7 +698,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertTrue(inst_creator.vm_active(block=True)) ip = inst_creator.get_port_ip(port_settings.name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) inst_creator.add_security_group( self.sec_grp_creator.get_security_group()) @@ -734,7 +736,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertTrue(inst_creator.vm_active(block=True)) ip = inst_creator.get_port_ip(port_settings.name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) inst_creator.add_security_group( self.sec_grp_creator.get_security_group()) @@ -1165,7 +1167,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase): for zone in zones: creator = self.inst_creators[index] self.assertTrue(creator.vm_active(block=True)) - vm = creator.get_vm_inst() + vm = creator.get_os_vm_server_obj() deployed_zone = vm._info['OS-EXT-AZ:availability_zone'] deployed_host = vm._info['OS-EXT-SRV-ATTR:host'] self.assertEqual(zone, deployed_zone + ':' + deployed_host) @@ -1186,6 +1188,8 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) + # Initialize for tearDown() self.image_creator = None self.network_creators = list() @@ -1387,7 +1391,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) ip = self.inst_creator.get_port_ip(ports_settings[0].name) - self.assertTrue(check_dhcp_lease(vm_inst, ip)) + self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip)) # Add security group to VM self.inst_creator.add_security_group( @@ -1539,15 +1543,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_creators.append(sec_grp_creator) # Check that group has not been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.inst_creator.add_security_group(sec_grp) # Validate that security group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_add_invalid_security_group(self): """ @@ -1574,15 +1578,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_creators.append(sec_grp_creator) # Check that group has not been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertFalse(self.inst_creator.add_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_remove_security_group(self): """ @@ -1610,14 +1614,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertTrue(inst_has_sec_grp(vm_inst, sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, vm_inst, sec_grp_settings.name)) # Add security group to instance after activated self.assertTrue(self.inst_creator.remove_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_remove_security_group_never_added(self): """ @@ -1644,15 +1649,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertFalse(self.inst_creator.remove_security_group(sec_grp)) # Validate that security group has been added - self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertFalse(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) def test_add_same_security_group(self): """ @@ -1680,27 +1685,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Check that group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) # Add security group to instance after activated self.assertTrue(self.inst_creator.add_security_group(sec_grp)) # Validate that security group has been added - self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(), - sec_grp_settings.name)) + self.assertTrue(inst_has_sec_grp( + self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name)) -def inst_has_sec_grp(vm_inst, sec_grp_name): +def inst_has_sec_grp(nova, vm_inst, sec_grp_name): """ Returns true if instance has a security group of a given name + :param nova: the nova client + :param vm_inst: the VmInst domain object + :param sec_grp_name: the name of the security group to validate :return: """ - if not hasattr(vm_inst, 'security_groups'): + vm = nova_utils.get_latest_server_os_object(nova, vm_inst) + if not hasattr(vm, 'security_groups'): return False found = False - for sec_grp_dict in vm_inst.security_groups: + for sec_grp_dict in vm.security_groups: if sec_grp_name in sec_grp_dict['name']: found = True break @@ -2357,10 +2366,11 @@ class CreateInstanceMockOfflineTests(OSComponentTestCase): self.assertTrue(self.inst_creator.vm_active(block=True)) -def check_dhcp_lease(vm, ip, timeout=160): +def check_dhcp_lease(nova_client, vm_domain, ip, timeout=160): """ Returns true if the expected DHCP lease has been acquired - :param vm: the OpenStack VM instance object + :param nova_client: the nova client + :param vm_domain: the SNAPS VM instance domain object :param ip: the IP address to look for :param timeout: how long to query for IP address :return: @@ -2371,6 +2381,7 @@ def check_dhcp_lease(vm, ip, timeout=160): logger.info("Looking for IP %s in the console log" % ip) full_log = '' while timeout > time.time() - start_time: + vm = nova_utils.get_latest_server_os_object(nova_client, vm_domain) output = vm.get_console_output() full_log = full_log + output if re.search(ip, output): diff --git a/snaps/openstack/tests/openstack_tests.py b/snaps/openstack/tests/openstack_tests.py index 109d2ce..c0b3a4a 100644 --- a/snaps/openstack/tests/openstack_tests.py +++ b/snaps/openstack/tests/openstack_tests.py @@ -158,7 +158,7 @@ def create_image_settings(image_name, image_user, image_format, metadata, logger.debug('Image metadata - ' + str(metadata)) if metadata and 'config' in metadata: - return ImageSettings(config=metadata['config']) + return ImageSettings(**metadata['config']) disk_file = None if metadata: diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py index 3063176..1889748 100644 --- a/snaps/openstack/utils/neutron_utils.py +++ b/snaps/openstack/utils/neutron_utils.py @@ -16,6 +16,7 @@ import logging from neutronclient.common.exceptions import NotFound from neutronclient.neutron.client import Client +from snaps.domain.vm_inst import FloatingIp from snaps.openstack.utils import keystone_utils __author__ = 'spisarski' @@ -29,11 +30,13 @@ Utilities for basic neutron API calls def neutron_client(os_creds): """ - Instantiates and returns a client for communications with OpenStack's Neutron server + Instantiates and returns a client for communications with OpenStack's + Neutron server :param os_creds: the credentials for connecting to the OpenStack remote API :return: the client object """ - return Client(api_version=os_creds.network_api_version, session=keystone_utils.keystone_session(os_creds)) + return Client(api_version=os_creds.network_api_version, + session=keystone_utils.keystone_session(os_creds)) def create_network(neutron, os_creds, network_settings): @@ -41,8 +44,9 @@ def create_network(neutron, os_creds, network_settings): Creates a network for OpenStack :param neutron: the client :param os_creds: the OpenStack credentials - :param network_settings: A dictionary containing the network configuration and is responsible for creating the - network request JSON body + :param network_settings: A dictionary containing the network configuration + and is responsible for creating the network + request JSON body :return: the network object """ if neutron and network_settings: @@ -67,7 +71,8 @@ def delete_network(neutron, network): def get_network(neutron, network_name, project_id=None): """ - Returns an object (dictionary) of the first network found with a given name and project_id (if included) + Returns an object (dictionary) of the first network found with a given name + and project_id (if included) :param neutron: the client :param network_name: the name of the network to retrieve :param project_id: the id of the network's project @@ -110,13 +115,15 @@ def create_subnet(neutron, subnet_settings, os_creds, network=None): Creates a network subnet for OpenStack :param neutron: the client :param network: the network object - :param subnet_settings: A dictionary containing the subnet configuration and is responsible for creating the subnet - request JSON body + :param subnet_settings: A dictionary containing the subnet configuration + and is responsible for creating the subnet request + JSON body :param os_creds: the OpenStack credentials :return: the subnet object """ if neutron and network and subnet_settings: - json_body = {'subnets': [subnet_settings.dict_for_neutron(os_creds, network=network)]} + json_body = {'subnets': [subnet_settings.dict_for_neutron( + os_creds, network=network)]} logger.info('Creating subnet with name ' + subnet_settings.name) subnets = neutron.create_subnet(body=json_body) return {'subnet': subnets['subnets'][0]} @@ -156,8 +163,9 @@ def create_router(neutron, os_creds, router_settings): Creates a router for OpenStack :param neutron: the client :param os_creds: the OpenStack credentials - :param router_settings: A dictionary containing the router configuration and is responsible for creating the subnet - request JSON body + :param router_settings: A dictionary containing the router configuration + and is responsible for creating the subnet request + JSON body :return: the router object """ if neutron: @@ -198,7 +206,8 @@ def get_router_by_name(neutron, router_name): def add_interface_router(neutron, router, subnet=None, port=None): """ - Adds an interface router for OpenStack for either a subnet or port. Exception will be raised if requesting for both. + Adds an interface router for OpenStack for either a subnet or port. + Exception will be raised if requesting for both. :param neutron: the client :param router: the router object :param subnet: the subnet object @@ -206,13 +215,18 @@ def add_interface_router(neutron, router, subnet=None, port=None): :return: the interface router object """ if subnet and port: - raise Exception('Cannot add interface to the router. Both subnet and port were sent in. Either or please.') + raise Exception('Cannot add interface to the router. Both subnet and ' + 'port were sent in. Either or please.') if neutron and router and (router or subnet): - logger.info('Adding interface to router with name ' + router['router']['name']) - return neutron.add_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + logger.info('Adding interface to router with name ' + + router['router']['name']) + return neutron.add_interface_router( + router=router['router']['id'], + body=__create_port_json_body(subnet, port)) else: - raise Exception("Unable to create interface router as neutron client, router or subnet were not created") + raise Exception('Unable to create interface router as neutron client,' + ' router or subnet were not created') def remove_interface_router(neutron, router, subnet=None, port=None): @@ -225,10 +239,14 @@ def remove_interface_router(neutron, router, subnet=None, port=None): """ if router: try: - logger.info('Removing router interface from router named ' + router['router']['name']) - neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + logger.info('Removing router interface from router named ' + + router['router']['name']) + neutron.remove_interface_router( + router=router['router']['id'], + body=__create_port_json_body(subnet, port)) except NotFound as e: - logger.warning('Could not remove router interface. NotFound - ' + str(e)) + logger.warning('Could not remove router interface. NotFound - %s', + e) pass else: logger.warning('Could not remove router interface, No router object') @@ -236,8 +254,9 @@ def remove_interface_router(neutron, router, subnet=None, port=None): def __create_port_json_body(subnet=None, port=None): """ - Returns the dictionary required for creating and deleting router interfaces. Will only work on a subnet or port - object. Will throw and exception if parameters contain both or neither + Returns the dictionary required for creating and deleting router + interfaces. Will only work on a subnet or port object. Will throw and + exception if parameters contain both or neither :param subnet: the subnet object :param port: the port object :return: the dict @@ -262,7 +281,8 @@ def create_port(neutron, os_creds, port_settings): :return: the port object """ json_body = port_settings.dict_for_neutron(neutron, os_creds) - logger.info('Creating port for network with name - ' + port_settings.network_name) + logger.info('Creating port for network with name - %s', + port_settings.network_name) return neutron.create_port(body=json_body) @@ -299,8 +319,10 @@ def create_security_group(neutron, keystone, sec_grp_settings): :param sec_grp_settings: the security group settings :return: the security group object """ - logger.info('Creating security group with name - ' + sec_grp_settings.name) - return neutron.create_security_group(sec_grp_settings.dict_for_neutron(keystone)) + logger.info('Creating security group with name - %s', + sec_grp_settings.name) + return neutron.create_security_group( + sec_grp_settings.dict_for_neutron(keystone)) def delete_security_group(neutron, sec_grp): @@ -309,7 +331,8 @@ def delete_security_group(neutron, sec_grp): :param neutron: the client :param sec_grp: the security group object to delete """ - logger.info('Deleting security group with name - ' + sec_grp['security_group']['name']) + logger.info('Deleting security group with name - %s', + sec_grp['security_group']['name']) return neutron.delete_security_group(sec_grp['security_group']['id']) @@ -350,8 +373,10 @@ def create_security_group_rule(neutron, sec_grp_rule_settings): :param sec_grp_rule_settings: the security group rule settings :return: the security group object """ - logger.info('Creating security group to security group - ' + sec_grp_rule_settings.sec_grp_name) - return neutron.create_security_group_rule(sec_grp_rule_settings.dict_for_neutron(neutron)) + logger.info('Creating security group to security group - %s', + sec_grp_rule_settings.sec_grp_name) + return neutron.create_security_group_rule( + sec_grp_rule_settings.dict_for_neutron(neutron)) def delete_security_group_rule(neutron, sec_grp_rule): @@ -360,8 +385,10 @@ def delete_security_group_rule(neutron, sec_grp_rule): :param neutron: the client :param sec_grp_rule: the security group rule object to delete """ - logger.info('Deleting security group rule with ID - ' + sec_grp_rule['security_group_rule']['id']) - neutron.delete_security_group_rule(sec_grp_rule['security_group_rule']['id']) + logger.info('Deleting security group rule with ID - %s', + sec_grp_rule['security_group_rule']['id']) + neutron.delete_security_group_rule( + sec_grp_rule['security_group_rule']['id']) def get_rules_by_security_group(neutron, sec_grp): @@ -370,10 +397,11 @@ def get_rules_by_security_group(neutron, sec_grp): :param neutron: the client :param sec_grp: the security group object """ - logger.info('Retrieving security group rules associate with the security group - ' + - sec_grp['security_group']['name']) + logger.info('Retrieving security group rules associate with the ' + 'security group - %s', sec_grp['security_group']['name']) out = list() - rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + rules = neutron.list_security_group_rules( + **{'security_group_id': sec_grp['security_group']['id']}) for rule in rules['security_group_rules']: if rule['security_group_id'] == sec_grp['security_group']['id']: out.append({'security_group_rule': rule}) @@ -387,7 +415,8 @@ def get_rule_by_id(neutron, sec_grp, rule_id): :param sec_grp: the security group object :param rule_id: the rule's ID """ - rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + rules = neutron.list_security_group_rules( + **{'security_group_id': sec_grp['security_group']['id']}) for rule in rules['security_group_rules']: if rule['id'] == rule_id: return {'security_group_rule': rule} @@ -396,11 +425,95 @@ def get_rule_by_id(neutron, sec_grp, rule_id): def get_external_networks(neutron): """ - Returns a list of external OpenStack network object/dict for all external networks + Returns a list of external OpenStack network object/dict for all external + networks :param neutron: the client :return: a list of external networks (empty list if none configured) """ out = list() - for network in neutron.list_networks(**{'router:external': True})['networks']: + for network in neutron.list_networks( + **{'router:external': True})['networks']: out.append({'network': network}) return out + + +def get_floating_ips(neutron): + """ + Returns all of the floating IPs + :param neutron: the Neutron client + :return: a list of SNAPS FloatingIp objects + """ + out = list() + fips = neutron.list_floatingips() + for fip in fips['floatingips']: + out.append(FloatingIp(inst_id=fip['id'], + ip=fip['floating_ip_address'])) + + return out + + +def create_floating_ip(neutron, ext_net_name): + """ + Returns the floating IP object that was created with this call + :param neutron: the Neutron client + :param ext_net_name: the name of the external network on which to apply the + floating IP address + :return: the SNAPS FloatingIp object + """ + logger.info('Creating floating ip to external network - ' + ext_net_name) + ext_net = get_network(neutron, ext_net_name) + if ext_net: + fip = neutron.create_floatingip( + body={'floatingip': + {'floating_network_id': ext_net['network']['id']}}) + + return FloatingIp(inst_id=fip['floatingip']['id'], + ip=fip['floatingip']['floating_ip_address']) + else: + raise Exception('Cannot create floating IP, ' + 'external network not found') + + +def get_floating_ip(neutron, floating_ip): + """ + Returns a floating IP object that should be identical to the floating_ip + parameter + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: hopefully the same floating IP object input + """ + logger.debug('Attempting to retrieve existing floating ip with IP - %s', + floating_ip.ip) + os_fip = get_os_floating_ip(neutron, floating_ip) + if os_fip: + return FloatingIp( + inst_id=os_fip['id'], ip=os_fip['floating_ip_address']) + + +def get_os_floating_ip(neutron, floating_ip): + """ + Returns an OpenStack floating IP object + parameter + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: hopefully the same floating IP object input + """ + logger.debug('Attempting to retrieve existing floating ip with IP - %s', + floating_ip.ip) + fips = neutron.list_floatingips(ip=floating_ip.id) + + for fip in fips['floatingips']: + if fip['id'] == floating_ip.id: + return fip + + +def delete_floating_ip(neutron, floating_ip): + """ + Responsible for deleting a floating IP + :param neutron: the Neutron client + :param floating_ip: the SNAPS FloatingIp object + :return: + """ + logger.debug('Attempting to delete existing floating ip with IP - %s', + floating_ip.ip) + return neutron.delete_floatingip(floating_ip.id) diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py index a1b959a..90eec13 100644 --- a/snaps/openstack/utils/nova_utils.py +++ b/snaps/openstack/utils/nova_utils.py @@ -13,16 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cryptography.hazmat.primitives import serialization -from cryptography.hazmat.primitives.asymmetric import rsa -from cryptography.hazmat.backends import default_backend - -import os import logging -from snaps.openstack.utils import keystone_utils +import os +from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa from novaclient.client import Client from novaclient.exceptions import NotFound +from snaps.domain.vm_inst import VmInst +from snaps.openstack.utils import keystone_utils, glance_utils, neutron_utils __author__ = 'spisarski' @@ -35,12 +35,69 @@ Utilities for basic OpenStack Nova API calls def nova_client(os_creds): """ - Instantiates and returns a client for communications with OpenStack's Nova server + Instantiates and returns a client for communications with OpenStack's Nova + server :param os_creds: The connection credentials to the OpenStack API :return: the client object """ logger.debug('Retrieving Nova Client') - return Client(os_creds.compute_api_version, session=keystone_utils.keystone_session(os_creds)) + return Client(os_creds.compute_api_version, + session=keystone_utils.keystone_session(os_creds)) + + +def create_server(nova, neutron, glance, instance_settings, image_settings, + keypair_settings=None): + """ + Creates a VM instance + :param nova: the nova client (required) + :param neutron: the neutron client for retrieving ports (required) + :param glance: the glance client (required) + :param instance_settings: the VM instance settings object (required) + :param image_settings: the VM's image settings object (required) + :param keypair_settings: the VM's keypair settings object (optional) + :return: a snaps.domain.VmInst object + """ + + ports = list() + + for port_setting in instance_settings.port_settings: + ports.append(neutron_utils.get_port_by_name( + neutron, port_setting.name)) + nics = [] + for port in ports: + kv = dict() + kv['port-id'] = port['port']['id'] + nics.append(kv) + + logger.info('Creating VM with name - ' + instance_settings.name) + keypair_name = None + if keypair_settings: + keypair_name = keypair_settings.name + + flavor = get_flavor_by_name(nova, instance_settings.flavor) + if not flavor: + raise Exception( + 'Flavor not found with name - %s', + instance_settings.flavor) + + image = glance_utils.get_image(glance, image_settings.name) + if image: + args = {'name': instance_settings.name, + 'flavor': flavor, + 'image': image, + 'nics': nics, + 'key_name': keypair_name, + 'security_groups': + instance_settings.security_group_names, + 'userdata': instance_settings.userdata, + 'availability_zone': + instance_settings.availability_zone} + server = nova.servers.create(**args) + return VmInst(name=server.name, inst_id=server.id) + else: + raise Exception( + 'Cannot create instance, image cannot be located with name %s', + image_settings.name) def get_servers_by_name(nova, name): @@ -50,7 +107,21 @@ def get_servers_by_name(nova, name): :param name: the server name :return: the list of servers """ - return nova.servers.list(search_opts={'name': name}) + out = list() + servers = nova.servers.list(search_opts={'name': name}) + for server in servers: + out.append(VmInst(name=server.name, inst_id=server.id)) + return out + + +def get_latest_server_os_object(nova, server): + """ + Returns a server with a given id + :param nova: the Nova client + :param server: the domain VmInst object + :return: the list of servers or None if not found + """ + return nova.servers.get(server.id) def get_latest_server_object(nova, server): @@ -60,7 +131,8 @@ def get_latest_server_object(nova, server): :param server: the old server object :return: the list of servers or None if not found """ - return nova.servers.get(server) + server = get_latest_server_os_object(nova, server) + return VmInst(name=server.name, inst_id=server.id) def create_keys(key_size=2048): @@ -69,7 +141,8 @@ def create_keys(key_size=2048): :param key_size: the number of bytes for the key size :return: the cryptography keys """ - return rsa.generate_private_key(backend=default_backend(), public_exponent=65537, + return rsa.generate_private_key(backend=default_backend(), + public_exponent=65537, key_size=key_size) @@ -79,7 +152,8 @@ def public_key_openssh(keys): :param keys: the keys generated by create_keys() from cryptography :return: the OpenSSH public key """ - return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH) def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): @@ -97,7 +171,8 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): os.mkdir(pub_dir) public_handle = open(pub_file_path, 'wb') public_bytes = keys.public_key().public_bytes( - serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + serialization.Encoding.OpenSSH, + serialization.PublicFormat.OpenSSH) public_handle.write(public_bytes) public_handle.close() os.chmod(pub_file_path, 0o400) @@ -107,9 +182,11 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): if not os.path.isdir(priv_dir): os.mkdir(priv_dir) private_handle = open(priv_file_path, 'wb') - private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, - encryption_algorithm=serialization.NoEncryption())) + private_handle.write( + keys.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption())) private_handle.close() os.chmod(priv_file_path, 0o400) logger.info("Saved private key to - " + priv_file_path) @@ -179,57 +256,6 @@ def delete_keypair(nova, key): nova.keypairs.delete(key) -def get_floating_ip_pools(nova): - """ - Returns all of the available floating IP pools - :param nova: the Nova client - :return: a list of pools - """ - return nova.floating_ip_pools.list() - - -def get_floating_ips(nova): - """ - Returns all of the floating IPs - :param nova: the Nova client - :return: a list of floating IPs - """ - return nova.floating_ips.list() - - -def create_floating_ip(nova, ext_net_name): - """ - Returns the floating IP object that was created with this call - :param nova: the Nova client - :param ext_net_name: the name of the external network on which to apply the floating IP address - :return: the floating IP object - """ - logger.info('Creating floating ip to external network - ' + ext_net_name) - return nova.floating_ips.create(ext_net_name) - - -def get_floating_ip(nova, floating_ip): - """ - Returns a floating IP object that should be identical to the floating_ip parameter - :param nova: the Nova client - :param floating_ip: the floating IP object to lookup - :return: hopefully the same floating IP object input - """ - logger.debug('Attempting to retrieve existing floating ip with IP - ' + floating_ip.ip) - return nova.floating_ips.get(floating_ip) - - -def delete_floating_ip(nova, floating_ip): - """ - Responsible for deleting a floating IP - :param nova: the Nova client - :param floating_ip: the floating IP object to delete - :return: - """ - logger.debug('Attempting to delete existing floating ip with IP - ' + floating_ip.ip) - return nova.floating_ips.delete(floating_ip) - - def get_nova_availability_zones(nova): """ Returns the names of all nova active compute servers @@ -251,9 +277,9 @@ def delete_vm_instance(nova, vm_inst): """ Deletes a VM instance :param nova: the nova client - :param vm_inst: the OpenStack instance object to delete + :param vm_inst: the snaps.domain.VmInst object """ - nova.servers.delete(vm_inst) + nova.servers.delete(vm_inst.id) def get_flavor_by_name(nova, name): @@ -276,10 +302,15 @@ def create_flavor(nova, flavor_settings): :param flavor_settings: the flavor settings :return: the Flavor """ - return nova.flavors.create(name=flavor_settings.name, flavorid=flavor_settings.flavor_id, ram=flavor_settings.ram, - vcpus=flavor_settings.vcpus, disk=flavor_settings.disk, - ephemeral=flavor_settings.ephemeral, swap=flavor_settings.swap, - rxtx_factor=flavor_settings.rxtx_factor, is_public=flavor_settings.is_public) + return nova.flavors.create(name=flavor_settings.name, + flavorid=flavor_settings.flavor_id, + ram=flavor_settings.ram, + vcpus=flavor_settings.vcpus, + disk=flavor_settings.disk, + ephemeral=flavor_settings.ephemeral, + swap=flavor_settings.swap, + rxtx_factor=flavor_settings.rxtx_factor, + is_public=flavor_settings.is_public) def delete_flavor(nova, flavor): @@ -308,4 +339,17 @@ def remove_security_group(nova, vm, security_group): :param vm: the OpenStack server object (VM) to alter :param security_group: the OpenStack security group object to add """ - nova.servers.remove_security_group(str(vm.id), security_group['security_group']['name']) + nova.servers.remove_security_group( + str(vm.id), security_group['security_group']['name']) + + +def add_floating_ip_to_server(nova, vm, floating_ip, ip_addr): + """ + Adds a floating IP to a server instance + :param nova: the nova client + :param vm: VmInst domain object + :param floating_ip: FloatingIp domain object + :param ip_addr: the IP to which to bind the floating IP to + """ + vm = get_latest_server_os_object(nova, vm) + vm.add_floating_ip(floating_ip.ip, ip_addr) diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py index e90b94c..1e89dda 100644 --- a/snaps/openstack/utils/tests/neutron_utils_tests.py +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -14,14 +14,16 @@ # limitations under the License. import uuid -from snaps.openstack.utils import keystone_utils -from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction -from snaps.openstack.tests import openstack_tests -from snaps.openstack.utils import neutron_utils -from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings from snaps.openstack import create_router -from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \ + PortSettings +from snaps.openstack.create_security_group import SecurityGroupSettings, \ + SecurityGroupRuleSettings, Direction +from snaps.openstack.tests import openstack_tests from snaps.openstack.tests import validation_utils +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.utils import keystone_utils +from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' @@ -57,13 +59,14 @@ class NeutronSmokeTests(OSComponentTestCase): with self.assertRaises(Exception): neutron = neutron_utils.neutron_client( - OSCreds(username='user', password='pass', auth_url='url', project_name='project')) + OSCreds(username='user', password='pass', auth_url='url', + project_name='project')) neutron.list_networks() def test_retrieve_ext_network_name(self): """ - Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained - within the returned list + Tests the neutron_utils.get_external_network_names to ensure the + configured self.ext_net_name is contained within the returned list :return: """ neutron = neutron_utils.neutron_client(self.os_creds) @@ -86,7 +89,8 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): self.port_name = str(guid) + '-port' self.neutron = neutron_utils.neutron_client(self.os_creds) self.network = None - self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net') + self.net_config = openstack_tests.get_pub_net_config( + net_name=guid + '-pub-net') def tearDown(self): """ @@ -94,31 +98,40 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): """ if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_network(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) def test_create_network_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty network name + Tests the neutron_utils.create_neutron_net() function with an empty + network name """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings(name='')) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings(name='')) def test_create_network_null_name(self): """ - Tests the neutron_utils.create_neutron_net() function when the network name is None + Tests the neutron_utils.create_neutron_net() function when the network + name is None """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, self.os_creds, - network_settings=NetworkSettings()) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, + network_settings=NetworkSettings()) class NeutronUtilsSubnetTests(OSComponentTestCase): @@ -133,7 +146,8 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): self.network = None self.subnet = None self.net_config = openstack_tests.get_pub_net_config( - net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name) + net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', + external_net=self.ext_net_name) def tearDown(self): """ @@ -142,71 +156,108 @@ class NeutronUtilsSubnetTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_subnet(self): """ Tests the neutron_utils.create_neutron_net() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) def test_create_subnet_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet name is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): SubnetSettings(cidr=self.net_config.subnet_cidr) def test_create_subnet_empty_name(self): """ - Tests the neutron_utils.create_neutron_net() function with an empty name + Tests the neutron_utils.create_neutron_net() function with an empty + name """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, network=self.network) - validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, network=self.network) + validate_subnet(self.neutron, '', + subnet_setting.cidr, True) def test_create_subnet_null_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr=None, + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) def test_create_subnet_empty_cidr(self): """ - Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty + Tests the neutron_utils.create_neutron_subnet() function for an + Exception when the subnet CIDR value is empty """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) with self.assertRaises(Exception): - sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name) - neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + sub_sets = SubnetSettings(cidr='', + name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, + network=self.network) class NeutronUtilsRouterTests(OSComponentTestCase): @@ -232,7 +283,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase): Cleans the remote OpenStack objects """ if self.interface_router: - neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet) + neutron_utils.remove_interface_router(self.neutron, self.router, + self.subnet) if self.router: neutron_utils.delete_router(self.neutron, self.router) @@ -244,30 +296,40 @@ class NeutronUtilsRouterTests(OSComponentTestCase): if self.subnet: neutron_utils.delete_subnet(self.neutron, self.subnet) validate_subnet(self.neutron, self.subnet.get('name'), - self.net_config.network_settings.subnet_settings[0].cidr, False) + self.net_config.network_settings.subnet_settings[ + 0].cidr, False) if self.network: neutron_utils.delete_network(self.neutron, self.network) - validate_network(self.neutron, self.network['network']['name'], False) + validate_network(self.neutron, self.network['network']['name'], + False) def test_create_router_simple(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) def test_create_router_with_public_interface(self): """ - Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + Tests the neutron_utils.create_neutron_net() function when an external + gateway is requested """ + subnet_setting = self.net_config.network_settings.subnet_settings[0] self.net_config = openstack_tests.OSNetworkConfig( self.net_config.network_settings.name, - self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name, + subnet_setting.name, + subnet_setting.cidr, + self.net_config.router_settings.name, self.ext_net_name) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) # TODO - Add validation that the router gatway has been set def test_create_router_empty_name(self): @@ -276,83 +338,125 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings(name='') - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) def test_create_router_null_name(self): """ - Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None + Tests the neutron_utils.create_neutron_subnet() function when the + subnet CIDR value is None """ with self.assertRaises(Exception): this_router_settings = create_router.RouterSettings() - self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + self.router = neutron_utils.create_router(self.neutron, + self.os_creds, + this_router_settings) validate_router(self.neutron, None, True) def test_add_interface_router(self): """ Tests the neutron_utils.add_interface_router() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) - - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) - validate_interface_router(self.interface_router, self.router, self.subnet) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) + + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) + + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) + validate_interface_router(self.interface_router, self.router, + self.subnet) def test_add_interface_router_null_router(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.add_interface_router() function for an + Exception when the router value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_add_interface_router_null_subnet(self): """ - Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None + Tests the neutron_utils.add_interface_router() function for an + Exception when the subnet value is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) - validate_router(self.neutron, self.net_config.router_settings.name, True) + self.router = neutron_utils.create_router( + self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, + True) with self.assertRaises(Exception): - self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + self.interface_router = neutron_utils.add_interface_router( + self.neutron, self.router, self.subnet) def test_create_port(self): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, + subnet_setting.cidr, True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( name=self.port_name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}], + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}], network_name=self.net_config.network_settings.name)) validate_port(self.neutron, self.port, self.port_name) @@ -360,111 +464,172 @@ class NeutronUtilsRouterTests(OSComponentTestCase): """ Tests the neutron_utils.create_port() function """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, self.os_creds, self.network) + validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr, + True) self.port = neutron_utils.create_port( self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}])) + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) validate_port(self.neutron, self.port, self.port_name) def test_create_port_null_name(self): """ - Tests the neutron_utils.create_port() function for an Exception when the port name value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the port name value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': ip_1}])) def test_create_port_null_network_object(self): """ - Tests the neutron_utils.create_port() function for an Exception when the network object is None + Tests the neutron_utils.create_port() function for an Exception when + the network object is None """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) - with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - self.neutron, self.port_name, self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': + self.net_config.network_settings.subnet_settings[ + 0].name, + 'ip': ip_1}])) def test_create_port_null_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, subnet_setting, + self.os_creds, self.network) + validate_subnet( + self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': None}])) def test_create_port_invalid_ip(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': 'foo'}])) def test_create_port_invalid_ip_to_subnet(self): """ - Tests the neutron_utils.create_port() function for an Exception when the IP value is None - """ - self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) - self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) - self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) - - self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], - self.os_creds, self.network) - validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, - self.net_config.network_settings.subnet_settings[0].cidr, True) + Tests the neutron_utils.create_port() function for an Exception when + the IP value is None + """ + self.network = neutron_utils.create_network( + self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, + self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, + self.net_config.network_settings.name, + True)) + + subnet_setting = self.net_config.network_settings.subnet_settings[0] + self.subnet = neutron_utils.create_subnet( + self.neutron, + subnet_setting, + self.os_creds, self.network) + validate_subnet(self.neutron, + subnet_setting.name, + subnet_setting.cidr, True) with self.assertRaises(Exception): - self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( - name=self.port_name, network_name=self.net_config.network_settings.name, - ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, - 'ip': '10.197.123.100'}])) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, + PortSettings( + name=self.port_name, + network_name=self.net_config.network_settings.name, + ip_addrs=[{ + 'subnet_name': subnet_setting.name, + 'ip': '10.197.123.100'}])) class NeutronUtilsSecurityGroupTests(OSComponentTestCase): @@ -490,7 +655,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): for security_group in self.security_groups: try: - neutron_utils.delete_security_group(self.neutron, security_group) + neutron_utils.delete_security_group(self.neutron, + security_group) except: pass @@ -499,72 +665,102 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): Tests the neutron_utils.create_security_group() function """ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name) - security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + security_group = neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) neutron_utils.delete_security_group(self.neutron, security_group) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNone(sec_grp_get) def test_create_sec_grp_no_name(self): """ - Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure - that attempting to create a security group without a name will raise an exception + Tests the SecurityGroupSettings constructor and + neutron_utils.create_security_group() function to ensure that + attempting to create a security group without a name will raise an + exception """ with self.assertRaises(Exception): sec_grp_settings = SecurityGroupSettings() self.security_groups.append( - neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + neutron_utils.create_security_group(self.neutron, + self.keystone, + sec_grp_settings)) def test_create_sec_grp_no_rules(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, + description='hello group') + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) - self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name']) - self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + self.security_groups[0]['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + self.security_groups[0]['security_group'][ + 'description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( - self.security_groups[0]['security_group'], sec_grp_get['security_group'])) + self.security_groups[0]['security_group'], + sec_grp_get['security_group'])) def test_create_sec_grp_one_rule(self): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=[sec_grp_rule_settings]) + sec_grp_rule_settings = SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=[sec_grp_rule_settings]) - self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) - free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0]) + self.security_groups.append( + neutron_utils.create_security_group(self.neutron, self.keystone, + sec_grp_settings)) + free_rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.security_groups[0]) for free_rule in free_rules: self.security_group_rules.append(free_rule) self.security_group_rules.append( - neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0])) + neutron_utils.create_security_group_rule( + self.neutron, sec_grp_settings.rule_settings[0])) # Refresh object so it is populated with the newly added rule - security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + security_group = neutron_utils.get_security_group( + self.neutron, sec_grp_settings.name) - rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + security_group) - self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules)) + self.assertTrue( + validation_utils.objects_equivalent(self.security_group_rules, + rules)) - self.assertTrue(sec_grp_settings.name, security_group['security_group']['name']) - self.assertTrue(sec_grp_settings.description, security_group['security_group']['description']) + self.assertTrue(sec_grp_settings.name, + security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.description, + security_group['security_group']['description']) - sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + sec_grp_get = neutron_utils.get_security_group(self.neutron, + sec_grp_settings.name) self.assertIsNotNone(sec_grp_get) self.assertTrue(validation_utils.objects_equivalent( security_group['security_group'], sec_grp_get['security_group'])) @@ -576,18 +772,59 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-1', + description='hello group'))) self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group'))) + SecurityGroupSettings(name=self.sec_grp_name + '-2', + description='hello group'))) sec_grp_1b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[0]['security_group']['id']) sec_grp_2b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[1]['security_group']['id']) - self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id']) - self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id']) + self.assertEqual(self.security_groups[0]['security_group']['id'], + sec_grp_1b['security_group']['id']) + self.assertEqual(self.security_groups[1]['security_group']['id'], + sec_grp_2b['security_group']['id']) + + +class NeutronUtilsFloatingIpTests(OSComponentTestCase): + """ + Test basic nova keypair functionality + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack + """ + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.floating_ip = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.floating_ip: + neutron_utils.delete_floating_ip(self.neutron, self.floating_ip) + + def test_floating_ips(self): + """ + Tests the creation of a floating IP + :return: + """ + initial_fips = neutron_utils.get_floating_ips(self.neutron) + + self.floating_ip = neutron_utils.create_floating_ip(self.neutron, + self.ext_net_name) + all_fips = neutron_utils.get_floating_ips(self.neutron) + self.assertEqual(len(initial_fips) + 1, len(all_fips)) + returned = neutron_utils.get_floating_ip(self.neutron, + self.floating_ip) + self.assertEqual(self.floating_ip.id, returned.id) + self.assertEqual(self.floating_ip.ip, returned.ip) """ @@ -597,8 +834,9 @@ Validation routines def validate_network(neutron, name, exists): """ - Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a network for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a network for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a network for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected network name :param exists: Whether or not the network name should exist or not @@ -614,8 +852,9 @@ def validate_network(neutron, name, exists): def validate_subnet(neutron, name, cidr, exists): """ - Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a subnet for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a subnet for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected subnet name :param cidr: The expected CIDR value @@ -632,8 +871,9 @@ def validate_subnet(neutron, name, cidr, exists): def validate_router(neutron, name, exists): """ - Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a router for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a router for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a router for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param name: The expected router name :param exists: Whether or not the network name should exist or not @@ -647,7 +887,8 @@ def validate_router(neutron, name, exists): def validate_interface_router(interface_router, router, subnet): """ - Returns true if the router ID & subnet ID have been properly included into the interface router object + Returns true if the router ID & subnet ID have been properly included into + the interface router object :param interface_router: the object to validate :param router: to validate against the interface_router :param subnet: to validate against the interface_router @@ -661,8 +902,9 @@ def validate_interface_router(interface_router, router, subnet): def validate_port(neutron, port_obj, this_port_name): """ - Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true. - Returns false if a port for a given name DOES exist if the exists parameter is true conversely false. + Returns true if a port for a given name DOES NOT exist if the exists + parameter is false conversely true. Returns false if a port for a given + name DOES exist if the exists parameter is true conversely false. :param neutron: The neutron client :param port_obj: The port object to lookup :param this_port_name: The expected router name diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py index 98aa889..552ffc7 100644 --- a/snaps/openstack/utils/tests/nova_utils_tests.py +++ b/snaps/openstack/utils/tests/nova_utils_tests.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import os import uuid -from snaps.openstack.utils import nova_utils +import os +from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor +from snaps.openstack.create_image import OpenStackImage +from snaps.openstack.create_instance import VmInstanceSettings +from snaps.openstack.create_network import OpenStackNetwork, PortSettings +from snaps.openstack.tests import openstack_tests from snaps.openstack.tests.os_source_file_test import OSComponentTestCase -from snaps.openstack.create_flavor import FlavorSettings +from snaps.openstack.utils import nova_utils, neutron_utils, glance_utils __author__ = 'spisarski' @@ -46,8 +50,10 @@ class NovaSmokeTests(OSComponentTestCase): from snaps.openstack.os_credentials import OSCreds nova = nova_utils.nova_client( - OSCreds(username='user', password='pass', auth_url=self.os_creds.auth_url, - project_name=self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings)) + OSCreds(username='user', password='pass', + auth_url=self.os_creds.auth_url, + project_name=self.os_creds.project_name, + proxy_settings=self.os_creds.proxy_settings)) # This should throw an exception with self.assertRaises(Exception): @@ -61,8 +67,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase): def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.priv_key_file_path = 'tmp/' + guid @@ -73,7 +79,6 @@ class NovaUtilsKeypairTests(OSComponentTestCase): self.public_key = nova_utils.public_key_openssh(self.keys) self.keypair_name = guid self.keypair = None - self.floating_ip = None def tearDown(self): """ @@ -97,14 +102,12 @@ class NovaUtilsKeypairTests(OSComponentTestCase): except: pass - if self.floating_ip: - nova_utils.delete_floating_ip(self.nova, self.floating_ip) - def test_create_keypair(self): """ Tests the creation of an OpenStack keypair that does not exist. """ - self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, + self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) self.assertEqual(self.keypair, result) keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name) @@ -114,7 +117,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase): """ Tests the creation of an OpenStack keypair that does not exist. """ - self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, + self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) self.assertEqual(self.keypair, result) nova_utils.delete_keypair(self.nova, self.keypair) @@ -126,25 +130,16 @@ class NovaUtilsKeypairTests(OSComponentTestCase): Tests that the generated RSA keys are properly saved to files :return: """ - nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path) - self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path) + nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, + self.priv_key_file_path) + self.keypair = nova_utils.upload_keypair_file(self.nova, + self.keypair_name, + self.pub_key_file_path) pub_key_file = open(os.path.expanduser(self.pub_key_file_path)) pub_key = pub_key_file.read() pub_key_file.close() self.assertEqual(self.keypair.public_key, pub_key) - def test_floating_ips(self): - """ - Tests the creation of a floating IP - :return: - """ - ips = nova_utils.get_floating_ips(self.nova) - self.assertIsNotNone(ips) - - self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name) - returned = nova_utils.get_floating_ip(self.nova, self.floating_ip) - self.assertEqual(self.floating_ip, returned) - class NovaUtilsFlavorTests(OSComponentTestCase): """ @@ -153,12 +148,15 @@ class NovaUtilsFlavorTests(OSComponentTestCase): def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) - self.flavor_settings = FlavorSettings(name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1, vcpus=1, - ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False) + self.flavor_settings = FlavorSettings(name=guid + '-name', + flavor_id=guid + '-id', ram=1, + disk=1, vcpus=1, + ephemeral=1, swap=2, + rxtx_factor=3.0, is_public=False) self.nova = nova_utils.nova_client(self.os_creds) self.flavor = None @@ -186,7 +184,8 @@ class NovaUtilsFlavorTests(OSComponentTestCase): self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings) self.validate_flavor() nova_utils.delete_flavor(self.nova, self.flavor) - flavor = nova_utils.get_flavor_by_name(self.nova, self.flavor_settings.name) + flavor = nova_utils.get_flavor_by_name(self.nova, + self.flavor_settings.name) self.assertIsNone(flavor) def validate_flavor(self): @@ -206,5 +205,110 @@ class NovaUtilsFlavorTests(OSComponentTestCase): else: self.assertEqual(self.flavor_settings.swap, self.flavor.swap) - self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor) + self.assertEqual(self.flavor_settings.rxtx_factor, + self.flavor.rxtx_factor) self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public) + + +class NovaUtilsInstanceTests(OSComponentTestCase): + """ + Tests the creation of VM instances via nova_utils.py + """ + + def setUp(self): + """ + Setup objects required by VM instances + :return: + """ + + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + + self.nova = nova_utils.nova_client(self.os_creds) + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.glance = glance_utils.glance_client(self.os_creds) + + self.image_creator = None + self.network_creator = None + self.flavor_creator = None + self.port = None + self.vm_inst = None + + try: + image_settings = openstack_tests.cirros_image_settings( + name=guid + '-image', image_metadata=self.image_metadata) + self.image_creator = OpenStackImage( + self.os_creds, image_settings=image_settings) + self.image_creator.create() + + network_settings = openstack_tests.get_priv_net_config( + guid + '-net', guid + '-subnet').network_settings + self.network_creator = OpenStackNetwork( + self.os_creds, network_settings) + self.network_creator.create() + + self.flavor_creator = OpenStackFlavor( + self.os_creds, + FlavorSettings( + name=guid + '-flavor-name', ram=128, disk=10, vcpus=1)) + self.flavor_creator.create() + + port_settings = PortSettings(name=guid + '-port', + network_name=network_settings.name) + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, port_settings) + + self.instance_settings = VmInstanceSettings( + name=guid + '-vm_inst', + flavor=self.flavor_creator.flavor_settings.name, + port_settings=[port_settings]) + except: + self.tearDown() + raise + + def tearDown(self): + """ + Cleanup deployed resources + :return: + """ + if self.vm_inst: + try: + nova_utils.delete_vm_instance(self.nova, self.vm_inst) + except: + pass + if self.port: + try: + neutron_utils.delete_port(self.neutron, self.port) + except: + pass + if self.flavor_creator: + try: + self.flavor_creator.clean() + except: + pass + if self.network_creator: + try: + self.network_creator.clean() + except: + pass + if self.image_creator: + try: + self.image_creator.clean() + except: + pass + + def test_create_instance(self): + """ + Tests the nova_utils.create_server() method + :return: + """ + + self.vm_inst = nova_utils.create_server( + self.nova, self.neutron, self.glance, self.instance_settings, + self.image_creator.image_settings) + + self.assertIsNotNone(self.vm_inst) + + vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst) + + self.assertEqual(self.vm_inst.name, vm_inst.name) + self.assertEqual(self.vm_inst.id, vm_inst.id) diff --git a/snaps/provisioning/tests/ansible_utils_tests.py b/snaps/provisioning/tests/ansible_utils_tests.py index cddedcd..733068f 100644 --- a/snaps/provisioning/tests/ansible_utils_tests.py +++ b/snaps/provisioning/tests/ansible_utils_tests.py @@ -13,21 +13,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import uuid + import os import pkg_resources -import uuid from scp import SCPClient -from snaps.openstack.create_security_group import SecurityGroupRuleSettings, Direction, Protocol, \ - OpenStackSecurityGroup, SecurityGroupSettings - from snaps.openstack import create_flavor -from snaps.openstack import create_instance from snaps.openstack import create_image +from snaps.openstack import create_instance from snaps.openstack import create_keypairs from snaps.openstack import create_network from snaps.openstack import create_router +from snaps.openstack.create_security_group import ( + SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup, + SecurityGroupSettings) from snaps.openstack.tests import openstack_tests, create_instance_tests from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase +from snaps.openstack.utils import nova_utils from snaps.provisioning import ansible_utils VM_BOOT_TIMEOUT = 600 @@ -38,16 +40,19 @@ ip_2 = '10.0.1.200' class AnsibleProvisioningTests(OSIntegrationTestCase): """ - Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP and eth1 w/o + Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP + and eth1 w/o """ def setUp(self): """ - Instantiates the CreateImage object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateImage object that is responsible for downloading + and creating an OS image file within OpenStack """ super(self.__class__, self).__start__() + self.nova = nova_utils.nova_client(self.os_creds) + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.keypair_priv_filepath = 'tmp/' + guid self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub' @@ -69,62 +74,78 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): try: # Create Image - os_image_settings = openstack_tests.ubuntu_image_settings(name=guid + '-' + '-image', - image_metadata=self.image_metadata) - self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings) + os_image_settings = openstack_tests.ubuntu_image_settings( + name=guid + '-' + '-image', + image_metadata=self.image_metadata) + self.image_creator = create_image.OpenStackImage(self.os_creds, + os_image_settings) self.image_creator.create() # First network is public self.pub_net_config = openstack_tests.get_pub_net_config( net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', - router_name=guid + '-pub-router', external_net=self.ext_net_name) + router_name=guid + '-pub-router', + external_net=self.ext_net_name) - self.network_creator = create_network.OpenStackNetwork(self.os_creds, self.pub_net_config.network_settings) + self.network_creator = create_network.OpenStackNetwork( + self.os_creds, self.pub_net_config.network_settings) self.network_creator.create() # Create routers - self.router_creator = create_router.OpenStackRouter(self.os_creds, self.pub_net_config.router_settings) + self.router_creator = create_router.OpenStackRouter( + self.os_creds, self.pub_net_config.router_settings) self.router_creator.create() # Create Flavor self.flavor_creator = create_flavor.OpenStackFlavor( self.admin_os_creds, - create_flavor.FlavorSettings(name=guid + '-flavor-name', ram=2048, disk=10, vcpus=2, + create_flavor.FlavorSettings(name=guid + '-flavor-name', + ram=2048, disk=10, vcpus=2, metadata=self.flavor_metadata)) self.flavor_creator.create() # Create Key/Pair self.keypair_creator = create_keypairs.OpenStackKeypair( self.os_creds, create_keypairs.KeypairSettings( - name=self.keypair_name, public_filepath=self.keypair_pub_filepath, + name=self.keypair_name, + public_filepath=self.keypair_pub_filepath, private_filepath=self.keypair_priv_filepath)) self.keypair_creator.create() # Create Security Group sec_grp_name = guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress, + rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, + direction=Direction.ingress, protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress, - protocol=Protocol.tcp, port_range_min=22, port_range_max=22) + rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, + direction=Direction.ingress, + protocol=Protocol.tcp, + port_range_min=22, + port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2])) + SecurityGroupSettings(name=sec_grp_name, + rule_settings=[rule1, rule2])) self.sec_grp_creator.create() # Create instance ports_settings = list() ports_settings.append( - create_network.PortSettings(name=self.port_1_name, - network_name=self.pub_net_config.network_settings.name)) + create_network.PortSettings( + name=self.port_1_name, + network_name=self.pub_net_config.network_settings.name)) instance_settings = create_instance.VmInstanceSettings( - name=self.vm_inst_name, flavor=self.flavor_creator.flavor_settings.name, port_settings=ports_settings, + name=self.vm_inst_name, + flavor=self.flavor_creator.flavor_settings.name, + port_settings=ports_settings, floating_ip_settings=[create_instance.FloatingIpSettings( name=self.floating_ip_name, port_name=self.port_1_name, router_name=self.pub_net_config.router_settings.name)]) self.inst_creator = create_instance.OpenStackVmInstance( - self.os_creds, instance_settings, self.image_creator.image_settings, + self.os_creds, instance_settings, + self.image_creator.image_settings, keypair_settings=self.keypair_creator.keypair_settings) except: self.tearDown() @@ -165,10 +186,13 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): def test_apply_simple_playbook(self): """ - Tests application of an Ansible playbook that simply copies over a file: - 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False - 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False - Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail. + Tests application of an Ansible playbook that simply copies over a file + 1. Have a ~/.ansible.cfg (or alternate means) to + set host_key_checking = False + 2. Set the following environment variable in your executing shell: + ANSIBLE_HOST_KEY_CHECKING=False + Should this not be performed, the creation of the host ssh key will + cause your ansible calls to fail. """ vm = self.inst_creator.create(block=True) @@ -176,10 +200,12 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_ssh_active(block=True)) priv_ip = self.inst_creator.get_port_ip(self.port_1_name) - self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip)) + self.assertTrue(create_instance_tests.check_dhcp_lease( + self.nova, vm, priv_ip)) # Apply Security Group - self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group()) + self.inst_creator.add_security_group( + self.sec_grp_creator.get_security_group()) ssh_client = self.inst_creator.ssh_client() self.assertIsNotNone(ssh_client) @@ -187,16 +213,19 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertIsNotNone(out) self.assertGreater(len(out), 1) - # Need to use the first floating IP as subsequent ones are currently broken with Apex CO + # Need to use the first floating IP as subsequent ones are currently + # broken with Apex CO ip = self.inst_creator.get_floating_ip().ip user = self.inst_creator.get_image_user() priv_key = self.inst_creator.keypair_settings.private_filepath - relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks', 'simple_playbook.yml') + relative_pb_path = pkg_resources.resource_filename( + 'snaps.provisioning.tests.playbooks', 'simple_playbook.yml') retval = self.inst_creator.apply_ansible_playbook(relative_pb_path) self.assertEqual(0, retval) - ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings) + ssh = ansible_utils.ssh_client(ip, user, priv_key, + self.os_creds.proxy_settings) self.assertIsNotNone(ssh) scp = SCPClient(ssh.get_transport()) scp.get('~/hello.txt', self.test_file_local_path) @@ -209,10 +238,14 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): def test_apply_template_playbook(self): """ - Tests application of an Ansible playbook that applies a template to a file: - 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False - 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False - Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail. + Tests application of an Ansible playbook that applies a template to a + file: + 1. Have a ~/.ansible.cfg (or alternate means) to set + host_key_checking = False + 2. Set the following environment variable in your executing shell: + ANSIBLE_HOST_KEY_CHECKING=False + Should this not be performed, the creation of the host ssh key will + cause your ansible calls to fail. """ vm = self.inst_creator.create(block=True) @@ -220,22 +253,29 @@ class AnsibleProvisioningTests(OSIntegrationTestCase): self.assertTrue(self.inst_creator.vm_ssh_active(block=True)) priv_ip = self.inst_creator.get_port_ip(self.port_1_name) - self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip)) + self.assertTrue(create_instance_tests.check_dhcp_lease( + self.nova, vm, priv_ip)) # Apply Security Group - self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group()) + self.inst_creator.add_security_group( + self.sec_grp_creator.get_security_group()) - # Need to use the first floating IP as subsequent ones are currently broken with Apex CO + # Need to use the first floating IP as subsequent ones are currently + # broken with Apex CO ip = self.inst_creator.get_floating_ip().ip user = self.inst_creator.get_image_user() priv_key = self.inst_creator.keypair_settings.private_filepath - relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks', - 'template_playbook.yml') - retval = self.inst_creator.apply_ansible_playbook(relative_pb_path, variables={'name': 'Foo'}) + relative_pb_path = pkg_resources.resource_filename( + 'snaps.provisioning.tests.playbooks', + 'template_playbook.yml') + retval = self.inst_creator.apply_ansible_playbook(relative_pb_path, + variables={ + 'name': 'Foo'}) self.assertEqual(0, retval) - ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings) + ssh = ansible_utils.ssh_client(ip, user, priv_key, + self.os_creds.proxy_settings) self.assertIsNotNone(ssh) scp = SCPClient(ssh.get_transport()) scp.get('/tmp/hello.txt', self.test_file_local_path) diff --git a/snaps/test_suite_builder.py b/snaps/test_suite_builder.py index e4f5685..d87ae5a 100644 --- a/snaps/test_suite_builder.py +++ b/snaps/test_suite_builder.py @@ -18,34 +18,54 @@ import unittest from snaps.domain.test.image_tests import ImageDomainObjectTests from snaps.domain.test.stack_tests import StackDomainObjectTests -from snaps.openstack.tests.create_stack_tests import StackSettingsUnitTests, CreateStackSuccessTests, \ - CreateStackNegativeTests -from snaps.openstack.utils.tests.glance_utils_tests import GlanceSmokeTests, GlanceUtilsTests +from snaps.domain.test.vm_inst_tests import (VmInstDomainObjectTests, + FloatingIpDomainObjectTests) from snaps.openstack.tests.create_flavor_tests import CreateFlavorTests -from snaps.openstack.utils.tests.heat_utils_tests import HeatUtilsCreateStackTests, HeatSmokeTests +from snaps.openstack.tests.create_image_tests import ( + CreateImageSuccessTests, CreateImageNegativeTests, ImageSettingsUnitTests, + CreateMultiPartImageTests) +from snaps.openstack.tests.create_instance_tests import ( + CreateInstanceSingleNetworkTests, CreateInstancePubPrivNetTests, + CreateInstanceOnComputeHost, CreateInstanceSimpleTests, + FloatingIpSettingsUnitTests, InstanceSecurityGroupTests, + VmInstanceSettingsUnitTests, CreateInstancePortManipulationTests, + SimpleHealthCheck, CreateInstanceFromThreePartImage, + CreateInstanceMockOfflineTests) +from snaps.openstack.tests.create_keypairs_tests import ( + CreateKeypairsTests, KeypairSettingsUnitTests) +from snaps.openstack.tests.create_network_tests import ( + CreateNetworkSuccessTests, NetworkSettingsUnitTests, PortSettingsUnitTests, + SubnetSettingsUnitTests, CreateNetworkTypeTests) +from snaps.openstack.tests.create_project_tests import ( + CreateProjectSuccessTests, ProjectSettingsUnitTests, + CreateProjectUserTests) +from snaps.openstack.tests.create_router_tests import ( + CreateRouterSuccessTests, CreateRouterNegativeTests) +from snaps.openstack.tests.create_security_group_tests import ( + CreateSecurityGroupTests, SecurityGroupRuleSettingsUnitTests, + SecurityGroupSettingsUnitTests) +from snaps.openstack.tests.create_stack_tests import ( + StackSettingsUnitTests, CreateStackSuccessTests, CreateStackNegativeTests) +from snaps.openstack.tests.create_user_tests import ( + UserSettingsUnitTests, CreateUserSuccessTests) +from snaps.openstack.tests.os_source_file_test import ( + OSComponentTestCase, OSIntegrationTestCase) +from snaps.openstack.utils.tests.glance_utils_tests import ( + GlanceSmokeTests, GlanceUtilsTests) +from snaps.openstack.utils.tests.heat_utils_tests import ( + HeatUtilsCreateStackTests, HeatSmokeTests) +from snaps.openstack.utils.tests.keystone_utils_tests import ( + KeystoneSmokeTests, KeystoneUtilsTests) +from snaps.openstack.utils.tests.neutron_utils_tests import ( + NeutronSmokeTests, NeutronUtilsNetworkTests, NeutronUtilsSubnetTests, + NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests, + NeutronUtilsFloatingIpTests) +from snaps.openstack.utils.tests.nova_utils_tests import ( + NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests, + NovaUtilsInstanceTests) +from snaps.provisioning.tests.ansible_utils_tests import ( + AnsibleProvisioningTests) from snaps.tests.file_utils_tests import FileUtilsTests -from snaps.openstack.tests.create_security_group_tests import CreateSecurityGroupTests, \ - SecurityGroupRuleSettingsUnitTests, SecurityGroupSettingsUnitTests -from snaps.openstack.tests.create_project_tests import CreateProjectSuccessTests, ProjectSettingsUnitTests, \ - CreateProjectUserTests -from snaps.openstack.tests.create_user_tests import UserSettingsUnitTests, CreateUserSuccessTests -from snaps.openstack.utils.tests.keystone_utils_tests import KeystoneSmokeTests, KeystoneUtilsTests -from snaps.openstack.utils.tests.neutron_utils_tests import NeutronSmokeTests, NeutronUtilsNetworkTests, \ - NeutronUtilsSubnetTests, NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests -from snaps.openstack.tests.create_image_tests import CreateImageSuccessTests, CreateImageNegativeTests, \ - ImageSettingsUnitTests, CreateMultiPartImageTests -from snaps.openstack.tests.create_keypairs_tests import CreateKeypairsTests, KeypairSettingsUnitTests -from snaps.openstack.tests.create_network_tests import CreateNetworkSuccessTests, NetworkSettingsUnitTests, \ - PortSettingsUnitTests, SubnetSettingsUnitTests, CreateNetworkTypeTests -from snaps.openstack.tests.create_router_tests import CreateRouterSuccessTests, CreateRouterNegativeTests -from snaps.openstack.tests.create_instance_tests import CreateInstanceSingleNetworkTests, \ - CreateInstancePubPrivNetTests, CreateInstanceOnComputeHost, CreateInstanceSimpleTests, \ - FloatingIpSettingsUnitTests, InstanceSecurityGroupTests, VmInstanceSettingsUnitTests, \ - CreateInstancePortManipulationTests, SimpleHealthCheck, CreateInstanceFromThreePartImage, \ - CreateInstanceMockOfflineTests -from snaps.provisioning.tests.ansible_utils_tests import AnsibleProvisioningTests -from snaps.openstack.tests.os_source_file_test import OSComponentTestCase, OSIntegrationTestCase -from snaps.openstack.utils.tests.nova_utils_tests import NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests __author__ = 'spisarski' @@ -57,59 +77,95 @@ def add_unit_tests(suite): :return: None as the tests will be adding to the 'suite' parameter object """ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FileUtilsTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupRuleSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FloatingIpSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(VmInstanceSettingsUnitTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests)) - suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests)) - - -def add_openstack_client_tests(suite, os_creds, ext_net_name, use_keystone=True, log_level=logging.INFO): + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + SecurityGroupRuleSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + SecurityGroupSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + FloatingIpSettingsUnitTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + VmInstanceSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests)) + suite.addTest( + unittest.TestLoader().loadTestsFromTestCase(VmInstDomainObjectTests)) + suite.addTest(unittest.TestLoader().loadTestsFromTestCase( + FloatingIpDomainObjectTests)) + + +def add_openstack_client_tests(suite, os_creds, ext_net_name, + use_keystone=True, log_level=logging.INFO): """ Adds tests written to exercise OpenStack client retrieval :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network + :param os_creds: and instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ # Basic connection tests - suite.addTest(OSComponentTestCase.parameterize(GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) if use_keystone: - suite.addTest(OSComponentTestCase.parameterize(KeystoneSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + KeystoneSmokeTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) - suite.addTest(OSComponentTestCase.parameterize(HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, - log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest( + OSComponentTestCase.parameterize( + HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) -def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, image_metadata=None, - log_level=logging.INFO): +def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, + image_metadata=None, log_level=logging.INFO): """ Adds tests written to exercise all existing OpenStack APIs :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network - :param image_metadata: dict() object containing metadata for creating an image with custom config + :param os_creds: Instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network + :param image_metadata: dict() object containing metadata for creating an + image with custom config (see YAML files in examples/image-metadata) :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object @@ -117,131 +173,203 @@ def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, im # Tests the OpenStack API calls if use_keystone: suite.addTest(OSComponentTestCase.parameterize( - KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateUserSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateUserSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateProjectSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateProjectSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateProjectUserTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateProjectUserTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, image_metadata=image_metadata, + GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, + image_metadata=image_metadata, + log_level=log_level)) + suite.addTest(OSComponentTestCase.parameterize( + NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) + suite.addTest(OSComponentTestCase.parameterize( + NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsSecurityGroupTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NeutronUtilsFloatingIpTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NeutronUtilsSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + NovaUtilsInstanceTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - HeatUtilsCreateStackTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level, + HeatUtilsCreateStackTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level, image_metadata=image_metadata)) -def add_openstack_integration_tests(suite, os_creds, ext_net_name, use_keystone=True, flavor_metadata=None, - image_metadata=None, use_floating_ips=True, log_level=logging.INFO): +def add_openstack_integration_tests(suite, os_creds, ext_net_name, + use_keystone=True, flavor_metadata=None, + image_metadata=None, use_floating_ips=True, + log_level=logging.INFO): """ - Adds tests written to exercise all long-running OpenStack integration tests meaning they will be creating VM - instances and potentially performing some SSH functions through floating IPs + Adds tests written to exercise all long-running OpenStack integration tests + meaning they will be creating VM instances and potentially performing some + SSH functions through floatingIPs :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test - :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on - a host that has access to the cloud's private network - :param image_metadata: dict() object containing metadata for creating an image with custom config + :param os_creds: and instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test + :param use_keystone: when True, tests requiring direct access to Keystone + are added as these need to be running on a host that + has access to the cloud's private network + :param image_metadata: dict() object containing metadata for creating an + image with custom config (see YAML files in examples/image-metadata) - :param flavor_metadata: dict() object containing the metadata required by your flavor based on your configuration: + :param flavor_metadata: dict() object containing the metadata required by + your flavor based on your configuration: (i.e. {'hw:mem_page_size': 'large'}) - :param use_floating_ips: when true, all tests requiring Floating IPs will be added to the suite + :param use_floating_ips: when true, all tests requiring Floating IPs will + be added to the suite :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ - # Tests the OpenStack API calls via a creator. If use_keystone, objects will be created with a custom user - # and project + # Tests the OpenStack API calls via a creator. If use_keystone, objects + # will be created with a custom user and project # Creator Object tests suite.addTest(OSIntegrationTestCase.parameterize( - CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateMultiPartImageTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateMultiPartImageTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateNetworkSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateNetworkSuccessTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateRouterNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateRouterNegativeTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) # VM Instances suite.addTest(OSIntegrationTestCase.parameterize( - SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceSimpleTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceSimpleTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstancePortManipulationTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstancePortManipulationTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - InstanceSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + InstanceSecurityGroupTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceOnComputeHost, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceOnComputeHost, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceFromThreePartImage, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceFromThreePartImage, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, + use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) if use_floating_ips: suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstanceSingleNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstanceSingleNetworkTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - CreateInstancePubPrivNetTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + CreateInstancePubPrivNetTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) suite.addTest(OSIntegrationTestCase.parameterize( - AnsibleProvisioningTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone, - flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level)) + AnsibleProvisioningTests, os_creds=os_creds, + ext_net_name=ext_net_name, use_keystone=use_keystone, + flavor_metadata=flavor_metadata, image_metadata=image_metadata, + log_level=log_level)) -def add_openstack_staging_tests(suite, os_creds, ext_net_name, log_level=logging.INFO): +def add_openstack_staging_tests(suite, os_creds, ext_net_name, + log_level=logging.INFO): """ - Adds tests that are still in development have not been designed to run successfully against all OpenStack pods + Adds tests that are still in development have not been designed to run + successfully against all OpenStack pods :param suite: the unittest.TestSuite object to which to add the tests - :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack - :param ext_net_name: the name of an external network on the cloud under test + :param os_creds: Instance of OSCreds that holds the credentials + required by OpenStack + :param ext_net_name: the name of an external network on the cloud under + test :param log_level: the logging level :return: None as the tests will be adding to the 'suite' parameter object """ suite.addTest(OSComponentTestCase.parameterize( - CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name, + log_level=log_level)) suite.addTest(OSComponentTestCase.parameterize( - CreateInstanceMockOfflineTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level)) + CreateInstanceMockOfflineTests, os_creds=os_creds, + ext_net_name=ext_net_name, log_level=log_level)) |