diff options
Diffstat (limited to 'snaps/openstack/utils')
-rw-r--r-- | snaps/openstack/utils/__init__.py | 15 | ||||
-rw-r--r-- | snaps/openstack/utils/deploy_utils.py | 151 | ||||
-rw-r--r-- | snaps/openstack/utils/glance_utils.py | 78 | ||||
-rw-r--r-- | snaps/openstack/utils/keystone_utils.py | 204 | ||||
-rw-r--r-- | snaps/openstack/utils/neutron_utils.py | 405 | ||||
-rw-r--r-- | snaps/openstack/utils/nova_utils.py | 282 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/__init__.py | 15 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/glance_utils_tests.py | 115 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/keystone_utils_tests.py | 100 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/neutron_utils_tests.py | 651 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/nova_utils_tests.py | 208 |
11 files changed, 2224 insertions, 0 deletions
diff --git a/snaps/openstack/utils/__init__.py b/snaps/openstack/utils/__init__.py new file mode 100644 index 0000000..7f92908 --- /dev/null +++ b/snaps/openstack/utils/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +__author__ = 'spisarski'
\ No newline at end of file diff --git a/snaps/openstack/utils/deploy_utils.py b/snaps/openstack/utils/deploy_utils.py new file mode 100644 index 0000000..ade8811 --- /dev/null +++ b/snaps/openstack/utils/deploy_utils.py @@ -0,0 +1,151 @@ +# +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This utility makes it easy to create OpenStack objects +import logging + +from snaps.openstack.create_project import OpenStackProject +from snaps.openstack.create_user import OpenStackUser +from snaps.openstack.create_image import OpenStackImage +from snaps.openstack.create_network import OpenStackNetwork +from snaps.openstack.create_router import OpenStackRouter +from snaps.openstack.create_keypairs import OpenStackKeypair +from snaps.openstack.create_instance import OpenStackVmInstance +from snaps.openstack.create_security_group import OpenStackSecurityGroup + +logger = logging.getLogger('deploy_utils') + + +def create_image(os_creds, image_settings, cleanup=False): + """ + Creates an image in OpenStack if necessary + :param os_creds: The OpenStack credentials object + :param image_settings: The image settings object + :param cleanup: Denotes whether or not this is being called for cleanup or not + :return: A reference to the image creator object from which the image object can be accessed + """ + image_creator = OpenStackImage(os_creds, image_settings) + image_creator.create(cleanup) + return image_creator + + +def create_network(os_creds, network_settings, cleanup=False): + """ + Creates a network on which the CMTSs can attach + :param os_creds: The OpenStack credentials object + :param network_settings: The network settings object + :param cleanup: Denotes whether or not this is being called for cleanup or not + :return: A reference to the network creator objects for each network from which network elements such as the + subnet, router, interface router, and network objects can be accessed. + """ + # Check for OS for network existence + # If exists return network instance data + # Else, create network and return instance data + + logger.info('Attempting to create network with name - ' + network_settings.name) + + network_creator = OpenStackNetwork(os_creds, network_settings) + network_creator.create(cleanup) + logger.info('Created network ') + return network_creator + + +def create_router(os_creds, router_settings, cleanup=False): + """ + Creates a network on which the CMTSs can attach + :param os_creds: The OpenStack credentials object + :param router_settings: The RouterSettings instance + :param cleanup: Denotes whether or not this is being called for cleanup or not + :return: A reference to the network creator objects for each network from which network elements such as the + subnet, router, interface router, and network objects can be accessed. + """ + # Check for OS for network existence + # If exists return network instance data + # Else, create network and return instance data + logger.info('Attempting to create router with name - ' + router_settings.name) + router_creator = OpenStackRouter(os_creds, router_settings) + router_creator.create(cleanup) + logger.info('Created router ') + return router_creator + + +def create_keypair(os_creds, keypair_settings, cleanup=False): + """ + Creates a keypair that can be applied to an instance + :param os_creds: The OpenStack credentials object + :param keypair_settings: The KeypairSettings object + :param cleanup: Denotes whether or not this is being called for cleanup or not + :return: A reference to the keypair creator object + """ + keypair_creator = OpenStackKeypair(os_creds, keypair_settings) + keypair_creator.create(cleanup) + return keypair_creator + + +def create_vm_instance(os_creds, instance_settings, image_settings, keypair_creator=None, cleanup=False): + """ + Creates a VM instance + :param os_creds: The OpenStack credentials + :param instance_settings: Instance of VmInstanceSettings + :param image_settings: The object containing image settings + :param keypair_creator: The object responsible for creating the keypair associated with this VM instance. (optional) + :param sg_names: The names of the security groups to apply to VM. (optional) + :param cleanup: Denotes whether or not this is being called for cleanup or not (default False) + :return: A reference to the VM instance object + """ + kp_settings = None + if keypair_creator: + kp_settings = keypair_creator.keypair_settings + vm_creator = OpenStackVmInstance(os_creds, instance_settings, image_settings, kp_settings) + vm_creator.create(cleanup=cleanup) + return vm_creator + + +def create_user(os_creds, user_settings): + """ + Creates an OpenStack user + :param os_creds: The OpenStack credentials + :param user_settings: The user configuration settings + :return: A reference to the user instance object + """ + user_creator = OpenStackUser(os_creds, user_settings) + user_creator.create() + return user_creator + + +def create_project(os_creds, project_settings): + """ + Creates an OpenStack user + :param os_creds: The OpenStack credentials + :param project_settings: The user project configuration settings + :return: A reference to the project instance object + """ + project_creator = OpenStackProject(os_creds, project_settings) + project_creator.create() + return project_creator + + +def create_security_group(os_creds, sec_grp_settings): + """ + Creates an OpenStack Security Group + :param os_creds: The OpenStack credentials + :param sec_grp_settings: The security group settings + :return: A reference to the project instance object + """ + sg_creator = OpenStackSecurityGroup(os_creds, sec_grp_settings) + sg_creator.create() + return sg_creator + diff --git a/snaps/openstack/utils/glance_utils.py b/snaps/openstack/utils/glance_utils.py new file mode 100644 index 0000000..6d90d3e --- /dev/null +++ b/snaps/openstack/utils/glance_utils.py @@ -0,0 +1,78 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from snaps import file_utils +from glanceclient.client import Client +from snaps.openstack.utils import keystone_utils + +__author__ = 'spisarski' + +logger = logging.getLogger('glance_utils') + +""" +Utilities for basic neutron API calls +""" + + +def glance_client(os_creds): + """ + Creates and returns a glance client object + :return: the glance client + """ + return Client(version=os_creds.image_api_version, session=keystone_utils.keystone_session(os_creds)) + + +def get_image(nova, glance, image_name): + """ + Returns an OpenStack image object for a given name + :param nova: the Nova client + :param glance: the Glance client + :param image_name: the image name to lookup + :return: the image object or None + """ + try: + image_dict = nova.images.find(name=image_name) + if image_dict: + return glance.images.get(image_dict.id) + except: + pass + return None + + +def create_image(glance, image_settings): + """ + Creates and returns OpenStack image object with an external URL + :param glance: the glance client + :param image_settings: the image settings object + :return: the OpenStack image object + :raise Exception if using a file and it cannot be found + """ + if image_settings.url: + return glance.images.create(name=image_settings.name, disk_format=image_settings.format, + container_format="bare", location=image_settings.url) + elif image_settings.image_file: + image_file = file_utils.get_file(image_settings.image_file) + return glance.images.create(name=image_settings.name, disk_format=image_settings.format, + container_format="bare", data=image_file) + + +def delete_image(glance, image): + """ + Deletes an image from OpenStack + :param glance: the glance client + :param image: the image to delete + """ + glance.images.delete(image) diff --git a/snaps/openstack/utils/keystone_utils.py b/snaps/openstack/utils/keystone_utils.py new file mode 100644 index 0000000..8175b9a --- /dev/null +++ b/snaps/openstack/utils/keystone_utils.py @@ -0,0 +1,204 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import requests +from keystoneclient.client import Client +from keystoneauth1.identity import v3, v2 +from keystoneauth1 import session +import logging + + +logger = logging.getLogger('keystone_utils') + +V2_VERSION = 'v2.0' + + +def keystone_session(os_creds): + """ + Creates a keystone session used for authenticating OpenStack clients + :param os_creds: The connection credentials to the OpenStack API + :return: the client object + """ + logger.debug('Retrieving Keystone Session') + + if os_creds.identity_api_version == 3: + auth = v3.Password(auth_url=os_creds.auth_url, username=os_creds.username, password=os_creds.password, + project_name=os_creds.project_name, user_domain_id=os_creds.user_domain_id, + project_domain_id=os_creds.project_domain_id) + else: + auth = v2.Password(auth_url=os_creds.auth_url, username=os_creds.username, password=os_creds.password, + tenant_name=os_creds.project_name) + + req_session = None + if os_creds.proxy_settings: + req_session = requests.Session() + req_session.proxies = {'http': os_creds.proxy_settings.host + ':' + os_creds.proxy_settings.port} + return session.Session(auth=auth, session=req_session) + + +def keystone_client(os_creds): + """ + Returns the keystone client + :param os_creds: the OpenStack credentials (OSCreds) object + :return: the client + """ + return Client(version=os_creds.identity_api_version, session=keystone_session(os_creds)) + + +def get_project(keystone=None, os_creds=None, project_name=None): + """ + Returns the first project object or None if not found + :param keystone: the Keystone client + :param os_creds: the OpenStack credentials used to obtain the Keystone client if the keystone parameter is None + :param project_name: the name to query + :return: the ID or None + """ + if not project_name: + return None + + if not keystone: + if os_creds: + keystone = keystone_client(os_creds) + else: + raise Exception('Cannot lookup project without the proper credentials') + + if keystone.version == V2_VERSION: + projects = keystone.tenants.list() + else: + projects = keystone.projects.list(**{'name': project_name}) + + for project in projects: + if project.name == project_name: + return project + + return None + + +def create_project(keystone, project_settings): + """ + Creates a project + :param keystone: the Keystone client + :param project_settings: the project configuration + :return: + """ + if keystone.version == V2_VERSION: + return keystone.tenants.create(project_settings.name, project_settings.description, project_settings.enabled) + + return keystone.projects.create(project_settings.name, project_settings.domain, + description=project_settings.description, + enabled=project_settings.enabled) + + +def delete_project(keystone, project): + """ + Deletes a project + :param keystone: the Keystone clien + :param project: the OpenStack project object + """ + if keystone.version == V2_VERSION: + keystone.tenants.delete(project) + else: + keystone.projects.delete(project) + + +def get_user(keystone, username, project_name=None): + """ + Returns a user for a given name and optionally project + :param keystone: the keystone client + :param username: the username to lookup + :param project_name: the associated project (optional) + :return: + """ + project = get_project(keystone=keystone, project_name=project_name) + + if project: + users = keystone.users.list(tenant_id=project.id) + else: + users = keystone.users.list() + + for user in users: + if user.name == username: + return user + + return None + + +def create_user(keystone, user_settings): + """ + Creates a user + :param keystone: the Keystone client + :param user_settings: the user configuration + :return: + """ + project = None + if user_settings.project_name: + project = get_project(keystone=keystone, project_name=user_settings.project_name) + + if keystone.version == V2_VERSION: + project_id = None + if project: + project_id = project.id + return keystone.users.create(name=user_settings.name, password=user_settings.password, + email=user_settings.email, tenant_id=project_id, enabled=user_settings.enabled) + else: + # TODO - need to support groups + return keystone.users.create(name=user_settings.name, password=user_settings.password, + email=user_settings.email, project=project, + # email=user_settings.email, project=project, group='default', + domain=user_settings.domain_name, + enabled=user_settings.enabled) + + +def delete_user(keystone, user): + """ + Deletes a user + :param keystone: the Keystone client + :param user: the OpenStack user object + """ + keystone.users.delete(user) + + +def create_role(keystone, name): + """ + Creates an OpenStack role + :param keystone: the keystone client + :param name: the role name + :return: + """ + return keystone.roles.create(name) + + +def delete_role(keystone, role): + """ + Deletes an OpenStack role + :param keystone: the keystone client + :param role: the role to delete + :return: + """ + keystone.roles.delete(role) + + +def assoc_user_to_project(keystone, role, user, project): + """ + Adds a user to a project + :param keystone: the Keystone client + :param role: the role used to join a project/user + :param user: the user to add to the project + :param project: the project to which to add a user + :return: + """ + if keystone.version == V2_VERSION: + keystone.roles.add_user_role(user, role, tenant=project) + else: + keystone.roles.grant(role, user=user, project=project) diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py new file mode 100644 index 0000000..6c92d2e --- /dev/null +++ b/snaps/openstack/utils/neutron_utils.py @@ -0,0 +1,405 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from neutronclient.common.exceptions import NotFound +from neutronclient.neutron.client import Client +import keystone_utils + +__author__ = 'spisarski' + +logger = logging.getLogger('neutron_utils') + +""" +Utilities for basic neutron API calls +""" + + +def neutron_client(os_creds): + """ + Instantiates and returns a client for communications with OpenStack's Neutron server + :param os_creds: the credentials for connecting to the OpenStack remote API + :return: the client object + """ + return Client(api_version=os_creds.network_api_version, session=keystone_utils.keystone_session(os_creds)) + + +def create_network(neutron, os_creds, network_settings): + """ + Creates a network for OpenStack + :param neutron: the client + :param os_creds: the OpenStack credentials + :param network_settings: A dictionary containing the network configuration and is responsible for creating the + network request JSON body + :return: the network object + """ + if neutron and network_settings: + logger.info('Creating network with name ' + network_settings.name) + json_body = network_settings.dict_for_neutron(os_creds) + return neutron.create_network(body=json_body) + else: + logger.error("Failed to create network") + raise Exception + + +def delete_network(neutron, network): + """ + Deletes a network for OpenStack + :param neutron: the client + :param network: the network object + """ + if neutron and network: + logger.info('Deleting network with name ' + network['network']['name']) + neutron.delete_network(network['network']['id']) + + +def get_network(neutron, network_name, project_id=None): + """ + Returns an object (dictionary) of the first network found with a given name and project_id (if included) + :param neutron: the client + :param network_name: the name of the network to retrieve + :param project_id: the id of the network's project + :return: + """ + net_filter = dict() + if network_name: + net_filter['name'] = network_name + if project_id: + net_filter['project_id'] = project_id + + networks = neutron.list_networks(**net_filter) + for network, netInsts in networks.iteritems(): + for inst in netInsts: + if inst.get('name') == network_name: + if project_id and inst.get('project_id') == project_id: + return {'network': inst} + else: + return {'network': inst} + return None + + +def get_network_by_id(neutron, network_id): + """ + Returns the network object (dictionary) with the given ID + :param neutron: the client + :param network_id: the id of the network to retrieve + :return: + """ + networks = neutron.list_networks(**{'id': network_id}) + for network, netInsts in networks.iteritems(): + for inst in netInsts: + if inst.get('id') == network_id: + return {'network': inst} + return None + + +def create_subnet(neutron, subnet_settings, os_creds, network=None): + """ + Creates a network subnet for OpenStack + :param neutron: the client + :param network: the network object + :param subnet_settings: A dictionary containing the subnet configuration and is responsible for creating the subnet + request JSON body + :param os_creds: the OpenStack credentials + :return: the subnet object + """ + if neutron and network and subnet_settings: + json_body = {'subnets': [subnet_settings.dict_for_neutron(os_creds, network=network)]} + logger.info('Creating subnet with name ' + subnet_settings.name) + subnets = neutron.create_subnet(body=json_body) + return {'subnet': subnets['subnets'][0]} + else: + logger.error("Failed to create subnet.") + raise Exception + + +def delete_subnet(neutron, subnet): + """ + Deletes a network subnet for OpenStack + :param neutron: the client + :param subnet: the subnet object + """ + if neutron and subnet: + logger.info('Deleting subnet with name ' + subnet['subnet']['name']) + neutron.delete_subnet(subnet['subnet']['id']) + + +def get_subnet_by_name(neutron, subnet_name): + """ + Returns the first subnet object (dictionary) found with a given name + :param neutron: the client + :param subnet_name: the name of the network to retrieve + :return: + """ + subnets = neutron.list_subnets(**{'name': subnet_name}) + for subnet, subnetInst in subnets.iteritems(): + for inst in subnetInst: + if inst.get('name') == subnet_name: + return {'subnet': inst} + return None + + +def create_router(neutron, os_creds, router_settings): + """ + Creates a router for OpenStack + :param neutron: the client + :param os_creds: the OpenStack credentials + :param router_settings: A dictionary containing the router configuration and is responsible for creating the subnet + request JSON body + :return: the router object + """ + if neutron: + json_body = router_settings.dict_for_neutron(neutron, os_creds) + logger.info('Creating router with name - ' + router_settings.name) + return neutron.create_router(json_body) + else: + logger.error("Failed to create router.") + raise Exception + + +def delete_router(neutron, router): + """ + Deletes a router for OpenStack + :param neutron: the client + :param router: the router object + """ + if neutron and router: + logger.info('Deleting router with name - ' + router['router']['name']) + neutron.delete_router(router=router['router']['id']) + return True + + +def get_router_by_name(neutron, router_name): + """ + Returns the first router object (dictionary) found with a given name + :param neutron: the client + :param router_name: the name of the network to retrieve + :return: + """ + routers = neutron.list_routers(**{'name': router_name}) + for router, routerInst in routers.iteritems(): + for inst in routerInst: + if inst.get('name') == router_name: + return {'router': inst} + return None + + +def add_interface_router(neutron, router, subnet=None, port=None): + """ + Adds an interface router for OpenStack for either a subnet or port. Exception will be raised if requesting for both. + :param neutron: the client + :param router: the router object + :param subnet: the subnet object + :param port: the port object + :return: the interface router object + """ + if subnet and port: + raise Exception('Cannot add interface to the router. Both subnet and port were sent in. Either or please.') + + if neutron and router and (router or subnet): + logger.info('Adding interface to router with name ' + router['router']['name']) + return neutron.add_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + else: + raise Exception("Unable to create interface router as neutron client, router or subnet were not created") + + +def remove_interface_router(neutron, router, subnet=None, port=None): + """ + Removes an interface router for OpenStack + :param neutron: the client + :param router: the router object + :param subnet: the subnet object (either subnet or port, not both) + :param port: the port object + """ + if router: + try: + logger.info('Removing router interface from router named ' + router['router']['name']) + neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) + except NotFound as e: + logger.warn('Could not remove router interface. NotFound - ' + e.message) + pass + else: + logger.warn('Could not remove router interface, No router object') + + +def __create_port_json_body(subnet=None, port=None): + """ + Returns the dictionary required for creating and deleting router interfaces. Will only work on a subnet or port + object. Will throw and exception if parameters contain both or neither + :param subnet: the subnet object + :param port: the port object + :return: the dict + """ + if subnet and port: + raise Exception('Cannot create JSON body with both subnet and port') + if not subnet and not port: + raise Exception('Cannot create JSON body without subnet or port') + + if subnet: + return {"subnet_id": subnet['subnet']['id']} + else: + return {"port_id": port['port']['id']} + + +def create_port(neutron, os_creds, port_settings): + """ + Creates a port for OpenStack + :param neutron: the client + :param os_creds: the OpenStack credentials + :param port_settings: the settings object for port configuration + :return: the port object + """ + json_body = port_settings.dict_for_neutron(neutron, os_creds) + logger.info('Creating port for network with name - ' + port_settings.network_name) + return neutron.create_port(body=json_body) + + +def delete_port(neutron, port): + """ + Removes an OpenStack port + :param neutron: the client + :param port: the port object + :return: + """ + logger.info('Deleting port with name ' + port['port']['name']) + neutron.delete_port(port['port']['id']) + + +def get_port_by_name(neutron, port_name): + """ + Returns the first port object (dictionary) found with a given name + :param neutron: the client + :param port_name: the name of the port to retrieve + :return: + """ + ports = neutron.list_ports(**{'name': port_name}) + for port in ports['ports']: + if port['name'] == port_name: + return {'port': port} + return None + + +def create_security_group(neutron, keystone, sec_grp_settings): + """ + Creates a security group object in OpenStack + :param neutron: the Neutron client + :param keystone: the Keystone client + :param sec_grp_settings: the security group settings + :return: the security group object + """ + logger.info('Creating security group with name - ' + sec_grp_settings.name) + return neutron.create_security_group(sec_grp_settings.dict_for_neutron(keystone)) + + +def delete_security_group(neutron, sec_grp): + """ + Deletes a security group object from OpenStack + :param neutron: the client + :param sec_grp: the security group object to delete + """ + logger.info('Deleting security group with name - ' + sec_grp['security_group']['name']) + return neutron.delete_security_group(sec_grp['security_group']['id']) + + +def get_security_group(neutron, name): + """ + Returns the first security group object of the given name else None + :param neutron: the client + :param name: the name of security group object to retrieve + """ + logger.info('Retrieving security group with name - ' + name) + + groups = neutron.list_security_groups(**{'name': name}) + for group in groups['security_groups']: + if group['name'] == name: + return {'security_group': group} + return None + + +def get_security_group_by_id(neutron, sec_grp_id): + """ + Returns the first security group object of the given name else None + :param neutron: the client + :param sec_grp_id: the id of the security group to retrieve + """ + logger.info('Retrieving security group with ID - ' + sec_grp_id) + + groups = neutron.list_security_groups(**{'sec_grp_id': sec_grp_id}) + for group in groups['security_groups']: + return {'security_group': group} + return None + + +def create_security_group_rule(neutron, sec_grp_rule_settings): + """ + Creates a security group object in OpenStack + :param neutron: the client + :param sec_grp_rule_settings: the security group rule settings + :return: the security group object + """ + logger.info('Creating security group to security group - ' + sec_grp_rule_settings.sec_grp_name) + return neutron.create_security_group_rule(sec_grp_rule_settings.dict_for_neutron(neutron)) + + +def delete_security_group_rule(neutron, sec_grp_rule): + """ + Deletes a security group object from OpenStack + :param neutron: the client + :param sec_grp_rule: the security group rule object to delete + """ + logger.info('Deleting security group rule with ID - ' + sec_grp_rule['security_group_rule']['id']) + neutron.delete_security_group_rule(sec_grp_rule['security_group_rule']['id']) + + +def get_rules_by_security_group(neutron, sec_grp): + """ + Retrieves all of the rules for a given security group + :param neutron: the client + :param sec_grp: the security group object + """ + logger.info('Retrieving security group rules associate with the security group - ' + + sec_grp['security_group']['name']) + out = list() + rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + for rule in rules['security_group_rules']: + if rule['security_group_id'] == sec_grp['security_group']['id']: + out.append({'security_group_rule': rule}) + return out + + +def get_rule_by_id(neutron, sec_grp, rule_id): + """ + Deletes a security group object from OpenStack + :param neutron: the client + :param sec_grp: the security group object + :param rule_id: the rule's ID + """ + rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']}) + for rule in rules['security_group_rules']: + if rule['id'] == rule_id: + return {'security_group_rule': rule} + return None + + +def get_external_networks(neutron): + """ + Returns a list of external OpenStack network object/dict for all external networks + :param neutron: the client + :return: a list of external networks (empty list if none configured) + """ + out = list() + for network in neutron.list_networks(**{'router:external': True})['networks']: + out.append({'network': network}) + return out diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py new file mode 100644 index 0000000..9d0f70f --- /dev/null +++ b/snaps/openstack/utils/nova_utils.py @@ -0,0 +1,282 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import logging +import keystone_utils + +from novaclient.client import Client +from novaclient.exceptions import NotFound + +__author__ = 'spisarski' + +logger = logging.getLogger('nova_utils') + +""" +Utilities for basic OpenStack Nova API calls +""" + + +def nova_client(os_creds): + """ + Instantiates and returns a client for communications with OpenStack's Nova server + :param os_creds: The connection credentials to the OpenStack API + :return: the client object + """ + logger.debug('Retrieving Nova Client') + return Client(os_creds.compute_api_version, session=keystone_utils.keystone_session(os_creds)) + + +def get_servers_by_name(nova, name): + """ + Returns a list of servers with a given name + :param nova: the Nova client + :param name: the server name + :return: the list of servers + """ + return nova.servers.list(search_opts={'name': name}) + + +def get_latest_server_object(nova, server): + """ + Returns a server with a given id + :param nova: the Nova client + :param server: the old server object + :return: the list of servers or None if not found + """ + return nova.servers.get(server) + + +def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): + """ + Saves the generated RSA generated keys to the filesystem + :param keys: the keys to save + :param pub_file_path: the path to the public keys + :param priv_file_path: the path to the private keys + :return: None + """ + if keys: + if pub_file_path: + pub_dir = os.path.dirname(pub_file_path) + if not os.path.isdir(pub_dir): + os.mkdir(pub_dir) + public_handle = open(pub_file_path, 'wb') + public_handle.write(keys.publickey().exportKey('OpenSSH')) + public_handle.close() + os.chmod(pub_file_path, 0o400) + logger.info("Saved public key to - " + pub_file_path) + if priv_file_path: + priv_dir = os.path.dirname(priv_file_path) + if not os.path.isdir(priv_dir): + os.mkdir(priv_dir) + private_handle = open(priv_file_path, 'wb') + private_handle.write(keys.exportKey()) + private_handle.close() + os.chmod(priv_file_path, 0o400) + logger.info("Saved private key to - " + priv_file_path) + + +def upload_keypair_file(nova, name, file_path): + """ + Uploads a public key from a file + :param nova: the Nova client + :param name: the keypair name + :param file_path: the path to the public key file + :return: the keypair object + """ + with open(os.path.expanduser(file_path)) as fpubkey: + logger.info('Saving keypair to - ' + file_path) + return upload_keypair(nova, name, fpubkey.read()) + + +def upload_keypair(nova, name, key): + """ + Uploads a public key from a file + :param nova: the Nova client + :param name: the keypair name + :param key: the public key object + :return: the keypair object + """ + logger.info('Creating keypair with name - ' + name) + return nova.keypairs.create(name=name, public_key=key) + + +def keypair_exists(nova, keypair_obj): + """ + Returns a copy of the keypair object if found + :param nova: the Nova client + :param keypair_obj: the keypair object + :return: the keypair object or None if not found + """ + try: + return nova.keypairs.get(keypair_obj) + except: + return None + + +def get_keypair_by_name(nova, name): + """ + Returns a list of all available keypairs + :param nova: the Nova client + :param name: the name of the keypair to lookup + :return: the keypair object or None if not found + """ + keypairs = nova.keypairs.list() + + for keypair in keypairs: + if keypair.name == name: + return keypair + + return None + + +def delete_keypair(nova, key): + """ + Deletes a keypair object from OpenStack + :param nova: the Nova client + :param key: the keypair object to delete + """ + logger.debug('Deleting keypair - ' + key.name) + nova.keypairs.delete(key) + + +def get_floating_ip_pools(nova): + """ + Returns all of the available floating IP pools + :param nova: the Nova client + :return: a list of pools + """ + return nova.floating_ip_pools.list() + + +def get_floating_ips(nova): + """ + Returns all of the floating IPs + :param nova: the Nova client + :return: a list of floating IPs + """ + return nova.floating_ips.list() + + +def create_floating_ip(nova, ext_net_name): + """ + Returns the floating IP object that was created with this call + :param nova: the Nova client + :param ext_net_name: the name of the external network on which to apply the floating IP address + :return: the floating IP object + """ + logger.info('Creating floating ip to external network - ' + ext_net_name) + return nova.floating_ips.create(ext_net_name) + + +def get_floating_ip(nova, floating_ip): + """ + Returns a floating IP object that should be identical to the floating_ip parameter + :param nova: the Nova client + :param floating_ip: the floating IP object to lookup + :return: hopefully the same floating IP object input + """ + logger.debug('Attempting to retrieve existing floating ip with IP - ' + floating_ip.ip) + return nova.floating_ips.get(floating_ip) + + +def delete_floating_ip(nova, floating_ip): + """ + Responsible for deleting a floating IP + :param nova: the Nova client + :param floating_ip: the floating IP object to delete + :return: + """ + logger.debug('Attempting to delete existing floating ip with IP - ' + floating_ip.ip) + return nova.floating_ips.delete(floating_ip) + + +def get_nova_availability_zones(nova): + """ + Returns the names of all nova compute servers + :param nova: the Nova client + :return: a list of compute server names + """ + out = list() + zones = nova.availability_zones.list() + for zone in zones: + if zone.zoneName == 'nova': + for key, host in zone.hosts.iteritems(): + out.append(zone.zoneName + ':' + key) + + return out + + +def delete_vm_instance(nova, vm_inst): + """ + Deletes a VM instance + :param nova: the nova client + :param vm_inst: the OpenStack instance object to delete + """ + nova.servers.delete(vm_inst) + + +def get_flavor_by_name(nova, name): + """ + Returns a flavor by name + :param nova: the Nova client + :param name: the flavor name to return + :return: the OpenStack flavor object or None if not exists + """ + try: + return nova.flavors.find(name=name) + except NotFound: + return None + + +def create_flavor(nova, flavor_settings): + """ + Creates and returns and OpenStack flavor object + :param nova: the Nova client + :param flavor_settings: the flavor settings + :return: the Flavor + """ + return nova.flavors.create(name=flavor_settings.name, flavorid=flavor_settings.flavor_id, ram=flavor_settings.ram, + vcpus=flavor_settings.vcpus, disk=flavor_settings.disk, + ephemeral=flavor_settings.ephemeral, swap=flavor_settings.swap, + rxtx_factor=flavor_settings.rxtx_factor, is_public=flavor_settings.is_public) + + +def delete_flavor(nova, flavor): + """ + Deletes a flavor + :param nova: the Nova client + :param flavor: the OpenStack flavor object + """ + nova.flavors.delete(flavor) + + +def add_security_group(nova, vm, security_group_name): + """ + Adds a security group to an existing VM + :param nova: the nova client + :param vm: the OpenStack server object (VM) to alter + :param security_group_name: the name of the security group to add + """ + nova.servers.add_security_group(vm.id, security_group_name) + + +def remove_security_group(nova, vm, security_group): + """ + Removes a security group from an existing VM + :param nova: the nova client + :param vm: the OpenStack server object (VM) to alter + :param security_group: the OpenStack security group object to add + """ + nova.servers.remove_security_group(vm.id, security_group) diff --git a/snaps/openstack/utils/tests/__init__.py b/snaps/openstack/utils/tests/__init__.py new file mode 100644 index 0000000..7f92908 --- /dev/null +++ b/snaps/openstack/utils/tests/__init__.py @@ -0,0 +1,15 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +__author__ = 'spisarski'
\ No newline at end of file diff --git a/snaps/openstack/utils/tests/glance_utils_tests.py b/snaps/openstack/utils/tests/glance_utils_tests.py new file mode 100644 index 0000000..d13908b --- /dev/null +++ b/snaps/openstack/utils/tests/glance_utils_tests.py @@ -0,0 +1,115 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import shutil +import uuid + +from snaps import file_utils +from snaps.openstack.tests import openstack_tests + +from snaps.openstack.utils import nova_utils +from snaps.openstack.tests import validation_utils +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.utils import glance_utils + +__author__ = 'spisarski' + + +class GlanceSmokeTests(OSComponentTestCase): + """ + Tests to ensure that the neutron client can communicate with the cloud + """ + + def test_glance_connect_success(self): + """ + Tests to ensure that the proper credentials can connect. + """ + glance = glance_utils.glance_client(self.os_creds) + + users = glance.images.list() + self.assertIsNotNone(users) + + def test_glance_connect_fail(self): + """ + Tests to ensure that the improper credentials cannot connect. + """ + from snaps.openstack.os_credentials import OSCreds + + with self.assertRaises(Exception): + neutron = glance_utils.glance_client(OSCreds('user', 'pass', 'url', 'project')) + neutron.list_networks() + + +class GlanceUtilsTests(OSComponentTestCase): + """ + Test for the CreateImage class defined in create_image.py + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading and creating an OS image file + within OpenStack + """ + guid = uuid.uuid4() + self.image_name = self.__class__.__name__ + '-' + str(guid) + self.image = None + self.nova = nova_utils.nova_client(self.os_creds) + self.glance = glance_utils.glance_client(self.os_creds) + + self.tmp_dir = 'tmp/' + str(guid) + if not os.path.exists(self.tmp_dir): + os.makedirs(self.tmp_dir) + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + if self.image: + glance_utils.delete_image(self.glance, self.image) + + if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir): + shutil.rmtree(self.tmp_dir) + + def test_create_image_minimal_url(self): + """ + Tests the glance_utils.create_image() function with a URL + """ + os_image_settings = openstack_tests.cirros_url_image(name=self.image_name) + + self.image = glance_utils.create_image(self.glance, os_image_settings) + self.assertIsNotNone(self.image) + + self.assertEqual(self.image_name, self.image.name) + + image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name) + self.assertIsNotNone(image) + + validation_utils.objects_equivalent(self.image, image) + + def test_create_image_minimal_file(self): + """ + Tests the glance_utils.create_image() function with a file + """ + url_image_settings = openstack_tests.cirros_url_image('foo') + image_file = file_utils.download(url_image_settings.url, self.tmp_dir) + file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name) + + self.image = glance_utils.create_image(self.glance, file_image_settings) + self.assertIsNotNone(self.image) + self.assertEqual(self.image_name, self.image.name) + + image = glance_utils.get_image(self.nova, self.glance, file_image_settings.name) + self.assertIsNotNone(image) + validation_utils.objects_equivalent(self.image, image) diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py new file mode 100644 index 0000000..76a43ef --- /dev/null +++ b/snaps/openstack/utils/tests/keystone_utils_tests.py @@ -0,0 +1,100 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid + +from snaps.openstack.create_project import ProjectSettings +from snaps.openstack.create_user import UserSettings +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.utils import keystone_utils + +__author__ = 'spisarski' + + +class KeystoneSmokeTests(OSComponentTestCase): + """ + Tests to ensure that the neutron client can communicate with the cloud + """ + + def test_keystone_connect_success(self): + """ + Tests to ensure that the proper credentials can connect. + """ + keystone = keystone_utils.keystone_client(self.os_creds) + + users = keystone.users.list() + self.assertIsNotNone(users) + + def test_keystone_connect_fail(self): + """ + Tests to ensure that the improper credentials cannot connect. + """ + from snaps.openstack.os_credentials import OSCreds + + with self.assertRaises(Exception): + keystone = keystone_utils.keystone_client(OSCreds('user', 'pass', 'url', 'project')) + keystone.users.list() + + +class KeystoneUtilsTests(OSComponentTestCase): + """ + Test for the CreateImage class defined in create_image.py + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading and creating an OS image file + within OpenStack + """ + guid = uuid.uuid4() + self.username = self.__class__.__name__ + '-' + str(guid) + self.user = None + + self.project_name = self.__class__.__name__ + '-' + str(guid) + self.project = None + self.keystone = keystone_utils.keystone_client(self.os_creds) + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + if self.project: + keystone_utils.delete_project(self.keystone, self.project) + + if self.user: + keystone_utils.delete_user(self.keystone, self.user) + + def test_create_user_minimal(self): + """ + Tests the keystone_utils.create_user() function + """ + user_settings = UserSettings(name=self.username, password='test123') + self.user = keystone_utils.create_user(self.keystone, user_settings) + self.assertEqual(self.username, self.user.name) + + user = keystone_utils.get_user(self.keystone, self.username) + self.assertIsNotNone(user) + self.assertEqual(self.user, user) + + def test_create_project_minimal(self): + """ + Tests the keyston_utils.create_project() funtion + """ + project_settings = ProjectSettings(name=self.project_name) + self.project = keystone_utils.create_project(self.keystone, project_settings) + self.assertEquals(self.project_name, self.project.name) + + project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name) + self.assertIsNotNone(project) + self.assertEquals(self.project_name, self.project.name) diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py new file mode 100644 index 0000000..5f95fc9 --- /dev/null +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -0,0 +1,651 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid + +from snaps.openstack.utils import keystone_utils +from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction +from snaps.openstack.tests import openstack_tests +from snaps.openstack.utils import neutron_utils +from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings +from snaps.openstack import create_router +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.tests import validation_utils + +__author__ = 'spisarski' + +ip_1 = '10.55.1.100' +ip_2 = '10.55.1.200' + + +class NeutronSmokeTests(OSComponentTestCase): + """ + Tests to ensure that the neutron client can communicate with the cloud + """ + + def test_neutron_connect_success(self): + """ + Tests to ensure that the proper credentials can connect. + """ + neutron = neutron_utils.neutron_client(self.os_creds) + + networks = neutron.list_networks() + + found = False + networks = networks.get('networks') + for network in networks: + if network.get('name') == self.ext_net_name: + found = True + self.assertTrue(found) + + def test_neutron_connect_fail(self): + """ + Tests to ensure that the improper credentials cannot connect. + """ + from snaps.openstack.os_credentials import OSCreds + + with self.assertRaises(Exception): + neutron = neutron_utils.neutron_client( + OSCreds(username='user', password='pass', auth_url='url', project_name='project')) + neutron.list_networks() + + def test_retrieve_ext_network_name(self): + """ + Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained + within the returned list + :return: + """ + neutron = neutron_utils.neutron_client(self.os_creds) + ext_networks = neutron_utils.get_external_networks(neutron) + found = False + for network in ext_networks: + if network['network']['name'] == self.ext_net_name: + found = True + break + self.assertTrue(found) + + +class NeutronUtilsNetworkTests(OSComponentTestCase): + """ + Test for creating networks via neutron_utils.py + """ + + def setUp(self): + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.port_name = str(guid) + '-port' + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.network = None + self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net') + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + if self.network: + neutron_utils.delete_network(self.neutron, self.network) + validate_network(self.neutron, self.network['network']['name'], False) + + def test_create_network(self): + """ + Tests the neutron_utils.create_neutron_net() function + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + def test_create_network_empty_name(self): + """ + Tests the neutron_utils.create_neutron_net() function with an empty network name + """ + with self.assertRaises(Exception): + self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name='')) + + def test_create_network_null_name(self): + """ + Tests the neutron_utils.create_neutron_net() function when the network name is None + """ + with self.assertRaises(Exception): + self.network = neutron_utils.create_network(self.neutron, NetworkSettings()) + + +class NeutronUtilsSubnetTests(OSComponentTestCase): + """ + Test for creating networks with subnets via neutron_utils.py + """ + + def setUp(self): + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.port_name = str(guid) + '-port' + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.network = None + self.subnet = None + self.net_config = openstack_tests.get_pub_net_config( + net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name) + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + if self.subnet: + neutron_utils.delete_subnet(self.neutron, self.subnet) + validate_subnet(self.neutron, self.subnet.get('name'), + self.net_config.network_settings.subnet_settings[0].cidr, False) + + if self.network: + neutron_utils.delete_network(self.neutron, self.network) + validate_network(self.neutron, self.network['network']['name'], False) + + def test_create_subnet(self): + """ + Tests the neutron_utils.create_neutron_net() function + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, network=self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + def test_create_subnet_null_name(self): + """ + Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + with self.assertRaises(Exception): + SubnetSettings(cidr=self.net_config.subnet_cidr) + + def test_create_subnet_empty_name(self): + """ + Tests the neutron_utils.create_neutron_net() function with an empty name + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, network=self.network) + validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True) + + def test_create_subnet_null_cidr(self): + """ + Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + with self.assertRaises(Exception): + sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + + def test_create_subnet_empty_cidr(self): + """ + Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + with self.assertRaises(Exception): + sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name) + neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network) + + +class NeutronUtilsRouterTests(OSComponentTestCase): + """ + Test for creating routers via neutron_utils.py + """ + + def setUp(self): + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.port_name = str(guid) + '-port' + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.network = None + self.subnet = None + self.port = None + self.router = None + self.interface_router = None + self.net_config = openstack_tests.get_pub_net_config( + net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', + router_name=guid + '-pub-router', external_net=self.ext_net_name) + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + if self.interface_router: + neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet) + + if self.router: + neutron_utils.delete_router(self.neutron, self.router) + validate_router(self.neutron, self.router.get('name'), False) + + if self.port: + neutron_utils.delete_port(self.neutron, self.port) + + if self.subnet: + neutron_utils.delete_subnet(self.neutron, self.subnet) + validate_subnet(self.neutron, self.subnet.get('name'), + self.net_config.network_settings.subnet_settings[0].cidr, False) + + if self.network: + neutron_utils.delete_network(self.neutron, self.network) + validate_network(self.neutron, self.network['network']['name'], False) + + def test_create_router_simple(self): + """ + Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + """ + self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, True) + + def test_create_router_with_public_interface(self): + """ + Tests the neutron_utils.create_neutron_net() function when an external gateway is requested + """ + self.net_config = openstack_tests.OSNetworkConfig( + self.net_config.network_settings.name, + self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name, + self.ext_net_name) + self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, True) + # TODO - Add validation that the router gatway has been set + + def test_create_router_empty_name(self): + """ + Tests the neutron_utils.create_neutron_net() function + """ + with self.assertRaises(Exception): + this_router_settings = create_router.RouterSettings(name='') + self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + + def test_create_router_null_name(self): + """ + Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None + """ + with self.assertRaises(Exception): + this_router_settings = create_router.RouterSettings() + self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings) + validate_router(self.neutron, None, True) + + def test_add_interface_router(self): + """ + Tests the neutron_utils.add_interface_router() function + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, True) + + self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + validate_interface_router(self.interface_router, self.router, self.subnet) + + def test_add_interface_router_null_router(self): + """ + Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + + def test_add_interface_router_null_subnet(self): + """ + Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings) + validate_router(self.neutron, self.net_config.router_settings.name, True) + + with self.assertRaises(Exception): + self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet) + + def test_create_port(self): + """ + Tests the neutron_utils.create_port() function + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, PortSettings( + name=self.port_name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}], + network_name=self.net_config.network_settings.name)) + validate_port(self.neutron, self.port, self.port_name) + + def test_create_port_empty_name(self): + """ + Tests the neutron_utils.create_port() function + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + self.port = neutron_utils.create_port( + self.neutron, self.os_creds, PortSettings( + name=self.port_name, network_name=self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}])) + validate_port(self.neutron, self.port, self.port_name) + + def test_create_port_null_name(self): + """ + Tests the neutron_utils.create_port() function for an Exception when the port name value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( + network_name=self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + + def test_create_port_null_network_object(self): + """ + Tests the neutron_utils.create_port() function for an Exception when the network object is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( + self.neutron, self.port_name, self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}])) + + def test_create_port_null_ip(self): + """ + Tests the neutron_utils.create_port() function for an Exception when the IP value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( + name=self.port_name, network_name=self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}])) + + def test_create_port_invalid_ip(self): + """ + Tests the neutron_utils.create_port() function for an Exception when the IP value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( + name=self.port_name, network_name=self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}])) + + def test_create_port_invalid_ip_to_subnet(self): + """ + Tests the neutron_utils.create_port() function for an Exception when the IP value is None + """ + self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings) + self.assertEqual(self.net_config.network_settings.name, self.network['network']['name']) + self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True)) + + self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0], + self.os_creds, self.network) + validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name, + self.net_config.network_settings.subnet_settings[0].cidr, True) + + with self.assertRaises(Exception): + self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings( + name=self.port_name, network_name=self.net_config.network_settings.name, + ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, + 'ip': '10.197.123.100'}])) + + +class NeutronUtilsSecurityGroupTests(OSComponentTestCase): + """ + Test for creating security groups via neutron_utils.py + """ + + def setUp(self): + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.sec_grp_name = guid + 'name' + + self.security_group = None + self.security_group_rules = list() + self.neutron = neutron_utils.neutron_client(self.os_creds) + self.keystone = keystone_utils.keystone_client(self.os_creds) + + def tearDown(self): + """ + Cleans the remote OpenStack objects + """ + for rule in self.security_group_rules: + neutron_utils.delete_security_group_rule(self.neutron, rule) + + if self.security_group: + neutron_utils.delete_security_group(self.neutron, self.security_group) + + def test_create_delete_simple_sec_grp(self): + """ + Tests the neutron_utils.create_security_group() function + """ + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name) + self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + + self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name']) + + sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + self.assertIsNotNone(sec_grp_get) + self.assertTrue(validation_utils.objects_equivalent( + self.security_group['security_group'], sec_grp_get['security_group'])) + + neutron_utils.delete_security_group(self.neutron, self.security_group) + sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + self.assertIsNone(sec_grp_get) + self.security_group = None + + def test_create_sec_grp_no_name(self): + """ + Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure + that attempting to create a security group without a name will raise an exception + """ + with self.assertRaises(Exception): + sec_grp_settings = SecurityGroupSettings() + self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + + def test_create_sec_grp_no_rules(self): + """ + Tests the neutron_utils.create_security_group() function + """ + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') + self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + + self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description']) + + sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + self.assertIsNotNone(sec_grp_get) + self.assertTrue(validation_utils.objects_equivalent( + self.security_group['security_group'], sec_grp_get['security_group'])) + + def test_create_sec_grp_one_rule(self): + """ + Tests the neutron_utils.create_security_group() function + """ + + sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=[sec_grp_rule_settings]) + + self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) + free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group) + for free_rule in free_rules: + self.security_group_rules.append(free_rule) + + self.security_group_rules.append( + neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0])) + + # Refresh object so it is populated with the newly added rule + self.security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + + rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group) + + self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules)) + + self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name']) + self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description']) + + sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name) + self.assertIsNotNone(sec_grp_get) + self.assertTrue(validation_utils.objects_equivalent( + self.security_group['security_group'], sec_grp_get['security_group'])) + + +""" +Validation routines +""" + + +def validate_network(neutron, name, exists): + """ + Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true. + Returns false if a network for a given name DOES exist if the exists parameter is true conversely false. + :param neutron: The neutron client + :param name: The expected network name + :param exists: Whether or not the network name should exist or not + :return: True/False + """ + network = neutron_utils.get_network(neutron, name) + if exists and network: + return True + if not exists and not network: + return True + return False + + +def validate_subnet(neutron, name, cidr, exists): + """ + Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true. + Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false. + :param neutron: The neutron client + :param name: The expected subnet name + :param cidr: The expected CIDR value + :param exists: Whether or not the network name should exist or not + :return: True/False + """ + subnet = neutron_utils.get_subnet_by_name(neutron, name) + if exists and subnet: + return subnet.get('cidr') == cidr + if not exists and not subnet: + return True + return False + + +def validate_router(neutron, name, exists): + """ + Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true. + Returns false if a router for a given name DOES exist if the exists parameter is true conversely false. + :param neutron: The neutron client + :param name: The expected router name + :param exists: Whether or not the network name should exist or not + :return: True/False + """ + router = neutron_utils.get_router_by_name(neutron, name) + if exists and router: + return True + return False + + +def validate_interface_router(interface_router, router, subnet): + """ + Returns true if the router ID & subnet ID have been properly included into the interface router object + :param interface_router: the object to validate + :param router: to validate against the interface_router + :param subnet: to validate against the interface_router + :return: True if both IDs match else False + """ + subnet_id = interface_router.get('subnet_id') + router_id = interface_router.get('port_id') + + return subnet.get('id') == subnet_id and router.get('id') == router_id + + +def validate_port(neutron, port_obj, this_port_name): + """ + Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true. + Returns false if a port for a given name DOES exist if the exists parameter is true conversely false. + :param neutron: The neutron client + :param port_obj: The port object to lookup + :param this_port_name: The expected router name + :return: True/False + """ + ports = neutron.list_ports() + for port, port_insts in ports.iteritems(): + for inst in port_insts: + if inst['id'] == port_obj['port']['id']: + return inst['name'] == this_port_name + return False diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py new file mode 100644 index 0000000..f6c9156 --- /dev/null +++ b/snaps/openstack/utils/tests/nova_utils_tests.py @@ -0,0 +1,208 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +import uuid + +from Crypto.PublicKey import RSA + +from snaps.openstack.utils import nova_utils +from snaps.openstack.tests.os_source_file_test import OSComponentTestCase +from snaps.openstack.create_flavor import FlavorSettings + +__author__ = 'spisarski' + +logger = logging.getLogger('nova_utils_tests') + + +class NovaSmokeTests(OSComponentTestCase): + """ + Tests to ensure that the nova client can communicate with the cloud + """ + + def test_nova_connect_success(self): + """ + Tests to ensure that the proper credentials can connect. + """ + nova = nova_utils.nova_client(self.os_creds) + + # This should not throw an exception + nova.flavors.list() + + def test_nova_connect_fail(self): + """ + Tests to ensure that the improper credentials cannot connect. + """ + from snaps.openstack.os_credentials import OSCreds + + nova = nova_utils.nova_client( + OSCreds(username='user', password='pass', auth_url=self.os_creds.auth_url, + project_name=self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings)) + + # This should throw an exception + with self.assertRaises(Exception): + nova.flavors.list() + + +class NovaUtilsKeypairTests(OSComponentTestCase): + """ + Test basic nova keypair functionality + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading and creating an OS image file + within OpenStack + """ + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.priv_key_file_path = 'tmp/' + guid + self.pub_key_file_path = self.priv_key_file_path + '.pub' + + self.nova = nova_utils.nova_client(self.os_creds) + self.keys = RSA.generate(1024) + self.public_key = self.keys.publickey().exportKey('OpenSSH') + self.keypair_name = guid + self.keypair = None + self.floating_ip = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.keypair: + try: + nova_utils.delete_keypair(self.nova, self.keypair) + except: + pass + + try: + os.remove(self.priv_key_file_path) + except: + pass + + try: + os.remove(self.pub_key_file_path) + except: + pass + + if self.floating_ip: + nova_utils.delete_floating_ip(self.nova, self.floating_ip) + + def test_create_keypair(self): + """ + Tests the creation of an OpenStack keypair that does not exist. + """ + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + result = nova_utils.keypair_exists(self.nova, self.keypair) + self.assertEquals(self.keypair, result) + keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name) + self.assertEquals(self.keypair, keypair) + + def test_create_delete_keypair(self): + """ + Tests the creation of an OpenStack keypair that does not exist. + """ + self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) + result = nova_utils.keypair_exists(self.nova, self.keypair) + self.assertEquals(self.keypair, result) + nova_utils.delete_keypair(self.nova, self.keypair) + result2 = nova_utils.keypair_exists(self.nova, self.keypair) + self.assertIsNone(result2) + + def test_create_key_from_file(self): + """ + Tests that the generated RSA keys are properly saved to files + :return: + """ + nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path) + self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path) + pub_key = open(os.path.expanduser(self.pub_key_file_path)).read() + self.assertEquals(self.keypair.public_key, pub_key) + + def test_floating_ips(self): + """ + Tests the creation of a floating IP + :return: + """ + ips = nova_utils.get_floating_ips(self.nova) + self.assertIsNotNone(ips) + + self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name) + returned = nova_utils.get_floating_ip(self.nova, self.floating_ip) + self.assertEquals(self.floating_ip, returned) + + +class NovaUtilsFlavorTests(OSComponentTestCase): + """ + Test basic nova flavor functionality + """ + + def setUp(self): + """ + Instantiates the CreateImage object that is responsible for downloading and creating an OS image file + within OpenStack + """ + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.flavor_settings = FlavorSettings(name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1, vcpus=1, + ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False) + self.nova = nova_utils.nova_client(self.os_creds) + self.flavor = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.flavor: + try: + nova_utils.delete_flavor(self.nova, self.flavor) + except: + pass + + def test_create_flavor(self): + """ + Tests the creation of an OpenStack keypair that does not exist. + """ + self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings) + self.validate_flavor() + + def test_create_delete_flavor(self): + """ + Tests the creation of an OpenStack keypair that does not exist. + """ + self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings) + self.validate_flavor() + nova_utils.delete_flavor(self.nova, self.flavor) + flavor = nova_utils.get_flavor_by_name(self.nova, self.flavor_settings.name) + self.assertIsNone(flavor) + + def validate_flavor(self): + """ + Validates the flavor_settings against the OpenStack flavor object + """ + self.assertIsNotNone(self.flavor) + self.assertEquals(self.flavor_settings.name, self.flavor.name) + self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id) + self.assertEquals(self.flavor_settings.ram, self.flavor.ram) + self.assertEquals(self.flavor_settings.disk, self.flavor.disk) + self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus) + self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral) + + if self.flavor_settings.swap == 0: + self.assertEquals('', self.flavor.swap) + else: + self.assertEquals(self.flavor_settings.swap, self.flavor.swap) + + self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor) + self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public) |