summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/utils
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/openstack/utils')
-rw-r--r--snaps/openstack/utils/__init__.py15
-rw-r--r--snaps/openstack/utils/deploy_utils.py151
-rw-r--r--snaps/openstack/utils/glance_utils.py78
-rw-r--r--snaps/openstack/utils/keystone_utils.py204
-rw-r--r--snaps/openstack/utils/neutron_utils.py405
-rw-r--r--snaps/openstack/utils/nova_utils.py282
-rw-r--r--snaps/openstack/utils/tests/__init__.py15
-rw-r--r--snaps/openstack/utils/tests/glance_utils_tests.py115
-rw-r--r--snaps/openstack/utils/tests/keystone_utils_tests.py100
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py651
-rw-r--r--snaps/openstack/utils/tests/nova_utils_tests.py208
11 files changed, 2224 insertions, 0 deletions
diff --git a/snaps/openstack/utils/__init__.py b/snaps/openstack/utils/__init__.py
new file mode 100644
index 0000000..7f92908
--- /dev/null
+++ b/snaps/openstack/utils/__init__.py
@@ -0,0 +1,15 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+__author__ = 'spisarski' \ No newline at end of file
diff --git a/snaps/openstack/utils/deploy_utils.py b/snaps/openstack/utils/deploy_utils.py
new file mode 100644
index 0000000..ade8811
--- /dev/null
+++ b/snaps/openstack/utils/deploy_utils.py
@@ -0,0 +1,151 @@
+#
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This utility makes it easy to create OpenStack objects
+import logging
+
+from snaps.openstack.create_project import OpenStackProject
+from snaps.openstack.create_user import OpenStackUser
+from snaps.openstack.create_image import OpenStackImage
+from snaps.openstack.create_network import OpenStackNetwork
+from snaps.openstack.create_router import OpenStackRouter
+from snaps.openstack.create_keypairs import OpenStackKeypair
+from snaps.openstack.create_instance import OpenStackVmInstance
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
+
+logger = logging.getLogger('deploy_utils')
+
+
+def create_image(os_creds, image_settings, cleanup=False):
+ """
+ Creates an image in OpenStack if necessary
+ :param os_creds: The OpenStack credentials object
+ :param image_settings: The image settings object
+ :param cleanup: Denotes whether or not this is being called for cleanup or not
+ :return: A reference to the image creator object from which the image object can be accessed
+ """
+ image_creator = OpenStackImage(os_creds, image_settings)
+ image_creator.create(cleanup)
+ return image_creator
+
+
+def create_network(os_creds, network_settings, cleanup=False):
+ """
+ Creates a network on which the CMTSs can attach
+ :param os_creds: The OpenStack credentials object
+ :param network_settings: The network settings object
+ :param cleanup: Denotes whether or not this is being called for cleanup or not
+ :return: A reference to the network creator objects for each network from which network elements such as the
+ subnet, router, interface router, and network objects can be accessed.
+ """
+ # Check for OS for network existence
+ # If exists return network instance data
+ # Else, create network and return instance data
+
+ logger.info('Attempting to create network with name - ' + network_settings.name)
+
+ network_creator = OpenStackNetwork(os_creds, network_settings)
+ network_creator.create(cleanup)
+ logger.info('Created network ')
+ return network_creator
+
+
+def create_router(os_creds, router_settings, cleanup=False):
+ """
+ Creates a network on which the CMTSs can attach
+ :param os_creds: The OpenStack credentials object
+ :param router_settings: The RouterSettings instance
+ :param cleanup: Denotes whether or not this is being called for cleanup or not
+ :return: A reference to the network creator objects for each network from which network elements such as the
+ subnet, router, interface router, and network objects can be accessed.
+ """
+ # Check for OS for network existence
+ # If exists return network instance data
+ # Else, create network and return instance data
+ logger.info('Attempting to create router with name - ' + router_settings.name)
+ router_creator = OpenStackRouter(os_creds, router_settings)
+ router_creator.create(cleanup)
+ logger.info('Created router ')
+ return router_creator
+
+
+def create_keypair(os_creds, keypair_settings, cleanup=False):
+ """
+ Creates a keypair that can be applied to an instance
+ :param os_creds: The OpenStack credentials object
+ :param keypair_settings: The KeypairSettings object
+ :param cleanup: Denotes whether or not this is being called for cleanup or not
+ :return: A reference to the keypair creator object
+ """
+ keypair_creator = OpenStackKeypair(os_creds, keypair_settings)
+ keypair_creator.create(cleanup)
+ return keypair_creator
+
+
+def create_vm_instance(os_creds, instance_settings, image_settings, keypair_creator=None, cleanup=False):
+ """
+ Creates a VM instance
+ :param os_creds: The OpenStack credentials
+ :param instance_settings: Instance of VmInstanceSettings
+ :param image_settings: The object containing image settings
+ :param keypair_creator: The object responsible for creating the keypair associated with this VM instance. (optional)
+ :param sg_names: The names of the security groups to apply to VM. (optional)
+ :param cleanup: Denotes whether or not this is being called for cleanup or not (default False)
+ :return: A reference to the VM instance object
+ """
+ kp_settings = None
+ if keypair_creator:
+ kp_settings = keypair_creator.keypair_settings
+ vm_creator = OpenStackVmInstance(os_creds, instance_settings, image_settings, kp_settings)
+ vm_creator.create(cleanup=cleanup)
+ return vm_creator
+
+
+def create_user(os_creds, user_settings):
+ """
+ Creates an OpenStack user
+ :param os_creds: The OpenStack credentials
+ :param user_settings: The user configuration settings
+ :return: A reference to the user instance object
+ """
+ user_creator = OpenStackUser(os_creds, user_settings)
+ user_creator.create()
+ return user_creator
+
+
+def create_project(os_creds, project_settings):
+ """
+ Creates an OpenStack user
+ :param os_creds: The OpenStack credentials
+ :param project_settings: The user project configuration settings
+ :return: A reference to the project instance object
+ """
+ project_creator = OpenStackProject(os_creds, project_settings)
+ project_creator.create()
+ return project_creator
+
+
+def create_security_group(os_creds, sec_grp_settings):
+ """
+ Creates an OpenStack Security Group
+ :param os_creds: The OpenStack credentials
+ :param sec_grp_settings: The security group settings
+ :return: A reference to the project instance object
+ """
+ sg_creator = OpenStackSecurityGroup(os_creds, sec_grp_settings)
+ sg_creator.create()
+ return sg_creator
+
diff --git a/snaps/openstack/utils/glance_utils.py b/snaps/openstack/utils/glance_utils.py
new file mode 100644
index 0000000..6d90d3e
--- /dev/null
+++ b/snaps/openstack/utils/glance_utils.py
@@ -0,0 +1,78 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from snaps import file_utils
+from glanceclient.client import Client
+from snaps.openstack.utils import keystone_utils
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('glance_utils')
+
+"""
+Utilities for basic neutron API calls
+"""
+
+
+def glance_client(os_creds):
+ """
+ Creates and returns a glance client object
+ :return: the glance client
+ """
+ return Client(version=os_creds.image_api_version, session=keystone_utils.keystone_session(os_creds))
+
+
+def get_image(nova, glance, image_name):
+ """
+ Returns an OpenStack image object for a given name
+ :param nova: the Nova client
+ :param glance: the Glance client
+ :param image_name: the image name to lookup
+ :return: the image object or None
+ """
+ try:
+ image_dict = nova.images.find(name=image_name)
+ if image_dict:
+ return glance.images.get(image_dict.id)
+ except:
+ pass
+ return None
+
+
+def create_image(glance, image_settings):
+ """
+ Creates and returns OpenStack image object with an external URL
+ :param glance: the glance client
+ :param image_settings: the image settings object
+ :return: the OpenStack image object
+ :raise Exception if using a file and it cannot be found
+ """
+ if image_settings.url:
+ return glance.images.create(name=image_settings.name, disk_format=image_settings.format,
+ container_format="bare", location=image_settings.url)
+ elif image_settings.image_file:
+ image_file = file_utils.get_file(image_settings.image_file)
+ return glance.images.create(name=image_settings.name, disk_format=image_settings.format,
+ container_format="bare", data=image_file)
+
+
+def delete_image(glance, image):
+ """
+ Deletes an image from OpenStack
+ :param glance: the glance client
+ :param image: the image to delete
+ """
+ glance.images.delete(image)
diff --git a/snaps/openstack/utils/keystone_utils.py b/snaps/openstack/utils/keystone_utils.py
new file mode 100644
index 0000000..8175b9a
--- /dev/null
+++ b/snaps/openstack/utils/keystone_utils.py
@@ -0,0 +1,204 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import requests
+from keystoneclient.client import Client
+from keystoneauth1.identity import v3, v2
+from keystoneauth1 import session
+import logging
+
+
+logger = logging.getLogger('keystone_utils')
+
+V2_VERSION = 'v2.0'
+
+
+def keystone_session(os_creds):
+ """
+ Creates a keystone session used for authenticating OpenStack clients
+ :param os_creds: The connection credentials to the OpenStack API
+ :return: the client object
+ """
+ logger.debug('Retrieving Keystone Session')
+
+ if os_creds.identity_api_version == 3:
+ auth = v3.Password(auth_url=os_creds.auth_url, username=os_creds.username, password=os_creds.password,
+ project_name=os_creds.project_name, user_domain_id=os_creds.user_domain_id,
+ project_domain_id=os_creds.project_domain_id)
+ else:
+ auth = v2.Password(auth_url=os_creds.auth_url, username=os_creds.username, password=os_creds.password,
+ tenant_name=os_creds.project_name)
+
+ req_session = None
+ if os_creds.proxy_settings:
+ req_session = requests.Session()
+ req_session.proxies = {'http': os_creds.proxy_settings.host + ':' + os_creds.proxy_settings.port}
+ return session.Session(auth=auth, session=req_session)
+
+
+def keystone_client(os_creds):
+ """
+ Returns the keystone client
+ :param os_creds: the OpenStack credentials (OSCreds) object
+ :return: the client
+ """
+ return Client(version=os_creds.identity_api_version, session=keystone_session(os_creds))
+
+
+def get_project(keystone=None, os_creds=None, project_name=None):
+ """
+ Returns the first project object or None if not found
+ :param keystone: the Keystone client
+ :param os_creds: the OpenStack credentials used to obtain the Keystone client if the keystone parameter is None
+ :param project_name: the name to query
+ :return: the ID or None
+ """
+ if not project_name:
+ return None
+
+ if not keystone:
+ if os_creds:
+ keystone = keystone_client(os_creds)
+ else:
+ raise Exception('Cannot lookup project without the proper credentials')
+
+ if keystone.version == V2_VERSION:
+ projects = keystone.tenants.list()
+ else:
+ projects = keystone.projects.list(**{'name': project_name})
+
+ for project in projects:
+ if project.name == project_name:
+ return project
+
+ return None
+
+
+def create_project(keystone, project_settings):
+ """
+ Creates a project
+ :param keystone: the Keystone client
+ :param project_settings: the project configuration
+ :return:
+ """
+ if keystone.version == V2_VERSION:
+ return keystone.tenants.create(project_settings.name, project_settings.description, project_settings.enabled)
+
+ return keystone.projects.create(project_settings.name, project_settings.domain,
+ description=project_settings.description,
+ enabled=project_settings.enabled)
+
+
+def delete_project(keystone, project):
+ """
+ Deletes a project
+ :param keystone: the Keystone clien
+ :param project: the OpenStack project object
+ """
+ if keystone.version == V2_VERSION:
+ keystone.tenants.delete(project)
+ else:
+ keystone.projects.delete(project)
+
+
+def get_user(keystone, username, project_name=None):
+ """
+ Returns a user for a given name and optionally project
+ :param keystone: the keystone client
+ :param username: the username to lookup
+ :param project_name: the associated project (optional)
+ :return:
+ """
+ project = get_project(keystone=keystone, project_name=project_name)
+
+ if project:
+ users = keystone.users.list(tenant_id=project.id)
+ else:
+ users = keystone.users.list()
+
+ for user in users:
+ if user.name == username:
+ return user
+
+ return None
+
+
+def create_user(keystone, user_settings):
+ """
+ Creates a user
+ :param keystone: the Keystone client
+ :param user_settings: the user configuration
+ :return:
+ """
+ project = None
+ if user_settings.project_name:
+ project = get_project(keystone=keystone, project_name=user_settings.project_name)
+
+ if keystone.version == V2_VERSION:
+ project_id = None
+ if project:
+ project_id = project.id
+ return keystone.users.create(name=user_settings.name, password=user_settings.password,
+ email=user_settings.email, tenant_id=project_id, enabled=user_settings.enabled)
+ else:
+ # TODO - need to support groups
+ return keystone.users.create(name=user_settings.name, password=user_settings.password,
+ email=user_settings.email, project=project,
+ # email=user_settings.email, project=project, group='default',
+ domain=user_settings.domain_name,
+ enabled=user_settings.enabled)
+
+
+def delete_user(keystone, user):
+ """
+ Deletes a user
+ :param keystone: the Keystone client
+ :param user: the OpenStack user object
+ """
+ keystone.users.delete(user)
+
+
+def create_role(keystone, name):
+ """
+ Creates an OpenStack role
+ :param keystone: the keystone client
+ :param name: the role name
+ :return:
+ """
+ return keystone.roles.create(name)
+
+
+def delete_role(keystone, role):
+ """
+ Deletes an OpenStack role
+ :param keystone: the keystone client
+ :param role: the role to delete
+ :return:
+ """
+ keystone.roles.delete(role)
+
+
+def assoc_user_to_project(keystone, role, user, project):
+ """
+ Adds a user to a project
+ :param keystone: the Keystone client
+ :param role: the role used to join a project/user
+ :param user: the user to add to the project
+ :param project: the project to which to add a user
+ :return:
+ """
+ if keystone.version == V2_VERSION:
+ keystone.roles.add_user_role(user, role, tenant=project)
+ else:
+ keystone.roles.grant(role, user=user, project=project)
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
new file mode 100644
index 0000000..6c92d2e
--- /dev/null
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -0,0 +1,405 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from neutronclient.common.exceptions import NotFound
+from neutronclient.neutron.client import Client
+import keystone_utils
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('neutron_utils')
+
+"""
+Utilities for basic neutron API calls
+"""
+
+
+def neutron_client(os_creds):
+ """
+ Instantiates and returns a client for communications with OpenStack's Neutron server
+ :param os_creds: the credentials for connecting to the OpenStack remote API
+ :return: the client object
+ """
+ return Client(api_version=os_creds.network_api_version, session=keystone_utils.keystone_session(os_creds))
+
+
+def create_network(neutron, os_creds, network_settings):
+ """
+ Creates a network for OpenStack
+ :param neutron: the client
+ :param os_creds: the OpenStack credentials
+ :param network_settings: A dictionary containing the network configuration and is responsible for creating the
+ network request JSON body
+ :return: the network object
+ """
+ if neutron and network_settings:
+ logger.info('Creating network with name ' + network_settings.name)
+ json_body = network_settings.dict_for_neutron(os_creds)
+ return neutron.create_network(body=json_body)
+ else:
+ logger.error("Failed to create network")
+ raise Exception
+
+
+def delete_network(neutron, network):
+ """
+ Deletes a network for OpenStack
+ :param neutron: the client
+ :param network: the network object
+ """
+ if neutron and network:
+ logger.info('Deleting network with name ' + network['network']['name'])
+ neutron.delete_network(network['network']['id'])
+
+
+def get_network(neutron, network_name, project_id=None):
+ """
+ Returns an object (dictionary) of the first network found with a given name and project_id (if included)
+ :param neutron: the client
+ :param network_name: the name of the network to retrieve
+ :param project_id: the id of the network's project
+ :return:
+ """
+ net_filter = dict()
+ if network_name:
+ net_filter['name'] = network_name
+ if project_id:
+ net_filter['project_id'] = project_id
+
+ networks = neutron.list_networks(**net_filter)
+ for network, netInsts in networks.iteritems():
+ for inst in netInsts:
+ if inst.get('name') == network_name:
+ if project_id and inst.get('project_id') == project_id:
+ return {'network': inst}
+ else:
+ return {'network': inst}
+ return None
+
+
+def get_network_by_id(neutron, network_id):
+ """
+ Returns the network object (dictionary) with the given ID
+ :param neutron: the client
+ :param network_id: the id of the network to retrieve
+ :return:
+ """
+ networks = neutron.list_networks(**{'id': network_id})
+ for network, netInsts in networks.iteritems():
+ for inst in netInsts:
+ if inst.get('id') == network_id:
+ return {'network': inst}
+ return None
+
+
+def create_subnet(neutron, subnet_settings, os_creds, network=None):
+ """
+ Creates a network subnet for OpenStack
+ :param neutron: the client
+ :param network: the network object
+ :param subnet_settings: A dictionary containing the subnet configuration and is responsible for creating the subnet
+ request JSON body
+ :param os_creds: the OpenStack credentials
+ :return: the subnet object
+ """
+ if neutron and network and subnet_settings:
+ json_body = {'subnets': [subnet_settings.dict_for_neutron(os_creds, network=network)]}
+ logger.info('Creating subnet with name ' + subnet_settings.name)
+ subnets = neutron.create_subnet(body=json_body)
+ return {'subnet': subnets['subnets'][0]}
+ else:
+ logger.error("Failed to create subnet.")
+ raise Exception
+
+
+def delete_subnet(neutron, subnet):
+ """
+ Deletes a network subnet for OpenStack
+ :param neutron: the client
+ :param subnet: the subnet object
+ """
+ if neutron and subnet:
+ logger.info('Deleting subnet with name ' + subnet['subnet']['name'])
+ neutron.delete_subnet(subnet['subnet']['id'])
+
+
+def get_subnet_by_name(neutron, subnet_name):
+ """
+ Returns the first subnet object (dictionary) found with a given name
+ :param neutron: the client
+ :param subnet_name: the name of the network to retrieve
+ :return:
+ """
+ subnets = neutron.list_subnets(**{'name': subnet_name})
+ for subnet, subnetInst in subnets.iteritems():
+ for inst in subnetInst:
+ if inst.get('name') == subnet_name:
+ return {'subnet': inst}
+ return None
+
+
+def create_router(neutron, os_creds, router_settings):
+ """
+ Creates a router for OpenStack
+ :param neutron: the client
+ :param os_creds: the OpenStack credentials
+ :param router_settings: A dictionary containing the router configuration and is responsible for creating the subnet
+ request JSON body
+ :return: the router object
+ """
+ if neutron:
+ json_body = router_settings.dict_for_neutron(neutron, os_creds)
+ logger.info('Creating router with name - ' + router_settings.name)
+ return neutron.create_router(json_body)
+ else:
+ logger.error("Failed to create router.")
+ raise Exception
+
+
+def delete_router(neutron, router):
+ """
+ Deletes a router for OpenStack
+ :param neutron: the client
+ :param router: the router object
+ """
+ if neutron and router:
+ logger.info('Deleting router with name - ' + router['router']['name'])
+ neutron.delete_router(router=router['router']['id'])
+ return True
+
+
+def get_router_by_name(neutron, router_name):
+ """
+ Returns the first router object (dictionary) found with a given name
+ :param neutron: the client
+ :param router_name: the name of the network to retrieve
+ :return:
+ """
+ routers = neutron.list_routers(**{'name': router_name})
+ for router, routerInst in routers.iteritems():
+ for inst in routerInst:
+ if inst.get('name') == router_name:
+ return {'router': inst}
+ return None
+
+
+def add_interface_router(neutron, router, subnet=None, port=None):
+ """
+ Adds an interface router for OpenStack for either a subnet or port. Exception will be raised if requesting for both.
+ :param neutron: the client
+ :param router: the router object
+ :param subnet: the subnet object
+ :param port: the port object
+ :return: the interface router object
+ """
+ if subnet and port:
+ raise Exception('Cannot add interface to the router. Both subnet and port were sent in. Either or please.')
+
+ if neutron and router and (router or subnet):
+ logger.info('Adding interface to router with name ' + router['router']['name'])
+ return neutron.add_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
+ else:
+ raise Exception("Unable to create interface router as neutron client, router or subnet were not created")
+
+
+def remove_interface_router(neutron, router, subnet=None, port=None):
+ """
+ Removes an interface router for OpenStack
+ :param neutron: the client
+ :param router: the router object
+ :param subnet: the subnet object (either subnet or port, not both)
+ :param port: the port object
+ """
+ if router:
+ try:
+ logger.info('Removing router interface from router named ' + router['router']['name'])
+ neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
+ except NotFound as e:
+ logger.warn('Could not remove router interface. NotFound - ' + e.message)
+ pass
+ else:
+ logger.warn('Could not remove router interface, No router object')
+
+
+def __create_port_json_body(subnet=None, port=None):
+ """
+ Returns the dictionary required for creating and deleting router interfaces. Will only work on a subnet or port
+ object. Will throw and exception if parameters contain both or neither
+ :param subnet: the subnet object
+ :param port: the port object
+ :return: the dict
+ """
+ if subnet and port:
+ raise Exception('Cannot create JSON body with both subnet and port')
+ if not subnet and not port:
+ raise Exception('Cannot create JSON body without subnet or port')
+
+ if subnet:
+ return {"subnet_id": subnet['subnet']['id']}
+ else:
+ return {"port_id": port['port']['id']}
+
+
+def create_port(neutron, os_creds, port_settings):
+ """
+ Creates a port for OpenStack
+ :param neutron: the client
+ :param os_creds: the OpenStack credentials
+ :param port_settings: the settings object for port configuration
+ :return: the port object
+ """
+ json_body = port_settings.dict_for_neutron(neutron, os_creds)
+ logger.info('Creating port for network with name - ' + port_settings.network_name)
+ return neutron.create_port(body=json_body)
+
+
+def delete_port(neutron, port):
+ """
+ Removes an OpenStack port
+ :param neutron: the client
+ :param port: the port object
+ :return:
+ """
+ logger.info('Deleting port with name ' + port['port']['name'])
+ neutron.delete_port(port['port']['id'])
+
+
+def get_port_by_name(neutron, port_name):
+ """
+ Returns the first port object (dictionary) found with a given name
+ :param neutron: the client
+ :param port_name: the name of the port to retrieve
+ :return:
+ """
+ ports = neutron.list_ports(**{'name': port_name})
+ for port in ports['ports']:
+ if port['name'] == port_name:
+ return {'port': port}
+ return None
+
+
+def create_security_group(neutron, keystone, sec_grp_settings):
+ """
+ Creates a security group object in OpenStack
+ :param neutron: the Neutron client
+ :param keystone: the Keystone client
+ :param sec_grp_settings: the security group settings
+ :return: the security group object
+ """
+ logger.info('Creating security group with name - ' + sec_grp_settings.name)
+ return neutron.create_security_group(sec_grp_settings.dict_for_neutron(keystone))
+
+
+def delete_security_group(neutron, sec_grp):
+ """
+ Deletes a security group object from OpenStack
+ :param neutron: the client
+ :param sec_grp: the security group object to delete
+ """
+ logger.info('Deleting security group with name - ' + sec_grp['security_group']['name'])
+ return neutron.delete_security_group(sec_grp['security_group']['id'])
+
+
+def get_security_group(neutron, name):
+ """
+ Returns the first security group object of the given name else None
+ :param neutron: the client
+ :param name: the name of security group object to retrieve
+ """
+ logger.info('Retrieving security group with name - ' + name)
+
+ groups = neutron.list_security_groups(**{'name': name})
+ for group in groups['security_groups']:
+ if group['name'] == name:
+ return {'security_group': group}
+ return None
+
+
+def get_security_group_by_id(neutron, sec_grp_id):
+ """
+ Returns the first security group object of the given name else None
+ :param neutron: the client
+ :param sec_grp_id: the id of the security group to retrieve
+ """
+ logger.info('Retrieving security group with ID - ' + sec_grp_id)
+
+ groups = neutron.list_security_groups(**{'sec_grp_id': sec_grp_id})
+ for group in groups['security_groups']:
+ return {'security_group': group}
+ return None
+
+
+def create_security_group_rule(neutron, sec_grp_rule_settings):
+ """
+ Creates a security group object in OpenStack
+ :param neutron: the client
+ :param sec_grp_rule_settings: the security group rule settings
+ :return: the security group object
+ """
+ logger.info('Creating security group to security group - ' + sec_grp_rule_settings.sec_grp_name)
+ return neutron.create_security_group_rule(sec_grp_rule_settings.dict_for_neutron(neutron))
+
+
+def delete_security_group_rule(neutron, sec_grp_rule):
+ """
+ Deletes a security group object from OpenStack
+ :param neutron: the client
+ :param sec_grp_rule: the security group rule object to delete
+ """
+ logger.info('Deleting security group rule with ID - ' + sec_grp_rule['security_group_rule']['id'])
+ neutron.delete_security_group_rule(sec_grp_rule['security_group_rule']['id'])
+
+
+def get_rules_by_security_group(neutron, sec_grp):
+ """
+ Retrieves all of the rules for a given security group
+ :param neutron: the client
+ :param sec_grp: the security group object
+ """
+ logger.info('Retrieving security group rules associate with the security group - ' +
+ sec_grp['security_group']['name'])
+ out = list()
+ rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']})
+ for rule in rules['security_group_rules']:
+ if rule['security_group_id'] == sec_grp['security_group']['id']:
+ out.append({'security_group_rule': rule})
+ return out
+
+
+def get_rule_by_id(neutron, sec_grp, rule_id):
+ """
+ Deletes a security group object from OpenStack
+ :param neutron: the client
+ :param sec_grp: the security group object
+ :param rule_id: the rule's ID
+ """
+ rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']})
+ for rule in rules['security_group_rules']:
+ if rule['id'] == rule_id:
+ return {'security_group_rule': rule}
+ return None
+
+
+def get_external_networks(neutron):
+ """
+ Returns a list of external OpenStack network object/dict for all external networks
+ :param neutron: the client
+ :return: a list of external networks (empty list if none configured)
+ """
+ out = list()
+ for network in neutron.list_networks(**{'router:external': True})['networks']:
+ out.append({'network': network})
+ return out
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
new file mode 100644
index 0000000..9d0f70f
--- /dev/null
+++ b/snaps/openstack/utils/nova_utils.py
@@ -0,0 +1,282 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import logging
+import keystone_utils
+
+from novaclient.client import Client
+from novaclient.exceptions import NotFound
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('nova_utils')
+
+"""
+Utilities for basic OpenStack Nova API calls
+"""
+
+
+def nova_client(os_creds):
+ """
+ Instantiates and returns a client for communications with OpenStack's Nova server
+ :param os_creds: The connection credentials to the OpenStack API
+ :return: the client object
+ """
+ logger.debug('Retrieving Nova Client')
+ return Client(os_creds.compute_api_version, session=keystone_utils.keystone_session(os_creds))
+
+
+def get_servers_by_name(nova, name):
+ """
+ Returns a list of servers with a given name
+ :param nova: the Nova client
+ :param name: the server name
+ :return: the list of servers
+ """
+ return nova.servers.list(search_opts={'name': name})
+
+
+def get_latest_server_object(nova, server):
+ """
+ Returns a server with a given id
+ :param nova: the Nova client
+ :param server: the old server object
+ :return: the list of servers or None if not found
+ """
+ return nova.servers.get(server)
+
+
+def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
+ """
+ Saves the generated RSA generated keys to the filesystem
+ :param keys: the keys to save
+ :param pub_file_path: the path to the public keys
+ :param priv_file_path: the path to the private keys
+ :return: None
+ """
+ if keys:
+ if pub_file_path:
+ pub_dir = os.path.dirname(pub_file_path)
+ if not os.path.isdir(pub_dir):
+ os.mkdir(pub_dir)
+ public_handle = open(pub_file_path, 'wb')
+ public_handle.write(keys.publickey().exportKey('OpenSSH'))
+ public_handle.close()
+ os.chmod(pub_file_path, 0o400)
+ logger.info("Saved public key to - " + pub_file_path)
+ if priv_file_path:
+ priv_dir = os.path.dirname(priv_file_path)
+ if not os.path.isdir(priv_dir):
+ os.mkdir(priv_dir)
+ private_handle = open(priv_file_path, 'wb')
+ private_handle.write(keys.exportKey())
+ private_handle.close()
+ os.chmod(priv_file_path, 0o400)
+ logger.info("Saved private key to - " + priv_file_path)
+
+
+def upload_keypair_file(nova, name, file_path):
+ """
+ Uploads a public key from a file
+ :param nova: the Nova client
+ :param name: the keypair name
+ :param file_path: the path to the public key file
+ :return: the keypair object
+ """
+ with open(os.path.expanduser(file_path)) as fpubkey:
+ logger.info('Saving keypair to - ' + file_path)
+ return upload_keypair(nova, name, fpubkey.read())
+
+
+def upload_keypair(nova, name, key):
+ """
+ Uploads a public key from a file
+ :param nova: the Nova client
+ :param name: the keypair name
+ :param key: the public key object
+ :return: the keypair object
+ """
+ logger.info('Creating keypair with name - ' + name)
+ return nova.keypairs.create(name=name, public_key=key)
+
+
+def keypair_exists(nova, keypair_obj):
+ """
+ Returns a copy of the keypair object if found
+ :param nova: the Nova client
+ :param keypair_obj: the keypair object
+ :return: the keypair object or None if not found
+ """
+ try:
+ return nova.keypairs.get(keypair_obj)
+ except:
+ return None
+
+
+def get_keypair_by_name(nova, name):
+ """
+ Returns a list of all available keypairs
+ :param nova: the Nova client
+ :param name: the name of the keypair to lookup
+ :return: the keypair object or None if not found
+ """
+ keypairs = nova.keypairs.list()
+
+ for keypair in keypairs:
+ if keypair.name == name:
+ return keypair
+
+ return None
+
+
+def delete_keypair(nova, key):
+ """
+ Deletes a keypair object from OpenStack
+ :param nova: the Nova client
+ :param key: the keypair object to delete
+ """
+ logger.debug('Deleting keypair - ' + key.name)
+ nova.keypairs.delete(key)
+
+
+def get_floating_ip_pools(nova):
+ """
+ Returns all of the available floating IP pools
+ :param nova: the Nova client
+ :return: a list of pools
+ """
+ return nova.floating_ip_pools.list()
+
+
+def get_floating_ips(nova):
+ """
+ Returns all of the floating IPs
+ :param nova: the Nova client
+ :return: a list of floating IPs
+ """
+ return nova.floating_ips.list()
+
+
+def create_floating_ip(nova, ext_net_name):
+ """
+ Returns the floating IP object that was created with this call
+ :param nova: the Nova client
+ :param ext_net_name: the name of the external network on which to apply the floating IP address
+ :return: the floating IP object
+ """
+ logger.info('Creating floating ip to external network - ' + ext_net_name)
+ return nova.floating_ips.create(ext_net_name)
+
+
+def get_floating_ip(nova, floating_ip):
+ """
+ Returns a floating IP object that should be identical to the floating_ip parameter
+ :param nova: the Nova client
+ :param floating_ip: the floating IP object to lookup
+ :return: hopefully the same floating IP object input
+ """
+ logger.debug('Attempting to retrieve existing floating ip with IP - ' + floating_ip.ip)
+ return nova.floating_ips.get(floating_ip)
+
+
+def delete_floating_ip(nova, floating_ip):
+ """
+ Responsible for deleting a floating IP
+ :param nova: the Nova client
+ :param floating_ip: the floating IP object to delete
+ :return:
+ """
+ logger.debug('Attempting to delete existing floating ip with IP - ' + floating_ip.ip)
+ return nova.floating_ips.delete(floating_ip)
+
+
+def get_nova_availability_zones(nova):
+ """
+ Returns the names of all nova compute servers
+ :param nova: the Nova client
+ :return: a list of compute server names
+ """
+ out = list()
+ zones = nova.availability_zones.list()
+ for zone in zones:
+ if zone.zoneName == 'nova':
+ for key, host in zone.hosts.iteritems():
+ out.append(zone.zoneName + ':' + key)
+
+ return out
+
+
+def delete_vm_instance(nova, vm_inst):
+ """
+ Deletes a VM instance
+ :param nova: the nova client
+ :param vm_inst: the OpenStack instance object to delete
+ """
+ nova.servers.delete(vm_inst)
+
+
+def get_flavor_by_name(nova, name):
+ """
+ Returns a flavor by name
+ :param nova: the Nova client
+ :param name: the flavor name to return
+ :return: the OpenStack flavor object or None if not exists
+ """
+ try:
+ return nova.flavors.find(name=name)
+ except NotFound:
+ return None
+
+
+def create_flavor(nova, flavor_settings):
+ """
+ Creates and returns and OpenStack flavor object
+ :param nova: the Nova client
+ :param flavor_settings: the flavor settings
+ :return: the Flavor
+ """
+ return nova.flavors.create(name=flavor_settings.name, flavorid=flavor_settings.flavor_id, ram=flavor_settings.ram,
+ vcpus=flavor_settings.vcpus, disk=flavor_settings.disk,
+ ephemeral=flavor_settings.ephemeral, swap=flavor_settings.swap,
+ rxtx_factor=flavor_settings.rxtx_factor, is_public=flavor_settings.is_public)
+
+
+def delete_flavor(nova, flavor):
+ """
+ Deletes a flavor
+ :param nova: the Nova client
+ :param flavor: the OpenStack flavor object
+ """
+ nova.flavors.delete(flavor)
+
+
+def add_security_group(nova, vm, security_group_name):
+ """
+ Adds a security group to an existing VM
+ :param nova: the nova client
+ :param vm: the OpenStack server object (VM) to alter
+ :param security_group_name: the name of the security group to add
+ """
+ nova.servers.add_security_group(vm.id, security_group_name)
+
+
+def remove_security_group(nova, vm, security_group):
+ """
+ Removes a security group from an existing VM
+ :param nova: the nova client
+ :param vm: the OpenStack server object (VM) to alter
+ :param security_group: the OpenStack security group object to add
+ """
+ nova.servers.remove_security_group(vm.id, security_group)
diff --git a/snaps/openstack/utils/tests/__init__.py b/snaps/openstack/utils/tests/__init__.py
new file mode 100644
index 0000000..7f92908
--- /dev/null
+++ b/snaps/openstack/utils/tests/__init__.py
@@ -0,0 +1,15 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+__author__ = 'spisarski' \ No newline at end of file
diff --git a/snaps/openstack/utils/tests/glance_utils_tests.py b/snaps/openstack/utils/tests/glance_utils_tests.py
new file mode 100644
index 0000000..d13908b
--- /dev/null
+++ b/snaps/openstack/utils/tests/glance_utils_tests.py
@@ -0,0 +1,115 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import os
+import shutil
+import uuid
+
+from snaps import file_utils
+from snaps.openstack.tests import openstack_tests
+
+from snaps.openstack.utils import nova_utils
+from snaps.openstack.tests import validation_utils
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.utils import glance_utils
+
+__author__ = 'spisarski'
+
+
+class GlanceSmokeTests(OSComponentTestCase):
+ """
+ Tests to ensure that the neutron client can communicate with the cloud
+ """
+
+ def test_glance_connect_success(self):
+ """
+ Tests to ensure that the proper credentials can connect.
+ """
+ glance = glance_utils.glance_client(self.os_creds)
+
+ users = glance.images.list()
+ self.assertIsNotNone(users)
+
+ def test_glance_connect_fail(self):
+ """
+ Tests to ensure that the improper credentials cannot connect.
+ """
+ from snaps.openstack.os_credentials import OSCreds
+
+ with self.assertRaises(Exception):
+ neutron = glance_utils.glance_client(OSCreds('user', 'pass', 'url', 'project'))
+ neutron.list_networks()
+
+
+class GlanceUtilsTests(OSComponentTestCase):
+ """
+ Test for the CreateImage class defined in create_image.py
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
+ within OpenStack
+ """
+ guid = uuid.uuid4()
+ self.image_name = self.__class__.__name__ + '-' + str(guid)
+ self.image = None
+ self.nova = nova_utils.nova_client(self.os_creds)
+ self.glance = glance_utils.glance_client(self.os_creds)
+
+ self.tmp_dir = 'tmp/' + str(guid)
+ if not os.path.exists(self.tmp_dir):
+ os.makedirs(self.tmp_dir)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.image:
+ glance_utils.delete_image(self.glance, self.image)
+
+ if os.path.exists(self.tmp_dir) and os.path.isdir(self.tmp_dir):
+ shutil.rmtree(self.tmp_dir)
+
+ def test_create_image_minimal_url(self):
+ """
+ Tests the glance_utils.create_image() function with a URL
+ """
+ os_image_settings = openstack_tests.cirros_url_image(name=self.image_name)
+
+ self.image = glance_utils.create_image(self.glance, os_image_settings)
+ self.assertIsNotNone(self.image)
+
+ self.assertEqual(self.image_name, self.image.name)
+
+ image = glance_utils.get_image(self.nova, self.glance, os_image_settings.name)
+ self.assertIsNotNone(image)
+
+ validation_utils.objects_equivalent(self.image, image)
+
+ def test_create_image_minimal_file(self):
+ """
+ Tests the glance_utils.create_image() function with a file
+ """
+ url_image_settings = openstack_tests.cirros_url_image('foo')
+ image_file = file_utils.download(url_image_settings.url, self.tmp_dir)
+ file_image_settings = openstack_tests.file_image_test_settings(name=self.image_name, file_path=image_file.name)
+
+ self.image = glance_utils.create_image(self.glance, file_image_settings)
+ self.assertIsNotNone(self.image)
+ self.assertEqual(self.image_name, self.image.name)
+
+ image = glance_utils.get_image(self.nova, self.glance, file_image_settings.name)
+ self.assertIsNotNone(image)
+ validation_utils.objects_equivalent(self.image, image)
diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py
new file mode 100644
index 0000000..76a43ef
--- /dev/null
+++ b/snaps/openstack/utils/tests/keystone_utils_tests.py
@@ -0,0 +1,100 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+
+from snaps.openstack.create_project import ProjectSettings
+from snaps.openstack.create_user import UserSettings
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.utils import keystone_utils
+
+__author__ = 'spisarski'
+
+
+class KeystoneSmokeTests(OSComponentTestCase):
+ """
+ Tests to ensure that the neutron client can communicate with the cloud
+ """
+
+ def test_keystone_connect_success(self):
+ """
+ Tests to ensure that the proper credentials can connect.
+ """
+ keystone = keystone_utils.keystone_client(self.os_creds)
+
+ users = keystone.users.list()
+ self.assertIsNotNone(users)
+
+ def test_keystone_connect_fail(self):
+ """
+ Tests to ensure that the improper credentials cannot connect.
+ """
+ from snaps.openstack.os_credentials import OSCreds
+
+ with self.assertRaises(Exception):
+ keystone = keystone_utils.keystone_client(OSCreds('user', 'pass', 'url', 'project'))
+ keystone.users.list()
+
+
+class KeystoneUtilsTests(OSComponentTestCase):
+ """
+ Test for the CreateImage class defined in create_image.py
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
+ within OpenStack
+ """
+ guid = uuid.uuid4()
+ self.username = self.__class__.__name__ + '-' + str(guid)
+ self.user = None
+
+ self.project_name = self.__class__.__name__ + '-' + str(guid)
+ self.project = None
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.project:
+ keystone_utils.delete_project(self.keystone, self.project)
+
+ if self.user:
+ keystone_utils.delete_user(self.keystone, self.user)
+
+ def test_create_user_minimal(self):
+ """
+ Tests the keystone_utils.create_user() function
+ """
+ user_settings = UserSettings(name=self.username, password='test123')
+ self.user = keystone_utils.create_user(self.keystone, user_settings)
+ self.assertEqual(self.username, self.user.name)
+
+ user = keystone_utils.get_user(self.keystone, self.username)
+ self.assertIsNotNone(user)
+ self.assertEqual(self.user, user)
+
+ def test_create_project_minimal(self):
+ """
+ Tests the keyston_utils.create_project() funtion
+ """
+ project_settings = ProjectSettings(name=self.project_name)
+ self.project = keystone_utils.create_project(self.keystone, project_settings)
+ self.assertEquals(self.project_name, self.project.name)
+
+ project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name)
+ self.assertIsNotNone(project)
+ self.assertEquals(self.project_name, self.project.name)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
new file mode 100644
index 0000000..5f95fc9
--- /dev/null
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -0,0 +1,651 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+
+from snaps.openstack.utils import keystone_utils
+from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
+from snaps.openstack.tests import openstack_tests
+from snaps.openstack.utils import neutron_utils
+from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
+from snaps.openstack import create_router
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.tests import validation_utils
+
+__author__ = 'spisarski'
+
+ip_1 = '10.55.1.100'
+ip_2 = '10.55.1.200'
+
+
+class NeutronSmokeTests(OSComponentTestCase):
+ """
+ Tests to ensure that the neutron client can communicate with the cloud
+ """
+
+ def test_neutron_connect_success(self):
+ """
+ Tests to ensure that the proper credentials can connect.
+ """
+ neutron = neutron_utils.neutron_client(self.os_creds)
+
+ networks = neutron.list_networks()
+
+ found = False
+ networks = networks.get('networks')
+ for network in networks:
+ if network.get('name') == self.ext_net_name:
+ found = True
+ self.assertTrue(found)
+
+ def test_neutron_connect_fail(self):
+ """
+ Tests to ensure that the improper credentials cannot connect.
+ """
+ from snaps.openstack.os_credentials import OSCreds
+
+ with self.assertRaises(Exception):
+ neutron = neutron_utils.neutron_client(
+ OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
+ neutron.list_networks()
+
+ def test_retrieve_ext_network_name(self):
+ """
+ Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
+ within the returned list
+ :return:
+ """
+ neutron = neutron_utils.neutron_client(self.os_creds)
+ ext_networks = neutron_utils.get_external_networks(neutron)
+ found = False
+ for network in ext_networks:
+ if network['network']['name'] == self.ext_net_name:
+ found = True
+ break
+ self.assertTrue(found)
+
+
+class NeutronUtilsNetworkTests(OSComponentTestCase):
+ """
+ Test for creating networks via neutron_utils.py
+ """
+
+ def setUp(self):
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.port_name = str(guid) + '-port'
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.network = None
+ self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.network:
+ neutron_utils.delete_network(self.neutron, self.network)
+ validate_network(self.neutron, self.network['network']['name'], False)
+
+ def test_create_network(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ def test_create_network_empty_name(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function with an empty network name
+ """
+ with self.assertRaises(Exception):
+ self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
+
+ def test_create_network_null_name(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function when the network name is None
+ """
+ with self.assertRaises(Exception):
+ self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
+
+
+class NeutronUtilsSubnetTests(OSComponentTestCase):
+ """
+ Test for creating networks with subnets via neutron_utils.py
+ """
+
+ def setUp(self):
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.port_name = str(guid) + '-port'
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.network = None
+ self.subnet = None
+ self.net_config = openstack_tests.get_pub_net_config(
+ net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.subnet:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ validate_subnet(self.neutron, self.subnet.get('name'),
+ self.net_config.network_settings.subnet_settings[0].cidr, False)
+
+ if self.network:
+ neutron_utils.delete_network(self.neutron, self.network)
+ validate_network(self.neutron, self.network['network']['name'], False)
+
+ def test_create_subnet(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, network=self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ def test_create_subnet_null_name(self):
+ """
+ Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ with self.assertRaises(Exception):
+ SubnetSettings(cidr=self.net_config.subnet_cidr)
+
+ def test_create_subnet_empty_name(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function with an empty name
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, network=self.network)
+ validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ def test_create_subnet_null_cidr(self):
+ """
+ Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ with self.assertRaises(Exception):
+ sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+
+ def test_create_subnet_empty_cidr(self):
+ """
+ Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ with self.assertRaises(Exception):
+ sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+
+
+class NeutronUtilsRouterTests(OSComponentTestCase):
+ """
+ Test for creating routers via neutron_utils.py
+ """
+
+ def setUp(self):
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.port_name = str(guid) + '-port'
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.network = None
+ self.subnet = None
+ self.port = None
+ self.router = None
+ self.interface_router = None
+ self.net_config = openstack_tests.get_pub_net_config(
+ net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
+ router_name=guid + '-pub-router', external_net=self.ext_net_name)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.interface_router:
+ neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
+
+ if self.router:
+ neutron_utils.delete_router(self.neutron, self.router)
+ validate_router(self.neutron, self.router.get('name'), False)
+
+ if self.port:
+ neutron_utils.delete_port(self.neutron, self.port)
+
+ if self.subnet:
+ neutron_utils.delete_subnet(self.neutron, self.subnet)
+ validate_subnet(self.neutron, self.subnet.get('name'),
+ self.net_config.network_settings.subnet_settings[0].cidr, False)
+
+ if self.network:
+ neutron_utils.delete_network(self.neutron, self.network)
+ validate_network(self.neutron, self.network['network']['name'], False)
+
+ def test_create_router_simple(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ """
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name, True)
+
+ def test_create_router_with_public_interface(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ """
+ self.net_config = openstack_tests.OSNetworkConfig(
+ self.net_config.network_settings.name,
+ self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
+ self.ext_net_name)
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name, True)
+ # TODO - Add validation that the router gatway has been set
+
+ def test_create_router_empty_name(self):
+ """
+ Tests the neutron_utils.create_neutron_net() function
+ """
+ with self.assertRaises(Exception):
+ this_router_settings = create_router.RouterSettings(name='')
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+
+ def test_create_router_null_name(self):
+ """
+ Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
+ """
+ with self.assertRaises(Exception):
+ this_router_settings = create_router.RouterSettings()
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+ validate_router(self.neutron, None, True)
+
+ def test_add_interface_router(self):
+ """
+ Tests the neutron_utils.add_interface_router() function
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name, True)
+
+ self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+ validate_interface_router(self.interface_router, self.router, self.subnet)
+
+ def test_add_interface_router_null_router(self):
+ """
+ Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+
+ def test_add_interface_router_null_subnet(self):
+ """
+ Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name, True)
+
+ with self.assertRaises(Exception):
+ self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+
+ def test_create_port(self):
+ """
+ Tests the neutron_utils.create_port() function
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds, PortSettings(
+ name=self.port_name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
+ network_name=self.net_config.network_settings.name))
+ validate_port(self.neutron, self.port, self.port_name)
+
+ def test_create_port_empty_name(self):
+ """
+ Tests the neutron_utils.create_port() function
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds, PortSettings(
+ name=self.port_name, network_name=self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
+ validate_port(self.neutron, self.port, self.port_name)
+
+ def test_create_port_null_name(self):
+ """
+ Tests the neutron_utils.create_port() function for an Exception when the port name value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+
+ def test_create_port_null_network_object(self):
+ """
+ Tests the neutron_utils.create_port() function for an Exception when the network object is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
+ self.neutron, self.port_name, self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+
+ def test_create_port_null_ip(self):
+ """
+ Tests the neutron_utils.create_port() function for an Exception when the IP value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
+ name=self.port_name, network_name=self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
+
+ def test_create_port_invalid_ip(self):
+ """
+ Tests the neutron_utils.create_port() function for an Exception when the IP value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
+ name=self.port_name, network_name=self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
+
+ def test_create_port_invalid_ip_to_subnet(self):
+ """
+ Tests the neutron_utils.create_port() function for an Exception when the IP value is None
+ """
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+
+ self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
+ self.os_creds, self.network)
+ validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
+ self.net_config.network_settings.subnet_settings[0].cidr, True)
+
+ with self.assertRaises(Exception):
+ self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
+ name=self.port_name, network_name=self.net_config.network_settings.name,
+ ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
+ 'ip': '10.197.123.100'}]))
+
+
+class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
+ """
+ Test for creating security groups via neutron_utils.py
+ """
+
+ def setUp(self):
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.sec_grp_name = guid + 'name'
+
+ self.security_group = None
+ self.security_group_rules = list()
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ for rule in self.security_group_rules:
+ neutron_utils.delete_security_group_rule(self.neutron, rule)
+
+ if self.security_group:
+ neutron_utils.delete_security_group(self.neutron, self.security_group)
+
+ def test_create_delete_simple_sec_grp(self):
+ """
+ Tests the neutron_utils.create_security_group() function
+ """
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
+ self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+
+ self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
+
+ sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ self.assertIsNotNone(sec_grp_get)
+ self.assertTrue(validation_utils.objects_equivalent(
+ self.security_group['security_group'], sec_grp_get['security_group']))
+
+ neutron_utils.delete_security_group(self.neutron, self.security_group)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ self.assertIsNone(sec_grp_get)
+ self.security_group = None
+
+ def test_create_sec_grp_no_name(self):
+ """
+ Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
+ that attempting to create a security group without a name will raise an exception
+ """
+ with self.assertRaises(Exception):
+ sec_grp_settings = SecurityGroupSettings()
+ self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+
+ def test_create_sec_grp_no_rules(self):
+ """
+ Tests the neutron_utils.create_security_group() function
+ """
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
+ self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+
+ self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
+
+ sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ self.assertIsNotNone(sec_grp_get)
+ self.assertTrue(validation_utils.objects_equivalent(
+ self.security_group['security_group'], sec_grp_get['security_group']))
+
+ def test_create_sec_grp_one_rule(self):
+ """
+ Tests the neutron_utils.create_security_group() function
+ """
+
+ sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=[sec_grp_rule_settings])
+
+ self.security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+ free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
+ for free_rule in free_rules:
+ self.security_group_rules.append(free_rule)
+
+ self.security_group_rules.append(
+ neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
+
+ # Refresh object so it is populated with the newly added rule
+ self.security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+
+ rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_group)
+
+ self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
+
+ self.assertTrue(sec_grp_settings.name, self.security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description, self.security_group['security_group']['description'])
+
+ sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ self.assertIsNotNone(sec_grp_get)
+ self.assertTrue(validation_utils.objects_equivalent(
+ self.security_group['security_group'], sec_grp_get['security_group']))
+
+
+"""
+Validation routines
+"""
+
+
+def validate_network(neutron, name, exists):
+ """
+ Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
+ Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
+ :param neutron: The neutron client
+ :param name: The expected network name
+ :param exists: Whether or not the network name should exist or not
+ :return: True/False
+ """
+ network = neutron_utils.get_network(neutron, name)
+ if exists and network:
+ return True
+ if not exists and not network:
+ return True
+ return False
+
+
+def validate_subnet(neutron, name, cidr, exists):
+ """
+ Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
+ Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
+ :param neutron: The neutron client
+ :param name: The expected subnet name
+ :param cidr: The expected CIDR value
+ :param exists: Whether or not the network name should exist or not
+ :return: True/False
+ """
+ subnet = neutron_utils.get_subnet_by_name(neutron, name)
+ if exists and subnet:
+ return subnet.get('cidr') == cidr
+ if not exists and not subnet:
+ return True
+ return False
+
+
+def validate_router(neutron, name, exists):
+ """
+ Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
+ Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
+ :param neutron: The neutron client
+ :param name: The expected router name
+ :param exists: Whether or not the network name should exist or not
+ :return: True/False
+ """
+ router = neutron_utils.get_router_by_name(neutron, name)
+ if exists and router:
+ return True
+ return False
+
+
+def validate_interface_router(interface_router, router, subnet):
+ """
+ Returns true if the router ID & subnet ID have been properly included into the interface router object
+ :param interface_router: the object to validate
+ :param router: to validate against the interface_router
+ :param subnet: to validate against the interface_router
+ :return: True if both IDs match else False
+ """
+ subnet_id = interface_router.get('subnet_id')
+ router_id = interface_router.get('port_id')
+
+ return subnet.get('id') == subnet_id and router.get('id') == router_id
+
+
+def validate_port(neutron, port_obj, this_port_name):
+ """
+ Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
+ Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
+ :param neutron: The neutron client
+ :param port_obj: The port object to lookup
+ :param this_port_name: The expected router name
+ :return: True/False
+ """
+ ports = neutron.list_ports()
+ for port, port_insts in ports.iteritems():
+ for inst in port_insts:
+ if inst['id'] == port_obj['port']['id']:
+ return inst['name'] == this_port_name
+ return False
diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py
new file mode 100644
index 0000000..f6c9156
--- /dev/null
+++ b/snaps/openstack/utils/tests/nova_utils_tests.py
@@ -0,0 +1,208 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import os
+import uuid
+
+from Crypto.PublicKey import RSA
+
+from snaps.openstack.utils import nova_utils
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.create_flavor import FlavorSettings
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('nova_utils_tests')
+
+
+class NovaSmokeTests(OSComponentTestCase):
+ """
+ Tests to ensure that the nova client can communicate with the cloud
+ """
+
+ def test_nova_connect_success(self):
+ """
+ Tests to ensure that the proper credentials can connect.
+ """
+ nova = nova_utils.nova_client(self.os_creds)
+
+ # This should not throw an exception
+ nova.flavors.list()
+
+ def test_nova_connect_fail(self):
+ """
+ Tests to ensure that the improper credentials cannot connect.
+ """
+ from snaps.openstack.os_credentials import OSCreds
+
+ nova = nova_utils.nova_client(
+ OSCreds(username='user', password='pass', auth_url=self.os_creds.auth_url,
+ project_name=self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings))
+
+ # This should throw an exception
+ with self.assertRaises(Exception):
+ nova.flavors.list()
+
+
+class NovaUtilsKeypairTests(OSComponentTestCase):
+ """
+ Test basic nova keypair functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
+ within OpenStack
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.priv_key_file_path = 'tmp/' + guid
+ self.pub_key_file_path = self.priv_key_file_path + '.pub'
+
+ self.nova = nova_utils.nova_client(self.os_creds)
+ self.keys = RSA.generate(1024)
+ self.public_key = self.keys.publickey().exportKey('OpenSSH')
+ self.keypair_name = guid
+ self.keypair = None
+ self.floating_ip = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.keypair:
+ try:
+ nova_utils.delete_keypair(self.nova, self.keypair)
+ except:
+ pass
+
+ try:
+ os.remove(self.priv_key_file_path)
+ except:
+ pass
+
+ try:
+ os.remove(self.pub_key_file_path)
+ except:
+ pass
+
+ if self.floating_ip:
+ nova_utils.delete_floating_ip(self.nova, self.floating_ip)
+
+ def test_create_keypair(self):
+ """
+ Tests the creation of an OpenStack keypair that does not exist.
+ """
+ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
+ result = nova_utils.keypair_exists(self.nova, self.keypair)
+ self.assertEquals(self.keypair, result)
+ keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
+ self.assertEquals(self.keypair, keypair)
+
+ def test_create_delete_keypair(self):
+ """
+ Tests the creation of an OpenStack keypair that does not exist.
+ """
+ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
+ result = nova_utils.keypair_exists(self.nova, self.keypair)
+ self.assertEquals(self.keypair, result)
+ nova_utils.delete_keypair(self.nova, self.keypair)
+ result2 = nova_utils.keypair_exists(self.nova, self.keypair)
+ self.assertIsNone(result2)
+
+ def test_create_key_from_file(self):
+ """
+ Tests that the generated RSA keys are properly saved to files
+ :return:
+ """
+ nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path)
+ self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path)
+ pub_key = open(os.path.expanduser(self.pub_key_file_path)).read()
+ self.assertEquals(self.keypair.public_key, pub_key)
+
+ def test_floating_ips(self):
+ """
+ Tests the creation of a floating IP
+ :return:
+ """
+ ips = nova_utils.get_floating_ips(self.nova)
+ self.assertIsNotNone(ips)
+
+ self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name)
+ returned = nova_utils.get_floating_ip(self.nova, self.floating_ip)
+ self.assertEquals(self.floating_ip, returned)
+
+
+class NovaUtilsFlavorTests(OSComponentTestCase):
+ """
+ Test basic nova flavor functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
+ within OpenStack
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.flavor_settings = FlavorSettings(name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1, vcpus=1,
+ ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False)
+ self.nova = nova_utils.nova_client(self.os_creds)
+ self.flavor = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.flavor:
+ try:
+ nova_utils.delete_flavor(self.nova, self.flavor)
+ except:
+ pass
+
+ def test_create_flavor(self):
+ """
+ Tests the creation of an OpenStack keypair that does not exist.
+ """
+ self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
+ self.validate_flavor()
+
+ def test_create_delete_flavor(self):
+ """
+ Tests the creation of an OpenStack keypair that does not exist.
+ """
+ self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
+ self.validate_flavor()
+ nova_utils.delete_flavor(self.nova, self.flavor)
+ flavor = nova_utils.get_flavor_by_name(self.nova, self.flavor_settings.name)
+ self.assertIsNone(flavor)
+
+ def validate_flavor(self):
+ """
+ Validates the flavor_settings against the OpenStack flavor object
+ """
+ self.assertIsNotNone(self.flavor)
+ self.assertEquals(self.flavor_settings.name, self.flavor.name)
+ self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id)
+ self.assertEquals(self.flavor_settings.ram, self.flavor.ram)
+ self.assertEquals(self.flavor_settings.disk, self.flavor.disk)
+ self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus)
+ self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral)
+
+ if self.flavor_settings.swap == 0:
+ self.assertEquals('', self.flavor.swap)
+ else:
+ self.assertEquals(self.flavor_settings.swap, self.flavor.swap)
+
+ self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
+ self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public)