summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/utils
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/openstack/utils')
-rw-r--r--snaps/openstack/utils/cinder_utils.py95
-rw-r--r--snaps/openstack/utils/deploy_utils.py6
-rw-r--r--snaps/openstack/utils/heat_utils.py218
-rw-r--r--snaps/openstack/utils/keystone_utils.py4
-rw-r--r--snaps/openstack/utils/launch_utils.py815
-rw-r--r--snaps/openstack/utils/magnum_utils.py126
-rw-r--r--snaps/openstack/utils/neutron_utils.py184
-rw-r--r--snaps/openstack/utils/nova_utils.py241
-rw-r--r--snaps/openstack/utils/settings_utils.py257
-rw-r--r--snaps/openstack/utils/tests/cinder_utils_tests.py159
-rw-r--r--snaps/openstack/utils/tests/heat_utils_tests.py528
-rw-r--r--snaps/openstack/utils/tests/keystone_utils_tests.py12
-rw-r--r--snaps/openstack/utils/tests/magnum_utils_tests.py299
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py439
-rw-r--r--snaps/openstack/utils/tests/nova_utils_tests.py185
-rw-r--r--snaps/openstack/utils/tests/settings_utils_tests.py153
16 files changed, 3256 insertions, 465 deletions
diff --git a/snaps/openstack/utils/cinder_utils.py b/snaps/openstack/utils/cinder_utils.py
index d13277d..c50a166 100644
--- a/snaps/openstack/utils/cinder_utils.py
+++ b/snaps/openstack/utils/cinder_utils.py
@@ -17,7 +17,8 @@ import logging
from cinderclient.client import Client
from cinderclient.exceptions import NotFound
-from snaps.domain.volume import QoSSpec, VolumeType, VolumeTypeEncryption
+from snaps.domain.volume import (
+ QoSSpec, VolumeType, VolumeTypeEncryption, Volume)
from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
@@ -42,6 +43,98 @@ def cinder_client(os_creds):
region_name=os_creds.region_name)
+def get_volume(cinder, volume_name=None, volume_settings=None):
+ """
+ Returns an OpenStack volume object for a given name
+ :param cinder: the Cinder client
+ :param volume_name: the volume name to lookup
+ :param volume_settings: the volume settings used for lookups
+ :return: the volume object or None
+ """
+ if volume_settings:
+ volume_name = volume_settings.name
+
+ volumes = cinder.volumes.list()
+ for volume in volumes:
+ if volume.name == volume_name:
+ return Volume(
+ name=volume.name, volume_id=volume.id,
+ description=volume.description, size=volume.size,
+ vol_type=volume.volume_type,
+ availability_zone=volume.availability_zone,
+ multi_attach=volume.multiattach,
+ attachments=volume.attachments)
+
+
+def __get_os_volume_by_id(cinder, volume_id):
+ """
+ Returns an OpenStack volume object for a given name
+ :param cinder: the Cinder client
+ :param volume_id: the volume ID to lookup
+ :return: the SNAPS-OO Domain Volume object or None
+ """
+ return cinder.volumes.get(volume_id)
+
+
+def get_volume_by_id(cinder, volume_id):
+ """
+ Returns an OpenStack volume object for a given name
+ :param cinder: the Cinder client
+ :param volume_id: the volume ID to lookup
+ :return: the SNAPS-OO Domain Volume object or None
+ """
+ volume = __get_os_volume_by_id(cinder, volume_id)
+ return Volume(
+ name=volume.name, volume_id=volume.id, description=volume.description,
+ size=volume.size, vol_type=volume.volume_type,
+ availability_zone=volume.availability_zone,
+ multi_attach=volume.multiattach, attachments=volume.attachments)
+
+
+def get_volume_status(cinder, volume):
+ """
+ Returns a new OpenStack Volume object for a given OpenStack volume object
+ :param cinder: the Cinder client
+ :param volume: the domain Volume object
+ :return: the OpenStack Volume object
+ """
+ os_volume = cinder.volumes.get(volume.id)
+ return os_volume.status
+
+
+def create_volume(cinder, volume_settings):
+ """
+ Creates and returns OpenStack volume object with an external URL
+ :param cinder: the cinder client
+ :param volume_settings: the volume settings object
+ :return: the OpenStack volume object
+ :raise Exception if using a file and it cannot be found
+ """
+ volume = cinder.volumes.create(
+ name=volume_settings.name, description=volume_settings.description,
+ size=volume_settings.size, imageRef=volume_settings.image_name,
+ volume_type=volume_settings.type_name,
+ availability_zone=volume_settings.availability_zone,
+ multiattach=volume_settings.multi_attach)
+
+ return Volume(
+ name=volume.name, volume_id=volume.id,
+ description=volume.description,
+ size=volume.size, vol_type=volume.volume_type,
+ availability_zone=volume.availability_zone,
+ multi_attach=volume.multiattach, attachments=volume.attachments)
+
+
+def delete_volume(cinder, volume):
+ """
+ Deletes an volume from OpenStack
+ :param cinder: the cinder client
+ :param volume: the volume to delete
+ """
+ logger.info('Deleting volume named - %s', volume.name)
+ return cinder.volumes.delete(volume.id)
+
+
def get_volume_type(cinder, volume_type_name=None, volume_type_settings=None):
"""
Returns an OpenStack volume type object for a given name
diff --git a/snaps/openstack/utils/deploy_utils.py b/snaps/openstack/utils/deploy_utils.py
index c936c1f..8cd6dd3 100644
--- a/snaps/openstack/utils/deploy_utils.py
+++ b/snaps/openstack/utils/deploy_utils.py
@@ -78,7 +78,7 @@ def create_router(os_creds, router_settings, cleanup=False):
"""
Creates a network on which the CMTSs can attach
:param os_creds: The OpenStack credentials object
- :param router_settings: The RouterSettings instance
+ :param router_settings: The RouterConfig instance
:param cleanup: Denotes whether or not this is being called for cleanup
:return: A reference to the network creator objects for each network from
which network elements such as the subnet, router, interface
@@ -103,7 +103,7 @@ def create_keypair(os_creds, keypair_settings, cleanup=False):
"""
Creates a keypair that can be applied to an instance
:param os_creds: The OpenStack credentials object
- :param keypair_settings: The KeypairSettings object
+ :param keypair_settings: The KeypairConfig object
:param cleanup: Denotes whether or not this is being called for cleanup
:return: A reference to the keypair creator object
"""
@@ -121,7 +121,7 @@ def create_vm_instance(os_creds, instance_settings, image_settings,
"""
Creates a VM instance
:param os_creds: The OpenStack credentials
- :param instance_settings: Instance of VmInstanceSettings
+ :param instance_settings: Instance of VmInstanceConfig
:param image_settings: The object containing image settings
:param keypair_creator: The object responsible for creating the keypair
associated with this VM instance. (optional)
diff --git a/snaps/openstack/utils/heat_utils.py b/snaps/openstack/utils/heat_utils.py
index 8b9395b..e440717 100644
--- a/snaps/openstack/utils/heat_utils.py
+++ b/snaps/openstack/utils/heat_utils.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import os
import yaml
from heatclient.client import Client
@@ -23,7 +24,8 @@ from oslo_serialization import jsonutils
from snaps import file_utils
from snaps.domain.stack import Stack, Resource, Output
-from snaps.openstack.utils import keystone_utils, neutron_utils, nova_utils
+from snaps.openstack.utils import (
+ keystone_utils, neutron_utils, nova_utils, cinder_utils)
__author__ = 'spisarski'
@@ -36,7 +38,7 @@ def heat_client(os_creds):
:param os_creds: the OpenStack credentials
:return: the client
"""
- logger.debug('Retrieving Nova Client')
+ logger.debug('Retrieving Heat Client')
return Client(os_creds.heat_api_version,
session=keystone_utils.keystone_session(os_creds),
region_name=os_creds.region_name)
@@ -116,6 +118,16 @@ def create_stack(heat_cli, stack_settings):
if stack_settings.env_values:
args['parameters'] = stack_settings.env_values
+ if stack_settings.resource_files:
+ resources = dict()
+ for res_file in stack_settings.resource_files:
+ heat_resource_contents = file_utils.read_file(res_file)
+ base_filename = os.path.basename(res_file)
+
+ if heat_resource_contents and base_filename:
+ resources[base_filename] = heat_resource_contents
+ args['files'] = resources
+
stack = heat_cli.stacks.create(**args)
return get_stack_by_id(heat_cli, stack_id=stack['stack']['id'])
@@ -130,30 +142,37 @@ def delete_stack(heat_cli, stack):
heat_cli.stacks.delete(stack.id)
-def __get_os_resources(heat_cli, stack):
+def __get_os_resources(heat_cli, res_id):
"""
Returns all of the OpenStack resource objects for a given stack
:param heat_cli: the OpenStack heat client
- :param stack: the SNAPS-OO Stack domain object
+ :param res_id: the resource ID
:return: a list
"""
- return heat_cli.resources.list(stack.id)
+ return heat_cli.resources.list(res_id)
-def get_resources(heat_cli, stack):
+def get_resources(heat_cli, res_id, res_type=None):
"""
Returns all of the OpenStack resource objects for a given stack
:param heat_cli: the OpenStack heat client
- :param stack: the SNAPS-OO Stack domain object
- :return: a list
+ :param res_id: the SNAPS-OO Stack domain object
+ :param res_type: the type name to filter
+ :return: a list of Resource domain objects
"""
- os_resources = __get_os_resources(heat_cli, stack)
+ os_resources = __get_os_resources(heat_cli, res_id)
if os_resources:
out = list()
for os_resource in os_resources:
- out.append(Resource(resource_type=os_resource.resource_type,
- resource_id=os_resource.physical_resource_id))
+ if ((res_type and os_resource.resource_type == res_type)
+ or not res_type):
+ out.append(Resource(
+ name=os_resource.resource_name,
+ resource_type=os_resource.resource_type,
+ resource_id=os_resource.physical_resource_id,
+ status=os_resource.resource_status,
+ status_reason=os_resource.resource_status_reason))
return out
@@ -163,7 +182,7 @@ def get_outputs(heat_cli, stack):
for given stack
:param heat_cli: the OpenStack heat client
:param stack: the SNAPS-OO Stack domain object
- :return: a list
+ :return: a list of Output domain objects
"""
out = list()
@@ -182,46 +201,183 @@ def get_outputs(heat_cli, stack):
def get_stack_networks(heat_cli, neutron, stack):
"""
- Returns an instance of NetworkSettings for each network owned by this stack
+ Returns a list of Network domain objects deployed by this stack
:param heat_cli: the OpenStack heat client object
:param neutron: the OpenStack neutron client object
:param stack: the SNAPS-OO Stack domain object
- :return: a list of NetworkSettings
+ :return: a list of Network objects
"""
out = list()
- resources = get_resources(heat_cli, stack)
+ resources = get_resources(heat_cli, stack.id, 'OS::Neutron::Net')
for resource in resources:
- if resource.type == 'OS::Neutron::Net':
- network = neutron_utils.get_network_by_id(
- neutron, resource.id)
- if network:
- out.append(network)
+ network = neutron_utils.get_network_by_id(neutron, resource.id)
+ if network:
+ out.append(network)
return out
-def get_stack_servers(heat_cli, nova, stack):
+def get_stack_routers(heat_cli, neutron, stack):
"""
- Returns an instance of NetworkSettings for each network owned by this stack
+ Returns a list of Network domain objects deployed by this stack
:param heat_cli: the OpenStack heat client object
- :param nova: the OpenStack nova client object
+ :param neutron: the OpenStack neutron client object
:param stack: the SNAPS-OO Stack domain object
- :return: a list of NetworkSettings
+ :return: a list of Network objects
"""
out = list()
- resources = get_resources(heat_cli, stack)
+ resources = get_resources(heat_cli, stack.id, 'OS::Neutron::Router')
for resource in resources:
- if resource.type == 'OS::Nova::Server':
- try:
+ router = neutron_utils.get_router_by_id(neutron, resource.id)
+ if router:
+ out.append(router)
+
+ return out
+
+
+def get_stack_security_groups(heat_cli, neutron, stack):
+ """
+ Returns a list of SecurityGroup domain objects deployed by this stack
+ :param heat_cli: the OpenStack heat client object
+ :param neutron: the OpenStack neutron client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of SecurityGroup objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack.id, 'OS::Neutron::SecurityGroup')
+ for resource in resources:
+ security_group = neutron_utils.get_security_group_by_id(
+ neutron, resource.id)
+ if security_group:
+ out.append(security_group)
+
+ return out
+
+
+def get_stack_servers(heat_cli, nova, neutron, stack):
+ """
+ Returns a list of VMInst domain objects associated with a Stack
+ :param heat_cli: the OpenStack heat client object
+ :param nova: the OpenStack nova client object
+ :param neutron: the OpenStack neutron client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of VMInst domain objects
+ """
+
+ out = list()
+ srvr_res = get_resources(heat_cli, stack.id, 'OS::Nova::Server')
+ for resource in srvr_res:
+ try:
+ server = nova_utils.get_server_object_by_id(
+ nova, neutron, resource.id)
+ if server:
+ out.append(server)
+ except NotFound:
+ logger.warn('VmInst cannot be located with ID %s', resource.id)
+
+ res_grps = get_resources(heat_cli, stack.id, 'OS::Heat::ResourceGroup')
+ for res_grp in res_grps:
+ res_ress = get_resources(heat_cli, res_grp.id)
+ for res_res in res_ress:
+ res_res_srvrs = get_resources(
+ heat_cli, res_res.id, 'OS::Nova::Server')
+ for res_srvr in res_res_srvrs:
server = nova_utils.get_server_object_by_id(
- nova, resource.id)
+ nova, neutron, res_srvr.id)
if server:
out.append(server)
- except NotFound:
- logger.warn(
- 'VmInst cannot be located with ID %s', resource.id)
+
+ return out
+
+
+def get_stack_keypairs(heat_cli, nova, stack):
+ """
+ Returns a list of Keypair domain objects associated with a Stack
+ :param heat_cli: the OpenStack heat client object
+ :param nova: the OpenStack nova client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of VMInst domain objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack.id, 'OS::Nova::KeyPair')
+ for resource in resources:
+ try:
+ keypair = nova_utils.get_keypair_by_id(nova, resource.id)
+ if keypair:
+ out.append(keypair)
+ except NotFound:
+ logger.warn('Keypair cannot be located with ID %s', resource.id)
+
+ return out
+
+
+def get_stack_volumes(heat_cli, cinder, stack):
+ """
+ Returns an instance of Volume domain objects created by this stack
+ :param heat_cli: the OpenStack heat client object
+ :param cinder: the OpenStack cinder client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of Volume domain objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack.id, 'OS::Cinder::Volume')
+ for resource in resources:
+ try:
+ server = cinder_utils.get_volume_by_id(cinder, resource.id)
+ if server:
+ out.append(server)
+ except NotFound:
+ logger.warn('Volume cannot be located with ID %s', resource.id)
+
+ return out
+
+
+def get_stack_volume_types(heat_cli, cinder, stack):
+ """
+ Returns an instance of VolumeType domain objects created by this stack
+ :param heat_cli: the OpenStack heat client object
+ :param cinder: the OpenStack cinder client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of VolumeType domain objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack.id, 'OS::Cinder::VolumeType')
+ for resource in resources:
+ try:
+ vol_type = cinder_utils.get_volume_type_by_id(cinder, resource.id)
+ if vol_type:
+ out.append(vol_type)
+ except NotFound:
+ logger.warn('VolumeType cannot be located with ID %s', resource.id)
+
+ return out
+
+
+def get_stack_flavors(heat_cli, nova, stack):
+ """
+ Returns an instance of Flavor SNAPS domain object for each flavor created
+ by this stack
+ :param heat_cli: the OpenStack heat client object
+ :param nova: the OpenStack cinder client object
+ :param stack: the SNAPS-OO Stack domain object
+ :return: a list of Volume domain objects
+ """
+
+ out = list()
+ resources = get_resources(heat_cli, stack.id, 'OS::Nova::Flavor')
+ for resource in resources:
+ try:
+ flavor = nova_utils.get_flavor_by_id(nova, resource.id)
+ if flavor:
+ out.append(flavor)
+ except NotFound:
+ logger.warn('Flavor cannot be located with ID %s', resource.id)
return out
diff --git a/snaps/openstack/utils/keystone_utils.py b/snaps/openstack/utils/keystone_utils.py
index 46f6fb8..b8769c0 100644
--- a/snaps/openstack/utils/keystone_utils.py
+++ b/snaps/openstack/utils/keystone_utils.py
@@ -114,7 +114,7 @@ def get_project(keystone=None, os_creds=None, project_settings=None,
:param keystone: the Keystone client
:param os_creds: the OpenStack credentials used to obtain the Keystone
client if the keystone parameter is None
- :param project_settings: a ProjectSettings object
+ :param project_settings: a ProjectConfig object
:param project_name: the name to query
:return: the SNAPS-OO Project domain object or None
"""
@@ -367,7 +367,7 @@ def grant_user_role_to_project(keystone, role, user, project):
"""
os_role = get_role_by_id(keystone, role.id)
- logger.info('Granting role %s to project %s', role.name, project)
+ logger.info('Granting role %s to project %s', role.name, project.name)
if keystone.version == V2_VERSION_STR:
keystone.roles.add_user_role(user, os_role, tenant=project)
else:
diff --git a/snaps/openstack/utils/launch_utils.py b/snaps/openstack/utils/launch_utils.py
new file mode 100644
index 0000000..e10cf48
--- /dev/null
+++ b/snaps/openstack/utils/launch_utils.py
@@ -0,0 +1,815 @@
+#
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This utility makes it easy to create OpenStack objects
+import logging
+import re
+import socket
+import struct
+
+import os
+import time
+from keystoneauth1.exceptions import Unauthorized
+
+from snaps.config.flavor import FlavorConfig
+from snaps.config.image import ImageConfig
+from snaps.config.keypair import KeypairConfig
+from snaps.config.network import PortConfig, NetworkConfig
+from snaps.config.project import ProjectConfig
+from snaps.config.qos import QoSConfig
+from snaps.config.router import RouterConfig
+from snaps.config.security_group import SecurityGroupConfig
+from snaps.config.user import UserConfig
+from snaps.config.vm_inst import VmInstanceConfig
+from snaps.config.volume import VolumeConfig
+from snaps.config.volume_type import VolumeTypeConfig
+from snaps.openstack.create_flavor import OpenStackFlavor
+from snaps.openstack.create_image import OpenStackImage
+from snaps.openstack.create_keypairs import OpenStackKeypair
+from snaps.openstack.create_network import OpenStackNetwork
+from snaps.openstack.create_project import OpenStackProject
+from snaps.openstack.create_qos import OpenStackQoS
+from snaps.openstack.create_router import OpenStackRouter
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
+from snaps.openstack.create_user import OpenStackUser
+from snaps.openstack.create_volume import OpenStackVolume
+from snaps.openstack.create_volume_type import OpenStackVolumeType
+from snaps.openstack.os_credentials import OSCreds, ProxySettings
+from snaps.openstack.utils import deploy_utils, neutron_utils
+from snaps.provisioning import ansible_utils
+
+logger = logging.getLogger('lanuch_utils')
+DEFAULT_CREDS_KEY = 'admin'
+
+
+def launch_config(config, tmplt_file, deploy, clean, clean_image):
+ """
+ Launches all objects and applies any configured ansible playbooks
+ :param config: the environment configuration dict object
+ :param tmplt_file: the path to the SNAPS-OO template file
+ :param deploy: when True deploy
+ :param clean: when True clean
+ :param clean_image: when True clean the image when clean is True
+ """
+ os_config = config.get('openstack')
+
+ creators = list()
+ vm_dict = dict()
+ images_dict = dict()
+ flavors_dict = dict()
+ networks_dict = dict()
+ routers_dict = dict()
+ os_creds_dict = dict()
+
+ if os_config:
+ os_creds_dict = __get_creds_dict(os_config)
+
+ # Create projects
+ projects_dict = __create_instances(
+ os_creds_dict, OpenStackProject, ProjectConfig,
+ os_config.get('projects'), 'project', clean)
+ creators.append(projects_dict)
+
+ # Create users
+ users_dict = __create_instances(
+ os_creds_dict, OpenStackUser, UserConfig,
+ os_config.get('users'), 'user', clean)
+ creators.append(users_dict)
+
+ # Associate new users to projects
+ if not clean:
+ for project_creator in projects_dict.values():
+ users = project_creator.project_settings.users
+ for user_name in users:
+ user_creator = users_dict.get(user_name)
+ if user_creator:
+ project_creator.assoc_user(
+ user_creator.get_user())
+
+ # Create flavors
+ flavors_dict = __create_instances(
+ os_creds_dict, OpenStackFlavor, FlavorConfig,
+ os_config.get('flavors'), 'flavor', clean, users_dict)
+ creators.append(flavors_dict)
+
+ # Create QoS specs
+ qos_dict = __create_instances(
+ os_creds_dict, OpenStackQoS, QoSConfig,
+ os_config.get('qos_specs'), 'qos_spec', clean, users_dict)
+ creators.append(qos_dict)
+
+ # Create volume types
+ vol_type_dict = __create_instances(
+ os_creds_dict, OpenStackVolumeType, VolumeTypeConfig,
+ os_config.get('volume_types'), 'volume_type', clean,
+ users_dict)
+ creators.append(vol_type_dict)
+
+ # Create volume types
+ vol_dict = __create_instances(
+ os_creds_dict, OpenStackVolume, VolumeConfig,
+ os_config.get('volumes'), 'volume', clean, users_dict)
+ creators.append(vol_dict)
+
+ # Create images
+ images_dict = __create_instances(
+ os_creds_dict, OpenStackImage, ImageConfig,
+ os_config.get('images'), 'image', clean, users_dict)
+ creators.append(images_dict)
+
+ # Create networks
+ networks_dict = __create_instances(
+ os_creds_dict, OpenStackNetwork, NetworkConfig,
+ os_config.get('networks'), 'network', clean, users_dict)
+ creators.append(networks_dict)
+
+ # Create routers
+ routers_dict = __create_instances(
+ os_creds_dict, OpenStackRouter, RouterConfig,
+ os_config.get('routers'), 'router', clean, users_dict)
+ creators.append(routers_dict)
+
+ # Create keypairs
+ keypairs_dict = __create_instances(
+ os_creds_dict, OpenStackKeypair, KeypairConfig,
+ os_config.get('keypairs'), 'keypair', clean, users_dict)
+ creators.append(keypairs_dict)
+
+ # Create security groups
+ creators.append(__create_instances(
+ os_creds_dict, OpenStackSecurityGroup,
+ SecurityGroupConfig,
+ os_config.get('security_groups'), 'security_group', clean,
+ users_dict))
+
+ # Create instance
+ vm_dict = __create_vm_instances(
+ os_creds_dict, users_dict, os_config.get('instances'),
+ images_dict, keypairs_dict, clean)
+ creators.append(vm_dict)
+ logger.info(
+ 'Completed creating/retrieving all configured instances')
+
+ # Must enter either block
+ if clean:
+ # Cleanup Environment
+ __cleanup(creators, clean_image)
+ elif deploy:
+ # Provision VMs
+ ansible_config = config.get('ansible')
+ if ansible_config and vm_dict:
+ if not __apply_ansible_playbooks(
+ ansible_config, os_creds_dict, vm_dict, images_dict,
+ flavors_dict, networks_dict, routers_dict, tmplt_file):
+ logger.error("Problem applying ansible playbooks")
+
+
+def __get_creds_dict(os_conn_config):
+ """
+ Returns a dict of OSCreds where the key is the creds name.
+ For backwards compatibility, credentials not contained in a list (only
+ one) will be returned with the key of None
+ :param os_conn_config: the credential configuration
+ :return: a dict of OSCreds objects
+ """
+ if 'connection' in os_conn_config:
+ return {DEFAULT_CREDS_KEY: __get_os_credentials(os_conn_config)}
+ elif 'connections' in os_conn_config:
+ out = dict()
+ for os_conn_dict in os_conn_config['connections']:
+ config = os_conn_dict.get('connection')
+ if not config:
+ raise Exception('Invalid connection format')
+
+ name = config.get('name')
+ if not name:
+ raise Exception('Connection config requires a name field')
+
+ out[name] = __get_os_credentials(os_conn_dict)
+ return out
+
+
+def __get_creds(os_creds_dict, os_user_dict, inst_config):
+ """
+ Returns the appropriate credentials
+ :param os_creds_dict: a dictionary of OSCreds objects where the name is the
+ key
+ :param os_user_dict: a dictionary of OpenStackUser objects where the name
+ is the key
+ :param inst_config:
+ :return: an OSCreds instance or None
+ """
+ os_creds = os_creds_dict.get(DEFAULT_CREDS_KEY)
+ if 'os_user' in inst_config:
+ os_user_conf = inst_config['os_user']
+ if 'name' in os_user_conf:
+ user_creator = os_user_dict.get(os_user_conf['name'])
+ if user_creator:
+ return user_creator.get_os_creds(
+ project_name=os_user_conf.get('project_name'))
+ elif 'os_creds_name' in inst_config:
+ if 'os_creds_name' in inst_config:
+ os_creds = os_creds_dict[inst_config['os_creds_name']]
+ return os_creds
+
+
+def __get_os_credentials(os_conn_config):
+ """
+ Returns an object containing all of the information required to access
+ OpenStack APIs
+ :param os_conn_config: The configuration holding the credentials
+ :return: an OSCreds instance
+ """
+ config = os_conn_config.get('connection')
+ if not config:
+ raise Exception('Invalid connection configuration')
+
+ proxy_settings = None
+ http_proxy = config.get('http_proxy')
+ if http_proxy:
+ tokens = re.split(':', http_proxy)
+ ssh_proxy_cmd = config.get('ssh_proxy_cmd')
+ proxy_settings = ProxySettings(host=tokens[0], port=tokens[1],
+ ssh_proxy_cmd=ssh_proxy_cmd)
+ else:
+ if 'proxy_settings' in config:
+ host = config['proxy_settings'].get('host')
+ port = config['proxy_settings'].get('port')
+ if host and host != 'None' and port and port != 'None':
+ proxy_settings = ProxySettings(**config['proxy_settings'])
+
+ if proxy_settings:
+ config['proxy_settings'] = proxy_settings
+ else:
+ if config.get('proxy_settings'):
+ del config['proxy_settings']
+
+ return OSCreds(**config)
+
+
+def __parse_ports_config(config):
+ """
+ Parses the "ports" configuration
+ :param config: The dictionary to parse
+ :return: a list of PortConfig objects
+ """
+ out = list()
+ for port_config in config:
+ out.append(PortConfig(**port_config.get('port')))
+ return out
+
+
+def __create_instances(os_creds_dict, creator_class, config_class, config,
+ config_key, cleanup=False, os_users_dict=None):
+ """
+ Returns a dictionary of SNAPS creator objects where the key is the name
+ :param os_creds_dict: Dictionary of OSCreds objects where the key is the
+ name
+ :param config: The list of configurations for the same type
+ :param config_key: The list of configurations for the same type
+ :param cleanup: Denotes whether or not this is being called for cleanup
+ :return: dictionary
+ """
+ out = {}
+
+ if config:
+ for config_dict in config:
+ inst_config = config_dict.get(config_key)
+ if inst_config:
+ creds = __get_creds(os_creds_dict, os_users_dict, inst_config)
+ if creds:
+ creator = creator_class(
+ creds,
+ config_class(**inst_config))
+
+ if creator:
+ if cleanup:
+ try:
+ creator.initialize()
+ except Unauthorized as e:
+ logger.warn(
+ 'Unable to initialize creator [%s] - %s',
+ creator, e)
+ else:
+ creator.create()
+
+ out[inst_config['name']] = creator
+
+ logger.info('Initialized configured %ss', config_key)
+
+ return out
+
+
+def __create_vm_instances(os_creds_dict, os_users_dict, instances_config,
+ image_dict, keypairs_dict, cleanup=False):
+ """
+ Returns a dictionary of OpenStackVmInstance objects where the key is the
+ instance name
+ :param os_creds_dict: Dictionary of OSCreds objects where the key is the
+ name
+ :param os_users_dict: Dictionary of OpenStackUser objects where the key is
+ the username
+ :param instances_config: The list of VM instance configurations
+ :param image_dict: A dictionary of images that will probably be used to
+ instantiate the VM instance
+ :param keypairs_dict: A dictionary of keypairs that will probably be used
+ to instantiate the VM instance
+ :param cleanup: Denotes whether or not this is being called for cleanup
+ :return: dictionary
+ """
+ vm_dict = {}
+
+ if instances_config:
+ for instance_config in instances_config:
+ conf = instance_config.get('instance')
+ if conf:
+ if image_dict:
+ image_creator = image_dict.get(conf.get('imageName'))
+ if image_creator:
+ instance_settings = VmInstanceConfig(
+ **instance_config['instance'])
+ kp_creator = keypairs_dict.get(
+ conf.get('keypair_name'))
+
+ try:
+ vm_dict[conf[
+ 'name']] = deploy_utils.create_vm_instance(
+ __get_creds(
+ os_creds_dict, os_users_dict, conf),
+ instance_settings,
+ image_creator.image_settings,
+ keypair_creator=kp_creator,
+ init_only=cleanup)
+ except Unauthorized as e:
+ if not cleanup:
+ logger.warn('Unable to initialize VM - %s', e)
+ raise
+ else:
+ raise Exception('Image creator instance not found.'
+ ' Cannot instantiate')
+ else:
+ if not cleanup:
+ raise Exception('Image dictionary is None. Cannot '
+ 'instantiate')
+ else:
+ raise Exception('Instance configuration is None. Cannot '
+ 'instantiate')
+ logger.info('Created configured instances')
+
+ return vm_dict
+
+
+def __apply_ansible_playbooks(ansible_configs, os_creds_dict, vm_dict,
+ image_dict, flavor_dict, networks_dict,
+ routers_dict, tmplt_file):
+ """
+ Applies ansible playbooks to running VMs with floating IPs
+ :param ansible_configs: a list of Ansible configurations
+ :param os_creds_dict: Dictionary of OSCreds objects where the key is the
+ name
+ :param vm_dict: the dictionary of newly instantiated VMs where the name is
+ the key
+ :param image_dict: the dictionary of newly instantiated images where the
+ name is the key
+ :param flavor_dict: the dictionary of newly instantiated flavors where the
+ name is the key
+ :param networks_dict: the dictionary of newly instantiated networks where
+ the name is the key
+ :param routers_dict: the dictionary of newly instantiated routers where
+ the name is the key
+ :param tmplt_file: the path of the SNAPS-OO template file for setting the
+ CWD so playbook location is relative to the deployment
+ file
+ :return: t/f - true if successful
+ """
+ logger.info("Applying Ansible Playbooks")
+ if ansible_configs:
+ # Set CWD so the deployment file's playbook location can leverage
+ # relative paths
+ orig_cwd = os.getcwd()
+ env_dir = os.path.dirname(tmplt_file)
+ os.chdir(env_dir)
+
+ # Apply playbooks
+ for ansible_config in ansible_configs:
+ # Ensure all hosts are accepting SSH session requests
+ for vm_name in ansible_config['hosts']:
+ vm_inst = vm_dict.get(vm_name)
+ if vm_inst:
+ if not vm_inst.vm_ssh_active(block=True):
+ logger.warning(
+ 'Timeout waiting for instance to respond to '
+ 'SSH requests')
+ return False
+
+ os_creds = os_creds_dict.get('admin-creds')
+ __apply_ansible_playbook(
+ ansible_config, os_creds, vm_dict, image_dict, flavor_dict,
+ networks_dict, routers_dict)
+
+ # Return to original directory
+ os.chdir(orig_cwd)
+
+ return True
+
+
+def __apply_ansible_playbook(ansible_config, os_creds, vm_dict, image_dict,
+ flavor_dict, networks_dict, routers_dict):
+ """
+ Applies an Ansible configuration setting
+ :param ansible_config: the configuration settings
+ :param os_creds: the OpenStack admin credentials object
+ :param vm_dict: the dictionary of newly instantiated VMs where the name is
+ the key
+ :param image_dict: the dictionary of newly instantiated images where the
+ name is the key
+ :param flavor_dict: the dictionary of newly instantiated flavors where the
+ name is the key
+ :param networks_dict: the dictionary of newly instantiated networks where
+ the name is the key
+ :param routers_dict: the dictionary of newly instantiated routers where
+ the name is the key
+ """
+ if ansible_config:
+ (remote_user, floating_ips, private_key_filepath,
+ proxy_settings) = __get_connection_info(
+ ansible_config, vm_dict)
+ if floating_ips:
+ for key, vm_creator in vm_dict.items():
+ fip = vm_creator.get_floating_ip()
+ if fip and fip.ip in floating_ips:
+ if not vm_creator.cloud_init_complete(block=True):
+ raise Exception(
+ 'Cannot apply playbooks as cloud-init has not '
+ 'completed')
+
+ variables = __get_variables(
+ ansible_config.get('variables'), os_creds, vm_dict, image_dict,
+ flavor_dict, networks_dict, routers_dict)
+
+ retval = ansible_utils.apply_playbook(
+ ansible_config['playbook_location'], floating_ips, remote_user,
+ private_key_filepath,
+ variables=variables,
+ proxy_setting=proxy_settings)
+ if retval != 0:
+ # Not a fatal type of event
+ raise Exception(
+ 'Error applying playbook found at location - %s',
+ ansible_config.get('playbook_location'))
+ elif ansible_config.get('post_processing'):
+ post_proc_config = ansible_config['post_processing']
+ if 'sleep' in post_proc_config:
+ time.sleep(post_proc_config['sleep'])
+ if 'reboot' in post_proc_config:
+ for vm_name in post_proc_config['reboot']:
+ if vm_name in vm_dict:
+ logger.info('Rebooting VM - %s', vm_name)
+ vm_dict[vm_name].reboot()
+
+ return retval
+
+
+def __get_connection_info(ansible_config, vm_dict):
+ """
+ Returns a tuple of data required for connecting to the running VMs
+ (remote_user, [floating_ips], private_key_filepath, proxy_settings)
+ :param ansible_config: the configuration settings
+ :param vm_dict: the dictionary of VMs where the VM name is the key
+ :return: tuple where the first element is the user and the second is a list
+ of floating IPs and the third is the
+ private key file location and the fourth is an instance of the
+ snaps.ProxySettings class
+ (note: in order to work, each of the hosts need to have the same sudo_user
+ and private key file location values)
+ """
+ if ansible_config.get('hosts'):
+ hosts = ansible_config['hosts']
+ if len(hosts) > 0:
+ floating_ips = list()
+ remote_user = None
+ pk_file = None
+ proxy_settings = None
+ for host in hosts:
+ vm = vm_dict.get(host)
+ if vm:
+ fip = vm.get_floating_ip()
+ if fip:
+ remote_user = vm.get_image_user()
+
+ if fip:
+ floating_ips.append(fip.ip)
+ else:
+ raise Exception(
+ 'Could not find floating IP for VM - ' +
+ vm.name)
+
+ pk_file = vm.keypair_settings.private_filepath
+ proxy_settings = vm.get_os_creds().proxy_settings
+ else:
+ logger.error('Could not locate VM with name - ' + host)
+
+ return remote_user, floating_ips, pk_file, proxy_settings
+ return None
+
+
+def __get_variables(var_config, os_creds, vm_dict, image_dict, flavor_dict,
+ networks_dict, routers_dict):
+ """
+ Returns a dictionary of substitution variables to be used for Ansible
+ templates
+ :param var_config: the variable configuration settings
+ :param os_creds: the OpenStack admin credentials object
+ :param vm_dict: the dictionary of newly instantiated VMs where the name is
+ the key
+ :param image_dict: the dictionary of newly instantiated images where the
+ name is the key
+ :param flavor_dict: the dictionary of newly instantiated flavors where the
+ name is the key
+ :param networks_dict: the dictionary of newly instantiated networks where
+ the name is the key
+ :param routers_dict: the dictionary of newly instantiated routers where
+ the name is the key
+ :return: dictionary or None
+ """
+ if var_config and vm_dict and len(vm_dict) > 0:
+ variables = dict()
+ for key, value in var_config.items():
+ value = __get_variable_value(
+ value, os_creds, vm_dict, image_dict, flavor_dict,
+ networks_dict, routers_dict)
+ if key and value:
+ variables[key] = value
+ logger.info(
+ "Set Jinga2 variable with key [%s] the value [%s]",
+ key, value)
+ else:
+ raise Exception(
+ 'Key - [' + str(key) + '] or Value [' + str(value)
+ + '] must not be None')
+ return variables
+ return None
+
+
+def __get_variable_value(var_config_values, os_creds, vm_dict, image_dict,
+ flavor_dict, networks_dict, routers_dict):
+ """
+ Returns the associated variable value for use by Ansible for substitution
+ purposes
+ :param var_config_values: the configuration dictionary
+ :param os_creds: the OpenStack admin credentials object
+ :param vm_dict: the dictionary of newly instantiated VMs where the name is
+ the key
+ :param image_dict: the dictionary of newly instantiated images where the
+ name is the key
+ :param flavor_dict: the dictionary of newly instantiated flavors where the
+ name is the key
+ :param networks_dict: the dictionary of newly instantiated networks where
+ the name is the key
+ :param routers_dict: the dictionary of newly instantiated routers where
+ the name is the key
+ :return:
+ """
+ if var_config_values['type'] == 'string':
+ return __get_string_variable_value(var_config_values)
+ if var_config_values['type'] == 'vm-attr':
+ return __get_vm_attr_variable_value(var_config_values, vm_dict)
+ if var_config_values['type'] == 'os_creds':
+ return __get_os_creds_variable_value(var_config_values, os_creds)
+ if var_config_values['type'] == 'network':
+ return __get_network_variable_value(var_config_values, networks_dict)
+ if var_config_values['type'] == 'router':
+ return __get_router_variable_value(var_config_values, routers_dict,
+ os_creds)
+ if var_config_values['type'] == 'port':
+ return __get_vm_port_variable_value(var_config_values, vm_dict)
+ if var_config_values['type'] == 'floating_ip':
+ return __get_vm_fip_variable_value(var_config_values, vm_dict)
+ if var_config_values['type'] == 'image':
+ return __get_image_variable_value(var_config_values, image_dict)
+ if var_config_values['type'] == 'flavor':
+ return __get_flavor_variable_value(var_config_values, flavor_dict)
+ return None
+
+
+def __get_string_variable_value(var_config_values):
+ """
+ Returns the associated string value
+ :param var_config_values: the configuration dictionary
+ :return: the value contained in the dictionary with the key 'value'
+ """
+ return var_config_values['value']
+
+
+def __get_vm_attr_variable_value(var_config_values, vm_dict):
+ """
+ Returns the associated value contained on a VM instance
+ :param var_config_values: the configuration dictionary
+ :param vm_dict: the dictionary containing all VMs where the key is the VM's
+ name
+ :return: the value
+ """
+ vm = vm_dict.get(var_config_values['vm_name'])
+ if vm:
+ if var_config_values['value'] == 'floating_ip':
+ return vm.get_floating_ip().ip
+ if var_config_values['value'] == 'image_user':
+ return vm.get_image_user()
+
+
+def __get_os_creds_variable_value(var_config_values, os_creds):
+ """
+ Returns the associated OS credentials value
+ :param var_config_values: the configuration dictionary
+ :param os_creds: the admin OpenStack OSCreds object
+ :return: the value
+ """
+ if os_creds:
+ if var_config_values['value'] == 'username':
+ logger.info("Returning OS username")
+ return os_creds.username
+ elif var_config_values['value'] == 'password':
+ logger.info("Returning OS password")
+ return os_creds.password
+ elif var_config_values['value'] == 'auth_url':
+ logger.info("Returning OS auth_url")
+ return os_creds.auth_url
+ elif var_config_values['value'] == 'project_name':
+ logger.info("Returning OS project_name")
+ return os_creds.project_name
+
+
+def __get_network_variable_value(var_config_values, networks_dict):
+ """
+ Returns the associated network value
+ :param var_config_values: the configuration dictionary
+ :param networks_dict: the dictionary containing all networks where the key
+ is the network name
+ :return: the value
+ """
+ net_name = var_config_values.get('network_name')
+
+ if net_name and networks_dict.get(net_name):
+ network_creator = networks_dict[net_name]
+
+ if 'subnet_name' in var_config_values:
+ subnet_name = var_config_values.get('subnet_name')
+ if subnet_name:
+ for subnet in network_creator.get_network().subnets:
+ if subnet_name == subnet.name:
+ if 'value' in var_config_values:
+ if 'gateway_ip' == var_config_values['value']:
+ return subnet.gateway_ip
+ if 'ip_range' == var_config_values['value']:
+ return subnet.start + ' ' + subnet.end
+ if 'cidr_ip' == var_config_values['value']:
+ cidr_split = subnet.cidr.split('/')
+ return cidr_split[0]
+ if 'netmask' == var_config_values['value']:
+ cidr_split = subnet.cidr.split('/')
+ cidr_bits = 32 - int(cidr_split[1])
+ netmask = socket.inet_ntoa(
+ struct.pack(
+ '!I', (1 << 32) - (1 << cidr_bits)))
+ return netmask
+ if 'broadcast_ip' == var_config_values['value']:
+ end_split = subnet.end.split('.')
+ broadcast_ip = (
+ end_split[0] + '.' + end_split[1] + '.'
+ + end_split[2] + '.255')
+ return broadcast_ip
+
+
+def __get_router_variable_value(var_config_values, routers_dict, os_creds):
+ """
+ Returns the associated network value
+ :param var_config_values: the configuration dictionary
+ :param routers_dict: the dictionary containing all networks where the key
+ is the network name
+ :param os_creds: the admin OpenStack credentials
+ :return: the value
+ """
+ router_name = var_config_values.get('router_name')
+ router_creator = routers_dict[router_name]
+
+ if router_creator:
+ if 'external_fixed_ip' == var_config_values.get('attr'):
+ neutron = neutron_utils.neutron_client(os_creds)
+ ext_nets = neutron_utils.get_external_networks(neutron)
+
+ subnet_name = var_config_values.get('subnet_name')
+
+ for ext_net in ext_nets:
+ for subnet in ext_net.subnets:
+ if subnet_name == subnet.name:
+ router = router_creator.get_router()
+ for fixed_ips in router.external_fixed_ips:
+ if subnet.id == fixed_ips['subnet_id']:
+ return fixed_ips['ip_address']
+
+
+def __get_vm_port_variable_value(var_config_values, vm_dict):
+ """
+ Returns the associated OS credentials value
+ :param var_config_values: the configuration dictionary
+ :param vm_dict: the dictionary containing all VMs where the key is the VM's
+ name
+ :return: the value
+ """
+ port_name = var_config_values.get('port_name')
+ vm_name = var_config_values.get('vm_name')
+
+ if port_name and vm_name:
+ vm = vm_dict.get(vm_name)
+ if vm:
+ for vm_port in vm.get_vm_inst().ports:
+ if vm_port.name == port_name:
+ port_value_id = var_config_values.get('port_value')
+ if port_value_id:
+ if port_value_id == 'mac_address':
+ return vm_port.mac_address
+ if port_value_id == 'ip_address':
+ return vm_port.ips[0]['ip_address']
+
+
+def __get_vm_fip_variable_value(var_config_values, vm_dict):
+ """
+ Returns the floating IP value if found
+ :param var_config_values: the configuration dictionary
+ :param vm_dict: the dictionary containing all VMs where the key is the VM's
+ name
+ :return: the floating IP string value or None
+ """
+ fip_name = var_config_values.get('fip_name')
+ vm_name = var_config_values.get('vm_name')
+
+ if vm_name:
+ vm = vm_dict.get(vm_name)
+ if vm:
+ fip = vm.get_floating_ip(fip_name)
+ if fip:
+ return fip.ip
+
+
+def __get_image_variable_value(var_config_values, image_dict):
+ """
+ Returns the associated image value
+ :param var_config_values: the configuration dictionary
+ :param image_dict: the dictionary containing all images where the key is
+ the name
+ :return: the value
+ """
+ if image_dict:
+ if var_config_values.get('image_name'):
+ image_creator = image_dict.get(var_config_values['image_name'])
+ if image_creator:
+ if (var_config_values.get('value')
+ and var_config_values['value'] == 'id'):
+ return image_creator.get_image().id
+ if (var_config_values.get('value')
+ and var_config_values['value'] == 'user'):
+ return image_creator.image_settings.image_user
+
+
+def __get_flavor_variable_value(var_config_values, flavor_dict):
+ """
+ Returns the associated flavor value
+ :param var_config_values: the configuration dictionary
+ :param flavor_dict: the dictionary containing all flavor creators where the
+ key is the name
+ :return: the value or None
+ """
+ if flavor_dict:
+ if var_config_values.get('flavor_name'):
+ flavor_creator = flavor_dict.get(var_config_values['flavor_name'])
+ if flavor_creator:
+ if (var_config_values.get('value')
+ and var_config_values['value'] == 'id'):
+ return flavor_creator.get_flavor().id
+
+
+def __cleanup(creators, clean_image=False):
+ """
+ Cleans up environment
+ :param creators: the list of creators by type
+ :param clean_image: when true
+ :return:
+ """
+ for creator_dict in reversed(creators):
+ for key, creator in creator_dict.items():
+ if ((isinstance(creator, OpenStackImage) and clean_image)
+ or not isinstance(creator, OpenStackImage)):
+ creator.clean()
diff --git a/snaps/openstack/utils/magnum_utils.py b/snaps/openstack/utils/magnum_utils.py
new file mode 100644
index 0000000..96ba6d1
--- /dev/null
+++ b/snaps/openstack/utils/magnum_utils.py
@@ -0,0 +1,126 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from magnumclient.client import Client
+
+from snaps.domain.cluster_template import ClusterTemplate
+from snaps.openstack.utils import keystone_utils
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('magnum_utils')
+
+
+def magnum_client(os_creds):
+ """
+ Retrieves the Magnum client
+ :param os_creds: the OpenStack credentialsf
+ :return: the client
+ """
+ logger.debug('Retrieving Magnum Client')
+ return Client(str(os_creds.magnum_api_version),
+ session=keystone_utils.keystone_session(os_creds))
+
+
+def get_cluster_template(magnum, template_config=None, template_name=None):
+ """
+ Returns the first ClusterTemplate domain object that matches the parameters
+ :param magnum: the Magnum client
+ :param template_config: a ClusterTemplateConfig object (optional)
+ :param template_name: the name of the template to lookup
+ :return: ClusterTemplate object or None
+ """
+ name = None
+ if template_config:
+ name = template_config.name
+ elif template_name:
+ name = template_name
+
+ os_templates = magnum.cluster_templates.list()
+ for os_template in os_templates:
+ if os_template.name == name:
+ return __map_os_cluster_template(os_template)
+
+
+def get_cluster_template_by_id(magnum, tmplt_id):
+ """
+ Returns the first ClusterTemplate domain object that matches the parameters
+ :param magnum: the Magnum client
+ :param tmplt_id: the template's ID
+ :return: ClusterTemplate object or None
+ """
+ return __map_os_cluster_template(magnum.cluster_templates.get(tmplt_id))
+
+
+def create_cluster_template(magnum, cluster_template_config):
+ """
+ Creates a Magnum Cluster Template object in OpenStack
+ :param magnum: the Magnum client
+ :param cluster_template_config: a ClusterTemplateConfig object
+ :return: a SNAPS ClusterTemplate domain object
+ """
+ config_dict = cluster_template_config.magnum_dict()
+ os_cluster_template = magnum.cluster_templates.create(**config_dict)
+ logger.info('Creating cluster template named [%s]',
+ cluster_template_config.name)
+ return __map_os_cluster_template(os_cluster_template)
+
+
+def delete_cluster_template(magnum, tmplt_id):
+ """
+ Deletes a Cluster Template from OpenStack
+ :param magnum: the Magnum client
+ :param tmplt_id: the cluster template ID to delete
+ """
+ logger.info('Deleting cluster template with ID [%s]', tmplt_id)
+ magnum.cluster_templates.delete(tmplt_id)
+
+
+def __map_os_cluster_template(os_tmplt):
+ """
+ Returns a SNAPS ClusterTemplate object from an OpenStack ClusterTemplate
+ object
+ :param os_tmplt: the OpenStack ClusterTemplate object
+ :return: SNAPS ClusterTemplate object
+ """
+ return ClusterTemplate(
+ id=os_tmplt.uuid,
+ name=os_tmplt.name,
+ image=os_tmplt.image_id,
+ keypair=os_tmplt.keypair_id,
+ network_driver=os_tmplt.network_driver,
+ external_net=os_tmplt.external_network_id,
+ floating_ip_enabled=os_tmplt.floating_ip_enabled,
+ docker_volume_size=os_tmplt.docker_volume_size,
+ server_type=os_tmplt.server_type,
+ flavor=os_tmplt.flavor_id,
+ master_flavor=os_tmplt.master_flavor_id,
+ coe=os_tmplt.coe,
+ fixed_net=os_tmplt.fixed_network,
+ fixed_subnet=os_tmplt.fixed_subnet,
+ registry_enabled=os_tmplt.registry_enabled,
+ insecure_registry=os_tmplt.insecure_registry,
+ docker_storage_driver=os_tmplt.docker_storage_driver,
+ dns_nameserver=os_tmplt.dns_nameserver,
+ public=os_tmplt.public,
+ tls_disabled=os_tmplt.tls_disabled,
+ http_proxy=os_tmplt.http_proxy,
+ https_proxy=os_tmplt.https_proxy,
+ no_proxy=os_tmplt.no_proxy,
+ volume_driver=os_tmplt.volume_driver,
+ master_lb_enabled=os_tmplt.master_lb_enabled,
+ labels=os_tmplt.labels
+ )
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 806bb53..e94a40e 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -53,15 +53,33 @@ def create_network(neutron, os_creds, network_settings):
:param network_settings: A dictionary containing the network configuration
and is responsible for creating the network
request JSON body
- :return: a SNAPS-OO Network domain object
+ :return: a SNAPS-OO Network domain object if found else None
"""
- if neutron and network_settings:
- logger.info('Creating network with name ' + network_settings.name)
- json_body = network_settings.dict_for_neutron(os_creds)
- os_network = neutron.create_network(body=json_body)
- return Network(**os_network['network'])
- else:
- raise NeutronException('Failded to create network')
+ logger.info('Creating network with name ' + network_settings.name)
+ json_body = network_settings.dict_for_neutron(os_creds)
+ os_network = neutron.create_network(body=json_body)
+
+ if os_network:
+ network = get_network_by_id(neutron, os_network['network']['id'])
+
+ subnets = list()
+ for subnet_settings in network_settings.subnet_settings:
+ try:
+ subnets.append(
+ create_subnet(neutron, subnet_settings, os_creds, network))
+ except:
+ logger.error(
+ 'Unexpected error creating subnet [%s] for network [%s]',
+ subnet_settings.name, network.name)
+
+ for subnet in subnets:
+ delete_subnet(neutron, subnet)
+
+ delete_network(neutron, network)
+
+ raise
+
+ return get_network_by_id(neutron, network.id)
def delete_network(neutron, network):
@@ -71,6 +89,14 @@ def delete_network(neutron, network):
:param network: a SNAPS-OO Network domain object
"""
if neutron and network:
+ if network.subnets:
+ for subnet in network.subnets:
+ logger.info('Deleting subnet with name ' + subnet.name)
+ try:
+ delete_subnet(neutron, subnet)
+ except NotFound:
+ pass
+
logger.info('Deleting network with name ' + network.name)
neutron.delete_network(network.id)
@@ -83,7 +109,7 @@ def get_network(neutron, network_settings=None, network_name=None,
else the query will use just the name from the network_name parameter.
When the project_id is included, that will be added to the query filter.
:param neutron: the client
- :param network_settings: the NetworkSettings object used to create filter
+ :param network_settings: the NetworkConfig object used to create filter
:param network_name: the name of the network to retrieve
:param project_id: the id of the network's project
:return: a SNAPS-OO Network domain object
@@ -100,12 +126,13 @@ def get_network(neutron, network_settings=None, network_name=None,
networks = neutron.list_networks(**net_filter)
for network, netInsts in networks.items():
for inst in netInsts:
- return Network(**inst)
+ return __map_network(neutron, inst)
-def get_network_by_id(neutron, network_id):
+def __get_os_network_by_id(neutron, network_id):
"""
- Returns the network object (dictionary) with the given ID else None
+ Returns the OpenStack network object (dictionary) with the given ID else
+ None
:param neutron: the client
:param network_id: the id of the network to retrieve
:return: a SNAPS-OO Network domain object
@@ -113,18 +140,42 @@ def get_network_by_id(neutron, network_id):
networks = neutron.list_networks(**{'id': network_id})
for network in networks['networks']:
if network['id'] == network_id:
- return Network(**network)
+ return network
+
+
+def get_network_by_id(neutron, network_id):
+ """
+ Returns the SNAPS Network domain object for the given ID else None
+ :param neutron: the client
+ :param network_id: the id of the network to retrieve
+ :return: a SNAPS-OO Network domain object
+ """
+ os_network = __get_os_network_by_id(neutron, network_id)
+ if os_network:
+ return __map_network(neutron, os_network)
-def create_subnet(neutron, subnet_settings, os_creds, network=None):
+def __map_network(neutron, os_network):
+ """
+ Returns the network object (dictionary) with the given ID else None
+ :param neutron: the client
+ :param os_network: the OpenStack Network dict
+ :return: a SNAPS-OO Network domain object
+ """
+ subnets = get_subnets_by_network_id(neutron, os_network['id'])
+ os_network['subnets'] = subnets
+ return Network(**os_network)
+
+
+def create_subnet(neutron, subnet_settings, os_creds, network):
"""
Creates a network subnet for OpenStack
:param neutron: the client
- :param network: the network object
:param subnet_settings: A dictionary containing the subnet configuration
and is responsible for creating the subnet request
JSON body
:param os_creds: the OpenStack credentials
+ :param network: the network object
:return: a SNAPS-OO Subnet domain object
"""
if neutron and network and subnet_settings:
@@ -207,9 +258,19 @@ def get_subnets_by_network(neutron, network):
:param network: the SNAPS-OO Network domain object
:return: a list of Subnet objects
"""
+ return get_subnets_by_network_id(neutron, network.id)
+
+
+def get_subnets_by_network_id(neutron, network_id):
+ """
+ Returns a list of SNAPS-OO Subnet domain objects
+ :param neutron: the OpenStack neutron client
+ :param network_id: the subnet's ID
+ :return: a list of Subnet objects
+ """
out = list()
- os_subnets = neutron.list_subnets(network_id=network.id)
+ os_subnets = neutron.list_subnets(network_id=network_id)
for os_subnet in os_subnets['subnets']:
out.append(Subnet(**os_subnet))
@@ -231,7 +292,7 @@ def create_router(neutron, os_creds, router_settings):
json_body = router_settings.dict_for_neutron(neutron, os_creds)
logger.info('Creating router with name - ' + router_settings.name)
os_router = neutron.create_router(json_body)
- return Router(**os_router['router'])
+ return __map_router(neutron, os_router['router'])
else:
logger.error("Failed to create router.")
raise NeutronException('Failed to create router')
@@ -257,7 +318,7 @@ def get_router_by_id(neutron, router_id):
"""
router = neutron.show_router(router_id)
if router:
- return Router(**router['router'])
+ return __map_router(neutron, router['router'])
def get_router(neutron, router_settings=None, router_name=None):
@@ -266,7 +327,7 @@ def get_router(neutron, router_settings=None, router_name=None):
values if not None, else finds the first with the value of the router_name
parameter, else None
:param neutron: the client
- :param router_settings: the RouterSettings object
+ :param router_settings: the RouterConfig object
:param router_name: the name of the network to retrieve
:return: a SNAPS-OO Router domain object
"""
@@ -281,11 +342,41 @@ def get_router(neutron, router_settings=None, router_name=None):
return None
routers = neutron.list_routers(**router_filter)
+
for routerInst in routers['routers']:
- return Router(**routerInst)
+ return __map_router(neutron, routerInst)
+
return None
+def __map_router(neutron, os_router):
+ """
+ Takes an OpenStack router instance and maps it to a SNAPS Router domain
+ object
+ :param neutron: the neutron client
+ :param os_router: the OpenStack Router object
+ :return:
+ """
+ device_ports = neutron.list_ports(
+ **{'device_id': os_router['id']})['ports']
+ port_subnets = list()
+
+ # Order by create date
+ sorted_ports = sorted(
+ device_ports, key=lambda dev_port: dev_port['created_at'])
+
+ for port in sorted_ports:
+ subnets = list()
+ for fixed_ip in port['fixed_ips']:
+ subnet = get_subnet_by_id(neutron, fixed_ip['subnet_id'])
+ if subnet and subnet.network_id == port['network_id']:
+ subnets.append(subnet)
+ port_subnets.append((Port(**port), subnets))
+
+ os_router['port_subnets'] = port_subnets
+ return Router(**os_router)
+
+
def add_interface_router(neutron, router, subnet=None, port=None):
"""
Adds an interface router for OpenStack for either a subnet or port.
@@ -389,7 +480,7 @@ def get_port(neutron, port_settings=None, port_name=None):
"""
Returns the first port object (dictionary) found for the given query
:param neutron: the client
- :param port_settings: the PortSettings object used for generating the query
+ :param port_settings: the PortConfig object used for generating the query
:param port_name: if port_settings is None, this name is the value to place
into the query
:return: a SNAPS-OO Port domain object
@@ -408,7 +499,8 @@ def get_port(neutron, port_settings=None, port_name=None):
if port_settings.network_name:
network = get_network(neutron,
network_name=port_settings.network_name)
- port_filter['network_id'] = network.id
+ if network:
+ port_filter['network_id'] = network.id
elif port_name:
port_filter['name'] = port_name
@@ -467,7 +559,7 @@ def create_security_group(neutron, keystone, sec_grp_settings):
sec_grp_settings.name)
os_group = neutron.create_security_group(
sec_grp_settings.dict_for_neutron(keystone))
- return SecurityGroup(**os_group['security_group'])
+ return __map_os_security_group(neutron, os_group['security_group'])
def delete_security_group(neutron, sec_grp):
@@ -488,7 +580,7 @@ def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None,
the security group will be used, else if the query parameters are None then
None will be returned
:param neutron: the client
- :param sec_grp_settings: an instance of SecurityGroupSettings config object
+ :param sec_grp_settings: an instance of SecurityGroupConfig object
:param sec_grp_name: the name of security group object to retrieve
:param project_id: the ID of the project/tentant object that owns the
secuity group to retrieve
@@ -511,7 +603,20 @@ def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None,
groups = neutron.list_security_groups(**sec_grp_filter)
for group in groups['security_groups']:
- return SecurityGroup(**group)
+ return __map_os_security_group(neutron, group)
+
+
+def __map_os_security_group(neutron, os_sec_grp):
+ """
+ Creates a SecurityGroup SNAPS domain object from an OpenStack Security
+ Group dict
+ :param neutron: the neutron client for performing rule lookups
+ :param os_sec_grp: the OpenStack Security Group dict object
+ :return: a SecurityGroup object
+ """
+ os_sec_grp['rules'] = get_rules_by_security_group_id(
+ neutron, os_sec_grp['id'])
+ return SecurityGroup(**os_sec_grp)
def get_security_group_by_id(neutron, sec_grp_id):
@@ -526,13 +631,13 @@ def get_security_group_by_id(neutron, sec_grp_id):
groups = neutron.list_security_groups(**{'id': sec_grp_id})
for group in groups['security_groups']:
if group['id'] == sec_grp_id:
- return SecurityGroup(**group)
+ return __map_os_security_group(neutron, group)
return None
def create_security_group_rule(neutron, sec_grp_rule_settings):
"""
- Creates a security group object in OpenStack
+ Creates a security group rule in OpenStack
:param neutron: the client
:param sec_grp_rule_settings: the security group rule settings
:return: a SNAPS-OO SecurityGroupRule domain object
@@ -546,7 +651,7 @@ def create_security_group_rule(neutron, sec_grp_rule_settings):
def delete_security_group_rule(neutron, sec_grp_rule):
"""
- Deletes a security group object from OpenStack
+ Deletes a security group rule object from OpenStack
:param neutron: the client
:param sec_grp_rule: the SNAPS SecurityGroupRule object to delete
"""
@@ -561,20 +666,29 @@ def get_rules_by_security_group(neutron, sec_grp):
:param neutron: the client
:param sec_grp: a list of SNAPS SecurityGroupRule domain objects
"""
+ return get_rules_by_security_group_id(neutron, sec_grp.id)
+
+
+def get_rules_by_security_group_id(neutron, sec_grp_id):
+ """
+ Retrieves all of the rules for a given security group by it's ID
+ :param neutron: the client
+ :param sec_grp_id: the ID of the associated security group
+ """
logger.info('Retrieving security group rules associate with the '
- 'security group - %s', sec_grp.name)
+ 'security group with ID - %s', sec_grp_id)
out = list()
rules = neutron.list_security_group_rules(
- **{'security_group_id': sec_grp.id})
+ **{'security_group_id': sec_grp_id})
for rule in rules['security_group_rules']:
- if rule['security_group_id'] == sec_grp.id:
+ if rule['security_group_id'] == sec_grp_id:
out.append(SecurityGroupRule(**rule))
return out
def get_rule_by_id(neutron, sec_grp, rule_id):
"""
- Deletes a security group object from OpenStack
+ Returns a SecurityGroupRule object from OpenStack
:param neutron: the client
:param sec_grp: the SNAPS SecurityGroup domain object
:param rule_id: the rule's ID
@@ -598,7 +712,7 @@ def get_external_networks(neutron):
out = list()
for network in neutron.list_networks(
**{'router:external': True})['networks']:
- out.append(Network(**network))
+ out.append(__map_network(neutron, network))
return out
@@ -614,7 +728,7 @@ def get_floating_ips(neutron, ports=None):
:param ports: a list of tuple 2 where index 0 is the port name and index 1
is the SNAPS-OO Port object
:return: a list of tuple 2 (port_id, SNAPS FloatingIp) objects when ports
- is not None else a list of Port objects
+ is not None else a list of FloatingIp objects
"""
out = list()
fips = neutron.list_floatingips()
@@ -698,7 +812,7 @@ def delete_floating_ip(neutron, floating_ip):
def get_network_quotas(neutron, project_id):
"""
- Returns a list of all available keypairs
+ Returns a list of NetworkQuotas objects
:param neutron: the neutron client
:param project_id: the project's ID of the quotas to lookup
:return: an object of type NetworkQuotas or None if not found
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index 1665fd0..e15484c 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -15,12 +15,14 @@
import logging
+import enum
import os
+import time
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from novaclient.client import Client
-from novaclient.exceptions import NotFound
+from novaclient.exceptions import NotFound, ClientException
from snaps import file_utils
from snaps.domain.flavor import Flavor
@@ -51,22 +53,22 @@ def nova_client(os_creds):
region_name=os_creds.region_name)
-def create_server(nova, neutron, glance, instance_settings, image_settings,
- keypair_settings=None):
+def create_server(nova, neutron, glance, instance_config, image_config,
+ keypair_config=None):
"""
Creates a VM instance
:param nova: the nova client (required)
:param neutron: the neutron client for retrieving ports (required)
:param glance: the glance client (required)
- :param instance_settings: the VM instance settings object (required)
- :param image_settings: the VM's image settings object (required)
- :param keypair_settings: the VM's keypair settings object (optional)
+ :param instance_config: the VMInstConfig object (required)
+ :param image_config: the VM's ImageConfig object (required)
+ :param keypair_config: the VM's KeypairConfig object (optional)
:return: a snaps.domain.VmInst object
"""
ports = list()
- for port_setting in instance_settings.port_settings:
+ for port_setting in instance_config.port_settings:
ports.append(neutron_utils.get_port(
neutron, port_settings=port_setting))
nics = []
@@ -75,56 +77,57 @@ def create_server(nova, neutron, glance, instance_settings, image_settings,
kv['port-id'] = port.id
nics.append(kv)
- logger.info('Creating VM with name - ' + instance_settings.name)
+ logger.info('Creating VM with name - ' + instance_config.name)
keypair_name = None
- if keypair_settings:
- keypair_name = keypair_settings.name
+ if keypair_config:
+ keypair_name = keypair_config.name
- flavor = get_flavor_by_name(nova, instance_settings.flavor)
+ flavor = get_flavor_by_name(nova, instance_config.flavor)
if not flavor:
raise NovaException(
- 'Flavor not found with name - %s', instance_settings.flavor)
+ 'Flavor not found with name - %s', instance_config.flavor)
- image = glance_utils.get_image(glance, image_settings=image_settings)
+ image = glance_utils.get_image(glance, image_settings=image_config)
if image:
userdata = None
- if instance_settings.userdata:
- if isinstance(instance_settings.userdata, str):
- userdata = instance_settings.userdata + '\n'
- elif (isinstance(instance_settings.userdata, dict) and
- 'script_file' in instance_settings.userdata):
+ if instance_config.userdata:
+ if isinstance(instance_config.userdata, str):
+ userdata = instance_config.userdata + '\n'
+ elif (isinstance(instance_config.userdata, dict) and
+ 'script_file' in instance_config.userdata):
try:
userdata = file_utils.read_file(
- instance_settings.userdata['script_file'])
+ instance_config.userdata['script_file'])
except Exception as e:
logger.warn('error reading userdata file %s - %s',
- instance_settings.userdata, e)
- args = {'name': instance_settings.name,
+ instance_config.userdata, e)
+ args = {'name': instance_config.name,
'flavor': flavor,
'image': image,
'nics': nics,
'key_name': keypair_name,
'security_groups':
- instance_settings.security_group_names,
+ instance_config.security_group_names,
'userdata': userdata}
- if instance_settings.availability_zone:
- args['availability_zone'] = instance_settings.availability_zone
+ if instance_config.availability_zone:
+ args['availability_zone'] = instance_config.availability_zone
server = nova.servers.create(**args)
- return __map_os_server_obj_to_vm_inst(server)
+ return __map_os_server_obj_to_vm_inst(neutron, server)
else:
raise NovaException(
'Cannot create instance, image cannot be located with name %s',
- image_settings.name)
+ image_config.name)
-def get_server(nova, vm_inst_settings=None, server_name=None):
+def get_server(nova, neutron, vm_inst_settings=None, server_name=None):
"""
Returns a VmInst object for the first server instance found.
:param nova: the Nova client
- :param vm_inst_settings: the VmInstanceSettings object from which to build
+ :param neutron: the Neutron client
+ :param vm_inst_settings: the VmInstanceConfig object from which to build
the query if not None
:param server_name: the server with this name to return if vm_inst_settings
is not None
@@ -138,12 +141,34 @@ def get_server(nova, vm_inst_settings=None, server_name=None):
servers = nova.servers.list(search_opts=search_opts)
for server in servers:
- return __map_os_server_obj_to_vm_inst(server)
+ return __map_os_server_obj_to_vm_inst(neutron, server)
-def __map_os_server_obj_to_vm_inst(os_server):
+def get_server_connection(nova, vm_inst_settings=None, server_name=None):
+ """
+ Returns a VmInst object for the first server instance found.
+ :param nova: the Nova client
+ :param vm_inst_settings: the VmInstanceConfig object from which to build
+ the query if not None
+ :param server_name: the server with this name to return if vm_inst_settings
+ is not None
+ :return: a snaps.domain.VmInst object or None if not found
+ """
+ search_opts = dict()
+ if vm_inst_settings:
+ search_opts['name'] = vm_inst_settings.name
+ elif server_name:
+ search_opts['name'] = server_name
+
+ servers = nova.servers.list(search_opts=search_opts)
+ for server in servers:
+ return server.links[0]
+
+
+def __map_os_server_obj_to_vm_inst(neutron, os_server):
"""
Returns a VmInst object for an OpenStack Server object
+ :param neutron: the Neutron client (when None, ports will be empty)
:param os_server: the OpenStack server object
:return: an equivalent SNAPS-OO VmInst domain object
"""
@@ -154,11 +179,23 @@ def __map_os_server_obj_to_vm_inst(os_server):
if sec_group.get('name'):
sec_grp_names.append(sec_group.get('name'))
+ out_ports = list()
+ if len(os_server.networks) > 0:
+ for net_name, ips in os_server.networks.items():
+ network = neutron_utils.get_network(neutron, network_name=net_name)
+ ports = neutron_utils.get_ports(neutron, network, ips)
+ for port in ports:
+ out_ports.append(port)
+
+ volumes = None
+ if hasattr(os_server, 'os-extended-volumes:volumes_attached'):
+ volumes = getattr(os_server, 'os-extended-volumes:volumes_attached')
+
return VmInst(
name=os_server.name, inst_id=os_server.id,
image_id=os_server.image['id'], flavor_id=os_server.flavor['id'],
- networks=os_server.networks, keypair_name=os_server.key_name,
- sec_grp_names=sec_grp_names)
+ ports=out_ports, keypair_name=os_server.key_name,
+ sec_grp_names=sec_grp_names, volume_ids=volumes)
def __get_latest_server_os_object(nova, server):
@@ -207,26 +244,28 @@ def get_server_console_output(nova, server):
return None
-def get_latest_server_object(nova, server):
+def get_latest_server_object(nova, neutron, server):
"""
Returns a server with a given id
:param nova: the Nova client
+ :param neutron: the Neutron client
:param server: the old server object
:return: the list of servers or None if not found
"""
server = __get_latest_server_os_object(nova, server)
- return __map_os_server_obj_to_vm_inst(server)
+ return __map_os_server_obj_to_vm_inst(neutron, server)
-def get_server_object_by_id(nova, server_id):
+def get_server_object_by_id(nova, neutron, server_id):
"""
Returns a server with a given id
:param nova: the Nova client
+ :param neutron: the Neutron client
:param server_id: the server's id
:return: an SNAPS-OO VmInst object or None if not found
"""
server = __get_latest_server_os_object_by_id(nova, server_id)
- return __map_os_server_obj_to_vm_inst(server)
+ return __map_os_server_obj_to_vm_inst(neutron, server)
def get_server_security_group_names(nova, server):
@@ -256,6 +295,22 @@ def get_server_info(nova, server):
return None
+def reboot_server(nova, server, reboot_type=None):
+ """
+ Returns a dictionary of a VMs info as returned by OpenStack
+ :param nova: the Nova client
+ :param server: the old server object
+ :param reboot_type: Acceptable values 'SOFT', 'HARD'
+ (api uses SOFT as the default)
+ :return: a dict of the info if VM exists else None
+ """
+ vm = __get_latest_server_os_object(nova, server)
+ if vm:
+ vm.reboot(reboot_type=reboot_type.value)
+ else:
+ raise ServerNotFoundError('Cannot locate server')
+
+
def create_keys(key_size=2048):
"""
Generates public and private keys
@@ -393,6 +448,18 @@ def get_keypair_by_name(nova, name):
return None
+def get_keypair_by_id(nova, kp_id):
+ """
+ Returns a list of all available keypairs
+ :param nova: the Nova client
+ :param kp_id: the ID of the keypair to return
+ :return: the keypair object
+ """
+ keypair = nova.keypairs.get(kp_id)
+ return Keypair(name=keypair.name, kp_id=keypair.id,
+ public_key=keypair.public_key)
+
+
def delete_keypair(nova, key):
"""
Deletes a keypair object from OpenStack
@@ -421,6 +488,21 @@ def get_availability_zone_hosts(nova, zone_name='nova'):
return out
+def get_hypervisor_hosts(nova):
+ """
+ Returns the host names of all nova nodes with active hypervisors
+ :param nova: the Nova client
+ :return: a list of hypervisor host names
+ """
+ out = list()
+ hypervisors = nova.hypervisors.list()
+ for hypervisor in hypervisors:
+ if hypervisor.state == "up":
+ out.append(hypervisor.hypervisor_hostname)
+
+ return out
+
+
def delete_vm_instance(nova, vm_inst):
"""
Deletes a VM instance
@@ -567,7 +649,18 @@ def add_security_group(nova, vm, security_group_name):
:param vm: the OpenStack server object (VM) to alter
:param security_group_name: the name of the security group to add
"""
- nova.servers.add_security_group(str(vm.id), security_group_name)
+ try:
+ nova.servers.add_security_group(str(vm.id), security_group_name)
+ except ClientException as e:
+ sec_grp_names = get_server_security_group_names(nova, vm)
+ if security_group_name in sec_grp_names:
+ logger.warn('Security group [%s] already added to VM [%s]',
+ security_group_name, vm.name)
+ return
+
+ logger.error('Unexpected error while adding security group [%s] - %s',
+ security_group_name, e)
+ raise
def remove_security_group(nova, vm, security_group):
@@ -618,7 +711,8 @@ def update_quotas(nova, project_id, compute_quotas):
update_values['cores'] = compute_quotas.cores
update_values['instances'] = compute_quotas.instances
update_values['injected_files'] = compute_quotas.injected_files
- update_values['injected_file_content_bytes'] = compute_quotas.injected_file_content_bytes
+ update_values['injected_file_content_bytes'] = (
+ compute_quotas.injected_file_content_bytes)
update_values['ram'] = compute_quotas.ram
update_values['fixed_ips'] = compute_quotas.fixed_ips
update_values['key_pairs'] = compute_quotas.key_pairs
@@ -626,7 +720,78 @@ def update_quotas(nova, project_id, compute_quotas):
return nova.quotas.update(project_id, **update_values)
+def attach_volume(nova, neutron, server, volume, timeout=None):
+ """
+ Attaches a volume to a server
+ :param nova: the nova client
+ :param neutron: the neutron client
+ :param server: the VMInst domain object
+ :param volume: the Volume domain object
+ :param timeout: denotes the amount of time to block to determine if the
+ has been properly attached. When None, do not wait.
+ :return: the value from the nova call
+ """
+ nova.volumes.create_server_volume(server.id, volume.id)
+
+ if timeout:
+ start_time = time.time()
+ while time.time() < start_time + timeout:
+ vm = get_server_object_by_id(nova, neutron, server.id)
+ for vol_dict in vm.volume_ids:
+ if volume.id == vol_dict['id']:
+ return vm
+
+ return None
+ else:
+ return get_server_object_by_id(nova, neutron, server.id)
+
+
+def detach_volume(nova, neutron, server, volume, timeout=None):
+ """
+ Attaches a volume to a server
+ :param nova: the nova client
+ :param neutron: the neutron client
+ :param server: the VMInst domain object
+ :param volume: the Volume domain object
+ :param timeout: denotes the amount of time to block to determine if the
+ has been properly detached. When None, do not wait.
+ :return: the value from the nova call
+ """
+ nova.volumes.delete_server_volume(server.id, volume.id)
+
+ if timeout:
+ start_time = time.time()
+ while time.time() < start_time + timeout:
+ vm = get_server_object_by_id(nova, neutron, server.id)
+ found = False
+ for vol_dict in vm.volume_ids:
+ if volume.id == vol_dict['id']:
+ found = True
+
+ if not found:
+ return vm
+
+ return None
+ else:
+ return get_server_object_by_id(nova, neutron, server.id)
+
+
+class RebootType(enum.Enum):
+ """
+ A rule's direction
+ """
+ soft = 'SOFT'
+ hard = 'HARD'
+
+
class NovaException(Exception):
"""
Exception when calls to the Keystone client cannot be served properly
"""
+
+
+class ServerNotFoundError(Exception):
+ """
+ Exception when operations to a VM/Server is requested and the OpenStack
+ Server instance cannot be located
+ """
diff --git a/snaps/openstack/utils/settings_utils.py b/snaps/openstack/utils/settings_utils.py
index 7f00075..2cf6047 100644
--- a/snaps/openstack/utils/settings_utils.py
+++ b/snaps/openstack/utils/settings_utils.py
@@ -15,30 +15,59 @@
import uuid
from snaps import file_utils
-from snaps.openstack.create_instance import (
- VmInstanceSettings, FloatingIpSettings)
-from snaps.openstack.create_keypairs import KeypairSettings
-from snaps.openstack.create_network import (
- PortSettings, SubnetSettings, NetworkSettings)
+from snaps.config.flavor import FlavorConfig
+from snaps.config.keypair import KeypairConfig
+from snaps.config.network import SubnetConfig, PortConfig, NetworkConfig
+from snaps.config.router import RouterConfig
+from snaps.config.security_group import (
+ SecurityGroupRuleConfig, SecurityGroupConfig)
+from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
+from snaps.config.volume import VolumeConfig
+from snaps.config.volume_type import (
+ ControlLocation, VolumeTypeEncryptionConfig, VolumeTypeConfig)
from snaps.openstack.utils import (
neutron_utils, nova_utils, heat_utils, glance_utils)
-def create_network_settings(neutron, network):
+def create_network_config(neutron, network):
"""
- Returns a NetworkSettings object
+ Returns a NetworkConfig object
:param neutron: the neutron client
:param network: a SNAPS-OO Network domain object
:return:
"""
- return NetworkSettings(
+ return NetworkConfig(
name=network.name, network_type=network.type,
- subnet_settings=create_subnet_settings(neutron, network))
+ subnet_settings=create_subnet_config(neutron, network))
-def create_subnet_settings(neutron, network):
+def create_security_group_config(neutron, security_group):
"""
- Returns a list of SubnetSettings objects for a given network
+ Returns a SecurityGroupConfig object
+ :param neutron: the neutron client
+ :param security_group: a SNAPS-OO SecurityGroup domain object
+ :return:
+ """
+ rules = neutron_utils.get_rules_by_security_group(neutron, security_group)
+
+ rule_settings = list()
+ for rule in rules:
+ rule_settings.append(SecurityGroupRuleConfig(
+ sec_grp_name=security_group.name, description=rule.description,
+ direction=rule.direction, ethertype=rule.ethertype,
+ port_range_min=rule.port_range_min,
+ port_range_max=rule.port_range_max, protocol=rule.protocol,
+ remote_group_id=rule.remote_group_id,
+ remote_ip_prefix=rule.remote_ip_prefix))
+
+ return SecurityGroupConfig(
+ name=security_group.name, description=security_group.description,
+ rule_settings=rule_settings)
+
+
+def create_subnet_config(neutron, network):
+ """
+ Returns a list of SubnetConfig objects for a given network
:param neutron: the OpenStack neutron client
:param network: the SNAPS-OO Network domain object
:return: a list
@@ -59,13 +88,147 @@ def create_subnet_settings(neutron, network):
kwargs['host_routes'] = subnet.host_routes
kwargs['ipv6_ra_mode'] = subnet.ipv6_ra_mode
kwargs['ipv6_address_mode'] = subnet.ipv6_address_mode
- out.append(SubnetSettings(**kwargs))
+ out.append(SubnetConfig(**kwargs))
return out
-def create_vm_inst_settings(nova, neutron, server):
+def create_router_config(neutron, router):
+ """
+ Returns a RouterConfig object
+ :param neutron: the neutron client
+ :param router: a SNAPS-OO Router domain object
+ :return:
+ """
+ ext_net_name = None
+
+ if router.external_network_id:
+ network = neutron_utils.get_network_by_id(
+ neutron, router.external_network_id)
+ if network:
+ ext_net_name = network.name
+
+ out_ports = list()
+ if router.port_subnets:
+ for port, subnets in router.port_subnets:
+ network = neutron_utils.get_network_by_id(
+ neutron, port.network_id)
+
+ ip_addrs = list()
+ if network and router.external_fixed_ips:
+ for ext_fixed_ips in router.external_fixed_ips:
+ for subnet in subnets:
+ if ext_fixed_ips['subnet_id'] == subnet.id:
+ ip_addrs.append(ext_fixed_ips['ip_address'])
+ else:
+ for ip in port.ips:
+ ip_addrs.append(ip['ip_address'])
+
+ ports = neutron_utils.get_ports(neutron, network, ip_addrs)
+ for out_port in ports:
+ out_ports.append(out_port)
+
+ port_settings = __create_port_configs(neutron, out_ports)
+
+ filtered_settings = list()
+ for port_setting in port_settings:
+ if port_setting.network_name != ext_net_name:
+ filtered_settings.append(port_setting)
+
+ return RouterConfig(
+ name=router.name, external_gateway=ext_net_name,
+ admin_state_up=router.admin_state_up,
+ port_settings=filtered_settings)
+
+
+def create_volume_config(volume):
+ """
+ Returns a VolumeConfig object
+ :param volume: a SNAPS-OO Volume object
+ """
+
+ return VolumeConfig(
+ name=volume.name, description=volume.description,
+ size=volume.size, type_name=volume.type,
+ availability_zone=volume.availability_zone,
+ multi_attach=volume.multi_attach)
+
+
+def create_volume_type_config(volume_type):
"""
- Returns a NetworkSettings object
+ Returns a VolumeTypeConfig object
+ :param volume_type: a SNAPS-OO VolumeType object
+ """
+
+ control = None
+ if volume_type.encryption:
+ if (volume_type.encryption.control_location
+ == ControlLocation.front_end.value):
+ control = ControlLocation.front_end
+ else:
+ control = ControlLocation.back_end
+
+ if volume_type and volume_type.encryption:
+ encrypt_settings = VolumeTypeEncryptionConfig(
+ name=volume_type.encryption.__class__,
+ provider_class=volume_type.encryption.provider,
+ control_location=control,
+ cipher=volume_type.encryption.cipher,
+ key_size=volume_type.encryption.key_size)
+ else:
+ encrypt_settings = None
+
+ qos_spec_name = None
+ if volume_type.qos_spec:
+ qos_spec_name = volume_type.qos_spec.name
+
+ return VolumeTypeConfig(
+ name=volume_type.name, encryption=encrypt_settings,
+ qos_spec_name=qos_spec_name, public=volume_type.public)
+
+
+def create_flavor_config(flavor):
+ """
+ Returns a FlavorConfig object
+ :param flavor: a FlavorConfig object
+ """
+ return FlavorConfig(
+ name=flavor.name, flavor_id=flavor.id, ram=flavor.ram,
+ disk=flavor.disk, vcpus=flavor.vcpus, ephemeral=flavor.ephemeral,
+ swap=flavor.swap, rxtx_factor=flavor.rxtx_factor,
+ is_public=flavor.is_public)
+
+
+def create_keypair_config(heat_cli, stack, keypair, pk_output_key):
+ """
+ Instantiates a KeypairConfig object from a Keypair domain objects
+ :param heat_cli: the heat client
+ :param stack: the Stack domain object
+ :param keypair: the Keypair SNAPS domain object
+ :param pk_output_key: the key to the heat template's outputs for retrieval
+ of the private key file
+ :return: a KeypairConfig object
+ """
+ if pk_output_key:
+ outputs = heat_utils.get_outputs(heat_cli, stack)
+ for output in outputs:
+ if output.key == pk_output_key:
+ # Save to file
+ guid = uuid.uuid4()
+ key_file = file_utils.save_string_to_file(
+ output.value, str(guid), 0o400)
+
+ # Use outputs, file and resources for the KeypairConfig
+ return KeypairConfig(
+ name=keypair.name, private_filepath=key_file.name)
+
+ return KeypairConfig(name=keypair.name)
+
+
+def create_vm_inst_config(nova, neutron, server):
+ """
+ Returns a VmInstanceConfig object
+ note: if the server instance is not active, the PortSettings objects will
+ not be generated resulting in an invalid configuration
:param nova: the nova client
:param neutron: the neutron client
:param server: a SNAPS-OO VmInst domain object
@@ -77,29 +240,34 @@ def create_vm_inst_settings(nova, neutron, server):
kwargs = dict()
kwargs['name'] = server.name
kwargs['flavor'] = flavor_name
- kwargs['port_settings'] = __create_port_settings(
- neutron, server.networks)
+
+ kwargs['port_settings'] = __create_port_configs(neutron, server.ports)
kwargs['security_group_names'] = server.sec_grp_names
- kwargs['floating_ip_settings'] = __create_floatingip_settings(
+ kwargs['floating_ip_settings'] = __create_floatingip_config(
neutron, kwargs['port_settings'])
- return VmInstanceSettings(**kwargs)
+ return VmInstanceConfig(**kwargs)
-def __create_port_settings(neutron, networks):
+def __create_port_configs(neutron, ports):
"""
- Returns a list of port settings based on the networks parameter
+ Returns a list of PortConfig objects based on the networks parameter
:param neutron: the neutron client
- :param networks: a dict where the key is the network name and the value
- is a list of IP addresses
+ :param ports: a list of SNAPS-OO Port domain objects
:return:
"""
out = list()
- for net_name, ips in networks.items():
- network = neutron_utils.get_network(neutron, network_name=net_name)
- ports = neutron_utils.get_ports(neutron, network, ips)
- for port in ports:
+ for port in ports:
+ if port.device_owner != 'network:dhcp':
+ ip_addrs = list()
+ for ip_dict in port.ips:
+ subnet = neutron_utils.get_subnet_by_id(
+ neutron, ip_dict['subnet_id'])
+ ip_addrs.append({'subnet_name': subnet.name,
+ 'ip': ip_dict['ip_address']})
+
+ network = neutron_utils.get_network_by_id(neutron, port.network_id)
kwargs = dict()
if port.name:
kwargs['name'] = port.name
@@ -107,18 +275,19 @@ def __create_port_settings(neutron, networks):
kwargs['mac_address'] = port.mac_address
kwargs['allowed_address_pairs'] = port.allowed_address_pairs
kwargs['admin_state_up'] = port.admin_state_up
- out.append(PortSettings(**kwargs))
+ kwargs['ip_addrs'] = ip_addrs
+ out.append(PortConfig(**kwargs))
return out
-def __create_floatingip_settings(neutron, port_settings):
+def __create_floatingip_config(neutron, port_settings):
"""
- Returns a list of FloatingIPSettings objects as they pertain to an
+ Returns a list of FloatingIpConfig objects as they pertain to an
existing deployed server instance
:param neutron: the neutron client
- :param port_settings: list of SNAPS-OO PortSettings objects
- :return: a list of FloatingIPSettings objects or an empty list if no
+ :param port_settings: list of SNAPS-OO PortConfig objects
+ :return: a list of FloatingIpConfig objects or an empty list if no
floating IPs have been created
"""
base_fip_name = 'fip-'
@@ -160,21 +329,21 @@ def __create_floatingip_settings(neutron, port_settings):
if subnet:
kwargs['subnet_name'] = subnet.name
- out.append(FloatingIpSettings(**kwargs))
+ out.append(FloatingIpConfig(**kwargs))
fip_ctr += 1
return out
-def determine_image_settings(glance, server, image_settings):
+def determine_image_config(glance, server, image_settings):
"""
- Returns a ImageSettings object from the list that matches the name in one
+ Returns a ImageConfig object from the list that matches the name in one
of the image_settings parameter
:param glance: the glance client
:param server: a SNAPS-OO VmInst domain object
- :param image_settings: list of ImageSettings objects
- :return: ImageSettings or None
+ :param image_settings: list of ImageConfig objects
+ :return: ImageConfig or None
"""
if image_settings:
for image_setting in image_settings:
@@ -183,20 +352,20 @@ def determine_image_settings(glance, server, image_settings):
return image_setting
-def determine_keypair_settings(heat_cli, stack, server, keypair_settings=None,
- priv_key_key=None):
+def determine_keypair_config(heat_cli, stack, server, keypair_settings=None,
+ priv_key_key=None):
"""
- Returns a KeypairSettings object from the list that matches the
+ Returns a KeypairConfig object from the list that matches the
server.keypair_name value in the keypair_settings parameter if not None,
else if the output_key is not None, the output's value when contains the
string 'BEGIN RSA PRIVATE KEY', this value will be stored into a file and
- encoded into the KeypairSettings object returned
+ encoded into the KeypairConfig object returned
:param heat_cli: the OpenStack heat client
:param stack: a SNAPS-OO Stack domain object
:param server: a SNAPS-OO VmInst domain object
- :param keypair_settings: list of KeypairSettings objects
+ :param keypair_settings: list of KeypairConfig objects
:param priv_key_key: the stack options that holds the private key value
- :return: KeypairSettings or None
+ :return: KeypairConfig or None
"""
# Existing keypair being used by Heat Template
if keypair_settings:
@@ -214,6 +383,6 @@ def determine_keypair_settings(heat_cli, stack, server, keypair_settings=None,
key_file = file_utils.save_string_to_file(
output.value, str(guid), 0o400)
- # Use outputs, file and resources for the KeypairSettings
- return KeypairSettings(
+ # Use outputs, file and resources for the KeypairConfig
+ return KeypairConfig(
name=server.keypair_name, private_filepath=key_file.name)
diff --git a/snaps/openstack/utils/tests/cinder_utils_tests.py b/snaps/openstack/utils/tests/cinder_utils_tests.py
index a45167e..b624b09 100644
--- a/snaps/openstack/utils/tests/cinder_utils_tests.py
+++ b/snaps/openstack/utils/tests/cinder_utils_tests.py
@@ -15,11 +15,15 @@
import logging
import uuid
+import time
from cinderclient.exceptions import NotFound, BadRequest
-from snaps.openstack.create_qos import QoSSettings, Consumer
-from snaps.openstack.create_volume_type import (
- VolumeTypeSettings, VolumeTypeEncryptionSettings, ControlLocation)
+from snaps.config.volume import VolumeConfig
+from snaps.config.volume_type import (
+ VolumeTypeConfig, ControlLocation, VolumeTypeEncryptionConfig)
+from snaps.config.qos import Consumer, QoSConfig
+from snaps.openstack import create_volume
+from snaps.openstack.create_qos import Consumer
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import cinder_utils
@@ -57,6 +61,113 @@ class CinderSmokeTests(OSComponentTestCase):
cinder.volumes.list()
+class CinderUtilsVolumeTests(OSComponentTestCase):
+ """
+ Test for the CreateVolume class defined in create_volume.py
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateVolume object that is responsible for
+ downloading and creating an OS volume file within OpenStack
+ """
+ guid = uuid.uuid4()
+ self.volume_name = self.__class__.__name__ + '-' + str(guid)
+ self.volume = None
+ self.cinder = cinder_utils.cinder_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.volume:
+ try:
+ cinder_utils.delete_volume(self.cinder, self.volume)
+ except NotFound:
+ pass
+
+ self.assertTrue(volume_deleted(self.cinder, self.volume))
+
+ def test_create_simple_volume(self):
+ """
+ Tests the cinder_utils.create_volume()
+ """
+ volume_settings = VolumeConfig(name=self.volume_name)
+ self.volume = cinder_utils.create_volume(
+ self.cinder, volume_settings)
+ self.assertIsNotNone(self.volume)
+ self.assertEqual(self.volume_name, self.volume.name)
+
+ self.assertTrue(volume_active(self.cinder, self.volume))
+
+ volume = cinder_utils.get_volume(
+ self.cinder, volume_settings=volume_settings)
+ self.assertIsNotNone(volume)
+ validation_utils.objects_equivalent(self.volume, volume)
+
+ def test_create_delete_volume(self):
+ """
+ Tests the cinder_utils.create_volume()
+ """
+ volume_settings = VolumeConfig(name=self.volume_name)
+ self.volume = cinder_utils.create_volume(
+ self.cinder, volume_settings)
+ self.assertIsNotNone(self.volume)
+ self.assertEqual(self.volume_name, self.volume.name)
+
+ self.assertTrue(volume_active(self.cinder, self.volume))
+
+ volume = cinder_utils.get_volume(
+ self.cinder, volume_settings=volume_settings)
+ self.assertIsNotNone(volume)
+ validation_utils.objects_equivalent(self.volume, volume)
+
+ cinder_utils.delete_volume(self.cinder, self.volume)
+ self.assertTrue(volume_deleted(self.cinder, self.volume))
+ self.assertIsNone(
+ cinder_utils.get_volume(self.cinder, volume_settings))
+
+
+def volume_active(cinder, volume):
+ """
+ Returns true if volume becomes active
+ :param cinder:
+ :param volume:
+ :return:
+ """
+ end_time = time.time() + create_volume.VOLUME_ACTIVE_TIMEOUT
+ while time.time() < end_time:
+ status = cinder_utils.get_volume_status(cinder, volume)
+ if status == create_volume.STATUS_ACTIVE:
+ return True
+ elif status == create_volume.STATUS_FAILED:
+ return False
+ time.sleep(3)
+
+ return False
+
+
+def volume_deleted(cinder, volume):
+ """
+ Returns true if volume becomes active
+ :param cinder:
+ :param volume:
+ :return:
+ """
+ end_time = time.time() + create_volume.VOLUME_ACTIVE_TIMEOUT
+ while time.time() < end_time:
+ try:
+ status = cinder_utils.get_volume_status(cinder, volume)
+ if status == create_volume.STATUS_DELETED:
+ return True
+ except NotFound:
+ return True
+
+ time.sleep(3)
+
+ return False
+
+
class CinderUtilsQoSTests(OSComponentTestCase):
"""
Test for the CreateQos class defined in create_qos.py
@@ -86,8 +197,8 @@ class CinderUtilsQoSTests(OSComponentTestCase):
"""
Tests the cinder_utils.create_qos()
"""
- qos_settings = QoSSettings(name=self.qos_name, specs=self.specs,
- consumer=Consumer.both)
+ qos_settings = QoSConfig(
+ name=self.qos_name, specs=self.specs, consumer=Consumer.both)
self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
self.assertIsNotNone(self.qos)
@@ -103,8 +214,8 @@ class CinderUtilsQoSTests(OSComponentTestCase):
"""
Tests the cinder_utils.create_qos()
"""
- qos_settings = QoSSettings(name=self.qos_name, specs=self.specs,
- consumer=Consumer.front_end)
+ qos_settings = QoSConfig(
+ name=self.qos_name, specs=self.specs, consumer=Consumer.front_end)
self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
self.assertIsNotNone(self.qos)
@@ -120,8 +231,8 @@ class CinderUtilsQoSTests(OSComponentTestCase):
"""
Tests the cinder_utils.create_qos()
"""
- qos_settings = QoSSettings(name=self.qos_name, specs=self.specs,
- consumer=Consumer.back_end)
+ qos_settings = QoSConfig(
+ name=self.qos_name, specs=self.specs, consumer=Consumer.back_end)
self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
self.assertIsNotNone(self.qos)
@@ -137,7 +248,7 @@ class CinderUtilsQoSTests(OSComponentTestCase):
"""
Tests the cinder_utils.create_qos()
"""
- qos_settings = QoSSettings(name=self.qos_name, consumer=Consumer.both)
+ qos_settings = QoSConfig(name=self.qos_name, consumer=Consumer.both)
self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
self.assertIsNotNone(self.qos)
self.assertEqual(self.qos_name, self.qos.name)
@@ -165,7 +276,7 @@ class CinderUtilsSimpleVolumeTypeTests(OSComponentTestCase):
"""
guid = uuid.uuid4()
volume_type_name = self.__class__.__name__ + '-' + str(guid)
- self.volume_type_settings = VolumeTypeSettings(name=volume_type_name)
+ self.volume_type_settings = VolumeTypeConfig(name=volume_type_name)
self.volume_type = None
self.cinder = cinder_utils.cinder_client(self.os_creds)
@@ -239,7 +350,7 @@ class CinderUtilsAddEncryptionTests(OSComponentTestCase):
volume_type_name = self.__class__.__name__ + '-' + str(guid) + '-type'
self.volume_type = cinder_utils.create_volume_type(
- self.cinder, VolumeTypeSettings(name=volume_type_name))
+ self.cinder, VolumeTypeConfig(name=volume_type_name))
def tearDown(self):
"""
@@ -263,7 +374,7 @@ class CinderUtilsAddEncryptionTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_encryption(),
get_volume_encryption(), and get_volume_encryption_by_id()
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name=self.encryption_name, provider_class='foo',
control_location=ControlLocation.front_end)
self.encryption = cinder_utils.create_volume_encryption(
@@ -281,7 +392,7 @@ class CinderUtilsAddEncryptionTests(OSComponentTestCase):
"""
Primarily tests the cinder_utils.delete_volume_type_encryption()
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name=self.encryption_name, provider_class='LuksEncryptor',
control_location=ControlLocation.back_end)
self.encryption = cinder_utils.create_volume_encryption(
@@ -307,7 +418,7 @@ class CinderUtilsAddEncryptionTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_encryption() with all valid
settings
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name=self.encryption_name, provider_class='foo',
cipher='bar', control_location=ControlLocation.back_end,
key_size=1)
@@ -329,7 +440,7 @@ class CinderUtilsAddEncryptionTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_encryption() raises an exception
when the provider class does not exist
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name=self.encryption_name, provider_class='foo',
cipher='bar', control_location=ControlLocation.back_end,
key_size=-1)
@@ -353,8 +464,8 @@ class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
self.vol_type_name = self.__class__.__name__ + '-' + str(guid)
self.specs = {'foo': 'bar'}
self.cinder = cinder_utils.cinder_client(self.os_creds)
- qos_settings = QoSSettings(name=self.qos_name, specs=self.specs,
- consumer=Consumer.both)
+ qos_settings = QoSConfig(
+ name=self.qos_name, specs=self.specs, consumer=Consumer.both)
self.qos = cinder_utils.create_qos(self.cinder, qos_settings)
self.volume_type = None
@@ -385,10 +496,10 @@ class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_type() where encryption has been
configured
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
- volume_type_settings = VolumeTypeSettings(
+ volume_type_settings = VolumeTypeConfig(
name=self.vol_type_name, encryption=encryption_settings)
self.volume_type = cinder_utils.create_volume_type(
self.cinder, volume_type_settings)
@@ -404,7 +515,7 @@ class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
"""
Tests the cinder_utils.create_volume_type() with an associated QoS Spec
"""
- volume_type_settings = VolumeTypeSettings(
+ volume_type_settings = VolumeTypeConfig(
name=self.vol_type_name, qos_spec_name=self.qos_name)
self.volume_type = cinder_utils.create_volume_type(
self.cinder, volume_type_settings)
@@ -419,7 +530,7 @@ class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_type() when the QoS Spec name
does not exist
"""
- volume_type_settings = VolumeTypeSettings(
+ volume_type_settings = VolumeTypeConfig(
name=self.vol_type_name, qos_spec_name='foo')
self.volume_type = cinder_utils.create_volume_type(
@@ -432,10 +543,10 @@ class CinderUtilsVolumeTypeCompleteTests(OSComponentTestCase):
Tests the cinder_utils.create_volume_type() with encryption and an
associated QoS Spec
"""
- encryption_settings = VolumeTypeEncryptionSettings(
+ encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
- volume_type_settings = VolumeTypeSettings(
+ volume_type_settings = VolumeTypeConfig(
name=self.vol_type_name, qos_spec_name=self.qos_name,
encryption=encryption_settings)
self.volume_type = cinder_utils.create_volume_type(
diff --git a/snaps/openstack/utils/tests/heat_utils_tests.py b/snaps/openstack/utils/tests/heat_utils_tests.py
index 44235ff..67fbdec 100644
--- a/snaps/openstack/utils/tests/heat_utils_tests.py
+++ b/snaps/openstack/utils/tests/heat_utils_tests.py
@@ -13,21 +13,25 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import os
+
import pkg_resources
import uuid
import time
-from snaps.openstack import create_stack
-from snaps.openstack.create_flavor import OpenStackFlavor, FlavorSettings
+import snaps.config.stack as stack_config
+from snaps.config.flavor import FlavorConfig
+from snaps.openstack.create_flavor import OpenStackFlavor
from snaps.openstack.create_image import OpenStackImage
from snaps.openstack.create_instance import OpenStackVmInstance
-from snaps.openstack.create_stack import StackSettings
+from snaps.openstack.create_stack import StackConfig
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
- heat_utils, neutron_utils, nova_utils, settings_utils, glance_utils)
+ heat_utils, neutron_utils, nova_utils, settings_utils, glance_utils,
+ cinder_utils)
__author__ = 'spisarski'
@@ -93,7 +97,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
# Create Flavor
self.flavor_creator = OpenStackFlavor(
self.os_creds,
- FlavorSettings(name=guid + '-flavor', ram=256, disk=10, vcpus=1))
+ FlavorConfig(name=guid + '-flavor', ram=256, disk=10, vcpus=1))
self.flavor_creator.create()
env_values = {'image_name': self.image_creator.image_settings.name,
@@ -103,10 +107,10 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
'inst_name': self.vm_inst_name}
heat_tmplt_path = pkg_resources.resource_filename(
'snaps.openstack.tests.heat', 'test_heat_template.yaml')
- self.stack_settings1 = StackSettings(
+ self.stack_settings1 = StackConfig(
name=stack_name1, template_path=heat_tmplt_path,
env_values=env_values)
- self.stack_settings2 = StackSettings(
+ self.stack_settings2 = StackConfig(
name=stack_name2, template_path=heat_tmplt_path,
env_values=env_values)
self.stack1 = None
@@ -115,7 +119,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack and image
"""
if self.stack1:
try:
@@ -160,7 +164,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
self.stack1.id)
self.assertEqual(self.stack1, stack_query_3)
- resources = heat_utils.get_resources(self.heat_client, self.stack1)
+ resources = heat_utils.get_resources(self.heat_client, self.stack1.id)
self.assertIsNotNone(resources)
self.assertEqual(4, len(resources))
@@ -168,22 +172,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
self.assertIsNotNone(outputs)
self.assertEqual(0, len(outputs))
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack1.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack1))
neutron = neutron_utils.neutron_client(self.os_creds)
networks = heat_utils.get_stack_networks(
@@ -198,7 +187,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
nova = nova_utils.nova_client(self.os_creds)
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, self.stack1)
+ self.heat_client, nova, neutron, self.stack1)
self.assertIsNotNone(servers)
self.assertEqual(1, len(servers))
self.assertEqual(self.vm_inst_name, servers[0].name)
@@ -222,21 +211,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
self.stack1.id)
self.assertEqual(self.stack1, stack1_query_3)
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack1.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack1))
self.stack2 = heat_utils.create_stack(self.heat_client,
self.stack_settings2)
@@ -253,22 +228,7 @@ class HeatUtilsCreateSimpleStackTests(OSComponentTestCase):
self.stack2.id)
self.assertEqual(self.stack2, stack2_query_3)
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
-
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack2.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
-
- time.sleep(3)
-
- self.assertTrue(is_active)
+ self.assertTrue(stack_active(self.heat_client, self.stack2))
class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
@@ -312,45 +272,35 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
'external_net_name': self.ext_net_name}
heat_tmplt_path = pkg_resources.resource_filename(
'snaps.openstack.tests.heat', 'floating_ip_heat_template.yaml')
- stack_settings = StackSettings(
+ stack_settings = StackConfig(
name=stack_name, template_path=heat_tmplt_path,
env_values=env_values)
self.heat_client = heat_utils.heat_client(self.os_creds)
self.stack = heat_utils.create_stack(self.heat_client, stack_settings)
- # Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
- is_active = False
- while time.time() < end_time:
- status = heat_utils.get_stack_status(self.heat_client,
- self.stack.id)
- if status == create_stack.STATUS_CREATE_COMPLETE:
- is_active = True
- break
- elif status == create_stack.STATUS_CREATE_FAILED:
- is_active = False
- break
+ self.assertTrue(stack_active(self.heat_client, self.stack))
- time.sleep(3)
- self.assertTrue(is_active)
+ self.keypair1_settings = None
+ self.keypair2_settings = None
def tearDown(self):
"""
- Cleans the image and downloaded image file
+ Cleans the stack and image
"""
if self.stack:
try:
heat_utils.delete_stack(self.heat_client, self.stack)
# Wait until stack deployment has completed
- end_time = time.time() + create_stack.STACK_COMPLETE_TIMEOUT
+ end_time = (time.time() +
+ stack_config.STACK_COMPLETE_TIMEOUT)
is_deleted = False
while time.time() < end_time:
status = heat_utils.get_stack_status(self.heat_client,
self.stack.id)
- if status == create_stack.STATUS_DELETE_COMPLETE:
+ if status == stack_config.STATUS_DELETE_COMPLETE:
is_deleted = True
break
- elif status == create_stack.STATUS_DELETE_FAILED:
+ elif status == stack_config.STATUS_DELETE_FAILED:
is_deleted = False
break
@@ -361,11 +311,11 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
neutron = neutron_utils.neutron_client(self.os_creds)
glance = glance_utils.glance_client(self.os_creds)
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, self.stack)
+ self.heat_client, nova, neutron, self.stack)
for server in servers:
- vm_settings = settings_utils.create_vm_inst_settings(
+ vm_settings = settings_utils.create_vm_inst_config(
nova, neutron, server)
- img_settings = settings_utils.determine_image_settings(
+ img_settings = settings_utils.determine_image_config(
glance, server,
[self.image_creator1.image_settings,
self.image_creator2.image_settings])
@@ -392,14 +342,26 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
except:
pass
+ if self.keypair1_settings:
+ expanded_path = os.path.expanduser(
+ self.keypair1_settings.private_filepath)
+ os.chmod(expanded_path, 0o755)
+ os.remove(expanded_path)
+
+ if self.keypair2_settings:
+ expanded_path = os.path.expanduser(
+ self.keypair2_settings.private_filepath)
+ os.chmod(expanded_path, 0o755)
+ os.remove(expanded_path)
+
def test_get_settings_from_stack(self):
"""
Tests that a heat template with floating IPs and can have the proper
settings derived from settings_utils.py.
"""
- resources = heat_utils.get_resources(self.heat_client, self.stack)
+ resources = heat_utils.get_resources(self.heat_client, self.stack.id)
self.assertIsNotNone(resources)
- self.assertEqual(11, len(resources))
+ self.assertEqual(12, len(resources))
options = heat_utils.get_outputs(self.heat_client, self.stack)
self.assertIsNotNone(options)
@@ -412,7 +374,7 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
self.assertEqual(1, len(networks))
self.assertEqual(self.network_name, networks[0].name)
- network_settings = settings_utils.create_network_settings(
+ network_settings = settings_utils.create_network_config(
neutron, networks[0])
self.assertIsNotNone(network_settings)
self.assertEqual(self.network_name, network_settings.name)
@@ -421,11 +383,11 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
glance = glance_utils.glance_client(self.os_creds)
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, self.stack)
+ self.heat_client, nova, neutron, self.stack)
self.assertIsNotNone(servers)
self.assertEqual(2, len(servers))
- image_settings = settings_utils.determine_image_settings(
+ image_settings = settings_utils.determine_image_config(
glance, servers[0],
[self.image_creator1.image_settings,
self.image_creator2.image_settings])
@@ -438,7 +400,7 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
self.assertEqual(
self.image_creator2.image_settings.name, image_settings.name)
- image_settings = settings_utils.determine_image_settings(
+ image_settings = settings_utils.determine_image_config(
glance, servers[1],
[self.image_creator1.image_settings,
self.image_creator2.image_settings])
@@ -449,14 +411,398 @@ class HeatUtilsCreateComplexStackTests(OSComponentTestCase):
self.assertEqual(
self.image_creator2.image_settings.name, image_settings.name)
- keypair1_settings = settings_utils.determine_keypair_settings(
+ self.keypair1_settings = settings_utils.determine_keypair_config(
self.heat_client, self.stack, servers[0],
priv_key_key='private_key')
- self.assertIsNotNone(keypair1_settings)
- self.assertEqual(self.keypair_name, keypair1_settings.name)
+ self.assertIsNotNone(self.keypair1_settings)
+ self.assertEqual(self.keypair_name, self.keypair1_settings.name)
- keypair2_settings = settings_utils.determine_keypair_settings(
+ self.keypair2_settings = settings_utils.determine_keypair_config(
self.heat_client, self.stack, servers[1],
priv_key_key='private_key')
- self.assertIsNotNone(keypair2_settings)
- self.assertEqual(self.keypair_name, keypair2_settings.name)
+ self.assertIsNotNone(self.keypair2_settings)
+ self.assertEqual(self.keypair_name, self.keypair2_settings.name)
+
+
+class HeatUtilsRouterTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+
+ self.net_name = guid + '-net'
+ self.subnet_name = guid + '-subnet'
+ self.router_name = guid + '-router'
+
+ env_values = {
+ 'net_name': self.net_name,
+ 'subnet_name': self.subnet_name,
+ 'router_name': self.router_name,
+ 'external_net_name': self.ext_net_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'router_heat_template.yaml')
+ self.stack_settings = StackConfig(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_router_with_stack(self):
+ """
+ Tests the creation of an OpenStack router with Heat and the retrieval
+ of the Router Domain objects from heat_utils#get_stack_routers().
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+
+ # Wait until stack deployment has completed
+ end_time = time.time() + stack_config.STACK_COMPLETE_TIMEOUT
+ is_active = False
+ while time.time() < end_time:
+ status = heat_utils.get_stack_status(self.heat_client,
+ self.stack.id)
+ if status == stack_config.STATUS_CREATE_COMPLETE:
+ is_active = True
+ break
+ elif status == stack_config.STATUS_CREATE_FAILED:
+ is_active = False
+ break
+
+ time.sleep(3)
+
+ self.assertTrue(is_active)
+
+ routers = heat_utils.get_stack_routers(
+ self.heat_client, self.neutron, self.stack)
+
+ self.assertEqual(1, len(routers))
+
+ router = routers[0]
+ self.assertEqual(self.router_name, router.name)
+
+ ext_net = neutron_utils.get_network(
+ self.neutron, network_name=self.ext_net_name)
+ self.assertEqual(ext_net.id, router.external_network_id)
+
+
+class HeatUtilsVolumeTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+ self.volume_name = guid + '-vol'
+ self.volume_type_name = guid + '-vol-type'
+
+ env_values = {
+ 'volume_name': self.volume_name,
+ 'volume_type_name': self.volume_type_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'volume_heat_template.yaml')
+ self.stack_settings = StackConfig(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.cinder = cinder_utils.cinder_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the stack
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_vol_with_stack(self):
+ """
+ Tests the creation of an OpenStack volume with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ volumes = heat_utils.get_stack_volumes(
+ self.heat_client, self.cinder, self.stack)
+
+ self.assertEqual(1, len(volumes))
+
+ volume = volumes[0]
+ self.assertEqual(self.volume_name, volume.name)
+ self.assertEqual(self.volume_type_name, volume.type)
+ self.assertEqual(1, volume.size)
+ self.assertEqual(False, volume.multi_attach)
+
+ def test_create_vol_types_with_stack(self):
+ """
+ Tests the creation of an OpenStack volume with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ volume_types = heat_utils.get_stack_volume_types(
+ self.heat_client, self.cinder, self.stack)
+
+ self.assertEqual(1, len(volume_types))
+
+ volume_type = volume_types[0]
+
+ self.assertEqual(self.volume_type_name, volume_type.name)
+ self.assertTrue(volume_type.public)
+ self.assertIsNone(volume_type.qos_spec)
+
+ # TODO - Add encryption back and find out why it broke in Pike
+ # encryption = volume_type.encryption
+ # self.assertIsNotNone(encryption)
+ # self.assertIsNone(encryption.cipher)
+ # self.assertEqual('front-end', encryption.control_location)
+ # self.assertIsNone(encryption.key_size)
+ # self.assertEqual(u'nova.volume.encryptors.luks.LuksEncryptor',
+ # encryption.provider)
+ # self.assertEqual(volume_type.id, encryption.volume_type_id)
+
+
+class HeatUtilsFlavorTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.name_prefix = guid
+ stack_name = guid + '-stack'
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'flavor_heat_template.yaml')
+ self.stack_settings = StackConfig(
+ name=stack_name, template_path=heat_tmplt_path)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.nova = nova_utils.nova_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the stack
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_flavor_with_stack(self):
+ """
+ Tests the creation of an OpenStack volume with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ flavors = heat_utils.get_stack_flavors(
+ self.heat_client, self.nova, self.stack)
+
+ self.assertEqual(1, len(flavors))
+
+ flavor = flavors[0]
+ self.assertTrue(flavor.name.startswith(self.name_prefix))
+ self.assertEqual(1024, flavor.ram)
+ self.assertEqual(200, flavor.disk)
+ self.assertEqual(8, flavor.vcpus)
+ self.assertEqual(0, flavor.ephemeral)
+ self.assertIsNone(flavor.swap)
+ self.assertEqual(1.0, flavor.rxtx_factor)
+ self.assertTrue(flavor.is_public)
+
+
+class HeatUtilsKeypairTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+ self.keypair_name = guid + '-kp'
+
+ env_values = {'keypair_name': self.keypair_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'keypair_heat_template.yaml')
+ self.stack_settings = StackConfig(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.nova = nova_utils.nova_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the stack
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_keypair_with_stack(self):
+ """
+ Tests the creation of an OpenStack keypair with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ keypairs = heat_utils.get_stack_keypairs(
+ self.heat_client, self.nova, self.stack)
+
+ self.assertEqual(1, len(keypairs))
+ keypair = keypairs[0]
+
+ self.assertEqual(self.keypair_name, keypair.name)
+
+ outputs = heat_utils.get_outputs(self.heat_client, self.stack)
+
+ for output in outputs:
+ if output.key == 'private_key':
+ self.assertTrue(output.value.startswith(
+ '-----BEGIN RSA PRIVATE KEY-----'))
+
+ keypair = nova_utils.get_keypair_by_id(self.nova, keypair.id)
+ self.assertIsNotNone(keypair)
+
+ self.assertEqual(self.keypair_name, keypair.name)
+
+
+class HeatUtilsSecurityGroupTests(OSComponentTestCase):
+ """
+ Test Heat volume functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates OpenStack instances that cannot be spawned by Heat
+ """
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ stack_name = guid + '-stack'
+ self.sec_grp_name = guid + '-sec-grp'
+
+ env_values = {'security_group_name': self.sec_grp_name}
+
+ heat_tmplt_path = pkg_resources.resource_filename(
+ 'snaps.openstack.tests.heat', 'security_group_heat_template.yaml')
+ self.stack_settings = StackConfig(
+ name=stack_name, template_path=heat_tmplt_path,
+ env_values=env_values)
+ self.stack = None
+ self.heat_client = heat_utils.heat_client(self.os_creds)
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+
+ def tearDown(self):
+ """
+ Cleans the stack
+ """
+ if self.stack:
+ try:
+ heat_utils.delete_stack(self.heat_client, self.stack)
+ except:
+ pass
+
+ def test_create_security_group_with_stack(self):
+ """
+ Tests the creation of an OpenStack SecurityGroup with Heat.
+ """
+ self.stack = heat_utils.create_stack(
+ self.heat_client, self.stack_settings)
+ self.assertTrue(stack_active(self.heat_client, self.stack))
+
+ sec_grp = heat_utils.get_stack_security_groups(
+ self.heat_client, self.neutron, self.stack)[0]
+
+ self.assertEqual(self.sec_grp_name, sec_grp.name)
+ self.assertEqual('Test description', sec_grp.description)
+ self.assertEqual(2, len(sec_grp.rules))
+
+ has_ssh_rule = False
+ has_icmp_rule = False
+
+ for rule in sec_grp.rules:
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'egress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min == 22
+ and rule.port_range_max == 22
+ and rule.protocol == 'tcp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_ssh_rule = True
+ if (rule.security_group_id == sec_grp.id
+ and rule.direction == 'ingress'
+ and rule.ethertype == 'IPv4'
+ and rule.port_range_min is None
+ and rule.port_range_max is None
+ and rule.protocol == 'icmp'
+ and rule.remote_group_id is None
+ and rule.remote_ip_prefix == '0.0.0.0/0'):
+ has_icmp_rule = True
+
+ self.assertTrue(has_ssh_rule)
+ self.assertTrue(has_icmp_rule)
+
+
+def stack_active(heat_cli, stack):
+ """
+ Blocks until stack application has successfully completed or failed
+ :param heat_cli: the Heat client
+ :param stack: the Stack domain object
+ :return: T/F
+ """
+ # Wait until stack deployment has completed
+ end_time = time.time() + stack_config.STACK_COMPLETE_TIMEOUT
+ is_active = False
+ while time.time() < end_time:
+ status = heat_utils.get_stack_status(heat_cli, stack.id)
+ if status == stack_config.STATUS_CREATE_COMPLETE:
+ is_active = True
+ break
+ elif status == stack_config.STATUS_CREATE_FAILED:
+ is_active = False
+ break
+
+ time.sleep(3)
+
+ return is_active
diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py
index bd0086b..b7f024d 100644
--- a/snaps/openstack/utils/tests/keystone_utils_tests.py
+++ b/snaps/openstack/utils/tests/keystone_utils_tests.py
@@ -14,8 +14,8 @@
# limitations under the License.
import uuid
-from snaps.openstack.create_project import ProjectSettings
-from snaps.openstack.create_user import UserSettings
+from snaps.config.project import ProjectConfig
+from snaps.config.user import UserConfig
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import keystone_utils, neutron_utils
@@ -96,7 +96,7 @@ class KeystoneUtilsTests(OSComponentTestCase):
"""
Tests the keystone_utils.create_user() function
"""
- user_settings = UserSettings(
+ user_settings = UserConfig(
name=self.username,
password=str(uuid.uuid4()),
domain_name=self.os_creds.user_domain_name)
@@ -111,7 +111,7 @@ class KeystoneUtilsTests(OSComponentTestCase):
"""
Tests the keyston_utils.create_project() funtion
"""
- project_settings = ProjectSettings(
+ project_settings = ProjectConfig(
name=self.project_name, domain=self.os_creds.project_domain_name)
self.project = keystone_utils.create_project(self.keystone,
project_settings)
@@ -180,13 +180,13 @@ class KeystoneUtilsTests(OSComponentTestCase):
Tests the keystone_utils function grant_user_role_to_project()
:return:
"""
- user_settings = UserSettings(
+ user_settings = UserConfig(
name=self.username, password=str(uuid.uuid4()),
domain_name=self.os_creds.user_domain_name)
self.user = keystone_utils.create_user(self.keystone, user_settings)
self.assertEqual(self.username, self.user.name)
- project_settings = ProjectSettings(
+ project_settings = ProjectConfig(
name=self.project_name, domain=self.os_creds.project_domain_name)
self.project = keystone_utils.create_project(self.keystone,
project_settings)
diff --git a/snaps/openstack/utils/tests/magnum_utils_tests.py b/snaps/openstack/utils/tests/magnum_utils_tests.py
new file mode 100644
index 0000000..f841c48
--- /dev/null
+++ b/snaps/openstack/utils/tests/magnum_utils_tests.py
@@ -0,0 +1,299 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import uuid
+
+from magnumclient.common.apiclient.exceptions import BadRequest
+
+from snaps.config.cluster_template import (
+ ClusterTemplateConfig, ServerType, ContainerOrchestrationEngine,
+ DockerStorageDriver)
+from snaps.config.flavor import FlavorConfig
+from snaps.config.keypair import KeypairConfig
+from snaps.openstack.create_flavor import OpenStackFlavor
+from snaps.openstack.create_image import OpenStackImage
+from snaps.openstack.create_keypairs import OpenStackKeypair
+from snaps.openstack.os_credentials import OSCreds
+from snaps.openstack.tests import openstack_tests
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.utils import magnum_utils
+
+__author__ = 'spisarski'
+
+logger = logging.getLogger('magnum_utils_tests')
+
+
+class MagnumSmokeTests(OSComponentTestCase):
+ """
+ Tests to ensure that the magnum client can communicate with the cloud
+ """
+
+ def test_connect_success(self):
+ """
+ Tests to ensure that the proper credentials can connect.
+ """
+ magnum = magnum_utils.magnum_client(self.os_creds)
+
+ # This should not throw an exception
+ self.assertIsNotNone(magnum.clusters.list())
+
+ def test_nova_connect_fail(self):
+ """
+ Tests to ensure that the improper credentials cannot connect.
+ """
+
+ with self.assertRaises(RuntimeError):
+ magnum_utils.magnum_client(
+ OSCreds(username='user', password='pass',
+ auth_url=self.os_creds.auth_url,
+ project_name=self.os_creds.project_name,
+ proxy_settings=self.os_creds.proxy_settings))
+
+
+class MagnumUtilsClusterTypeTests(OSComponentTestCase):
+ """
+ Tests individual functions within magnum_utils.py
+ """
+
+ def setUp(self):
+ self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.cluster_type_name = self.guid + '-cluster-type'
+ self.magnum = magnum_utils.magnum_client(self.os_creds)
+
+ metadata = self.image_metadata
+ if not metadata:
+ metadata = dict()
+ if 'extra_properties' not in metadata:
+ metadata['extra_properties'] = dict()
+ metadata['extra_properties']['os_distro'] = 'cirros'
+
+ os_image_settings = openstack_tests.cirros_image_settings(
+ name=self.guid + '-image', image_metadata=metadata)
+
+ self.image_creator = OpenStackImage(self.os_creds, os_image_settings)
+
+ self.flavor_creator = OpenStackFlavor(
+ self.os_creds, FlavorConfig(
+ name=self.guid + '-flavor', ram=512, disk=10, vcpus=1))
+
+ keypair_priv_filepath = 'tmp/' + self.guid
+ keypair_pub_filepath = keypair_priv_filepath + '.pub'
+
+ self.keypair_creator = OpenStackKeypair(
+ self.os_creds, KeypairConfig(
+ name=self.guid + '-keypair',
+ public_filepath=keypair_pub_filepath,
+ private_filepath=keypair_priv_filepath))
+
+ self.cluster_template = None
+
+ try:
+ self.image_creator.create()
+ self.flavor_creator.create()
+ self.keypair_creator.create()
+ except:
+ self.tearDown()
+ raise
+
+ def tearDown(self):
+ if self.cluster_template:
+ try:
+ magnum_utils.delete_cluster_template(
+ self.magnum, self.cluster_template.id)
+ except:
+ pass
+ if self.keypair_creator:
+ try:
+ self.keypair_creator.clean()
+ except:
+ pass
+ if self.flavor_creator:
+ try:
+ self.flavor_creator.clean()
+ except:
+ pass
+ if self.image_creator:
+ try:
+ self.image_creator.clean()
+ except:
+ pass
+
+ def test_create_cluster_template_simple(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ flavor=self.flavor_creator.flavor_settings.name)
+
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+ self.assertIsNotNone(self.cluster_template)
+ self.assertTrue(
+ validate_cluster_template(config, self.cluster_template))
+
+ template_by_name = magnum_utils.get_cluster_template(
+ self.magnum, template_name=config.name)
+ self.assertEqual(self.cluster_template, template_by_name)
+ template_by_id = magnum_utils.get_cluster_template_by_id(
+ self.magnum, self.cluster_template.id)
+ self.assertEqual(self.cluster_template, template_by_id)
+
+ def test_create_cluster_template_all(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ network_driver='flannel', external_net=self.ext_net_name,
+ floating_ip_enabled=True, docker_volume_size=100,
+ server_type=ServerType.vm,
+ flavor=self.flavor_creator.flavor_settings.name,
+ master_flavor=self.flavor_creator.flavor_settings.name,
+ coe=ContainerOrchestrationEngine.kubernetes,
+ fixed_net='foo', fixed_subnet='bar',
+ registry_enabled=True, insecure_registry='localhost',
+ docker_storage_driver=DockerStorageDriver.overlay,
+ dns_nameserver='8.8.4.4', public=True, tls_disabled=True,
+ http_proxy=None, https_proxy=None, volume_driver='cinder',
+ master_lb_enabled=False, labels={'foo': 'bar'})
+
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+ self.assertIsNotNone(self.cluster_template)
+ self.assertTrue(
+ validate_cluster_template(config, self.cluster_template))
+
+ template_by_name = magnum_utils.get_cluster_template(
+ self.magnum, template_name=config.name)
+ self.assertEqual(self.cluster_template, template_by_name)
+ template_by_id = magnum_utils.get_cluster_template_by_id(
+ self.magnum, self.cluster_template.id)
+ self.assertEqual(self.cluster_template, template_by_id)
+
+ def test_create_cluster_template_bad_image(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image='foo',
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ flavor=self.flavor_creator.flavor_settings.name)
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+ def test_create_cluster_template_bad_ext_net(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net='foo',
+ flavor=self.flavor_creator.flavor_settings.name)
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+ def test_create_cluster_template_bad_flavor(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ flavor='foo')
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+ def test_create_cluster_template_bad_master_flavor(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ flavor=self.flavor_creator.flavor_settings.name,
+ master_flavor='foo')
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+ def test_create_cluster_template_bad_network_driver(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ network_driver='foo')
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+ def test_create_cluster_template_bad_volume_driver(self):
+ config = ClusterTemplateConfig(
+ name=self.cluster_type_name,
+ image=self.image_creator.image_settings.name,
+ keypair=self.keypair_creator.keypair_settings.name,
+ external_net=self.ext_net_name,
+ volume_driver='foo')
+
+ with self.assertRaises(BadRequest):
+ self.cluster_template = magnum_utils.create_cluster_template(
+ self.magnum, config)
+
+
+def validate_cluster_template(tmplt_config, tmplt_obj):
+ """
+ Returns true if the configuration matches the ClusterTemplate object
+ :param tmplt_config: the ClusterTemplateConfig object
+ :param tmplt_obj: the ClusterTemplate domain object
+ :return: T/F
+ """
+ if not tmplt_config.network_driver:
+ network_driver = 'flannel'
+ else:
+ network_driver = tmplt_config.network_driver
+
+ return (
+ tmplt_config.coe.value == tmplt_obj.coe and
+ tmplt_config.dns_nameserver == tmplt_obj.dns_nameserver and
+ tmplt_config.docker_storage_driver.value
+ == tmplt_obj.docker_storage_driver and
+ tmplt_config.docker_volume_size == tmplt_obj.docker_volume_size and
+ tmplt_config.external_net == tmplt_obj.external_net and
+ tmplt_config.fixed_net == tmplt_obj.fixed_net and
+ tmplt_config.fixed_subnet == tmplt_obj.fixed_subnet and
+ tmplt_config.flavor == tmplt_obj.flavor and
+ tmplt_config.floating_ip_enabled == tmplt_obj.floating_ip_enabled and
+ tmplt_config.http_proxy == tmplt_obj.http_proxy and
+ tmplt_config.https_proxy == tmplt_obj.https_proxy and
+ tmplt_config.no_proxy == tmplt_obj.no_proxy and
+ tmplt_config.image == tmplt_obj.image and
+ tmplt_config.insecure_registry == tmplt_obj.insecure_registry and
+ tmplt_config.keypair == tmplt_obj.keypair and
+ tmplt_config.labels == tmplt_obj.labels and
+ tmplt_config.master_flavor == tmplt_obj.master_flavor and
+ tmplt_config.master_lb_enabled == tmplt_obj.master_lb_enabled and
+ tmplt_config.name == tmplt_obj.name and
+ network_driver == tmplt_obj.network_driver and
+ tmplt_config.no_proxy == tmplt_obj.no_proxy and
+ tmplt_config.public == tmplt_obj.public and
+ tmplt_config.registry_enabled == tmplt_obj.registry_enabled and
+ tmplt_config.server_type.value == tmplt_obj.server_type and
+ tmplt_config.tls_disabled == tmplt_obj.tls_disabled and
+ tmplt_config.volume_driver == tmplt_obj.volume_driver
+ )
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 05d508d..38faf71 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -14,11 +14,11 @@
# limitations under the License.
import uuid
-from snaps.openstack import create_router
-from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
- PortSettings
-from snaps.openstack.create_security_group import SecurityGroupSettings, \
- SecurityGroupRuleSettings, Direction
+from neutronclient.common.exceptions import NotFound, BadRequest
+
+from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
@@ -102,7 +102,7 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
def test_create_network(self):
"""
- Tests the neutron_utils.create_neutron_net() function
+ Tests the neutron_utils.create_network() function
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
@@ -110,26 +110,28 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
self.network.name)
self.assertTrue(validate_network(
self.neutron, self.net_config.network_settings.name, True))
+ self.assertEqual(len(self.net_config.network_settings.subnet_settings),
+ len(self.network.subnets))
def test_create_network_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty
+ Tests the neutron_utils.create_network() function with an empty
network name
"""
with self.assertRaises(Exception):
self.network = neutron_utils.create_network(
self.neutron, self.os_creds,
- network_settings=NetworkSettings(name=''))
+ network_settings=NetworkConfig(name=''))
def test_create_network_null_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function when the network
+ Tests the neutron_utils.create_network() function when the network
name is None
"""
with self.assertRaises(Exception):
self.network = neutron_utils.create_network(
self.neutron, self.os_creds,
- network_settings=NetworkSettings())
+ network_settings=NetworkConfig())
class NeutronUtilsSubnetTests(OSComponentTestCase):
@@ -142,7 +144,6 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.network = None
- self.subnet = None
self.net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
external_net=self.ext_net_name)
@@ -151,11 +152,6 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
"""
Cleans the remote OpenStack objects
"""
- if self.subnet:
- try:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- except:
- pass
if self.network:
try:
neutron_utils.delete_network(self.neutron, self.network)
@@ -164,7 +160,7 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
def test_create_subnet(self):
"""
- Tests the neutron_utils.create_neutron_net() function
+ Tests the neutron_utils.create_network() function
"""
self.network = neutron_utils.create_network(
self.neutron, self.os_creds, self.net_config.network_settings)
@@ -174,20 +170,18 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, network=self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
subnet_query1 = neutron_utils.get_subnet(
self.neutron, subnet_name=subnet_setting.name)
- self.assertEqual(self.subnet, subnet_query1)
+ self.assertEqual(self.network.subnets[0], subnet_query1)
subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
self.network)
self.assertIsNotNone(subnet_query2)
self.assertEqual(1, len(subnet_query2))
- self.assertEqual(self.subnet, subnet_query2[0])
+ self.assertEqual(self.network.subnets[0], subnet_query2[0])
def test_create_subnet_null_name(self):
"""
@@ -202,11 +196,11 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
with self.assertRaises(Exception):
- SubnetSettings(cidr=self.net_config.subnet_cidr)
+ SubnetConfig(cidr=self.net_config.subnet_cidr)
def test_create_subnet_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty
+ Tests the neutron_utils.create_network() function with an empty
name
"""
self.network = neutron_utils.create_network(
@@ -217,8 +211,6 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, network=self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.assertFalse(validate_subnet(
@@ -226,49 +218,265 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
subnet_query1 = neutron_utils.get_subnet(
self.neutron, subnet_name=subnet_setting.name)
- self.assertEqual(self.subnet, subnet_query1)
+ self.assertEqual(self.network.subnets[0], subnet_query1)
subnet_query2 = neutron_utils.get_subnets_by_network(self.neutron,
self.network)
self.assertIsNotNone(subnet_query2)
self.assertEqual(1, len(subnet_query2))
- self.assertEqual(self.subnet, subnet_query2[0])
+ self.assertEqual(self.network.subnets[0], subnet_query2[0])
def test_create_subnet_null_cidr(self):
"""
Tests the neutron_utils.create_neutron_subnet() function for an
Exception when the subnet CIDR value is None
"""
- self.network = neutron_utils.create_network(
- self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name,
- self.network.name)
- self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True))
-
+ self.net_config.network_settings.subnet_settings[0].cidr = None
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(
- cidr=None, name=self.net_config.subnet_name)
- neutron_utils.create_subnet(
- self.neutron, sub_sets, self.os_creds, network=self.network)
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
def test_create_subnet_empty_cidr(self):
"""
Tests the neutron_utils.create_neutron_subnet() function for an
Exception when the subnet CIDR value is empty
"""
+ self.net_config.network_settings.subnet_settings[0].cidr = ''
+ with self.assertRaises(Exception):
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+
+
+class NeutronUtilsIPv6Tests(OSComponentTestCase):
+ """
+ Test for creating IPv6 networks with subnets via neutron_utils.py
+ """
+
+ def setUp(self):
+ self.guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.network = None
+
+ def tearDown(self):
+ """
+ Cleans the remote OpenStack objects
+ """
+ if self.network:
+ try:
+ neutron_utils.delete_network(self.neutron, self.network)
+ except:
+ pass
+
+ def test_create_network_slaac(self):
+ """
+ Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
+ is True and IPv6 modes are slaac
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
+ ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
+ gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
+ enable_dhcp=True, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
self.network = neutron_utils.create_network(
- self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name,
- self.network.name)
- self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True))
+ self.neutron, self.os_creds, self.network_settings)
+ self.assertEqual(self.network_settings.name, self.network.name)
+
+ subnet_settings = self.network_settings.subnet_settings[0]
+ self.assertEqual(1, len(self.network.subnets))
+ subnet = self.network.subnets[0]
+
+ self.assertEqual(self.network.id, subnet.network_id)
+ self.assertEqual(subnet_settings.name, subnet.name)
+ self.assertEqual(subnet_settings.start, subnet.start)
+ self.assertEqual(subnet_settings.end, subnet.end)
+ self.assertEqual('1:1::/64', subnet.cidr)
+ self.assertEqual(6, subnet.ip_version)
+ self.assertEqual(1, len(subnet.dns_nameservers))
+ self.assertEqual(
+ sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
+ self.assertTrue(subnet.enable_dhcp)
+ self.assertEqual(
+ subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
+ self.assertEqual(
+ subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
- with self.assertRaises(Exception):
- sub_sets = SubnetSettings(
- cidr='', name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
- network=self.network)
+ def test_create_network_stateful(self):
+ """
+ Tests the neutron_utils.create_network() with an IPv6 subnet where DHCP
+ is True and IPv6 modes are stateful
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
+ ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
+ gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
+ enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateful',
+ ipv6_address_mode='dhcpv6-stateful')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ self.assertEqual(self.network_settings.name, self.network.name)
+
+ subnet_settings = self.network_settings.subnet_settings[0]
+ self.assertEqual(1, len(self.network.subnets))
+ subnet = self.network.subnets[0]
+
+ self.assertEqual(self.network.id, subnet.network_id)
+ self.assertEqual(subnet_settings.name, subnet.name)
+ self.assertEqual(subnet_settings.start, subnet.start)
+ self.assertEqual(subnet_settings.end, subnet.end)
+ self.assertEqual('1:1::/64', subnet.cidr)
+ self.assertEqual(6, subnet.ip_version)
+ self.assertEqual(1, len(subnet.dns_nameservers))
+ self.assertEqual(
+ sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
+ self.assertTrue(subnet.enable_dhcp)
+ self.assertEqual(
+ subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
+ self.assertEqual(
+ subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
+
+ def test_create_network_stateless(self):
+ """
+ Tests the neutron_utils.create_network() when DHCP is enabled and
+ the RA and address modes are both 'slaac'
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
+ ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
+ gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
+ enable_dhcp=True, ipv6_ra_mode='dhcpv6-stateless',
+ ipv6_address_mode='dhcpv6-stateless')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ self.assertEqual(self.network_settings.name, self.network.name)
+
+ subnet_settings = self.network_settings.subnet_settings[0]
+ self.assertEqual(1, len(self.network.subnets))
+ subnet = self.network.subnets[0]
+
+ self.assertEqual(self.network.id, subnet.network_id)
+ self.assertEqual(subnet_settings.name, subnet.name)
+ self.assertEqual(subnet_settings.start, subnet.start)
+ self.assertEqual(subnet_settings.end, subnet.end)
+ self.assertEqual('1:1::/64', subnet.cidr)
+ self.assertEqual(6, subnet.ip_version)
+ self.assertEqual(1, len(subnet.dns_nameservers))
+ self.assertEqual(
+ sub_setting.dns_nameservers[0], subnet.dns_nameservers[0])
+ self.assertTrue(subnet.enable_dhcp)
+ self.assertEqual(
+ subnet_settings.ipv6_ra_mode.value, subnet.ipv6_ra_mode)
+ self.assertEqual(
+ subnet_settings.ipv6_address_mode.value, subnet.ipv6_address_mode)
+
+ def test_create_network_no_dhcp_slaac(self):
+ """
+ Tests the neutron_utils.create_network() for a BadRequest when
+ DHCP is not enabled and the RA and address modes are both 'slaac'
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1:0:0:0:0:0:0/64',
+ ip_version=6, dns_nameservers=['2620:0:ccc:0:0:0:0:2'],
+ gateway_ip='1:1:0:0:0:0:0:1', start='1:1::ff', end='1:1::ffff',
+ enable_dhcp=False, ipv6_ra_mode='slaac', ipv6_address_mode='slaac')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ with self.assertRaises(BadRequest):
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ def test_create_network_invalid_start_ip(self):
+ """
+ Tests the neutron_utils.create_network() that contains one IPv6 subnet
+ with an invalid start IP to ensure Neutron assigns it the smallest IP
+ possible
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
+ start='foo')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ self.assertEqual('1:1::2', self.network.subnets[0].start)
+ self.assertEqual(
+ '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
+
+ def test_create_network_invalid_end_ip(self):
+ """
+ Tests the neutron_utils.create_network() that contains one IPv6 subnet
+ with an invalid end IP to ensure Neutron assigns it the largest IP
+ possible
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
+ end='bar')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ self.assertEqual('1:1::2', self.network.subnets[0].start)
+ self.assertEqual(
+ '1:1:0:ffff:ffff:ffff:ffff:ffff', self.network.subnets[0].end)
+
+ def test_create_network_with_bad_cidr(self):
+ """
+ Tests the neutron_utils.create_network() for a BadRequest when
+ the subnet CIDR is invalid
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1:1:/48', ip_version=6)
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ with self.assertRaises(BadRequest):
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ def test_create_network_invalid_gateway_ip(self):
+ """
+ Tests the neutron_utils.create_network() for a BadRequest when
+ the subnet gateway IP is invalid
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
+ gateway_ip='1:2::1')
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ with self.assertRaises(BadRequest):
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
+
+ def test_create_network_with_bad_dns(self):
+ """
+ Tests the neutron_utils.create_network() for a BadRequest when
+ the DNS IP is invalid
+ """
+ sub_setting = SubnetConfig(
+ name=self.guid + '-subnet', cidr='1:1::/48', ip_version=6,
+ dns_nameservers=['foo'])
+ self.network_settings = NetworkConfig(
+ name=self.guid + '-net', subnet_settings=[sub_setting])
+
+ with self.assertRaises(BadRequest):
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.network_settings)
class NeutronUtilsRouterTests(OSComponentTestCase):
@@ -281,7 +489,6 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.network = None
- self.subnet = None
self.port = None
self.router = None
self.interface_router = None
@@ -294,8 +501,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
Cleans the remote OpenStack objects
"""
if self.interface_router:
- neutron_utils.remove_interface_router(self.neutron, self.router,
- self.subnet)
+ neutron_utils.remove_interface_router(
+ self.neutron, self.router, self.network.subnets[0])
if self.router:
try:
@@ -310,22 +517,12 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
except:
pass
- if self.subnet:
- try:
- neutron_utils.delete_subnet(self.neutron, self.subnet)
- except:
- pass
-
if self.network:
- try:
- neutron_utils.delete_network(self.neutron, self.network)
- except:
- pass
+ neutron_utils.delete_network(self.neutron, self.network)
def test_create_router_simple(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external
- gateway is requested
+ Tests the neutron_utils.create_router()
"""
self.router = neutron_utils.create_router(
self.neutron, self.os_creds, self.net_config.router_settings)
@@ -334,8 +531,7 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
def test_create_router_with_public_interface(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external
- gateway is requested
+ Tests the neutron_utils.create_router() function with a pubic interface
"""
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.net_config = openstack_tests.OSNetworkConfig(
@@ -351,30 +547,7 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
ext_net = neutron_utils.get_network(
self.neutron, network_name=self.ext_net_name)
- self.assertEqual(
- self.router.external_gateway_info['network_id'], ext_net.id)
-
- def test_create_router_empty_name(self):
- """
- Tests the neutron_utils.create_neutron_net() function
- """
- with self.assertRaises(Exception):
- this_router_settings = create_router.RouterSettings(name='')
- self.router = neutron_utils.create_router(self.neutron,
- self.os_creds,
- this_router_settings)
-
- def test_create_router_null_name(self):
- """
- Tests the neutron_utils.create_neutron_subnet() function when the
- subnet CIDR value is None
- """
- with self.assertRaises(Exception):
- this_router_settings = create_router.RouterSettings()
- self.router = neutron_utils.create_router(self.neutron,
- self.os_creds,
- this_router_settings)
- validate_router(self.neutron, None, True)
+ self.assertEqual(self.router.external_network_id, ext_net.id)
def test_add_interface_router(self):
"""
@@ -388,9 +561,6 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
@@ -400,9 +570,9 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
True)
self.interface_router = neutron_utils.add_interface_router(
- self.neutron, self.router, self.subnet)
+ self.neutron, self.router, self.network.subnets[0])
validate_interface_router(self.interface_router, self.router,
- self.subnet)
+ self.network.subnets[0])
def test_add_interface_router_null_router(self):
"""
@@ -417,15 +587,12 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(NeutronException):
self.interface_router = neutron_utils.add_interface_router(
- self.neutron, self.router, self.subnet)
+ self.neutron, self.router, self.network.subnets[0])
def test_add_interface_router_null_subnet(self):
"""
@@ -446,7 +613,31 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
with self.assertRaises(NeutronException):
self.interface_router = neutron_utils.add_interface_router(
- self.neutron, self.router, self.subnet)
+ self.neutron, self.router, None)
+
+ def test_add_interface_router_missing_subnet(self):
+ """
+ Tests the neutron_utils.add_interface_router() function for an
+ Exception when the subnet object has been deleted
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network.name)
+ self.assertTrue(validate_network(
+ self.neutron, self.net_config.network_settings.name, True))
+
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
+
+ for subnet in self.network.subnets:
+ neutron_utils.delete_subnet(self.neutron, subnet)
+
+ with self.assertRaises(NotFound):
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.network.subnets[0])
def test_create_port(self):
"""
@@ -460,13 +651,11 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
- self.neutron, self.os_creds, PortSettings(
+ self.neutron, self.os_creds, PortConfig(
name=self.port_name,
ip_addrs=[{
'subnet_name': subnet_setting.name,
@@ -486,13 +675,11 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, self.network)
self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
- self.neutron, self.os_creds, PortSettings(
+ self.neutron, self.os_creds, PortConfig(
name=self.port_name,
network_name=self.net_config.network_settings.name,
ip_addrs=[{
@@ -512,15 +699,12 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.port = neutron_utils.create_port(
self.neutron, self.os_creds,
- PortSettings(
+ PortConfig(
network_name=self.net_config.network_settings.name,
ip_addrs=[{
'subnet_name': subnet_setting.name,
@@ -537,7 +721,7 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.neutron, self.os_creds,
- PortSettings(
+ PortConfig(
name=self.port_name,
network_name=self.net_config.network_settings.name,
ip_addrs=[{
@@ -559,16 +743,13 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting,
- self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.neutron, self.os_creds,
- PortSettings(
+ PortConfig(
name=self.port_name,
network_name=self.net_config.network_settings.name,
ip_addrs=[{
@@ -588,15 +769,13 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.neutron, self.os_creds,
- PortSettings(
+ PortConfig(
name=self.port_name,
network_name=self.net_config.network_settings.name,
ip_addrs=[{
@@ -616,15 +795,13 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
self.neutron, self.net_config.network_settings.name, True))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
- self.subnet = neutron_utils.create_subnet(
- self.neutron, subnet_setting, self.os_creds, self.network)
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
with self.assertRaises(Exception):
self.port = neutron_utils.create_port(
self.neutron, self.os_creds,
- PortSettings(
+ PortConfig(
name=self.port_name,
network_name=self.net_config.network_settings.name,
ip_addrs=[{
@@ -664,7 +841,7 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
security_group = neutron_utils.create_security_group(self.neutron,
self.keystone,
sec_grp_settings)
@@ -684,13 +861,13 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
def test_create_sec_grp_no_name(self):
"""
- Tests the SecurityGroupSettings constructor and
+ Tests the SecurityGroupConfig constructor and
neutron_utils.create_security_group() function to ensure that
attempting to create a security group without a name will raise an
exception
"""
with self.assertRaises(Exception):
- sec_grp_settings = SecurityGroupSettings()
+ sec_grp_settings = SecurityGroupConfig()
self.security_groups.append(
neutron_utils.create_security_group(self.neutron,
self.keystone,
@@ -700,8 +877,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group')
self.security_groups.append(
neutron_utils.create_security_group(self.neutron, self.keystone,
sec_grp_settings))
@@ -719,9 +896,9 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_rule_settings = SecurityGroupRuleSettings(
+ sec_grp_rule_settings = SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=[sec_grp_rule_settings])
@@ -762,12 +939,12 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-1',
- description='hello group')))
+ SecurityGroupConfig(
+ name=self.sec_grp_name + '-1', description='hello group')))
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-2',
- description='hello group')))
+ SecurityGroupConfig(
+ name=self.sec_grp_name + '-2', description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0].id)
diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py
index c5b29b5..8cb0812 100644
--- a/snaps/openstack/utils/tests/nova_utils_tests.py
+++ b/snaps/openstack/utils/tests/nova_utils_tests.py
@@ -13,20 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import time
import uuid
import os
-import time
from snaps import file_utils
+from snaps.config.flavor import FlavorConfig
+from snaps.config.network import PortConfig
+from snaps.config.vm_inst import VmInstanceConfig
+from snaps.config.volume import VolumeConfig
from snaps.openstack import create_instance
-from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
+from snaps.openstack.create_flavor import OpenStackFlavor
from snaps.openstack.create_image import OpenStackImage
-from snaps.openstack.create_instance import VmInstanceSettings
-from snaps.openstack.create_network import OpenStackNetwork, PortSettings
+from snaps.openstack.create_instance import OpenStackVmInstance
+from snaps.openstack.create_network import OpenStackNetwork
+from snaps.openstack.create_volume import OpenStackVolume
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
-from snaps.openstack.utils import nova_utils, neutron_utils, glance_utils
+from snaps.openstack.utils import (
+ nova_utils, neutron_utils, glance_utils, cinder_utils)
__author__ = 'spisarski'
@@ -47,6 +53,16 @@ class NovaSmokeTests(OSComponentTestCase):
# This should not throw an exception
nova.flavors.list()
+ def test_nova_get_hypervisor_hosts(self):
+ """
+ Tests to ensure that get_hypervisors() function works.
+ """
+ nova = nova_utils.nova_client(self.os_creds)
+
+ hosts = nova_utils.get_hypervisor_hosts(nova)
+ # This should not throw an exception
+ self.assertGreaterEqual(len(hosts), 1)
+
def test_nova_connect_fail(self):
"""
Tests to ensure that the improper credentials cannot connect.
@@ -156,11 +172,9 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
and creating an OS image file within OpenStack
"""
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
- self.flavor_settings = FlavorSettings(name=guid + '-name',
- flavor_id=guid + '-id', ram=1,
- disk=1, vcpus=1,
- ephemeral=1, swap=2,
- rxtx_factor=3.0, is_public=False)
+ self.flavor_settings = FlavorConfig(
+ name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1,
+ vcpus=1, ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False)
self.nova = nova_utils.nova_client(self.os_creds)
self.flavor = None
@@ -252,16 +266,16 @@ class NovaUtilsInstanceTests(OSComponentTestCase):
self.flavor_creator = OpenStackFlavor(
self.os_creds,
- FlavorSettings(
+ FlavorConfig(
name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
self.flavor_creator.create()
- port_settings = PortSettings(name=guid + '-port',
- network_name=network_settings.name)
+ port_settings = PortConfig(
+ name=guid + '-port', network_name=network_settings.name)
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, port_settings)
- self.instance_settings = VmInstanceSettings(
+ self.instance_settings = VmInstanceConfig(
name=guid + '-vm_inst',
flavor=self.flavor_creator.flavor_settings.name,
port_settings=[port_settings])
@@ -325,7 +339,148 @@ class NovaUtilsInstanceTests(OSComponentTestCase):
iters += 1
self.assertTrue(active)
- vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst)
+ vm_inst = nova_utils.get_latest_server_object(
+ self.nova, self.neutron, self.vm_inst)
self.assertEqual(self.vm_inst.name, vm_inst.name)
self.assertEqual(self.vm_inst.id, vm_inst.id)
+
+
+class NovaUtilsInstanceVolumeTests(OSComponentTestCase):
+ """
+ Tests the creation of VM instances via nova_utils.py
+ """
+
+ def setUp(self):
+ """
+ Setup objects required by VM instances
+ :return:
+ """
+
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+
+ self.nova = nova_utils.nova_client(self.os_creds)
+ self.cinder = cinder_utils.cinder_client(self.os_creds)
+
+ self.image_creator = None
+ self.network_creator = None
+ self.flavor_creator = None
+ self.volume_creator = None
+ self.instance_creator = None
+
+ try:
+ image_settings = openstack_tests.cirros_image_settings(
+ name=guid + '-image', image_metadata=self.image_metadata)
+ self.image_creator = OpenStackImage(
+ self.os_creds, image_settings=image_settings)
+ self.image_creator.create()
+
+ network_settings = openstack_tests.get_priv_net_config(
+ guid + '-net', guid + '-subnet').network_settings
+ self.network_creator = OpenStackNetwork(
+ self.os_creds, network_settings)
+ self.network_creator.create()
+
+ self.flavor_creator = OpenStackFlavor(
+ self.os_creds,
+ FlavorConfig(
+ name=guid + '-flavor-name', ram=256, disk=10, vcpus=1))
+ self.flavor_creator.create()
+
+ # Create Volume
+ volume_settings = VolumeConfig(
+ name=self.__class__.__name__ + '-' + str(guid))
+ self.volume_creator = OpenStackVolume(
+ self.os_creds, volume_settings)
+ self.volume_creator.create(block=True)
+
+ port_settings = PortConfig(
+ name=guid + '-port', network_name=network_settings.name)
+ instance_settings = VmInstanceConfig(
+ name=guid + '-vm_inst',
+ flavor=self.flavor_creator.flavor_settings.name,
+ port_settings=[port_settings])
+ self.instance_creator = OpenStackVmInstance(
+ self.os_creds, instance_settings, image_settings)
+ self.instance_creator.create(block=True)
+ except:
+ self.tearDown()
+ raise
+
+ def tearDown(self):
+ """
+ Cleanup deployed resources
+ :return:
+ """
+ if self.instance_creator:
+ try:
+ self.instance_creator.clean()
+ except:
+ pass
+ if self.volume_creator:
+ try:
+ self.volume_creator.clean()
+ except:
+ pass
+ if self.flavor_creator:
+ try:
+ self.flavor_creator.clean()
+ except:
+ pass
+ if self.network_creator:
+ try:
+ self.network_creator.clean()
+ except:
+ pass
+ if self.image_creator:
+ try:
+ self.image_creator.clean()
+ except:
+ pass
+
+ def test_add_remove_volume(self):
+ """
+ Tests the nova_utils.create_server() method
+ :return:
+ """
+
+ self.assertIsNotNone(self.volume_creator.get_volume())
+ self.assertEqual(0, len(self.volume_creator.get_volume().attachments))
+
+ # Attach volume to VM
+ neutron = neutron_utils.neutron_client(self.os_creds)
+ nova_utils.attach_volume(
+ self.nova, neutron, self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume())
+
+ time.sleep(10)
+
+ vol_attach = cinder_utils.get_volume_by_id(
+ self.cinder, self.volume_creator.get_volume().id)
+ vm_attach = nova_utils.get_server_object_by_id(
+ self.nova, neutron, self.instance_creator.get_vm_inst().id)
+
+ # Detach volume to VM
+ nova_utils.detach_volume(
+ self.nova, neutron, self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume())
+
+ time.sleep(10)
+
+ vol_detach = cinder_utils.get_volume_by_id(
+ self.cinder, self.volume_creator.get_volume().id)
+ vm_detach = nova_utils.get_server_object_by_id(
+ self.nova, neutron, self.instance_creator.get_vm_inst().id)
+
+ # Validate Attachment
+ self.assertIsNotNone(vol_attach)
+ self.assertEqual(self.volume_creator.get_volume().id, vol_attach.id)
+ self.assertEqual(1, len(vol_attach.attachments))
+ self.assertEqual(vm_attach.volume_ids[0]['id'],
+ vol_attach.attachments[0]['volume_id'])
+
+ # Validate Detachment
+ self.assertIsNotNone(vol_detach)
+ self.assertEqual(self.volume_creator.get_volume().id, vol_detach.id)
+ self.assertEqual(0, len(vol_detach.attachments))
+ self.assertEqual(0, len(vm_detach.volume_ids))
diff --git a/snaps/openstack/utils/tests/settings_utils_tests.py b/snaps/openstack/utils/tests/settings_utils_tests.py
index f84e6a0..cbd78d8 100644
--- a/snaps/openstack/utils/tests/settings_utils_tests.py
+++ b/snaps/openstack/utils/tests/settings_utils_tests.py
@@ -13,17 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+import unittest
+
import os
import uuid
+from snaps.config.network import SubnetConfig, NetworkConfig, PortConfig
+from snaps.config.flavor import FlavorConfig
+from snaps.config.keypair import KeypairConfig
+from snaps.config.qos import Consumer
+from snaps.config.security_group import (
+ SecurityGroupRuleConfig, Direction, Protocol, SecurityGroupConfig)
+from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
+from snaps.domain.flavor import Flavor
+from snaps.domain.volume import (
+ Volume, VolumeType, VolumeTypeEncryption, QoSSpec)
from snaps.openstack import (
create_image, create_network, create_router, create_flavor,
create_keypairs, create_instance)
-from snaps.openstack.create_network import (
- NetworkSettings, OpenStackNetwork, SubnetSettings)
-from snaps.openstack.create_security_group import (
- SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup,
- SecurityGroupSettings)
+from snaps.openstack.create_qos import Consumer
+from snaps.openstack.create_network import OpenStackNetwork
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
@@ -36,7 +46,7 @@ logger = logging.getLogger('nova_utils_tests')
class SettingsUtilsNetworkingTests(OSComponentTestCase):
"""
- Tests the ability to reverse engineer NetworkSettings objects from existing
+ Tests the ability to reverse engineer NetworkConfig objects from existing
networks deployed to OpenStack
"""
@@ -62,16 +72,16 @@ class SettingsUtilsNetworkingTests(OSComponentTestCase):
def test_derive_net_settings_no_subnet(self):
"""
- Validates the utility function settings_utils#create_network_settings
- returns an acceptable NetworkSettings object and ensures that the
+ Validates the utility function settings_utils#create_network_config
+ returns an acceptable NetworkConfig object and ensures that the
new settings object will not cause the new OpenStackNetwork instance
to create another network
"""
- net_settings = NetworkSettings(name=self.network_name)
+ net_settings = NetworkConfig(name=self.network_name)
self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
network = self.net_creator.create()
- derived_settings = settings_utils.create_network_settings(
+ derived_settings = settings_utils.create_network_config(
self.neutron, network)
self.assertIsNotNone(derived_settings)
@@ -89,18 +99,18 @@ class SettingsUtilsNetworkingTests(OSComponentTestCase):
def test_derive_net_settings_two_subnets(self):
"""
- Validates the utility function settings_utils#create_network_settings
- returns an acceptable NetworkSettings object
+ Validates the utility function settings_utils#create_network_config
+ returns an acceptable NetworkConfig object
"""
subnet_settings = list()
- subnet_settings.append(SubnetSettings(name='sub1', cidr='10.0.0.0/24'))
- subnet_settings.append(SubnetSettings(name='sub2', cidr='10.0.1.0/24'))
- net_settings = NetworkSettings(name=self.network_name,
- subnet_settings=subnet_settings)
+ subnet_settings.append(SubnetConfig(name='sub1', cidr='10.0.0.0/24'))
+ subnet_settings.append(SubnetConfig(name='sub2', cidr='10.0.1.0/24'))
+ net_settings = NetworkConfig(
+ name=self.network_name, subnet_settings=subnet_settings)
self.net_creator = OpenStackNetwork(self.os_creds, net_settings)
network = self.net_creator.create()
- derived_settings = settings_utils.create_network_settings(
+ derived_settings = settings_utils.create_network_config(
self.neutron, network)
self.assertIsNotNone(derived_settings)
@@ -135,7 +145,7 @@ class SettingsUtilsNetworkingTests(OSComponentTestCase):
class SettingsUtilsVmInstTests(OSComponentTestCase):
"""
- Tests the ability to reverse engineer VmInstanceSettings objects from
+ Tests the ability to reverse engineer VmInstanceConfig objects from
existing VMs/servers deployed to OpenStack
"""
@@ -144,8 +154,6 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
Instantiates the CreateImage object that is responsible for downloading
and creating an OS image file within OpenStack
"""
- # super(self.__class__, self).__start__()
-
self.nova = nova_utils.nova_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.neutron = neutron_utils.neutron_client(self.os_creds)
@@ -196,13 +204,13 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
# Create Flavor
self.flavor_creator = create_flavor.OpenStackFlavor(
self.os_creds,
- create_flavor.FlavorSettings(name=guid + '-flavor-name',
- ram=256, disk=1, vcpus=1))
+ FlavorConfig(
+ name=guid + '-flavor-name', ram=256, disk=1, vcpus=1))
self.flavor_creator.create()
# Create Key/Pair
self.keypair_creator = create_keypairs.OpenStackKeypair(
- self.os_creds, create_keypairs.KeypairSettings(
+ self.os_creds, KeypairConfig(
name=self.keypair_name,
public_filepath=self.keypair_pub_filepath,
private_filepath=self.keypair_priv_filepath))
@@ -210,32 +218,30 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
# Create Security Group
sec_grp_name = guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
# Create instance
ports_settings = list()
ports_settings.append(
- create_network.PortSettings(
+ PortConfig(
name=self.port_1_name,
network_name=self.pub_net_config.network_settings.name))
- instance_settings = create_instance.VmInstanceSettings(
+ instance_settings = VmInstanceConfig(
name=self.vm_inst_name,
flavor=self.flavor_creator.flavor_settings.name,
port_settings=ports_settings,
- floating_ip_settings=[create_instance.FloatingIpSettings(
+ floating_ip_settings=[FloatingIpConfig(
name=self.floating_ip_name, port_name=self.port_1_name,
router_name=self.pub_net_config.router_settings.name)])
@@ -310,16 +316,17 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
# super(self.__class__, self).__clean__()
- def test_derive_vm_inst_settings(self):
+ def test_derive_vm_inst_config(self):
"""
- Validates the utility function settings_utils#create_vm_inst_settings
- returns an acceptable VmInstanceSettings object
+ Validates the utility function settings_utils#create_vm_inst_config
+ returns an acceptable VmInstanceConfig object
"""
self.inst_creator.create(block=True)
server = nova_utils.get_server(
- self.nova, vm_inst_settings=self.inst_creator.instance_settings)
- derived_vm_settings = settings_utils.create_vm_inst_settings(
+ self.nova, self.neutron,
+ vm_inst_settings=self.inst_creator.instance_settings)
+ derived_vm_settings = settings_utils.create_vm_inst_config(
self.nova, self.neutron, server)
self.assertIsNotNone(derived_vm_settings)
self.assertIsNotNone(derived_vm_settings.port_settings)
@@ -328,14 +335,72 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
def test_derive_image_settings(self):
"""
Validates the utility function settings_utils#create_image_settings
- returns an acceptable ImageSettings object
+ returns an acceptable ImageConfig object
"""
self.inst_creator.create(block=True)
server = nova_utils.get_server(
- self.nova, vm_inst_settings=self.inst_creator.instance_settings)
- derived_image_settings = settings_utils.determine_image_settings(
+ self.nova, self.neutron,
+ vm_inst_settings=self.inst_creator.instance_settings)
+ derived_image_settings = settings_utils.determine_image_config(
self.glance, server, [self.image_creator.image_settings])
self.assertIsNotNone(derived_image_settings)
self.assertEqual(self.image_creator.image_settings.name,
derived_image_settings.name)
+
+
+class SettingsUtilsUnitTests(unittest.TestCase):
+ """
+ Exercises the settings_utils.py functions around volumes
+ """
+
+ def test_vol_settings_from_vol(self):
+ volume = Volume(
+ name='vol-name', volume_id='vol-id', description='desc', size=99,
+ vol_type='vol-type', availability_zone='zone1', multi_attach=True)
+ settings = settings_utils.create_volume_config(volume)
+ self.assertEqual(volume.name, settings.name)
+ self.assertEqual(volume.description, settings.description)
+ self.assertEqual(volume.size, settings.size)
+ self.assertEqual(volume.type, settings.type_name)
+ self.assertEqual(volume.availability_zone, settings.availability_zone)
+ self.assertEqual(volume.multi_attach, settings.multi_attach)
+
+ def test_vol_type_settings_from_vol(self):
+ encryption = VolumeTypeEncryption(
+ volume_encryption_id='vol-encrypt-id', volume_type_id='vol-typ-id',
+ control_location='front-end', provider='FooClass', cipher='1',
+ key_size=1)
+ qos_spec = QoSSpec(name='qos-spec-name', spec_id='qos-spec-id',
+ consumer=Consumer.back_end)
+ volume_type = VolumeType(
+ name='vol-type-name', volume_type_id='vol-type-id', public=True,
+ encryption=encryption, qos_spec=qos_spec)
+
+ settings = settings_utils.create_volume_type_config(volume_type)
+ self.assertEqual(volume_type.name, settings.name)
+ self.assertEqual(volume_type.public, settings.public)
+
+ encrypt_settings = settings.encryption
+ self.assertIsNotNone(encrypt_settings)
+ self.assertEqual(encryption.control_location,
+ encrypt_settings.control_location.value)
+ self.assertEqual(encryption.cipher, encrypt_settings.cipher)
+ self.assertEqual(encryption.key_size, encrypt_settings.key_size)
+
+ self.assertEqual(qos_spec.name, settings.qos_spec_name)
+
+ def test_flavor_settings_from_flavor(self):
+ flavor = Flavor(
+ name='flavor-name', flavor_id='flavor-id', ram=99, disk=101,
+ vcpus=9, ephemeral=3, swap=5, rxtx_factor=7, is_public=False)
+ settings = settings_utils.create_flavor_config(flavor)
+ self.assertEqual(flavor.name, settings.name)
+ self.assertEqual(flavor.id, settings.flavor_id)
+ self.assertEqual(flavor.ram, settings.ram)
+ self.assertEqual(flavor.disk, settings.disk)
+ self.assertEqual(flavor.vcpus, settings.vcpus)
+ self.assertEqual(flavor.ephemeral, settings.ephemeral)
+ self.assertEqual(flavor.swap, settings.swap)
+ self.assertEqual(flavor.rxtx_factor, settings.rxtx_factor)
+ self.assertEqual(flavor.is_public, settings.is_public)