summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--requirements.txt2
-rw-r--r--snaps/domain/test/vm_inst_tests.py49
-rw-r--r--snaps/domain/vm_inst.py44
-rw-r--r--snaps/openstack/create_instance.py84
-rw-r--r--snaps/openstack/tests/create_instance_tests.py69
-rw-r--r--snaps/openstack/tests/openstack_tests.py2
-rw-r--r--snaps/openstack/utils/neutron_utils.py181
-rw-r--r--snaps/openstack/utils/nova_utils.py192
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py686
-rw-r--r--snaps/openstack/utils/tests/nova_utils_tests.py170
-rw-r--r--snaps/provisioning/tests/ansible_utils_tests.py128
-rw-r--r--snaps/test_suite_builder.py396
12 files changed, 1376 insertions, 627 deletions
diff --git a/requirements.txt b/requirements.txt
index c3e748f..c708103 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,4 @@
-python-novaclient!=7.0.0,<8.0.0,>=6.0.0 # Apache-2.0
+python-novaclient>=6.0.0 # Apache-2.0
python-neutronclient>=5.1.0 # Apache-2.0
python-keystoneclient>=3.8.0 # Apache-2.0
python-glanceclient>=2.5.0 # Apache-2.0
diff --git a/snaps/domain/test/vm_inst_tests.py b/snaps/domain/test/vm_inst_tests.py
new file mode 100644
index 0000000..e722a06
--- /dev/null
+++ b/snaps/domain/test/vm_inst_tests.py
@@ -0,0 +1,49 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from snaps.domain.vm_inst import VmInst, FloatingIp
+
+
+class VmInstDomainObjectTests(unittest.TestCase):
+ """
+ Tests the construction of the snaps.domain.test.Image class
+ """
+
+ def test_construction_positional(self):
+ vm_inst = VmInst('name', 'id')
+ self.assertEqual('name', vm_inst.name)
+ self.assertEqual('id', vm_inst.id)
+
+ def test_construction_named(self):
+ vm_inst = VmInst(inst_id='id', name='name')
+ self.assertEqual('name', vm_inst.name)
+ self.assertEqual('id', vm_inst.id)
+
+
+class FloatingIpDomainObjectTests(unittest.TestCase):
+ """
+ Tests the construction of the snaps.domain.test.Image class
+ """
+
+ def test_construction_positional(self):
+ vm_inst = FloatingIp('id-123', '10.0.0.1')
+ self.assertEqual('id-123', vm_inst.id)
+ self.assertEqual('10.0.0.1', vm_inst.ip)
+
+ def test_construction_named(self):
+ vm_inst = FloatingIp(ip='10.0.0.1', inst_id='id-123')
+ self.assertEqual('id-123', vm_inst.id)
+ self.assertEqual('10.0.0.1', vm_inst.ip)
diff --git a/snaps/domain/vm_inst.py b/snaps/domain/vm_inst.py
new file mode 100644
index 0000000..0e12d14
--- /dev/null
+++ b/snaps/domain/vm_inst.py
@@ -0,0 +1,44 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class VmInst:
+ """
+ SNAPS domain object for Images. Should contain attributes that
+ are shared amongst cloud providers
+ """
+ def __init__(self, name, inst_id):
+ """
+ Constructor
+ :param name: the image's name
+ :param inst_id: the instance's id
+ """
+ self.name = name
+ self.id = inst_id
+
+
+class FloatingIp:
+ """
+ SNAPS domain object for Images. Should contain attributes that
+ are shared amongst cloud providers
+ """
+ def __init__(self, inst_id, ip):
+ """
+ Constructor
+ :param inst_id: the floating ip's id
+ :param ip: the IP address
+ """
+ self.id = inst_id
+ self.ip = ip
diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py
index 991aca5..68c421d 100644
--- a/snaps/openstack/create_instance.py
+++ b/snaps/openstack/create_instance.py
@@ -102,7 +102,7 @@ class OpenStackVmInstance:
logger.info(
'Found existing machine with name - %s',
self.instance_settings.name)
- fips = self.__nova.floating_ips.list()
+ fips = neutron_utils.get_floating_ips(self.__nova)
for fip in fips:
if fip.instance_id == server.id:
self.__floating_ips.append(fip)
@@ -116,45 +116,12 @@ class OpenStackVmInstance:
active, error, or timeout waiting. Floating IPs will be
assigned after active when block=True
"""
- nics = []
- for key, port in self.__ports:
- kv = dict()
- kv['port-id'] = port['port']['id']
- nics.append(kv)
-
- logger.info('Creating VM with name - ' + self.instance_settings.name)
- keypair_name = None
- if self.keypair_settings:
- keypair_name = self.keypair_settings.name
-
- flavor = nova_utils.get_flavor_by_name(self.__nova,
- self.instance_settings.flavor)
- if not flavor:
- raise Exception(
- 'Flavor not found with name - %s',
- self.instance_settings.flavor)
-
- image = glance_utils.get_image(
- glance_utils.glance_client(self.__os_creds),
- self.image_settings.name)
- if image:
- self.__vm = self.__nova.servers.create(
- name=self.instance_settings.name,
- flavor=flavor,
- image=image,
- nics=nics,
- key_name=keypair_name,
- security_groups=self.instance_settings.security_group_names,
- userdata=self.instance_settings.userdata,
- availability_zone=self.instance_settings.availability_zone)
-
- else:
- raise Exception(
- 'Cannot create instance, image cannot be located with name %s',
- self.image_settings.name)
-
- logger.info(
- 'Created instance with name - %s', self.instance_settings.name)
+ glance = glance_utils.glance_client(self.__os_creds)
+ self.__vm = nova_utils.create_server(
+ self.__nova, self.__neutron, glance, self.instance_settings,
+ self.image_settings, self.keypair_settings)
+ logger.info('Created instance with name - %s',
+ self.instance_settings.name)
if block:
if not self.vm_active(block=True):
@@ -162,9 +129,7 @@ class OpenStackVmInstance:
'Fatal error, VM did not become ACTIVE within the alloted '
'time')
- # TODO - the call above should add security groups. The return object
- # shows they exist but the association had never been made by
- # OpenStack. This call is here to ensure they have been added
+ # Create server should do this but found it needed to occur here
for sec_grp_name in self.instance_settings.security_group_names:
if self.vm_active(block=True):
nova_utils.add_security_group(self.__nova, self.__vm,
@@ -202,8 +167,8 @@ class OpenStackVmInstance:
if ext_gateway:
subnet = neutron_utils.get_subnet_by_name(
self.__neutron, floating_ip_setting.subnet_name)
- floating_ip = nova_utils.create_floating_ip(self.__nova,
- ext_gateway)
+ floating_ip = neutron_utils.create_floating_ip(
+ self.__neutron, ext_gateway)
self.__floating_ips.append(floating_ip)
self.__floating_ip_dict[floating_ip_setting.name] = floating_ip
@@ -241,7 +206,7 @@ class OpenStackVmInstance:
for floating_ip in self.__floating_ips:
try:
logger.info('Deleting Floating IP - ' + floating_ip.ip)
- nova_utils.delete_floating_ip(self.__nova, floating_ip)
+ neutron_utils.delete_floating_ip(self.__neutron, floating_ip)
except Exception as e:
logger.error('Error deleting Floating IP - ' + str(e))
self.__floating_ips = list()
@@ -345,7 +310,8 @@ class OpenStackVmInstance:
while count > 0:
logger.debug('Attempting to add floating IP to instance')
try:
- self.__vm.add_floating_ip(floating_ip, ip)
+ nova_utils.add_floating_ip_to_server(
+ self.__nova, self.__vm, floating_ip, ip)
logger.info(
'Added floating IP %s to port IP %s on instance %s',
floating_ip.ip, ip, self.instance_settings.name)
@@ -376,7 +342,14 @@ class OpenStackVmInstance:
Returns the latest version of this server object from OpenStack
:return: Server object
"""
- return nova_utils.get_latest_server_object(self.__nova, self.__vm)
+ return self.__vm
+
+ def get_os_vm_server_obj(self):
+ """
+ Returns the OpenStack server object
+ :return: the server object
+ """
+ return nova_utils.get_latest_server_os_object(self.__nova, self.__vm)
def get_port_ip(self, port_name, subnet_name=None):
"""
@@ -466,21 +439,21 @@ class OpenStackVmInstance:
elif len(self.__floating_ips) > 0:
return self.__floating_ips[0]
- def __config_nic(self, nic_name, port, floating_ip):
+ def __config_nic(self, nic_name, port, ip):
"""
Although ports/NICs can contain multiple IPs, this code currently only
supports the first.
:param nic_name: Name of the interface
:param port: The port information containing the expected IP values.
- :param floating_ip: The floating IP on which to apply the playbook.
+ :param ip: The IP on which to apply the playbook.
:return: the return value from ansible
"""
- ip = port['port']['fixed_ips'][0]['ip_address']
+ port_ip = port['port']['fixed_ips'][0]['ip_address']
variables = {
- 'floating_ip': floating_ip,
+ 'floating_ip': ip,
'nic_name': nic_name,
- 'nic_ip': ip
+ 'nic_ip': port_ip
}
if self.image_settings.nic_config_pb_loc and self.keypair_settings:
@@ -594,7 +567,8 @@ class OpenStackVmInstance:
if not self.__vm:
return False
- instance = self.__nova.servers.get(self.__vm.id)
+ instance = nova_utils.get_latest_server_os_object(
+ self.__nova, self.__vm)
if not instance:
logger.warning('Cannot find instance with id - ' + self.__vm.id)
return False
@@ -770,7 +744,7 @@ class VmInstanceSettings:
if kwargs.get('security_group_names'):
if isinstance(kwargs['security_group_names'], list):
- self.security_group_names = kwargs['security_group_names']
+ self.security_group_names = kwargs['security_group_names']
elif isinstance(kwargs['security_group_names'], set):
self.security_group_names = kwargs['security_group_names']
elif isinstance(kwargs['security_group_names'], str):
diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py
index 998fe88..dc8d79b 100644
--- a/snaps/openstack/tests/create_instance_tests.py
+++ b/snaps/openstack/tests/create_instance_tests.py
@@ -261,6 +261,7 @@ class SimpleHealthCheck(OSIntegrationTestCase):
"""
super(self.__class__, self).__start__()
+ self.nova = nova_utils.nova_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.vm_inst_name = guid + '-inst'
self.port_1_name = guid + 'port-1'
@@ -361,7 +362,7 @@ class SimpleHealthCheck(OSIntegrationTestCase):
self.assertTrue(self.inst_creator.vm_active(block=True))
- self.assertTrue(check_dhcp_lease(vm, ip))
+ self.assertTrue(check_dhcp_lease(self.nova, vm, ip))
class CreateInstanceSimpleTests(OSIntegrationTestCase):
@@ -498,6 +499,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
"""
super(self.__class__, self).__start__()
+ self.nova = nova_utils.nova_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.keypair_priv_filepath = 'tmp/' + guid
self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub'
@@ -696,7 +698,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertTrue(inst_creator.vm_active(block=True))
ip = inst_creator.get_port_ip(port_settings.name)
- self.assertTrue(check_dhcp_lease(vm_inst, ip))
+ self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip))
inst_creator.add_security_group(
self.sec_grp_creator.get_security_group())
@@ -734,7 +736,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertTrue(inst_creator.vm_active(block=True))
ip = inst_creator.get_port_ip(port_settings.name)
- self.assertTrue(check_dhcp_lease(vm_inst, ip))
+ self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip))
inst_creator.add_security_group(
self.sec_grp_creator.get_security_group())
@@ -1165,7 +1167,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase):
for zone in zones:
creator = self.inst_creators[index]
self.assertTrue(creator.vm_active(block=True))
- vm = creator.get_vm_inst()
+ vm = creator.get_os_vm_server_obj()
deployed_zone = vm._info['OS-EXT-AZ:availability_zone']
deployed_host = vm._info['OS-EXT-SRV-ATTR:host']
self.assertEqual(zone, deployed_zone + ':' + deployed_host)
@@ -1186,6 +1188,8 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
"""
super(self.__class__, self).__start__()
+ self.nova = nova_utils.nova_client(self.os_creds)
+
# Initialize for tearDown()
self.image_creator = None
self.network_creators = list()
@@ -1387,7 +1391,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
self.assertTrue(self.inst_creator.vm_active(block=True))
ip = self.inst_creator.get_port_ip(ports_settings[0].name)
- self.assertTrue(check_dhcp_lease(vm_inst, ip))
+ self.assertTrue(check_dhcp_lease(self.nova, vm_inst, ip))
# Add security group to VM
self.inst_creator.add_security_group(
@@ -1539,15 +1543,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_creators.append(sec_grp_creator)
# Check that group has not been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
# Add security group to instance after activated
self.inst_creator.add_security_group(sec_grp)
# Validate that security group has been added
- self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertTrue(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
def test_add_invalid_security_group(self):
"""
@@ -1574,15 +1578,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_creators.append(sec_grp_creator)
# Check that group has not been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
# Add security group to instance after activated
self.assertFalse(self.inst_creator.add_security_group(sec_grp))
# Validate that security group has been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
def test_remove_security_group(self):
"""
@@ -1610,14 +1614,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
# Check that group has been added
- self.assertTrue(inst_has_sec_grp(vm_inst, sec_grp_settings.name))
+ self.assertTrue(inst_has_sec_grp(
+ self.nova, vm_inst, sec_grp_settings.name))
# Add security group to instance after activated
self.assertTrue(self.inst_creator.remove_security_group(sec_grp))
# Validate that security group has been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
def test_remove_security_group_never_added(self):
"""
@@ -1644,15 +1649,15 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
# Check that group has been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
# Add security group to instance after activated
self.assertFalse(self.inst_creator.remove_security_group(sec_grp))
# Validate that security group has been added
- self.assertFalse(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertFalse(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
def test_add_same_security_group(self):
"""
@@ -1680,27 +1685,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
# Check that group has been added
- self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertTrue(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
# Add security group to instance after activated
self.assertTrue(self.inst_creator.add_security_group(sec_grp))
# Validate that security group has been added
- self.assertTrue(inst_has_sec_grp(self.inst_creator.get_vm_inst(),
- sec_grp_settings.name))
+ self.assertTrue(inst_has_sec_grp(
+ self.nova, self.inst_creator.get_vm_inst(), sec_grp_settings.name))
-def inst_has_sec_grp(vm_inst, sec_grp_name):
+def inst_has_sec_grp(nova, vm_inst, sec_grp_name):
"""
Returns true if instance has a security group of a given name
+ :param nova: the nova client
+ :param vm_inst: the VmInst domain object
+ :param sec_grp_name: the name of the security group to validate
:return:
"""
- if not hasattr(vm_inst, 'security_groups'):
+ vm = nova_utils.get_latest_server_os_object(nova, vm_inst)
+ if not hasattr(vm, 'security_groups'):
return False
found = False
- for sec_grp_dict in vm_inst.security_groups:
+ for sec_grp_dict in vm.security_groups:
if sec_grp_name in sec_grp_dict['name']:
found = True
break
@@ -2357,10 +2366,11 @@ class CreateInstanceMockOfflineTests(OSComponentTestCase):
self.assertTrue(self.inst_creator.vm_active(block=True))
-def check_dhcp_lease(vm, ip, timeout=160):
+def check_dhcp_lease(nova_client, vm_domain, ip, timeout=160):
"""
Returns true if the expected DHCP lease has been acquired
- :param vm: the OpenStack VM instance object
+ :param nova_client: the nova client
+ :param vm_domain: the SNAPS VM instance domain object
:param ip: the IP address to look for
:param timeout: how long to query for IP address
:return:
@@ -2371,6 +2381,7 @@ def check_dhcp_lease(vm, ip, timeout=160):
logger.info("Looking for IP %s in the console log" % ip)
full_log = ''
while timeout > time.time() - start_time:
+ vm = nova_utils.get_latest_server_os_object(nova_client, vm_domain)
output = vm.get_console_output()
full_log = full_log + output
if re.search(ip, output):
diff --git a/snaps/openstack/tests/openstack_tests.py b/snaps/openstack/tests/openstack_tests.py
index 109d2ce..c0b3a4a 100644
--- a/snaps/openstack/tests/openstack_tests.py
+++ b/snaps/openstack/tests/openstack_tests.py
@@ -158,7 +158,7 @@ def create_image_settings(image_name, image_user, image_format, metadata,
logger.debug('Image metadata - ' + str(metadata))
if metadata and 'config' in metadata:
- return ImageSettings(config=metadata['config'])
+ return ImageSettings(**metadata['config'])
disk_file = None
if metadata:
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 3063176..1889748 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -16,6 +16,7 @@ import logging
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
+from snaps.domain.vm_inst import FloatingIp
from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
@@ -29,11 +30,13 @@ Utilities for basic neutron API calls
def neutron_client(os_creds):
"""
- Instantiates and returns a client for communications with OpenStack's Neutron server
+ Instantiates and returns a client for communications with OpenStack's
+ Neutron server
:param os_creds: the credentials for connecting to the OpenStack remote API
:return: the client object
"""
- return Client(api_version=os_creds.network_api_version, session=keystone_utils.keystone_session(os_creds))
+ return Client(api_version=os_creds.network_api_version,
+ session=keystone_utils.keystone_session(os_creds))
def create_network(neutron, os_creds, network_settings):
@@ -41,8 +44,9 @@ def create_network(neutron, os_creds, network_settings):
Creates a network for OpenStack
:param neutron: the client
:param os_creds: the OpenStack credentials
- :param network_settings: A dictionary containing the network configuration and is responsible for creating the
- network request JSON body
+ :param network_settings: A dictionary containing the network configuration
+ and is responsible for creating the network
+ request JSON body
:return: the network object
"""
if neutron and network_settings:
@@ -67,7 +71,8 @@ def delete_network(neutron, network):
def get_network(neutron, network_name, project_id=None):
"""
- Returns an object (dictionary) of the first network found with a given name and project_id (if included)
+ Returns an object (dictionary) of the first network found with a given name
+ and project_id (if included)
:param neutron: the client
:param network_name: the name of the network to retrieve
:param project_id: the id of the network's project
@@ -110,13 +115,15 @@ def create_subnet(neutron, subnet_settings, os_creds, network=None):
Creates a network subnet for OpenStack
:param neutron: the client
:param network: the network object
- :param subnet_settings: A dictionary containing the subnet configuration and is responsible for creating the subnet
- request JSON body
+ :param subnet_settings: A dictionary containing the subnet configuration
+ and is responsible for creating the subnet request
+ JSON body
:param os_creds: the OpenStack credentials
:return: the subnet object
"""
if neutron and network and subnet_settings:
- json_body = {'subnets': [subnet_settings.dict_for_neutron(os_creds, network=network)]}
+ json_body = {'subnets': [subnet_settings.dict_for_neutron(
+ os_creds, network=network)]}
logger.info('Creating subnet with name ' + subnet_settings.name)
subnets = neutron.create_subnet(body=json_body)
return {'subnet': subnets['subnets'][0]}
@@ -156,8 +163,9 @@ def create_router(neutron, os_creds, router_settings):
Creates a router for OpenStack
:param neutron: the client
:param os_creds: the OpenStack credentials
- :param router_settings: A dictionary containing the router configuration and is responsible for creating the subnet
- request JSON body
+ :param router_settings: A dictionary containing the router configuration
+ and is responsible for creating the subnet request
+ JSON body
:return: the router object
"""
if neutron:
@@ -198,7 +206,8 @@ def get_router_by_name(neutron, router_name):
def add_interface_router(neutron, router, subnet=None, port=None):
"""
- Adds an interface router for OpenStack for either a subnet or port. Exception will be raised if requesting for both.
+ Adds an interface router for OpenStack for either a subnet or port.
+ Exception will be raised if requesting for both.
:param neutron: the client
:param router: the router object
:param subnet: the subnet object
@@ -206,13 +215,18 @@ def add_interface_router(neutron, router, subnet=None, port=None):
:return: the interface router object
"""
if subnet and port:
- raise Exception('Cannot add interface to the router. Both subnet and port were sent in. Either or please.')
+ raise Exception('Cannot add interface to the router. Both subnet and '
+ 'port were sent in. Either or please.')
if neutron and router and (router or subnet):
- logger.info('Adding interface to router with name ' + router['router']['name'])
- return neutron.add_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
+ logger.info('Adding interface to router with name ' +
+ router['router']['name'])
+ return neutron.add_interface_router(
+ router=router['router']['id'],
+ body=__create_port_json_body(subnet, port))
else:
- raise Exception("Unable to create interface router as neutron client, router or subnet were not created")
+ raise Exception('Unable to create interface router as neutron client,'
+ ' router or subnet were not created')
def remove_interface_router(neutron, router, subnet=None, port=None):
@@ -225,10 +239,14 @@ def remove_interface_router(neutron, router, subnet=None, port=None):
"""
if router:
try:
- logger.info('Removing router interface from router named ' + router['router']['name'])
- neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
+ logger.info('Removing router interface from router named ' +
+ router['router']['name'])
+ neutron.remove_interface_router(
+ router=router['router']['id'],
+ body=__create_port_json_body(subnet, port))
except NotFound as e:
- logger.warning('Could not remove router interface. NotFound - ' + str(e))
+ logger.warning('Could not remove router interface. NotFound - %s',
+ e)
pass
else:
logger.warning('Could not remove router interface, No router object')
@@ -236,8 +254,9 @@ def remove_interface_router(neutron, router, subnet=None, port=None):
def __create_port_json_body(subnet=None, port=None):
"""
- Returns the dictionary required for creating and deleting router interfaces. Will only work on a subnet or port
- object. Will throw and exception if parameters contain both or neither
+ Returns the dictionary required for creating and deleting router
+ interfaces. Will only work on a subnet or port object. Will throw and
+ exception if parameters contain both or neither
:param subnet: the subnet object
:param port: the port object
:return: the dict
@@ -262,7 +281,8 @@ def create_port(neutron, os_creds, port_settings):
:return: the port object
"""
json_body = port_settings.dict_for_neutron(neutron, os_creds)
- logger.info('Creating port for network with name - ' + port_settings.network_name)
+ logger.info('Creating port for network with name - %s',
+ port_settings.network_name)
return neutron.create_port(body=json_body)
@@ -299,8 +319,10 @@ def create_security_group(neutron, keystone, sec_grp_settings):
:param sec_grp_settings: the security group settings
:return: the security group object
"""
- logger.info('Creating security group with name - ' + sec_grp_settings.name)
- return neutron.create_security_group(sec_grp_settings.dict_for_neutron(keystone))
+ logger.info('Creating security group with name - %s',
+ sec_grp_settings.name)
+ return neutron.create_security_group(
+ sec_grp_settings.dict_for_neutron(keystone))
def delete_security_group(neutron, sec_grp):
@@ -309,7 +331,8 @@ def delete_security_group(neutron, sec_grp):
:param neutron: the client
:param sec_grp: the security group object to delete
"""
- logger.info('Deleting security group with name - ' + sec_grp['security_group']['name'])
+ logger.info('Deleting security group with name - %s',
+ sec_grp['security_group']['name'])
return neutron.delete_security_group(sec_grp['security_group']['id'])
@@ -350,8 +373,10 @@ def create_security_group_rule(neutron, sec_grp_rule_settings):
:param sec_grp_rule_settings: the security group rule settings
:return: the security group object
"""
- logger.info('Creating security group to security group - ' + sec_grp_rule_settings.sec_grp_name)
- return neutron.create_security_group_rule(sec_grp_rule_settings.dict_for_neutron(neutron))
+ logger.info('Creating security group to security group - %s',
+ sec_grp_rule_settings.sec_grp_name)
+ return neutron.create_security_group_rule(
+ sec_grp_rule_settings.dict_for_neutron(neutron))
def delete_security_group_rule(neutron, sec_grp_rule):
@@ -360,8 +385,10 @@ def delete_security_group_rule(neutron, sec_grp_rule):
:param neutron: the client
:param sec_grp_rule: the security group rule object to delete
"""
- logger.info('Deleting security group rule with ID - ' + sec_grp_rule['security_group_rule']['id'])
- neutron.delete_security_group_rule(sec_grp_rule['security_group_rule']['id'])
+ logger.info('Deleting security group rule with ID - %s',
+ sec_grp_rule['security_group_rule']['id'])
+ neutron.delete_security_group_rule(
+ sec_grp_rule['security_group_rule']['id'])
def get_rules_by_security_group(neutron, sec_grp):
@@ -370,10 +397,11 @@ def get_rules_by_security_group(neutron, sec_grp):
:param neutron: the client
:param sec_grp: the security group object
"""
- logger.info('Retrieving security group rules associate with the security group - ' +
- sec_grp['security_group']['name'])
+ logger.info('Retrieving security group rules associate with the '
+ 'security group - %s', sec_grp['security_group']['name'])
out = list()
- rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']})
+ rules = neutron.list_security_group_rules(
+ **{'security_group_id': sec_grp['security_group']['id']})
for rule in rules['security_group_rules']:
if rule['security_group_id'] == sec_grp['security_group']['id']:
out.append({'security_group_rule': rule})
@@ -387,7 +415,8 @@ def get_rule_by_id(neutron, sec_grp, rule_id):
:param sec_grp: the security group object
:param rule_id: the rule's ID
"""
- rules = neutron.list_security_group_rules(**{'security_group_id': sec_grp['security_group']['id']})
+ rules = neutron.list_security_group_rules(
+ **{'security_group_id': sec_grp['security_group']['id']})
for rule in rules['security_group_rules']:
if rule['id'] == rule_id:
return {'security_group_rule': rule}
@@ -396,11 +425,95 @@ def get_rule_by_id(neutron, sec_grp, rule_id):
def get_external_networks(neutron):
"""
- Returns a list of external OpenStack network object/dict for all external networks
+ Returns a list of external OpenStack network object/dict for all external
+ networks
:param neutron: the client
:return: a list of external networks (empty list if none configured)
"""
out = list()
- for network in neutron.list_networks(**{'router:external': True})['networks']:
+ for network in neutron.list_networks(
+ **{'router:external': True})['networks']:
out.append({'network': network})
return out
+
+
+def get_floating_ips(neutron):
+ """
+ Returns all of the floating IPs
+ :param neutron: the Neutron client
+ :return: a list of SNAPS FloatingIp objects
+ """
+ out = list()
+ fips = neutron.list_floatingips()
+ for fip in fips['floatingips']:
+ out.append(FloatingIp(inst_id=fip['id'],
+ ip=fip['floating_ip_address']))
+
+ return out
+
+
+def create_floating_ip(neutron, ext_net_name):
+ """
+ Returns the floating IP object that was created with this call
+ :param neutron: the Neutron client
+ :param ext_net_name: the name of the external network on which to apply the
+ floating IP address
+ :return: the SNAPS FloatingIp object
+ """
+ logger.info('Creating floating ip to external network - ' + ext_net_name)
+ ext_net = get_network(neutron, ext_net_name)
+ if ext_net:
+ fip = neutron.create_floatingip(
+ body={'floatingip':
+ {'floating_network_id': ext_net['network']['id']}})
+
+ return FloatingIp(inst_id=fip['floatingip']['id'],
+ ip=fip['floatingip']['floating_ip_address'])
+ else:
+ raise Exception('Cannot create floating IP, '
+ 'external network not found')
+
+
+def get_floating_ip(neutron, floating_ip):
+ """
+ Returns a floating IP object that should be identical to the floating_ip
+ parameter
+ :param neutron: the Neutron client
+ :param floating_ip: the SNAPS FloatingIp object
+ :return: hopefully the same floating IP object input
+ """
+ logger.debug('Attempting to retrieve existing floating ip with IP - %s',
+ floating_ip.ip)
+ os_fip = get_os_floating_ip(neutron, floating_ip)
+ if os_fip:
+ return FloatingIp(
+ inst_id=os_fip['id'], ip=os_fip['floating_ip_address'])
+
+
+def get_os_floating_ip(neutron, floating_ip):
+ """
+ Returns an OpenStack floating IP object
+ parameter
+ :param neutron: the Neutron client
+ :param floating_ip: the SNAPS FloatingIp object
+ :return: hopefully the same floating IP object input
+ """
+ logger.debug('Attempting to retrieve existing floating ip with IP - %s',
+ floating_ip.ip)
+ fips = neutron.list_floatingips(ip=floating_ip.id)
+
+ for fip in fips['floatingips']:
+ if fip['id'] == floating_ip.id:
+ return fip
+
+
+def delete_floating_ip(neutron, floating_ip):
+ """
+ Responsible for deleting a floating IP
+ :param neutron: the Neutron client
+ :param floating_ip: the SNAPS FloatingIp object
+ :return:
+ """
+ logger.debug('Attempting to delete existing floating ip with IP - %s',
+ floating_ip.ip)
+ return neutron.delete_floatingip(floating_ip.id)
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index a1b959a..90eec13 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -13,16 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from cryptography.hazmat.primitives import serialization
-from cryptography.hazmat.primitives.asymmetric import rsa
-from cryptography.hazmat.backends import default_backend
-
-import os
import logging
-from snaps.openstack.utils import keystone_utils
+import os
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
from novaclient.client import Client
from novaclient.exceptions import NotFound
+from snaps.domain.vm_inst import VmInst
+from snaps.openstack.utils import keystone_utils, glance_utils, neutron_utils
__author__ = 'spisarski'
@@ -35,12 +35,69 @@ Utilities for basic OpenStack Nova API calls
def nova_client(os_creds):
"""
- Instantiates and returns a client for communications with OpenStack's Nova server
+ Instantiates and returns a client for communications with OpenStack's Nova
+ server
:param os_creds: The connection credentials to the OpenStack API
:return: the client object
"""
logger.debug('Retrieving Nova Client')
- return Client(os_creds.compute_api_version, session=keystone_utils.keystone_session(os_creds))
+ return Client(os_creds.compute_api_version,
+ session=keystone_utils.keystone_session(os_creds))
+
+
+def create_server(nova, neutron, glance, instance_settings, image_settings,
+ keypair_settings=None):
+ """
+ Creates a VM instance
+ :param nova: the nova client (required)
+ :param neutron: the neutron client for retrieving ports (required)
+ :param glance: the glance client (required)
+ :param instance_settings: the VM instance settings object (required)
+ :param image_settings: the VM's image settings object (required)
+ :param keypair_settings: the VM's keypair settings object (optional)
+ :return: a snaps.domain.VmInst object
+ """
+
+ ports = list()
+
+ for port_setting in instance_settings.port_settings:
+ ports.append(neutron_utils.get_port_by_name(
+ neutron, port_setting.name))
+ nics = []
+ for port in ports:
+ kv = dict()
+ kv['port-id'] = port['port']['id']
+ nics.append(kv)
+
+ logger.info('Creating VM with name - ' + instance_settings.name)
+ keypair_name = None
+ if keypair_settings:
+ keypair_name = keypair_settings.name
+
+ flavor = get_flavor_by_name(nova, instance_settings.flavor)
+ if not flavor:
+ raise Exception(
+ 'Flavor not found with name - %s',
+ instance_settings.flavor)
+
+ image = glance_utils.get_image(glance, image_settings.name)
+ if image:
+ args = {'name': instance_settings.name,
+ 'flavor': flavor,
+ 'image': image,
+ 'nics': nics,
+ 'key_name': keypair_name,
+ 'security_groups':
+ instance_settings.security_group_names,
+ 'userdata': instance_settings.userdata,
+ 'availability_zone':
+ instance_settings.availability_zone}
+ server = nova.servers.create(**args)
+ return VmInst(name=server.name, inst_id=server.id)
+ else:
+ raise Exception(
+ 'Cannot create instance, image cannot be located with name %s',
+ image_settings.name)
def get_servers_by_name(nova, name):
@@ -50,7 +107,21 @@ def get_servers_by_name(nova, name):
:param name: the server name
:return: the list of servers
"""
- return nova.servers.list(search_opts={'name': name})
+ out = list()
+ servers = nova.servers.list(search_opts={'name': name})
+ for server in servers:
+ out.append(VmInst(name=server.name, inst_id=server.id))
+ return out
+
+
+def get_latest_server_os_object(nova, server):
+ """
+ Returns a server with a given id
+ :param nova: the Nova client
+ :param server: the domain VmInst object
+ :return: the list of servers or None if not found
+ """
+ return nova.servers.get(server.id)
def get_latest_server_object(nova, server):
@@ -60,7 +131,8 @@ def get_latest_server_object(nova, server):
:param server: the old server object
:return: the list of servers or None if not found
"""
- return nova.servers.get(server)
+ server = get_latest_server_os_object(nova, server)
+ return VmInst(name=server.name, inst_id=server.id)
def create_keys(key_size=2048):
@@ -69,7 +141,8 @@ def create_keys(key_size=2048):
:param key_size: the number of bytes for the key size
:return: the cryptography keys
"""
- return rsa.generate_private_key(backend=default_backend(), public_exponent=65537,
+ return rsa.generate_private_key(backend=default_backend(),
+ public_exponent=65537,
key_size=key_size)
@@ -79,7 +152,8 @@ def public_key_openssh(keys):
:param keys: the keys generated by create_keys() from cryptography
:return: the OpenSSH public key
"""
- return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+ return keys.public_key().public_bytes(serialization.Encoding.OpenSSH,
+ serialization.PublicFormat.OpenSSH)
def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
@@ -97,7 +171,8 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
os.mkdir(pub_dir)
public_handle = open(pub_file_path, 'wb')
public_bytes = keys.public_key().public_bytes(
- serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+ serialization.Encoding.OpenSSH,
+ serialization.PublicFormat.OpenSSH)
public_handle.write(public_bytes)
public_handle.close()
os.chmod(pub_file_path, 0o400)
@@ -107,9 +182,11 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
if not os.path.isdir(priv_dir):
os.mkdir(priv_dir)
private_handle = open(priv_file_path, 'wb')
- private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption()))
+ private_handle.write(
+ keys.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()))
private_handle.close()
os.chmod(priv_file_path, 0o400)
logger.info("Saved private key to - " + priv_file_path)
@@ -179,57 +256,6 @@ def delete_keypair(nova, key):
nova.keypairs.delete(key)
-def get_floating_ip_pools(nova):
- """
- Returns all of the available floating IP pools
- :param nova: the Nova client
- :return: a list of pools
- """
- return nova.floating_ip_pools.list()
-
-
-def get_floating_ips(nova):
- """
- Returns all of the floating IPs
- :param nova: the Nova client
- :return: a list of floating IPs
- """
- return nova.floating_ips.list()
-
-
-def create_floating_ip(nova, ext_net_name):
- """
- Returns the floating IP object that was created with this call
- :param nova: the Nova client
- :param ext_net_name: the name of the external network on which to apply the floating IP address
- :return: the floating IP object
- """
- logger.info('Creating floating ip to external network - ' + ext_net_name)
- return nova.floating_ips.create(ext_net_name)
-
-
-def get_floating_ip(nova, floating_ip):
- """
- Returns a floating IP object that should be identical to the floating_ip parameter
- :param nova: the Nova client
- :param floating_ip: the floating IP object to lookup
- :return: hopefully the same floating IP object input
- """
- logger.debug('Attempting to retrieve existing floating ip with IP - ' + floating_ip.ip)
- return nova.floating_ips.get(floating_ip)
-
-
-def delete_floating_ip(nova, floating_ip):
- """
- Responsible for deleting a floating IP
- :param nova: the Nova client
- :param floating_ip: the floating IP object to delete
- :return:
- """
- logger.debug('Attempting to delete existing floating ip with IP - ' + floating_ip.ip)
- return nova.floating_ips.delete(floating_ip)
-
-
def get_nova_availability_zones(nova):
"""
Returns the names of all nova active compute servers
@@ -251,9 +277,9 @@ def delete_vm_instance(nova, vm_inst):
"""
Deletes a VM instance
:param nova: the nova client
- :param vm_inst: the OpenStack instance object to delete
+ :param vm_inst: the snaps.domain.VmInst object
"""
- nova.servers.delete(vm_inst)
+ nova.servers.delete(vm_inst.id)
def get_flavor_by_name(nova, name):
@@ -276,10 +302,15 @@ def create_flavor(nova, flavor_settings):
:param flavor_settings: the flavor settings
:return: the Flavor
"""
- return nova.flavors.create(name=flavor_settings.name, flavorid=flavor_settings.flavor_id, ram=flavor_settings.ram,
- vcpus=flavor_settings.vcpus, disk=flavor_settings.disk,
- ephemeral=flavor_settings.ephemeral, swap=flavor_settings.swap,
- rxtx_factor=flavor_settings.rxtx_factor, is_public=flavor_settings.is_public)
+ return nova.flavors.create(name=flavor_settings.name,
+ flavorid=flavor_settings.flavor_id,
+ ram=flavor_settings.ram,
+ vcpus=flavor_settings.vcpus,
+ disk=flavor_settings.disk,
+ ephemeral=flavor_settings.ephemeral,
+ swap=flavor_settings.swap,
+ rxtx_factor=flavor_settings.rxtx_factor,
+ is_public=flavor_settings.is_public)
def delete_flavor(nova, flavor):
@@ -308,4 +339,17 @@ def remove_security_group(nova, vm, security_group):
:param vm: the OpenStack server object (VM) to alter
:param security_group: the OpenStack security group object to add
"""
- nova.servers.remove_security_group(str(vm.id), security_group['security_group']['name'])
+ nova.servers.remove_security_group(
+ str(vm.id), security_group['security_group']['name'])
+
+
+def add_floating_ip_to_server(nova, vm, floating_ip, ip_addr):
+ """
+ Adds a floating IP to a server instance
+ :param nova: the nova client
+ :param vm: VmInst domain object
+ :param floating_ip: FloatingIp domain object
+ :param ip_addr: the IP to which to bind the floating IP to
+ """
+ vm = get_latest_server_os_object(nova, vm)
+ vm.add_floating_ip(floating_ip.ip, ip_addr)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index e90b94c..1e89dda 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -14,14 +14,16 @@
# limitations under the License.
import uuid
-from snaps.openstack.utils import keystone_utils
-from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction
-from snaps.openstack.tests import openstack_tests
-from snaps.openstack.utils import neutron_utils
-from snaps.openstack.create_network import NetworkSettings, SubnetSettings, PortSettings
from snaps.openstack import create_router
-from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.create_network import NetworkSettings, SubnetSettings, \
+ PortSettings
+from snaps.openstack.create_security_group import SecurityGroupSettings, \
+ SecurityGroupRuleSettings, Direction
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests import validation_utils
+from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
+from snaps.openstack.utils import keystone_utils
+from snaps.openstack.utils import neutron_utils
__author__ = 'spisarski'
@@ -57,13 +59,14 @@ class NeutronSmokeTests(OSComponentTestCase):
with self.assertRaises(Exception):
neutron = neutron_utils.neutron_client(
- OSCreds(username='user', password='pass', auth_url='url', project_name='project'))
+ OSCreds(username='user', password='pass', auth_url='url',
+ project_name='project'))
neutron.list_networks()
def test_retrieve_ext_network_name(self):
"""
- Tests the neutron_utils.get_external_network_names to ensure the configured self.ext_net_name is contained
- within the returned list
+ Tests the neutron_utils.get_external_network_names to ensure the
+ configured self.ext_net_name is contained within the returned list
:return:
"""
neutron = neutron_utils.neutron_client(self.os_creds)
@@ -86,7 +89,8 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.network = None
- self.net_config = openstack_tests.get_pub_net_config(net_name=guid + '-pub-net')
+ self.net_config = openstack_tests.get_pub_net_config(
+ net_name=guid + '-pub-net')
def tearDown(self):
"""
@@ -94,31 +98,40 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
"""
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_network(self):
"""
Tests the neutron_utils.create_neutron_net() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
def test_create_network_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty network name
+ Tests the neutron_utils.create_neutron_net() function with an empty
+ network name
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, self.os_creds,
- network_settings=NetworkSettings(name=''))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds,
+ network_settings=NetworkSettings(name=''))
def test_create_network_null_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function when the network name is None
+ Tests the neutron_utils.create_neutron_net() function when the network
+ name is None
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, self.os_creds,
- network_settings=NetworkSettings())
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds,
+ network_settings=NetworkSettings())
class NeutronUtilsSubnetTests(OSComponentTestCase):
@@ -133,7 +146,8 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
self.network = None
self.subnet = None
self.net_config = openstack_tests.get_pub_net_config(
- net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet', external_net=self.ext_net_name)
+ net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
+ external_net=self.ext_net_name)
def tearDown(self):
"""
@@ -142,71 +156,108 @@ class NeutronUtilsSubnetTests(OSComponentTestCase):
if self.subnet:
neutron_utils.delete_subnet(self.neutron, self.subnet)
validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[0].cidr, False)
+ self.net_config.network_settings.subnet_settings[
+ 0].cidr, False)
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_subnet(self):
"""
Tests the neutron_utils.create_neutron_net() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, network=self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, network=self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
def test_create_subnet_null_name(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet name is None
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet name is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
SubnetSettings(cidr=self.net_config.subnet_cidr)
def test_create_subnet_empty_name(self):
"""
- Tests the neutron_utils.create_neutron_net() function with an empty name
+ Tests the neutron_utils.create_neutron_net() function with an empty
+ name
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, network=self.network)
- validate_subnet(self.neutron, '', self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, network=self.network)
+ validate_subnet(self.neutron, '',
+ subnet_setting.cidr, True)
def test_create_subnet_null_cidr(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is None
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet CIDR value is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr=None, name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+ sub_sets = SubnetSettings(cidr=None,
+ name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
+ network=self.network)
def test_create_subnet_empty_cidr(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function for an Exception when the subnet CIDR value is empty
+ Tests the neutron_utils.create_neutron_subnet() function for an
+ Exception when the subnet CIDR value is empty
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
with self.assertRaises(Exception):
- sub_sets = SubnetSettings(cidr='', name=self.net_config.subnet_name)
- neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds, network=self.network)
+ sub_sets = SubnetSettings(cidr='',
+ name=self.net_config.subnet_name)
+ neutron_utils.create_subnet(self.neutron, sub_sets, self.os_creds,
+ network=self.network)
class NeutronUtilsRouterTests(OSComponentTestCase):
@@ -232,7 +283,8 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
Cleans the remote OpenStack objects
"""
if self.interface_router:
- neutron_utils.remove_interface_router(self.neutron, self.router, self.subnet)
+ neutron_utils.remove_interface_router(self.neutron, self.router,
+ self.subnet)
if self.router:
neutron_utils.delete_router(self.neutron, self.router)
@@ -244,30 +296,40 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
if self.subnet:
neutron_utils.delete_subnet(self.neutron, self.subnet)
validate_subnet(self.neutron, self.subnet.get('name'),
- self.net_config.network_settings.subnet_settings[0].cidr, False)
+ self.net_config.network_settings.subnet_settings[
+ 0].cidr, False)
if self.network:
neutron_utils.delete_network(self.neutron, self.network)
- validate_network(self.neutron, self.network['network']['name'], False)
+ validate_network(self.neutron, self.network['network']['name'],
+ False)
def test_create_router_simple(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ Tests the neutron_utils.create_neutron_net() function when an external
+ gateway is requested
"""
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
def test_create_router_with_public_interface(self):
"""
- Tests the neutron_utils.create_neutron_net() function when an external gateway is requested
+ Tests the neutron_utils.create_neutron_net() function when an external
+ gateway is requested
"""
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.net_config = openstack_tests.OSNetworkConfig(
self.net_config.network_settings.name,
- self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, self.net_config.router_settings.name,
+ subnet_setting.name,
+ subnet_setting.cidr,
+ self.net_config.router_settings.name,
self.ext_net_name)
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
# TODO - Add validation that the router gatway has been set
def test_create_router_empty_name(self):
@@ -276,83 +338,125 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
"""
with self.assertRaises(Exception):
this_router_settings = create_router.RouterSettings(name='')
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+ self.router = neutron_utils.create_router(self.neutron,
+ self.os_creds,
+ this_router_settings)
def test_create_router_null_name(self):
"""
- Tests the neutron_utils.create_neutron_subnet() function when the subnet CIDR value is None
+ Tests the neutron_utils.create_neutron_subnet() function when the
+ subnet CIDR value is None
"""
with self.assertRaises(Exception):
this_router_settings = create_router.RouterSettings()
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, this_router_settings)
+ self.router = neutron_utils.create_router(self.neutron,
+ self.os_creds,
+ this_router_settings)
validate_router(self.neutron, None, True)
def test_add_interface_router(self):
"""
Tests the neutron_utils.add_interface_router() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
-
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
-
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
- validate_interface_router(self.interface_router, self.router, self.subnet)
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
+
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
+
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
+ validate_interface_router(self.interface_router, self.router,
+ self.subnet)
def test_add_interface_router_null_router(self):
"""
- Tests the neutron_utils.add_interface_router() function for an Exception when the router value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.add_interface_router() function for an
+ Exception when the router value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
def test_add_interface_router_null_subnet(self):
"""
- Tests the neutron_utils.add_interface_router() function for an Exception when the subnet value is None
+ Tests the neutron_utils.add_interface_router() function for an
+ Exception when the subnet value is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.router = neutron_utils.create_router(self.neutron, self.os_creds, self.net_config.router_settings)
- validate_router(self.neutron, self.net_config.router_settings.name, True)
+ self.router = neutron_utils.create_router(
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(self.neutron, self.net_config.router_settings.name,
+ True)
with self.assertRaises(Exception):
- self.interface_router = neutron_utils.add_interface_router(self.neutron, self.router, self.subnet)
+ self.interface_router = neutron_utils.add_interface_router(
+ self.neutron, self.router, self.subnet)
def test_create_port(self):
"""
Tests the neutron_utils.create_port() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ validate_subnet(self.neutron, subnet_setting.name,
+ subnet_setting.cidr, True)
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
name=self.port_name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}],
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}],
network_name=self.net_config.network_settings.name))
validate_port(self.neutron, self.port, self.port_name)
@@ -360,111 +464,172 @@ class NeutronUtilsRouterTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_port() function
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting, self.os_creds, self.network)
+ validate_subnet(self.neutron, subnet_setting.name, subnet_setting.cidr,
+ True)
self.port = neutron_utils.create_port(
self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings[0].name, 'ip': ip_1}]))
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
validate_port(self.neutron, self.port, self.port_name)
def test_create_port_null_name(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the port name value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the port name value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': ip_1}]))
def test_create_port_null_network_object(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the network object is None
+ Tests the neutron_utils.create_port() function for an Exception when
+ the network object is None
"""
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
-
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- self.neutron, self.port_name, self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': ip_1}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name':
+ self.net_config.network_settings.subnet_settings[
+ 0].name,
+ 'ip': ip_1}]))
def test_create_port_null_ip(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron, subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(
+ self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': None}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': None}]))
def test_create_port_invalid_ip(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron,
+ subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name, 'ip': 'foo'}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': 'foo'}]))
def test_create_port_invalid_ip_to_subnet(self):
"""
- Tests the neutron_utils.create_port() function for an Exception when the IP value is None
- """
- self.network = neutron_utils.create_network(self.neutron, self.os_creds, self.net_config.network_settings)
- self.assertEqual(self.net_config.network_settings.name, self.network['network']['name'])
- self.assertTrue(validate_network(self.neutron, self.net_config.network_settings.name, True))
-
- self.subnet = neutron_utils.create_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0],
- self.os_creds, self.network)
- validate_subnet(self.neutron, self.net_config.network_settings.subnet_settings[0].name,
- self.net_config.network_settings.subnet_settings[0].cidr, True)
+ Tests the neutron_utils.create_port() function for an Exception when
+ the IP value is None
+ """
+ self.network = neutron_utils.create_network(
+ self.neutron, self.os_creds, self.net_config.network_settings)
+ self.assertEqual(self.net_config.network_settings.name,
+ self.network['network']['name'])
+ self.assertTrue(validate_network(self.neutron,
+ self.net_config.network_settings.name,
+ True))
+
+ subnet_setting = self.net_config.network_settings.subnet_settings[0]
+ self.subnet = neutron_utils.create_subnet(
+ self.neutron,
+ subnet_setting,
+ self.os_creds, self.network)
+ validate_subnet(self.neutron,
+ subnet_setting.name,
+ subnet_setting.cidr, True)
with self.assertRaises(Exception):
- self.port = neutron_utils.create_port(self.neutron, self.os_creds, PortSettings(
- name=self.port_name, network_name=self.net_config.network_settings.name,
- ip_addrs=[{'subnet_name': self.net_config.network_settings.subnet_settings.name,
- 'ip': '10.197.123.100'}]))
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds,
+ PortSettings(
+ name=self.port_name,
+ network_name=self.net_config.network_settings.name,
+ ip_addrs=[{
+ 'subnet_name': subnet_setting.name,
+ 'ip': '10.197.123.100'}]))
class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
@@ -490,7 +655,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
for security_group in self.security_groups:
try:
- neutron_utils.delete_security_group(self.neutron, security_group)
+ neutron_utils.delete_security_group(self.neutron,
+ security_group)
except:
pass
@@ -499,72 +665,102 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
Tests the neutron_utils.create_security_group() function
"""
sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
- security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)
+ security_group = neutron_utils.create_security_group(self.neutron,
+ self.keystone,
+ sec_grp_settings)
- self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.name,
+ security_group['security_group']['name'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group['security_group'], sec_grp_get['security_group']))
neutron_utils.delete_security_group(self.neutron, security_group)
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
"""
- Tests the SecurityGroupSettings constructor and neutron_utils.create_security_group() function to ensure
- that attempting to create a security group without a name will raise an exception
+ Tests the SecurityGroupSettings constructor and
+ neutron_utils.create_security_group() function to ensure that
+ attempting to create a security group without a name will raise an
+ exception
"""
with self.assertRaises(Exception):
sec_grp_settings = SecurityGroupSettings()
self.security_groups.append(
- neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
+ neutron_utils.create_security_group(self.neutron,
+ self.keystone,
+ sec_grp_settings))
def test_create_sec_grp_no_rules(self):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
+ description='hello group')
+ self.security_groups.append(
+ neutron_utils.create_security_group(self.neutron, self.keystone,
+ sec_grp_settings))
- self.assertTrue(sec_grp_settings.name, self.security_groups[0]['security_group']['name'])
- self.assertTrue(sec_grp_settings.description, self.security_groups[0]['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name,
+ self.security_groups[0]['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description,
+ self.security_groups[0]['security_group'][
+ 'description'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
- self.security_groups[0]['security_group'], sec_grp_get['security_group']))
+ self.security_groups[0]['security_group'],
+ sec_grp_get['security_group']))
def test_create_sec_grp_one_rule(self):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_rule_settings = SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=[sec_grp_rule_settings])
+ sec_grp_rule_settings = SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=[sec_grp_rule_settings])
- self.security_groups.append(neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings))
- free_rules = neutron_utils.get_rules_by_security_group(self.neutron, self.security_groups[0])
+ self.security_groups.append(
+ neutron_utils.create_security_group(self.neutron, self.keystone,
+ sec_grp_settings))
+ free_rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.security_groups[0])
for free_rule in free_rules:
self.security_group_rules.append(free_rule)
self.security_group_rules.append(
- neutron_utils.create_security_group_rule(self.neutron, sec_grp_settings.rule_settings[0]))
+ neutron_utils.create_security_group_rule(
+ self.neutron, sec_grp_settings.rule_settings[0]))
# Refresh object so it is populated with the newly added rule
- security_group = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ security_group = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings.name)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, security_group)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ security_group)
- self.assertTrue(validation_utils.objects_equivalent(self.security_group_rules, rules))
+ self.assertTrue(
+ validation_utils.objects_equivalent(self.security_group_rules,
+ rules))
- self.assertTrue(sec_grp_settings.name, security_group['security_group']['name'])
- self.assertTrue(sec_grp_settings.description, security_group['security_group']['description'])
+ self.assertTrue(sec_grp_settings.name,
+ security_group['security_group']['name'])
+ self.assertTrue(sec_grp_settings.description,
+ security_group['security_group']['description'])
- sec_grp_get = neutron_utils.get_security_group(self.neutron, sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(self.neutron,
+ sec_grp_settings.name)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group['security_group'], sec_grp_get['security_group']))
@@ -576,18 +772,59 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-1', description='hello group')))
+ SecurityGroupSettings(name=self.sec_grp_name + '-1',
+ description='hello group')))
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-2', description='hello group')))
+ SecurityGroupSettings(name=self.sec_grp_name + '-2',
+ description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0]['security_group']['id'])
sec_grp_2b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[1]['security_group']['id'])
- self.assertEqual(self.security_groups[0]['security_group']['id'], sec_grp_1b['security_group']['id'])
- self.assertEqual(self.security_groups[1]['security_group']['id'], sec_grp_2b['security_group']['id'])
+ self.assertEqual(self.security_groups[0]['security_group']['id'],
+ sec_grp_1b['security_group']['id'])
+ self.assertEqual(self.security_groups[1]['security_group']['id'],
+ sec_grp_2b['security_group']['id'])
+
+
+class NeutronUtilsFloatingIpTests(OSComponentTestCase):
+ """
+ Test basic nova keypair functionality
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
+ """
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.floating_ip = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.floating_ip:
+ neutron_utils.delete_floating_ip(self.neutron, self.floating_ip)
+
+ def test_floating_ips(self):
+ """
+ Tests the creation of a floating IP
+ :return:
+ """
+ initial_fips = neutron_utils.get_floating_ips(self.neutron)
+
+ self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
+ self.ext_net_name)
+ all_fips = neutron_utils.get_floating_ips(self.neutron)
+ self.assertEqual(len(initial_fips) + 1, len(all_fips))
+ returned = neutron_utils.get_floating_ip(self.neutron,
+ self.floating_ip)
+ self.assertEqual(self.floating_ip.id, returned.id)
+ self.assertEqual(self.floating_ip.ip, returned.ip)
"""
@@ -597,8 +834,9 @@ Validation routines
def validate_network(neutron, name, exists):
"""
- Returns true if a network for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a network for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a network for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a network for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected network name
:param exists: Whether or not the network name should exist or not
@@ -614,8 +852,9 @@ def validate_network(neutron, name, exists):
def validate_subnet(neutron, name, cidr, exists):
"""
- Returns true if a subnet for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a subnet for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a subnet for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a subnet for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected subnet name
:param cidr: The expected CIDR value
@@ -632,8 +871,9 @@ def validate_subnet(neutron, name, cidr, exists):
def validate_router(neutron, name, exists):
"""
- Returns true if a router for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a router for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a router for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a router for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param name: The expected router name
:param exists: Whether or not the network name should exist or not
@@ -647,7 +887,8 @@ def validate_router(neutron, name, exists):
def validate_interface_router(interface_router, router, subnet):
"""
- Returns true if the router ID & subnet ID have been properly included into the interface router object
+ Returns true if the router ID & subnet ID have been properly included into
+ the interface router object
:param interface_router: the object to validate
:param router: to validate against the interface_router
:param subnet: to validate against the interface_router
@@ -661,8 +902,9 @@ def validate_interface_router(interface_router, router, subnet):
def validate_port(neutron, port_obj, this_port_name):
"""
- Returns true if a port for a given name DOES NOT exist if the exists parameter is false conversely true.
- Returns false if a port for a given name DOES exist if the exists parameter is true conversely false.
+ Returns true if a port for a given name DOES NOT exist if the exists
+ parameter is false conversely true. Returns false if a port for a given
+ name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
:param port_obj: The port object to lookup
:param this_port_name: The expected router name
diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py
index 98aa889..552ffc7 100644
--- a/snaps/openstack/utils/tests/nova_utils_tests.py
+++ b/snaps/openstack/utils/tests/nova_utils_tests.py
@@ -13,12 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import os
import uuid
-from snaps.openstack.utils import nova_utils
+import os
+from snaps.openstack.create_flavor import FlavorSettings, OpenStackFlavor
+from snaps.openstack.create_image import OpenStackImage
+from snaps.openstack.create_instance import VmInstanceSettings
+from snaps.openstack.create_network import OpenStackNetwork, PortSettings
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
-from snaps.openstack.create_flavor import FlavorSettings
+from snaps.openstack.utils import nova_utils, neutron_utils, glance_utils
__author__ = 'spisarski'
@@ -46,8 +50,10 @@ class NovaSmokeTests(OSComponentTestCase):
from snaps.openstack.os_credentials import OSCreds
nova = nova_utils.nova_client(
- OSCreds(username='user', password='pass', auth_url=self.os_creds.auth_url,
- project_name=self.os_creds.project_name, proxy_settings=self.os_creds.proxy_settings))
+ OSCreds(username='user', password='pass',
+ auth_url=self.os_creds.auth_url,
+ project_name=self.os_creds.project_name,
+ proxy_settings=self.os_creds.proxy_settings))
# This should throw an exception
with self.assertRaises(Exception):
@@ -61,8 +67,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
def setUp(self):
"""
- Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
"""
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.priv_key_file_path = 'tmp/' + guid
@@ -73,7 +79,6 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
self.public_key = nova_utils.public_key_openssh(self.keys)
self.keypair_name = guid
self.keypair = None
- self.floating_ip = None
def tearDown(self):
"""
@@ -97,14 +102,12 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
except:
pass
- if self.floating_ip:
- nova_utils.delete_floating_ip(self.nova, self.floating_ip)
-
def test_create_keypair(self):
"""
Tests the creation of an OpenStack keypair that does not exist.
"""
- self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
+ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
+ self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
self.assertEqual(self.keypair, result)
keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
@@ -114,7 +117,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
"""
Tests the creation of an OpenStack keypair that does not exist.
"""
- self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
+ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name,
+ self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
self.assertEqual(self.keypair, result)
nova_utils.delete_keypair(self.nova, self.keypair)
@@ -126,25 +130,16 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
Tests that the generated RSA keys are properly saved to files
:return:
"""
- nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path)
- self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path)
+ nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path,
+ self.priv_key_file_path)
+ self.keypair = nova_utils.upload_keypair_file(self.nova,
+ self.keypair_name,
+ self.pub_key_file_path)
pub_key_file = open(os.path.expanduser(self.pub_key_file_path))
pub_key = pub_key_file.read()
pub_key_file.close()
self.assertEqual(self.keypair.public_key, pub_key)
- def test_floating_ips(self):
- """
- Tests the creation of a floating IP
- :return:
- """
- ips = nova_utils.get_floating_ips(self.nova)
- self.assertIsNotNone(ips)
-
- self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name)
- returned = nova_utils.get_floating_ip(self.nova, self.floating_ip)
- self.assertEqual(self.floating_ip, returned)
-
class NovaUtilsFlavorTests(OSComponentTestCase):
"""
@@ -153,12 +148,15 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
def setUp(self):
"""
- Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
"""
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
- self.flavor_settings = FlavorSettings(name=guid + '-name', flavor_id=guid + '-id', ram=1, disk=1, vcpus=1,
- ephemeral=1, swap=2, rxtx_factor=3.0, is_public=False)
+ self.flavor_settings = FlavorSettings(name=guid + '-name',
+ flavor_id=guid + '-id', ram=1,
+ disk=1, vcpus=1,
+ ephemeral=1, swap=2,
+ rxtx_factor=3.0, is_public=False)
self.nova = nova_utils.nova_client(self.os_creds)
self.flavor = None
@@ -186,7 +184,8 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
self.flavor = nova_utils.create_flavor(self.nova, self.flavor_settings)
self.validate_flavor()
nova_utils.delete_flavor(self.nova, self.flavor)
- flavor = nova_utils.get_flavor_by_name(self.nova, self.flavor_settings.name)
+ flavor = nova_utils.get_flavor_by_name(self.nova,
+ self.flavor_settings.name)
self.assertIsNone(flavor)
def validate_flavor(self):
@@ -206,5 +205,110 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
else:
self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
- self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
+ self.assertEqual(self.flavor_settings.rxtx_factor,
+ self.flavor.rxtx_factor)
self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
+
+
+class NovaUtilsInstanceTests(OSComponentTestCase):
+ """
+ Tests the creation of VM instances via nova_utils.py
+ """
+
+ def setUp(self):
+ """
+ Setup objects required by VM instances
+ :return:
+ """
+
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+
+ self.nova = nova_utils.nova_client(self.os_creds)
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.glance = glance_utils.glance_client(self.os_creds)
+
+ self.image_creator = None
+ self.network_creator = None
+ self.flavor_creator = None
+ self.port = None
+ self.vm_inst = None
+
+ try:
+ image_settings = openstack_tests.cirros_image_settings(
+ name=guid + '-image', image_metadata=self.image_metadata)
+ self.image_creator = OpenStackImage(
+ self.os_creds, image_settings=image_settings)
+ self.image_creator.create()
+
+ network_settings = openstack_tests.get_priv_net_config(
+ guid + '-net', guid + '-subnet').network_settings
+ self.network_creator = OpenStackNetwork(
+ self.os_creds, network_settings)
+ self.network_creator.create()
+
+ self.flavor_creator = OpenStackFlavor(
+ self.os_creds,
+ FlavorSettings(
+ name=guid + '-flavor-name', ram=128, disk=10, vcpus=1))
+ self.flavor_creator.create()
+
+ port_settings = PortSettings(name=guid + '-port',
+ network_name=network_settings.name)
+ self.port = neutron_utils.create_port(
+ self.neutron, self.os_creds, port_settings)
+
+ self.instance_settings = VmInstanceSettings(
+ name=guid + '-vm_inst',
+ flavor=self.flavor_creator.flavor_settings.name,
+ port_settings=[port_settings])
+ except:
+ self.tearDown()
+ raise
+
+ def tearDown(self):
+ """
+ Cleanup deployed resources
+ :return:
+ """
+ if self.vm_inst:
+ try:
+ nova_utils.delete_vm_instance(self.nova, self.vm_inst)
+ except:
+ pass
+ if self.port:
+ try:
+ neutron_utils.delete_port(self.neutron, self.port)
+ except:
+ pass
+ if self.flavor_creator:
+ try:
+ self.flavor_creator.clean()
+ except:
+ pass
+ if self.network_creator:
+ try:
+ self.network_creator.clean()
+ except:
+ pass
+ if self.image_creator:
+ try:
+ self.image_creator.clean()
+ except:
+ pass
+
+ def test_create_instance(self):
+ """
+ Tests the nova_utils.create_server() method
+ :return:
+ """
+
+ self.vm_inst = nova_utils.create_server(
+ self.nova, self.neutron, self.glance, self.instance_settings,
+ self.image_creator.image_settings)
+
+ self.assertIsNotNone(self.vm_inst)
+
+ vm_inst = nova_utils.get_latest_server_object(self.nova, self.vm_inst)
+
+ self.assertEqual(self.vm_inst.name, vm_inst.name)
+ self.assertEqual(self.vm_inst.id, vm_inst.id)
diff --git a/snaps/provisioning/tests/ansible_utils_tests.py b/snaps/provisioning/tests/ansible_utils_tests.py
index cddedcd..733068f 100644
--- a/snaps/provisioning/tests/ansible_utils_tests.py
+++ b/snaps/provisioning/tests/ansible_utils_tests.py
@@ -13,21 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import uuid
+
import os
import pkg_resources
-import uuid
from scp import SCPClient
-from snaps.openstack.create_security_group import SecurityGroupRuleSettings, Direction, Protocol, \
- OpenStackSecurityGroup, SecurityGroupSettings
-
from snaps.openstack import create_flavor
-from snaps.openstack import create_instance
from snaps.openstack import create_image
+from snaps.openstack import create_instance
from snaps.openstack import create_keypairs
from snaps.openstack import create_network
from snaps.openstack import create_router
+from snaps.openstack.create_security_group import (
+ SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup,
+ SecurityGroupSettings)
from snaps.openstack.tests import openstack_tests, create_instance_tests
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
+from snaps.openstack.utils import nova_utils
from snaps.provisioning import ansible_utils
VM_BOOT_TIMEOUT = 600
@@ -38,16 +40,19 @@ ip_2 = '10.0.1.200'
class AnsibleProvisioningTests(OSIntegrationTestCase):
"""
- Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP and eth1 w/o
+ Test for the CreateInstance class with two NIC/Ports, eth0 with floating IP
+ and eth1 w/o
"""
def setUp(self):
"""
- Instantiates the CreateImage object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateImage object that is responsible for downloading
+ and creating an OS image file within OpenStack
"""
super(self.__class__, self).__start__()
+ self.nova = nova_utils.nova_client(self.os_creds)
+
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.keypair_priv_filepath = 'tmp/' + guid
self.keypair_pub_filepath = self.keypair_priv_filepath + '.pub'
@@ -69,62 +74,78 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
try:
# Create Image
- os_image_settings = openstack_tests.ubuntu_image_settings(name=guid + '-' + '-image',
- image_metadata=self.image_metadata)
- self.image_creator = create_image.OpenStackImage(self.os_creds, os_image_settings)
+ os_image_settings = openstack_tests.ubuntu_image_settings(
+ name=guid + '-' + '-image',
+ image_metadata=self.image_metadata)
+ self.image_creator = create_image.OpenStackImage(self.os_creds,
+ os_image_settings)
self.image_creator.create()
# First network is public
self.pub_net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
- router_name=guid + '-pub-router', external_net=self.ext_net_name)
+ router_name=guid + '-pub-router',
+ external_net=self.ext_net_name)
- self.network_creator = create_network.OpenStackNetwork(self.os_creds, self.pub_net_config.network_settings)
+ self.network_creator = create_network.OpenStackNetwork(
+ self.os_creds, self.pub_net_config.network_settings)
self.network_creator.create()
# Create routers
- self.router_creator = create_router.OpenStackRouter(self.os_creds, self.pub_net_config.router_settings)
+ self.router_creator = create_router.OpenStackRouter(
+ self.os_creds, self.pub_net_config.router_settings)
self.router_creator.create()
# Create Flavor
self.flavor_creator = create_flavor.OpenStackFlavor(
self.admin_os_creds,
- create_flavor.FlavorSettings(name=guid + '-flavor-name', ram=2048, disk=10, vcpus=2,
+ create_flavor.FlavorSettings(name=guid + '-flavor-name',
+ ram=2048, disk=10, vcpus=2,
metadata=self.flavor_metadata))
self.flavor_creator.create()
# Create Key/Pair
self.keypair_creator = create_keypairs.OpenStackKeypair(
self.os_creds, create_keypairs.KeypairSettings(
- name=self.keypair_name, public_filepath=self.keypair_pub_filepath,
+ name=self.keypair_name,
+ public_filepath=self.keypair_pub_filepath,
private_filepath=self.keypair_priv_filepath))
self.keypair_creator.create()
# Create Security Group
sec_grp_name = guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
+ direction=Direction.ingress,
protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, direction=Direction.ingress,
- protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
+ rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
+ direction=Direction.ingress,
+ protocol=Protocol.tcp,
+ port_range_min=22,
+ port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name, rule_settings=[rule1, rule2]))
+ SecurityGroupSettings(name=sec_grp_name,
+ rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
# Create instance
ports_settings = list()
ports_settings.append(
- create_network.PortSettings(name=self.port_1_name,
- network_name=self.pub_net_config.network_settings.name))
+ create_network.PortSettings(
+ name=self.port_1_name,
+ network_name=self.pub_net_config.network_settings.name))
instance_settings = create_instance.VmInstanceSettings(
- name=self.vm_inst_name, flavor=self.flavor_creator.flavor_settings.name, port_settings=ports_settings,
+ name=self.vm_inst_name,
+ flavor=self.flavor_creator.flavor_settings.name,
+ port_settings=ports_settings,
floating_ip_settings=[create_instance.FloatingIpSettings(
name=self.floating_ip_name, port_name=self.port_1_name,
router_name=self.pub_net_config.router_settings.name)])
self.inst_creator = create_instance.OpenStackVmInstance(
- self.os_creds, instance_settings, self.image_creator.image_settings,
+ self.os_creds, instance_settings,
+ self.image_creator.image_settings,
keypair_settings=self.keypair_creator.keypair_settings)
except:
self.tearDown()
@@ -165,10 +186,13 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
def test_apply_simple_playbook(self):
"""
- Tests application of an Ansible playbook that simply copies over a file:
- 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False
- 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False
- Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail.
+ Tests application of an Ansible playbook that simply copies over a file
+ 1. Have a ~/.ansible.cfg (or alternate means) to
+ set host_key_checking = False
+ 2. Set the following environment variable in your executing shell:
+ ANSIBLE_HOST_KEY_CHECKING=False
+ Should this not be performed, the creation of the host ssh key will
+ cause your ansible calls to fail.
"""
vm = self.inst_creator.create(block=True)
@@ -176,10 +200,12 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
priv_ip = self.inst_creator.get_port_ip(self.port_1_name)
- self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip))
+ self.assertTrue(create_instance_tests.check_dhcp_lease(
+ self.nova, vm, priv_ip))
# Apply Security Group
- self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
+ self.inst_creator.add_security_group(
+ self.sec_grp_creator.get_security_group())
ssh_client = self.inst_creator.ssh_client()
self.assertIsNotNone(ssh_client)
@@ -187,16 +213,19 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
self.assertIsNotNone(out)
self.assertGreater(len(out), 1)
- # Need to use the first floating IP as subsequent ones are currently broken with Apex CO
+ # Need to use the first floating IP as subsequent ones are currently
+ # broken with Apex CO
ip = self.inst_creator.get_floating_ip().ip
user = self.inst_creator.get_image_user()
priv_key = self.inst_creator.keypair_settings.private_filepath
- relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks', 'simple_playbook.yml')
+ relative_pb_path = pkg_resources.resource_filename(
+ 'snaps.provisioning.tests.playbooks', 'simple_playbook.yml')
retval = self.inst_creator.apply_ansible_playbook(relative_pb_path)
self.assertEqual(0, retval)
- ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings)
+ ssh = ansible_utils.ssh_client(ip, user, priv_key,
+ self.os_creds.proxy_settings)
self.assertIsNotNone(ssh)
scp = SCPClient(ssh.get_transport())
scp.get('~/hello.txt', self.test_file_local_path)
@@ -209,10 +238,14 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
def test_apply_template_playbook(self):
"""
- Tests application of an Ansible playbook that applies a template to a file:
- 1. Have a ~/.ansible.cfg (or alternate means) to set host_key_checking = False
- 2. Set the following environment variable in your executing shell: ANSIBLE_HOST_KEY_CHECKING=False
- Should this not be performed, the creation of the host ssh key will cause your ansible calls to fail.
+ Tests application of an Ansible playbook that applies a template to a
+ file:
+ 1. Have a ~/.ansible.cfg (or alternate means) to set
+ host_key_checking = False
+ 2. Set the following environment variable in your executing shell:
+ ANSIBLE_HOST_KEY_CHECKING=False
+ Should this not be performed, the creation of the host ssh key will
+ cause your ansible calls to fail.
"""
vm = self.inst_creator.create(block=True)
@@ -220,22 +253,29 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
priv_ip = self.inst_creator.get_port_ip(self.port_1_name)
- self.assertTrue(create_instance_tests.check_dhcp_lease(vm, priv_ip))
+ self.assertTrue(create_instance_tests.check_dhcp_lease(
+ self.nova, vm, priv_ip))
# Apply Security Group
- self.inst_creator.add_security_group(self.sec_grp_creator.get_security_group())
+ self.inst_creator.add_security_group(
+ self.sec_grp_creator.get_security_group())
- # Need to use the first floating IP as subsequent ones are currently broken with Apex CO
+ # Need to use the first floating IP as subsequent ones are currently
+ # broken with Apex CO
ip = self.inst_creator.get_floating_ip().ip
user = self.inst_creator.get_image_user()
priv_key = self.inst_creator.keypair_settings.private_filepath
- relative_pb_path = pkg_resources.resource_filename('snaps.provisioning.tests.playbooks',
- 'template_playbook.yml')
- retval = self.inst_creator.apply_ansible_playbook(relative_pb_path, variables={'name': 'Foo'})
+ relative_pb_path = pkg_resources.resource_filename(
+ 'snaps.provisioning.tests.playbooks',
+ 'template_playbook.yml')
+ retval = self.inst_creator.apply_ansible_playbook(relative_pb_path,
+ variables={
+ 'name': 'Foo'})
self.assertEqual(0, retval)
- ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings)
+ ssh = ansible_utils.ssh_client(ip, user, priv_key,
+ self.os_creds.proxy_settings)
self.assertIsNotNone(ssh)
scp = SCPClient(ssh.get_transport())
scp.get('/tmp/hello.txt', self.test_file_local_path)
diff --git a/snaps/test_suite_builder.py b/snaps/test_suite_builder.py
index e4f5685..d87ae5a 100644
--- a/snaps/test_suite_builder.py
+++ b/snaps/test_suite_builder.py
@@ -18,34 +18,54 @@ import unittest
from snaps.domain.test.image_tests import ImageDomainObjectTests
from snaps.domain.test.stack_tests import StackDomainObjectTests
-from snaps.openstack.tests.create_stack_tests import StackSettingsUnitTests, CreateStackSuccessTests, \
- CreateStackNegativeTests
-from snaps.openstack.utils.tests.glance_utils_tests import GlanceSmokeTests, GlanceUtilsTests
+from snaps.domain.test.vm_inst_tests import (VmInstDomainObjectTests,
+ FloatingIpDomainObjectTests)
from snaps.openstack.tests.create_flavor_tests import CreateFlavorTests
-from snaps.openstack.utils.tests.heat_utils_tests import HeatUtilsCreateStackTests, HeatSmokeTests
+from snaps.openstack.tests.create_image_tests import (
+ CreateImageSuccessTests, CreateImageNegativeTests, ImageSettingsUnitTests,
+ CreateMultiPartImageTests)
+from snaps.openstack.tests.create_instance_tests import (
+ CreateInstanceSingleNetworkTests, CreateInstancePubPrivNetTests,
+ CreateInstanceOnComputeHost, CreateInstanceSimpleTests,
+ FloatingIpSettingsUnitTests, InstanceSecurityGroupTests,
+ VmInstanceSettingsUnitTests, CreateInstancePortManipulationTests,
+ SimpleHealthCheck, CreateInstanceFromThreePartImage,
+ CreateInstanceMockOfflineTests)
+from snaps.openstack.tests.create_keypairs_tests import (
+ CreateKeypairsTests, KeypairSettingsUnitTests)
+from snaps.openstack.tests.create_network_tests import (
+ CreateNetworkSuccessTests, NetworkSettingsUnitTests, PortSettingsUnitTests,
+ SubnetSettingsUnitTests, CreateNetworkTypeTests)
+from snaps.openstack.tests.create_project_tests import (
+ CreateProjectSuccessTests, ProjectSettingsUnitTests,
+ CreateProjectUserTests)
+from snaps.openstack.tests.create_router_tests import (
+ CreateRouterSuccessTests, CreateRouterNegativeTests)
+from snaps.openstack.tests.create_security_group_tests import (
+ CreateSecurityGroupTests, SecurityGroupRuleSettingsUnitTests,
+ SecurityGroupSettingsUnitTests)
+from snaps.openstack.tests.create_stack_tests import (
+ StackSettingsUnitTests, CreateStackSuccessTests, CreateStackNegativeTests)
+from snaps.openstack.tests.create_user_tests import (
+ UserSettingsUnitTests, CreateUserSuccessTests)
+from snaps.openstack.tests.os_source_file_test import (
+ OSComponentTestCase, OSIntegrationTestCase)
+from snaps.openstack.utils.tests.glance_utils_tests import (
+ GlanceSmokeTests, GlanceUtilsTests)
+from snaps.openstack.utils.tests.heat_utils_tests import (
+ HeatUtilsCreateStackTests, HeatSmokeTests)
+from snaps.openstack.utils.tests.keystone_utils_tests import (
+ KeystoneSmokeTests, KeystoneUtilsTests)
+from snaps.openstack.utils.tests.neutron_utils_tests import (
+ NeutronSmokeTests, NeutronUtilsNetworkTests, NeutronUtilsSubnetTests,
+ NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests,
+ NeutronUtilsFloatingIpTests)
+from snaps.openstack.utils.tests.nova_utils_tests import (
+ NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests,
+ NovaUtilsInstanceTests)
+from snaps.provisioning.tests.ansible_utils_tests import (
+ AnsibleProvisioningTests)
from snaps.tests.file_utils_tests import FileUtilsTests
-from snaps.openstack.tests.create_security_group_tests import CreateSecurityGroupTests, \
- SecurityGroupRuleSettingsUnitTests, SecurityGroupSettingsUnitTests
-from snaps.openstack.tests.create_project_tests import CreateProjectSuccessTests, ProjectSettingsUnitTests, \
- CreateProjectUserTests
-from snaps.openstack.tests.create_user_tests import UserSettingsUnitTests, CreateUserSuccessTests
-from snaps.openstack.utils.tests.keystone_utils_tests import KeystoneSmokeTests, KeystoneUtilsTests
-from snaps.openstack.utils.tests.neutron_utils_tests import NeutronSmokeTests, NeutronUtilsNetworkTests, \
- NeutronUtilsSubnetTests, NeutronUtilsRouterTests, NeutronUtilsSecurityGroupTests
-from snaps.openstack.tests.create_image_tests import CreateImageSuccessTests, CreateImageNegativeTests, \
- ImageSettingsUnitTests, CreateMultiPartImageTests
-from snaps.openstack.tests.create_keypairs_tests import CreateKeypairsTests, KeypairSettingsUnitTests
-from snaps.openstack.tests.create_network_tests import CreateNetworkSuccessTests, NetworkSettingsUnitTests, \
- PortSettingsUnitTests, SubnetSettingsUnitTests, CreateNetworkTypeTests
-from snaps.openstack.tests.create_router_tests import CreateRouterSuccessTests, CreateRouterNegativeTests
-from snaps.openstack.tests.create_instance_tests import CreateInstanceSingleNetworkTests, \
- CreateInstancePubPrivNetTests, CreateInstanceOnComputeHost, CreateInstanceSimpleTests, \
- FloatingIpSettingsUnitTests, InstanceSecurityGroupTests, VmInstanceSettingsUnitTests, \
- CreateInstancePortManipulationTests, SimpleHealthCheck, CreateInstanceFromThreePartImage, \
- CreateInstanceMockOfflineTests
-from snaps.provisioning.tests.ansible_utils_tests import AnsibleProvisioningTests
-from snaps.openstack.tests.os_source_file_test import OSComponentTestCase, OSIntegrationTestCase
-from snaps.openstack.utils.tests.nova_utils_tests import NovaSmokeTests, NovaUtilsKeypairTests, NovaUtilsFlavorTests
__author__ = 'spisarski'
@@ -57,59 +77,95 @@ def add_unit_tests(suite):
:return: None as the tests will be adding to the 'suite' parameter object
"""
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FileUtilsTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupRuleSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SecurityGroupSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FloatingIpSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(VmInstanceSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests))
-
-
-def add_openstack_client_tests(suite, os_creds, ext_net_name, use_keystone=True, log_level=logging.INFO):
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ SecurityGroupRuleSettingsUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ SecurityGroupSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(ImageSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(ImageDomainObjectTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(KeypairSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(UserSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(ProjectSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(NetworkSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(SubnetSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(PortSettingsUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ FloatingIpSettingsUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ VmInstanceSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(StackDomainObjectTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(StackSettingsUnitTests))
+ suite.addTest(
+ unittest.TestLoader().loadTestsFromTestCase(VmInstDomainObjectTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ FloatingIpDomainObjectTests))
+
+
+def add_openstack_client_tests(suite, os_creds, ext_net_name,
+ use_keystone=True, log_level=logging.INFO):
"""
Adds tests written to exercise OpenStack client retrieval
:param suite: the unittest.TestSuite object to which to add the tests
- :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack
- :param ext_net_name: the name of an external network on the cloud under test
- :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on
- a host that has access to the cloud's private network
+ :param os_creds: and instance of OSCreds that holds the credentials
+ required by OpenStack
+ :param ext_net_name: the name of an external network on the cloud under
+ test
+ :param use_keystone: when True, tests requiring direct access to Keystone
+ are added as these need to be running on a host that
+ has access to the cloud's private network
:param log_level: the logging level
:return: None as the tests will be adding to the 'suite' parameter object
"""
# Basic connection tests
- suite.addTest(OSComponentTestCase.parameterize(GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
- log_level=log_level))
+ suite.addTest(
+ OSComponentTestCase.parameterize(
+ GlanceSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
if use_keystone:
- suite.addTest(OSComponentTestCase.parameterize(KeystoneSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
- log_level=log_level))
+ suite.addTest(
+ OSComponentTestCase.parameterize(
+ KeystoneSmokeTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
- suite.addTest(OSComponentTestCase.parameterize(NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
- log_level=log_level))
- suite.addTest(OSComponentTestCase.parameterize(NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
- log_level=log_level))
- suite.addTest(OSComponentTestCase.parameterize(HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
- log_level=log_level))
+ suite.addTest(
+ OSComponentTestCase.parameterize(
+ NeutronSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
+ suite.addTest(
+ OSComponentTestCase.parameterize(
+ NovaSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
+ suite.addTest(
+ OSComponentTestCase.parameterize(
+ HeatSmokeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
-def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, image_metadata=None,
- log_level=logging.INFO):
+def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True,
+ image_metadata=None, log_level=logging.INFO):
"""
Adds tests written to exercise all existing OpenStack APIs
:param suite: the unittest.TestSuite object to which to add the tests
- :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack
- :param ext_net_name: the name of an external network on the cloud under test
- :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on
- a host that has access to the cloud's private network
- :param image_metadata: dict() object containing metadata for creating an image with custom config
+ :param os_creds: Instance of OSCreds that holds the credentials
+ required by OpenStack
+ :param ext_net_name: the name of an external network on the cloud under
+ test
+ :param use_keystone: when True, tests requiring direct access to Keystone
+ are added as these need to be running on a host that
+ has access to the cloud's private network
+ :param image_metadata: dict() object containing metadata for creating an
+ image with custom config
(see YAML files in examples/image-metadata)
:param log_level: the logging level
:return: None as the tests will be adding to the 'suite' parameter object
@@ -117,131 +173,203 @@ def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True, im
# Tests the OpenStack API calls
if use_keystone:
suite.addTest(OSComponentTestCase.parameterize(
- KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ KeystoneUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- CreateUserSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateUserSuccessTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- CreateProjectSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateProjectSuccessTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- CreateProjectUserTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateProjectUserTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name, image_metadata=image_metadata,
+ GlanceUtilsTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ image_metadata=image_metadata,
+ log_level=log_level))
+ suite.addTest(OSComponentTestCase.parameterize(
+ NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name,
log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NeutronUtilsNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
+ suite.addTest(OSComponentTestCase.parameterize(
+ NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NeutronUtilsSubnetTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NeutronUtilsSecurityGroupTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NeutronUtilsRouterTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NeutronUtilsFloatingIpTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NeutronUtilsSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NovaUtilsKeypairTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- NovaUtilsFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ NovaUtilsInstanceTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateFlavorTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- HeatUtilsCreateStackTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level,
+ HeatUtilsCreateStackTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level,
image_metadata=image_metadata))
-def add_openstack_integration_tests(suite, os_creds, ext_net_name, use_keystone=True, flavor_metadata=None,
- image_metadata=None, use_floating_ips=True, log_level=logging.INFO):
+def add_openstack_integration_tests(suite, os_creds, ext_net_name,
+ use_keystone=True, flavor_metadata=None,
+ image_metadata=None, use_floating_ips=True,
+ log_level=logging.INFO):
"""
- Adds tests written to exercise all long-running OpenStack integration tests meaning they will be creating VM
- instances and potentially performing some SSH functions through floating IPs
+ Adds tests written to exercise all long-running OpenStack integration tests
+ meaning they will be creating VM instances and potentially performing some
+ SSH functions through floatingIPs
:param suite: the unittest.TestSuite object to which to add the tests
- :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack
- :param ext_net_name: the name of an external network on the cloud under test
- :param use_keystone: when True, tests requiring direct access to Keystone are added as these need to be running on
- a host that has access to the cloud's private network
- :param image_metadata: dict() object containing metadata for creating an image with custom config
+ :param os_creds: and instance of OSCreds that holds the credentials
+ required by OpenStack
+ :param ext_net_name: the name of an external network on the cloud under
+ test
+ :param use_keystone: when True, tests requiring direct access to Keystone
+ are added as these need to be running on a host that
+ has access to the cloud's private network
+ :param image_metadata: dict() object containing metadata for creating an
+ image with custom config
(see YAML files in examples/image-metadata)
- :param flavor_metadata: dict() object containing the metadata required by your flavor based on your configuration:
+ :param flavor_metadata: dict() object containing the metadata required by
+ your flavor based on your configuration:
(i.e. {'hw:mem_page_size': 'large'})
- :param use_floating_ips: when true, all tests requiring Floating IPs will be added to the suite
+ :param use_floating_ips: when true, all tests requiring Floating IPs will
+ be added to the suite
:param log_level: the logging level
:return: None as the tests will be adding to the 'suite' parameter object
"""
- # Tests the OpenStack API calls via a creator. If use_keystone, objects will be created with a custom user
- # and project
+ # Tests the OpenStack API calls via a creator. If use_keystone, objects
+ # will be created with a custom user and project
# Creator Object tests
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateImageSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateImageNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateMultiPartImageTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateMultiPartImageTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateKeypairsTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateNetworkSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateNetworkSuccessTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateRouterSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateRouterNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateRouterNegativeTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
# VM Instances
suite.addTest(OSIntegrationTestCase.parameterize(
- SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ SimpleHealthCheck, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstanceSimpleTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstanceSimpleTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstancePortManipulationTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstancePortManipulationTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- InstanceSecurityGroupTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ InstanceSecurityGroupTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstanceOnComputeHost, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstanceOnComputeHost, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstanceFromThreePartImage, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstanceFromThreePartImage, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateStackSuccessTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateStackNegativeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
if use_floating_ips:
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstanceSingleNetworkTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstanceSingleNetworkTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstancePubPrivNetTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ CreateInstancePubPrivNetTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- AnsibleProvisioningTests, os_creds=os_creds, ext_net_name=ext_net_name, use_keystone=use_keystone,
- flavor_metadata=flavor_metadata, image_metadata=image_metadata, log_level=log_level))
+ AnsibleProvisioningTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
-def add_openstack_staging_tests(suite, os_creds, ext_net_name, log_level=logging.INFO):
+def add_openstack_staging_tests(suite, os_creds, ext_net_name,
+ log_level=logging.INFO):
"""
- Adds tests that are still in development have not been designed to run successfully against all OpenStack pods
+ Adds tests that are still in development have not been designed to run
+ successfully against all OpenStack pods
:param suite: the unittest.TestSuite object to which to add the tests
- :param os_creds: and instance of OSCreds that holds the credentials required by OpenStack
- :param ext_net_name: the name of an external network on the cloud under test
+ :param os_creds: Instance of OSCreds that holds the credentials
+ required by OpenStack
+ :param ext_net_name: the name of an external network on the cloud under
+ test
:param log_level: the logging level
:return: None as the tests will be adding to the 'suite' parameter object
"""
suite.addTest(OSComponentTestCase.parameterize(
- CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateNetworkTypeTests, os_creds=os_creds, ext_net_name=ext_net_name,
+ log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- CreateInstanceMockOfflineTests, os_creds=os_creds, ext_net_name=ext_net_name, log_level=log_level))
+ CreateInstanceMockOfflineTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level))