summaryrefslogtreecommitdiffstats
path: root/snaps/openstack
diff options
context:
space:
mode:
authorspisarski <s.pisarski@cablelabs.com>2017-05-04 12:43:53 -0600
committerspisarski <s.pisarski@cablelabs.com>2017-05-08 09:31:16 -0600
commitd1c4675e5c35257b84cd69969c6e67575faf19db (patch)
tree63ff3b41ff445b8c192ec33c0aefb913a112b613 /snaps/openstack
parent40c96233ca4c27de1b3d8b53187c302ddd6208a2 (diff)
Modified code to support both Python 2.7 and 3.x
* Tested on Python 2.7.10 and 3.4.4 * Updated installation documentation JIRA: SNAPS-30 Change-Id: I94a37d218be8ea47bbbcfb560197737430fcb3ba Signed-off-by: spisarski <s.pisarski@cablelabs.com>
Diffstat (limited to 'snaps/openstack')
-rw-r--r--snaps/openstack/create_image.py2
-rw-r--r--snaps/openstack/create_instance.py30
-rw-r--r--snaps/openstack/create_keypairs.py42
-rw-r--r--snaps/openstack/create_network.py4
-rw-r--r--snaps/openstack/create_project.py23
-rw-r--r--snaps/openstack/create_router.py74
-rw-r--r--snaps/openstack/create_security_group.py55
-rw-r--r--snaps/openstack/tests/create_flavor_tests.py87
-rw-r--r--snaps/openstack/tests/create_image_tests.py192
-rw-r--r--snaps/openstack/tests/create_instance_tests.py233
-rw-r--r--snaps/openstack/tests/create_keypairs_tests.py52
-rw-r--r--snaps/openstack/tests/create_network_tests.py184
-rw-r--r--snaps/openstack/tests/create_project_tests.py36
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py90
-rw-r--r--snaps/openstack/tests/create_user_tests.py33
-rw-r--r--snaps/openstack/tests/validation_utils.py4
-rw-r--r--snaps/openstack/utils/glance_utils.py4
-rw-r--r--snaps/openstack/utils/neutron_utils.py16
-rw-r--r--snaps/openstack/utils/nova_utils.py44
-rw-r--r--snaps/openstack/utils/tests/keystone_utils_tests.py6
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py10
-rw-r--r--snaps/openstack/utils/tests/nova_utils_tests.py38
22 files changed, 634 insertions, 625 deletions
diff --git a/snaps/openstack/create_image.py b/snaps/openstack/create_image.py
index 4f2bd7e..1e2e4dc 100644
--- a/snaps/openstack/create_image.py
+++ b/snaps/openstack/create_image.py
@@ -176,7 +176,7 @@ class OpenStackImage:
# TODO - Place this API call into glance_utils.
status = glance_utils.get_image_status(self.__glance, self.__image)
if not status:
- logger.warn('Cannot image status for image with ID - ' + self.__image.id)
+ logger.warning('Cannot image status for image with ID - ' + self.__image.id)
return False
if status == 'ERROR':
diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py
index dda68fd..9fa3232 100644
--- a/snaps/openstack/create_instance.py
+++ b/snaps/openstack/create_instance.py
@@ -210,7 +210,7 @@ class OpenStackVmInstance:
logger.info('Deleting Floating IP - ' + floating_ip.ip)
nova_utils.delete_floating_ip(self.__nova, floating_ip)
except Exception as e:
- logger.error('Error deleting Floating IP - ' + e.message)
+ logger.error('Error deleting Floating IP - ' + str(e))
self.__floating_ips = list()
self.__floating_ip_dict = dict()
@@ -220,7 +220,7 @@ class OpenStackVmInstance:
try:
neutron_utils.delete_port(self.__neutron, port)
except PortNotFoundClient as e:
- logger.warn('Unexpected error deleting port - ' + e.message)
+ logger.warning('Unexpected error deleting port - ' + str(e))
pass
self.__ports = list()
@@ -243,7 +243,7 @@ class OpenStackVmInstance:
logger.error('VM not deleted within the timeout period of ' +
str(self.instance_settings.vm_delete_timeout) + ' seconds')
except Exception as e:
- logger.error('Unexpected error while checking VM instance status - ' + e.message)
+ logger.error('Unexpected error while checking VM instance status - ' + str(e))
def __setup_ports(self, port_settings, cleanup):
"""
@@ -303,7 +303,7 @@ class OpenStackVmInstance:
' on instance - ' + self.instance_settings.name)
return
except Exception as e:
- logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + e.message)
+ logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + str(e))
time.sleep(poll_interval)
count -= 1
pass
@@ -341,7 +341,7 @@ class OpenStackVmInstance:
if subnet_name:
subnet = neutron_utils.get_subnet_by_name(self.__neutron, subnet_name)
if not subnet:
- logger.warn('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
+ logger.warning('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
return None
for fixed_ip in port_dict['fixed_ips']:
if fixed_ip['subnet_id'] == subnet['subnet']['id']:
@@ -374,7 +374,7 @@ class OpenStackVmInstance:
for key, port in self.__ports:
if key == port_name:
return port
- logger.warn('Cannot find port with name - ' + port_name)
+ logger.warning('Cannot find port with name - ' + port_name)
return None
def config_nics(self):
@@ -427,7 +427,7 @@ class OpenStackVmInstance:
[floating_ip], self.get_image_user(), self.keypair_settings.private_filepath,
variables, self.__os_creds.proxy_settings)
else:
- logger.warn('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
+ logger.warning('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
'No playbook or keypairs found.')
def get_image_user(self):
@@ -452,7 +452,7 @@ class OpenStackVmInstance:
return self.__vm_status_check(STATUS_DELETED, block, self.instance_settings.vm_delete_timeout,
poll_interval)
except NotFound as e:
- logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + e.message)
+ logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + str(e))
return True
def vm_active(self, block=False, poll_interval=POLL_INTERVAL):
@@ -500,7 +500,7 @@ class OpenStackVmInstance:
"""
instance = self.__nova.servers.get(self.__vm.id)
if not instance:
- logger.warn('Cannot find instance with id - ' + self.__vm.id)
+ logger.warning('Cannot find instance with id - ' + self.__vm.id)
return False
if instance.status == 'ERROR':
@@ -575,7 +575,7 @@ class OpenStackVmInstance:
self.keypair_settings.private_filepath,
proxy_settings=self.__os_creds.proxy_settings)
else:
- logger.warn('Cannot return an SSH client. No Floating IP configured')
+ logger.warning('Cannot return an SSH client. No Floating IP configured')
def add_security_group(self, security_group):
"""
@@ -586,14 +586,14 @@ class OpenStackVmInstance:
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot add')
+ logger.warning('Security group object is None, cannot add')
return False
try:
nova_utils.add_security_group(self.__nova, self.get_vm_inst(), security_group['security_group']['name'])
return True
except NotFound as e:
- logger.warn('Security group not added - ' + e.message)
+ logger.warning('Security group not added - ' + str(e))
return False
def remove_security_group(self, security_group):
@@ -605,14 +605,14 @@ class OpenStackVmInstance:
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot remove')
+ logger.warning('Security group object is None, cannot remove')
return False
try:
nova_utils.remove_security_group(self.__nova, self.get_vm_inst(), security_group)
return True
except NotFound as e:
- logger.warn('Security group not removed - ' + e.message)
+ logger.warning('Security group not removed - ' + str(e))
return False
@@ -659,7 +659,7 @@ class VmInstanceSettings:
self.security_group_names = set(config['security_group_names'])
elif isinstance(config['security_group_names'], set):
self.security_group_names = config['security_group_names']
- elif isinstance(config['security_group_names'], basestring):
+ elif isinstance(config['security_group_names'], str):
self.security_group_names = [config['security_group_names']]
else:
raise Exception('Invalid data type for security_group_names attribute')
diff --git a/snaps/openstack/create_keypairs.py b/snaps/openstack/create_keypairs.py
index ea7c811..6af40c6 100644
--- a/snaps/openstack/create_keypairs.py
+++ b/snaps/openstack/create_keypairs.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,7 +15,6 @@
import logging
import os
-from Crypto.PublicKey import RSA
from novaclient.exceptions import NotFound
from snaps.openstack.utils import nova_utils
@@ -50,28 +49,23 @@ class OpenStackKeypair:
"""
logger.info('Creating keypair %s...' % self.keypair_settings.name)
- try:
- self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
-
- if not self.__keypair and not cleanup:
- if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
- logger.info("Uploading existing keypair")
- self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
- self.keypair_settings.public_filepath)
- else:
- logger.info("Creating new keypair")
- # TODO - Make this value configurable
- keys = RSA.generate(1024)
- self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
- keys.publickey().exportKey('OpenSSH'))
- nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
- self.keypair_settings.private_filepath)
-
- return self.__keypair
- except Exception as e:
- logger.error('Unexpected error creating keypair named - ' + self.keypair_settings.name)
- self.clean()
- raise Exception(e.message)
+ self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
+
+ if not self.__keypair and not cleanup:
+ if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
+ logger.info("Uploading existing keypair")
+ self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
+ self.keypair_settings.public_filepath)
+ else:
+ logger.info("Creating new keypair")
+ # TODO - Make this value configurable
+ keys = nova_utils.create_keys(1024)
+ self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
+ nova_utils.public_key_openssh(keys))
+ nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
+ self.keypair_settings.private_filepath)
+
+ return self.__keypair
def clean(self):
"""
diff --git a/snaps/openstack/create_network.py b/snaps/openstack/create_network.py
index a214ba1..210c2bf 100644
--- a/snaps/openstack/create_network.py
+++ b/snaps/openstack/create_network.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -89,7 +89,7 @@ class OpenStackNetwork:
logger.info('Deleting subnet with name ' + subnet['subnet']['name'])
neutron_utils.delete_subnet(self.__neutron, subnet)
except NotFound as e:
- logger.warn('Error deleting subnet with message - ' + e.message)
+ logger.warning('Error deleting subnet with message - ' + str(e))
pass
self.__subnets = list()
diff --git a/snaps/openstack/create_project.py b/snaps/openstack/create_project.py
index 60f9ed0..2658d13 100644
--- a/snaps/openstack/create_project.py
+++ b/snaps/openstack/create_project.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,19 +47,14 @@ class OpenStackProject:
:param cleanup: Denotes whether or not this is being called for cleanup or not
:return: The OpenStack Image object
"""
- try:
- self.__project = keystone_utils.get_project(keystone=self.__keystone,
- project_name=self.project_settings.name)
- if self.__project:
- logger.info('Found project with name - ' + self.project_settings.name)
- elif not cleanup:
- self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
- else:
- logger.info('Did not create image due to cleanup mode')
- except Exception as e:
- logger.error('Unexpected error. Rolling back')
- self.clean()
- raise Exception(e.message)
+ self.__project = keystone_utils.get_project(keystone=self.__keystone,
+ project_name=self.project_settings.name)
+ if self.__project:
+ logger.info('Found project with name - ' + self.project_settings.name)
+ elif not cleanup:
+ self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
+ else:
+ logger.info('Did not create image due to cleanup mode')
return self.__project
diff --git a/snaps/openstack/create_router.py b/snaps/openstack/create_router.py
index 70c6b76..5c461cc 100644
--- a/snaps/openstack/create_router.py
+++ b/snaps/openstack/create_router.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -59,46 +59,42 @@ class OpenStackRouter:
:return: the router object
"""
logger.debug('Creating Router with name - ' + self.router_settings.name)
- try:
- existing = False
- router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
- if router_inst:
- self.__router = router_inst
- existing = True
+ existing = False
+ router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
+ if router_inst:
+ self.__router = router_inst
+ existing = True
+ else:
+ if not cleanup:
+ self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
+
+ for internal_subnet_name in self.router_settings.internal_subnets:
+ internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
+ if internal_subnet:
+ self.__internal_subnets.append(internal_subnet)
+ if internal_subnet and not cleanup and not existing:
+ logger.debug('Adding router to subnet...')
+ self.__internal_router_interface = neutron_utils.add_interface_router(
+ self.__neutron, self.__router, subnet=internal_subnet)
else:
- if not cleanup:
- self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
-
- for internal_subnet_name in self.router_settings.internal_subnets:
- internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
- if internal_subnet:
- self.__internal_subnets.append(internal_subnet)
- if internal_subnet and not cleanup and not existing:
- logger.debug('Adding router to subnet...')
- self.__internal_router_interface = neutron_utils.add_interface_router(
- self.__neutron, self.__router, subnet=internal_subnet)
- else:
- raise Exception('Subnet not found with name ' + internal_subnet_name)
+ raise Exception('Subnet not found with name ' + internal_subnet_name)
+
+ for port_setting in self.router_settings.port_settings:
+ port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
+ logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if port:
+ self.__ports.append(port)
- for port_setting in self.router_settings.port_settings:
- port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
- logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if not port and not cleanup and not existing:
+ port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
if port:
+ logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
self.__ports.append(port)
+ neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
+ else:
+ raise Exception('Error creating port with name - ' + port_setting.name)
- if not port and not cleanup and not existing:
- port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
- if port:
- logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
- self.__ports.append(port)
- neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
- else:
- raise Exception('Error creating port with name - ' + port_setting.name)
-
- return self.__router
- except Exception as e:
- self.clean()
- raise Exception(e.message)
+ return self.__router
def clean(self):
"""
@@ -163,8 +159,6 @@ class RouterSettings:
policies.
:param external_gateway: Name of the external network to which to route
:param admin_state_up: The administrative status of the router. True = up / False = down (default True)
- :param enable_snat: Boolean value. Enable Source NAT (SNAT) attribute. Default is True. To persist this
- attribute value, set the enable_snat_by_default option in the neutron.conf file.
:param external_fixed_ips: Dictionary containing the IP address parameters.
:param internal_subnets: List of subnet names to which to connect this router for Floating IP purposes
:param port_settings: List of PortSettings objects
@@ -238,7 +232,7 @@ class RouterSettings:
else:
raise Exception('Could not find the external network named - ' + self.external_gateway)
- #TODO: Enable SNAT option for Router
- #TODO: Add external_fixed_ips Tests
+ # TODO: Enable SNAT option for Router
+ # TODO: Add external_fixed_ips Tests
return {'router': out}
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py
index fc1ee98..57f7284 100644
--- a/snaps/openstack/create_security_group.py
+++ b/snaps/openstack/create_security_group.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -58,7 +58,7 @@ class OpenStackSecurityGroup:
if not self.__security_group and not cleanup:
# Create the security group
self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone,
- self.sec_grp_settings)
+ self.sec_grp_settings)
# Get the rules added for free
auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group)
@@ -118,11 +118,11 @@ class OpenStackSecurityGroup:
"""
Removes and deletes the rules then the security group.
"""
- for setting, rule in self.__rules.iteritems():
+ for setting, rule in self.__rules.items():
try:
neutron_utils.delete_security_group_rule(self.__neutron, rule)
except NotFound as e:
- logger.warn('Rule not found, cannot delete - ' + e.message)
+ logger.warning('Rule not found, cannot delete - ' + str(e))
pass
self.__rules = dict()
@@ -130,7 +130,7 @@ class OpenStackSecurityGroup:
try:
neutron_utils.delete_security_group(self.__neutron, self.__security_group)
except NotFound as e:
- logger.warn('Security Group not found, cannot delete - ' + e.message)
+ logger.warning('Security Group not found, cannot delete - ' + str(e))
self.__security_group = None
@@ -177,7 +177,7 @@ class OpenStackSecurityGroup:
if rule_setting:
self.__rules.pop(rule_setting)
else:
- logger.warn('Rule setting is None, cannot remove rule')
+ logger.warning('Rule setting is None, cannot remove rule')
def __get_setting_from_rule(self, rule):
"""
@@ -460,17 +460,16 @@ def map_direction(direction):
"""
if not direction:
return None
- if type(direction) is Direction:
+ if isinstance(direction, Direction):
return direction
- elif isinstance(direction, basestring):
- if direction == 'egress':
+ else:
+ dir_str = str(direction)
+ if dir_str == 'egress':
return Direction.egress
- elif direction == 'ingress':
+ elif dir_str == 'ingress':
return Direction.ingress
else:
- raise Exception('Invalid Direction - ' + direction)
- else:
- raise Exception('Invalid Direction object - ' + str(direction))
+ raise Exception('Invalid Direction - ' + dir_str)
def map_protocol(protocol):
@@ -482,21 +481,20 @@ def map_protocol(protocol):
"""
if not protocol:
return None
- elif type(protocol) is Protocol:
+ elif isinstance(protocol, Protocol):
return protocol
- elif isinstance(protocol, basestring):
- if protocol == 'icmp':
+ else:
+ proto_str = str(protocol)
+ if proto_str == 'icmp':
return Protocol.icmp
- elif protocol == 'tcp':
+ elif proto_str == 'tcp':
return Protocol.tcp
- elif protocol == 'udp':
+ elif proto_str == 'udp':
return Protocol.udp
- elif protocol == 'null':
+ elif proto_str == 'null':
return Protocol.null
else:
- raise Exception('Invalid Protocol - ' + protocol)
- else:
- raise Exception('Invalid Protocol object - ' + str(protocol))
+ raise Exception('Invalid Protocol - ' + proto_str)
def map_ethertype(ethertype):
@@ -508,14 +506,13 @@ def map_ethertype(ethertype):
"""
if not ethertype:
return None
- elif type(ethertype) is Ethertype:
+ elif isinstance(ethertype, Ethertype):
return ethertype
- elif isinstance(ethertype, basestring):
- if ethertype == 'IPv6':
+ else:
+ eth_str = str(ethertype)
+ if eth_str == 'IPv6':
return Ethertype.IPv6
- elif ethertype == 'IPv4':
+ elif eth_str == 'IPv4':
return Ethertype.IPv4
else:
- raise Exception('Invalid Ethertype - ' + ethertype)
- else:
- raise Exception('Invalid Ethertype object - ' + str(ethertype))
+ raise Exception('Invalid Ethertype - ' + eth_str)
diff --git a/snaps/openstack/tests/create_flavor_tests.py b/snaps/openstack/tests/create_flavor_tests.py
index ad135ee..be7ac64 100644
--- a/snaps/openstack/tests/create_flavor_tests.py
+++ b/snaps/openstack/tests/create_flavor_tests.py
@@ -1,3 +1,8 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
@@ -160,60 +165,60 @@ class FlavorSettingsUnitTests(unittest.TestCase):
def test_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(name='foo', ram=1, disk=2, vcpus=3)
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_config_with_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(config={'name': 'foo', 'ram': 1, 'disk': 2, 'vcpus': 3})
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(name='foo', flavor_id='bar', ram=1, disk=2, vcpus=3, ephemeral=4, swap=5,
rxtx_factor=6.0, is_public=False, metadata=metadata)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
def test_config_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(config={'name': 'foo', 'flavor_id': 'bar', 'ram': 1, 'disk': 2, 'vcpus': 3,
'ephemeral': 4, 'swap': 5, 'rxtx_factor': 6.0, 'is_public': False,
'metadata': metadata})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
class CreateFlavorTests(OSComponentTestCase):
@@ -264,7 +269,7 @@ class CreateFlavorTests(OSComponentTestCase):
flavor_creator_2 = OpenStackFlavor(self.os_creds, flavor_settings)
flavor2 = flavor_creator_2.create()
- self.assertEquals(flavor.id, flavor2.id)
+ self.assertEqual(flavor.id, flavor2.id)
def test_create_clean_flavor(self):
"""
diff --git a/snaps/openstack/tests/create_image_tests.py b/snaps/openstack/tests/create_image_tests.py
index 6b8c1c0..87e7bdf 100644
--- a/snaps/openstack/tests/create_image_tests.py
+++ b/snaps/openstack/tests/create_image_tests.py
@@ -20,7 +20,7 @@ import unittest
from snaps import file_utils
from snaps.openstack.create_image import ImageSettings
-import openstack_tests
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.utils import glance_utils
from snaps.openstack import create_image
from snaps.openstack import os_credentials
@@ -78,10 +78,10 @@ class ImageSettingsUnitTests(unittest.TestCase):
def test_name_user_format_url_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
@@ -89,41 +89,41 @@ class ImageSettingsUnitTests(unittest.TestCase):
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_url_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'download_url': 'http://foo.com'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_name_user_format_file_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_file_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_all_url(self):
@@ -133,21 +133,21 @@ class ImageSettingsUnitTests(unittest.TestCase):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties, nic_config_pb_loc='/foo/bar',
kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('bar', settings.kernel_image_settings.image_user)
- self.assertEquals('qcow2', settings.kernel_image_settings.format)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
- self.assertEquals('bar', settings.ramdisk_image_settings.image_user)
- self.assertEquals('qcow2', settings.ramdisk_image_settings.format)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('bar', settings.kernel_image_settings.image_user)
+ self.assertEqual('qcow2', settings.kernel_image_settings.format)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
+ self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
def test_config_all_url(self):
settings = ImageSettings(
@@ -159,42 +159,42 @@ class ImageSettingsUnitTests(unittest.TestCase):
'image_user': 'bar', 'format': 'qcow2'},
'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com',
'image_user': 'bar', 'format': 'qcow2'}})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
def test_all_file(self):
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
extra_properties=properties, nic_config_pb_loc='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals(properties, settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual(properties, settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
def test_config_all_file(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow',
'extra_properties': '{\'hw_video_model\' : \'vga\'}',
'nic_config_pb_loc': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
class CreateImageSuccessTests(OSIntegrationTestCase):
@@ -246,10 +246,10 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(created_image.size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_image_clean_url_properties(self):
"""
@@ -263,11 +263,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_image_clean_file(self):
"""
@@ -288,11 +288,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_delete_image(self):
"""
@@ -305,8 +305,8 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
# Delete Image manually
glance_utils.delete_image(self.glance, created_image)
@@ -327,16 +327,16 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(image1.name, retrieved_image.name)
- self.assertEquals(image1.id, retrieved_image.id)
- self.assertEquals(image1.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(image1.name, retrieved_image.name)
+ self.assertEqual(image1.id, retrieved_image.id)
+ self.assertEqual(image1.properties, retrieved_image.properties)
# Should be retrieving the instance data
os_image_2 = create_image.OpenStackImage(self.os_creds, self.image_settings)
image2 = os_image_2.create()
- self.assertEquals(image1.id, image2.id)
+ self.assertEqual(image1.id, image2.id)
class CreateImageNegativeTests(OSIntegrationTestCase):
@@ -490,17 +490,17 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
main_image = glance_utils.get_image(self.glance, image_settings.name)
self.assertIsNotNone(main_image)
self.assertIsNotNone(image_creator.get_image())
- self.assertEquals(image_creator.get_image().id, main_image.id)
+ self.assertEqual(image_creator.get_image().id, main_image.id)
kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name)
self.assertIsNotNone(kernel_image)
self.assertIsNotNone(image_creator.get_kernel_image())
- self.assertEquals(kernel_image.id, image_creator.get_kernel_image().id)
+ self.assertEqual(kernel_image.id, image_creator.get_kernel_image().id)
ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name)
self.assertIsNotNone(ramdisk_image)
self.assertIsNotNone(image_creator.get_ramdisk_image())
- self.assertEquals(ramdisk_image.id, image_creator.get_ramdisk_image().id)
+ self.assertEqual(ramdisk_image.id, image_creator.get_ramdisk_image().id)
def test_create_three_part_image_from_file_3_creators(self):
"""
@@ -522,7 +522,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_file_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
@@ -535,7 +535,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
# Create the main image
image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
@@ -552,11 +552,11 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_three_part_image_from_url_3_creators(self):
"""
@@ -578,7 +578,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_image_settings = openstack_tests.cirros_image_settings(
@@ -590,7 +590,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_image_settings), ramdisk_image.size)
# Create the main image
os_image_settings = openstack_tests.cirros_image_settings(
@@ -612,12 +612,12 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(os_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def get_image_size(image_settings):
diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py
index 977d33b..e0dca17 100644
--- a/snaps/openstack/tests/create_instance_tests.py
+++ b/snaps/openstack/tests/create_instance_tests.py
@@ -69,33 +69,33 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
def test_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(name='foo', flavor='bar', port_settings=[port_settings])
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_config_with_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(config={'name': 'foo', 'flavor': 'bar', 'ports': [port_settings]})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_all(self):
@@ -106,22 +106,22 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
security_group_names=['sec_grp_1'], floating_ip_settings=[fip_settings],
sudo_user='joe', vm_boot_timeout=999, vm_delete_timeout=333,
ssh_connect_timeout=111, availability_zone='server name')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals('sec_grp_1', settings.security_group_names[0])
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual('sec_grp_1', settings.security_group_names[0])
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
def test_config_all(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
@@ -132,21 +132,21 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
'floating_ips': [fip_settings], 'sudo_user': 'joe',
'vm_boot_timeout': 999, 'vm_delete_timeout': 333,
'ssh_connect_timeout': 111, 'availability_zone': 'server name'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
class FloatingIpSettingsUnitTests(unittest.TestCase):
@@ -188,36 +188,36 @@ class FloatingIpSettingsUnitTests(unittest.TestCase):
def test_name_port_router_only(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router')
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_config_with_name_port_router_only(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_all(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router',
subnet_name='bar-subnet', provisioning=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
def test_config_all(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router',
'subnet_name': 'bar-subnet', 'provisioning': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
@@ -283,7 +283,7 @@ class SimpleHealthCheck(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -295,19 +295,19 @@ class SimpleHealthCheck(OSIntegrationTestCase):
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -408,25 +408,25 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -441,13 +441,13 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase):
self.os_creds, instance_settings, self.image_creator.image_settings)
vm_inst = self.inst_creator.create()
- self.assertEquals(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Delete instance
nova_utils.delete_vm_instance(self.nova, vm_inst)
self.assertTrue(self.inst_creator.vm_deleted(block=True))
- self.assertEquals(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Exception should not be thrown
self.inst_creator.clean()
@@ -523,13 +523,13 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -541,25 +541,25 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.router_creator:
try:
self.router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -585,9 +585,9 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.inst_creators.append(inst_creator)
vm_inst = inst_creator.create()
- self.assertEquals(ip_1, inst_creator.get_port_ip(self.port_1_name))
+ self.assertEqual(ip_1, inst_creator.get_port_ip(self.port_1_name))
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
def test_ssh_client_fip_before_active(self):
"""
@@ -610,7 +610,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
@@ -637,7 +637,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
@@ -698,25 +698,25 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -736,7 +736,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
def test_set_custom_invalid_ip_one_subnet(self):
@@ -772,7 +772,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_custom_invalid_mac(self):
"""
@@ -808,9 +808,9 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_allowed_address_pairs(self):
"""
@@ -832,7 +832,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
port = self.inst_creator.get_port_by_name(port_settings.name)
self.assertIsNotNone(port)
self.assertIsNotNone(port['port'].get('allowed_address_pairs'))
- self.assertEquals(1, len(port['port']['allowed_address_pairs']))
+ self.assertEqual(1, len(port['port']['allowed_address_pairs']))
validation_utils.objects_equivalent(pair, port['port']['allowed_address_pairs'][0])
def test_set_allowed_address_pairs_bad_mac(self):
@@ -933,25 +933,25 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase):
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -987,7 +987,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase):
vm = creator.get_vm_inst()
deployed_zone = vm._info['OS-EXT-AZ:availability_zone']
deployed_host = vm._info['OS-EXT-SRV-ATTR:host']
- self.assertEquals(zone, deployed_zone + ':' + deployed_host)
+ self.assertEqual(zone, deployed_zone + ':' + deployed_host)
index += 1
@@ -1065,7 +1065,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
self.keypair_creator.create()
except Exception as e:
self.tearDown()
- raise Exception(e.message)
+ raise Exception(str(e))
def tearDown(self):
"""
@@ -1075,13 +1075,13 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -1093,25 +1093,25 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
for router_creator in self.router_creators:
try:
router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
for network_creator in self.network_creators:
try:
network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -1146,7 +1146,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
vm_inst = self.inst_creator.create(block=True)
- self.assertEquals(vm_inst, self.inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, self.inst_creator.get_vm_inst())
# Effectively blocks until VM has been properly activated
self.assertTrue(self.inst_creator.vm_active(block=True))
@@ -1154,6 +1154,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
# Effectively blocks until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
+ # TODO - Refactor config_nics() to return a status that can be validated here.
self.inst_creator.config_nics()
# TODO - *** ADD VALIDATION HERE ***
@@ -1226,31 +1227,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
for sec_grp_creator in self.sec_grp_creators:
try:
sec_grp_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning security group with message - ' + e.message)
+ logger.error('Unexpected exception cleaning security group with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -1495,25 +1496,25 @@ class CreateInstanceFromThreePartImage(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
diff --git a/snaps/openstack/tests/create_keypairs_tests.py b/snaps/openstack/tests/create_keypairs_tests.py
index e4409a9..b587a50 100644
--- a/snaps/openstack/tests/create_keypairs_tests.py
+++ b/snaps/openstack/tests/create_keypairs_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,8 +16,6 @@ import os
import uuid
import unittest
-from Crypto.PublicKey import RSA
-
from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
@@ -40,52 +38,52 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = KeypairSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_only(self):
settings = KeypairSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_pub_only(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_pub_only(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_priv_only(self):
settings = KeypairSettings(name='foo', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_with_name_priv_only(self):
settings = KeypairSettings(config={'name': 'foo', 'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_all(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_all(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub',
'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
class CreateKeypairsTests(OSIntegrationTestCase):
@@ -132,7 +130,7 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
def test_create_delete_keypair(self):
"""
@@ -162,10 +160,10 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
def test_create_keypair_save_both(self):
"""
@@ -178,10 +176,10 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
self.assertTrue(os.path.isfile(self.priv_file_path))
@@ -190,14 +188,14 @@ class CreateKeypairsTests(OSIntegrationTestCase):
Tests the creation of an existing public keypair from a file
:return:
"""
- keys = RSA.generate(1024)
+ keys = nova_utils.create_keys()
nova_utils.save_keys_to_files(keys=keys, pub_file_path=self.pub_file_path)
self.keypair_creator = OpenStackKeypair(
self.os_creds, KeypairSettings(name=self.keypair_name, public_filepath=self.pub_file_path))
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
diff --git a/snaps/openstack/tests/create_network_tests.py b/snaps/openstack/tests/create_network_tests.py
index a2b17f8..a7f21ad 100644
--- a/snaps/openstack/tests/create_network_tests.py
+++ b/snaps/openstack/tests/create_network_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -40,37 +40,37 @@ class NetworkSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = NetworkSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_config_with_name_only(self):
settings = NetworkSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_all(self):
sub_settings = SubnetSettings(name='foo-subnet', cidr='10.0.0.0/24')
settings = NetworkSettings(name='foo', admin_state_up=False, shared=True, project_name='bar', external=True,
network_type='flat', physical_network='phy', subnet_settings=[sub_settings])
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
def test_config_all(self):
settings = NetworkSettings(config={'name': 'foo', 'admin_state_up': False, 'shared': True,
@@ -78,15 +78,15 @@ class NetworkSettingsUnitTests(unittest.TestCase):
'physical_network': 'phy',
'subnets':
[{'subnet': {'name': 'foo-subnet', 'cidr': '10.0.0.0/24'}}]})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
class SubnetSettingsUnitTests(unittest.TestCase):
@@ -112,15 +112,15 @@ class SubnetSettingsUnitTests(unittest.TestCase):
def test_name_cidr_only(self):
settings = SubnetSettings(name='foo', cidr='10.0.0.0/24')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
@@ -129,16 +129,16 @@ class SubnetSettingsUnitTests(unittest.TestCase):
def test_config_with_name_cidr_only(self):
settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.gateway_ip)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
@@ -151,22 +151,22 @@ class SubnetSettingsUnitTests(unittest.TestCase):
start='10.0.0.2', end='10.0.0.101', gateway_ip='10.0.0.1', enable_dhcp=False,
dns_nameservers=['8.8.8.8'], host_routes=[host_routes], destination='dest',
nexthop='hop', ipv6_ra_mode='dhcpv6-stateful', ipv6_address_mode='slaac')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
def test_config_all(self):
host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'}
@@ -176,22 +176,22 @@ class SubnetSettingsUnitTests(unittest.TestCase):
'dns_nameservers': ['8.8.8.8'], 'host_routes': [host_routes],
'destination': 'dest', 'nexthop': 'hop', 'ipv6_ra_mode': 'dhcpv6-stateful',
'ipv6_address_mode': 'slaac'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
class PortSettingsUnitTests(unittest.TestCase):
@@ -217,8 +217,8 @@ class PortSettingsUnitTests(unittest.TestCase):
def test_name_netname_only(self):
settings = PortSettings(name='foo', network_name='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
@@ -233,8 +233,8 @@ class PortSettingsUnitTests(unittest.TestCase):
def test_config_with_name_netname_only(self):
settings = PortSettings(config={'name': 'foo', 'network_name': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
@@ -257,20 +257,20 @@ class PortSettingsUnitTests(unittest.TestCase):
security_groups=['foo_grp_id'], allowed_address_pairs=allowed_address_pairs,
opt_value='opt value', opt_name='opt name', device_owner='owner',
device_id='device number')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
def test_config_all(self):
ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}]
@@ -282,20 +282,20 @@ class PortSettingsUnitTests(unittest.TestCase):
'fixed_ips': fixed_ips, 'security_groups': ['foo_grp_id'],
'allowed_address_pairs': allowed_address_pairs, 'opt_value': 'opt value',
'opt_name': 'opt name', 'device_owner': 'owner', 'device_id': 'device number'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
class CreateNetworkSuccessTests(OSIntegrationTestCase):
@@ -415,8 +415,8 @@ class CreateNetworkSuccessTests(OSIntegrationTestCase):
self.net_creator2 = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
self.net_creator2.create()
- self.assertEquals(self.net_creator.get_network()['network']['id'],
- self.net_creator2.get_network()['network']['id'])
+ self.assertEqual(self.net_creator.get_network()['network']['id'],
+ self.net_creator2.get_network()['network']['id'])
class CreateNetworkTypeTests(OSComponentTestCase):
@@ -492,7 +492,7 @@ class CreateNetworkTypeTests(OSComponentTestCase):
# Validate network was created
neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
- self.assertEquals(network_type, network['network']['provider:network_type'])
+ self.assertEqual(network_type, network['network']['provider:network_type'])
# TODO - determine what value we need to place into physical_network
# - Do not know what vaule to place into the 'physical_network' setting.
diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py
index 4e1d254..ede8b4b 100644
--- a/snaps/openstack/tests/create_project_tests.py
+++ b/snaps/openstack/tests/create_project_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -41,30 +41,30 @@ class ProjectSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = ProjectSettings(name='foo')
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_config_with_name_only(self):
settings = ProjectSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_all(self):
settings = ProjectSettings(name='foo', domain='bar', description='foobar', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = ProjectSettings(config={'name': 'foo', 'domain': 'bar', 'description': 'foobar', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
@@ -104,7 +104,7 @@ class CreateProjectSuccessTests(OSComponentTestCase):
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
def test_create_project_2x(self):
"""
@@ -116,10 +116,10 @@ class CreateProjectSuccessTests(OSComponentTestCase):
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
project2 = OpenStackProject(self.os_creds, self.project_settings).create()
- self.assertEquals(retrieved_project, project2)
+ self.assertEqual(retrieved_project, project2)
def test_create_delete_project(self):
"""
@@ -194,9 +194,9 @@ class CreateProjectUserTests(OSComponentTestCase):
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
@@ -229,8 +229,8 @@ class CreateProjectUserTests(OSComponentTestCase):
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 079be0c..6a3c0ef 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -48,28 +48,28 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
def test_name_and_direction(self):
settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_config_name_and_direction(self):
settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_all(self):
settings = SecurityGroupRuleSettings(
sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
remote_ip_prefix='prfx')
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.icmp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
def test_config_all(self):
settings = SecurityGroupRuleSettings(
@@ -82,15 +82,15 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
'port_range_min': 1,
'port_range_max': 2,
'remote_ip_prefix': 'prfx'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.tcp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
class SecurityGroupSettingsUnitTests(unittest.TestCase):
@@ -108,11 +108,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = SecurityGroupSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_config_with_name_only(self):
settings = SecurityGroupSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
@@ -126,11 +126,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(rule_settings[0], settings.rule_settings[0])
- self.assertEquals(rule_settings[1], settings.rule_settings[1])
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
def test_config_all(self):
settings = SecurityGroupSettings(
@@ -139,12 +139,12 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
'project_name': 'foo',
'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(1, len(settings.rule_settings))
- self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.rule_settings[0].direction)
class CreateSecurityGroupTests(OSIntegrationTestCase):
@@ -189,7 +189,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_delete_group(self):
@@ -224,7 +224,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_group_with_several_rules(self):
@@ -253,7 +253,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_add_rule(self):
@@ -273,13 +273,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
direction=Direction.egress, protocol=Protocol.icmp))
rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) + 1, len(rules2))
+ self.assertEqual(len(rules) + 1, len(rules2))
def test_remove_rule_by_id(self):
"""
@@ -308,13 +308,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
def test_remove_rule_by_setting(self):
"""
@@ -344,12 +344,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex
diff --git a/snaps/openstack/tests/create_user_tests.py b/snaps/openstack/tests/create_user_tests.py
index 1f7a163..007f7f0 100644
--- a/snaps/openstack/tests/create_user_tests.py
+++ b/snaps/openstack/tests/create_user_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -52,35 +52,35 @@ class UserSettingsUnitTests(unittest.TestCase):
def test_name_pass_only(self):
settings = UserSettings(name='foo', password='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_config_with_name_pass_only(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_all(self):
settings = UserSettings(name='foo', password='bar', project_name='proj-foo', email='foo@bar.com', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar', 'project_name': 'proj-foo',
'email': 'foo@bar.com', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
@@ -120,7 +120,7 @@ class CreateUserSuccessTests(OSComponentTestCase):
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
def test_create_user_2x(self):
"""
@@ -132,11 +132,11 @@ class CreateUserSuccessTests(OSComponentTestCase):
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
# Create user for the second time to ensure it is the same
user2 = OpenStackUser(self.os_creds, self.user_settings).create()
- self.assertEquals(retrieved_user, user2)
+ self.assertEqual(retrieved_user, user2)
def test_create_delete_user(self):
"""
@@ -152,4 +152,3 @@ class CreateUserSuccessTests(OSComponentTestCase):
# Delete user
self.user_creator.clean()
self.assertIsNone(self.user_creator.get_user())
-
diff --git a/snaps/openstack/tests/validation_utils.py b/snaps/openstack/tests/validation_utils.py
index 7c9bd7f..b6d5856 100644
--- a/snaps/openstack/tests/validation_utils.py
+++ b/snaps/openstack/tests/validation_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,7 +42,7 @@ def dicts_equivalent(dict1, dict2):
:return: T/F
"""
if (type(dict1) is dict or type(dict1) is _DictWithMeta) and (type(dict2) is dict or type(dict2) is _DictWithMeta):
- for key, value1 in dict1.iteritems():
+ for key, value1 in dict1.items():
if not objects_equivalent(value1, dict2.get(key)):
return False
return True
diff --git a/snaps/openstack/utils/glance_utils.py b/snaps/openstack/utils/glance_utils.py
index d859257..ca9c490 100644
--- a/snaps/openstack/utils/glance_utils.py
+++ b/snaps/openstack/utils/glance_utils.py
@@ -151,10 +151,8 @@ def __create_image_v2(glance, image_settings):
kwargs['name'] = image_settings.name
kwargs['disk_format'] = image_settings.format
kwargs['container_format'] = 'bare'
-
if image_settings.extra_properties:
- for key, value in image_settings.extra_properties.iteritems():
- kwargs[key] = value
+ kwargs.update(image_settings.extra_properties)
created_image = glance.images.create(**kwargs)
image_file = file_utils.get_file(image_filename)
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 6c92d2e..a65a42f 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,7 +16,7 @@ import logging
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
@@ -80,7 +80,7 @@ def get_network(neutron, network_name, project_id=None):
net_filter['project_id'] = project_id
networks = neutron.list_networks(**net_filter)
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('name') == network_name:
if project_id and inst.get('project_id') == project_id:
@@ -98,7 +98,7 @@ def get_network_by_id(neutron, network_id):
:return:
"""
networks = neutron.list_networks(**{'id': network_id})
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('id') == network_id:
return {'network': inst}
@@ -144,7 +144,7 @@ def get_subnet_by_name(neutron, subnet_name):
:return:
"""
subnets = neutron.list_subnets(**{'name': subnet_name})
- for subnet, subnetInst in subnets.iteritems():
+ for subnet, subnetInst in subnets.items():
for inst in subnetInst:
if inst.get('name') == subnet_name:
return {'subnet': inst}
@@ -189,7 +189,7 @@ def get_router_by_name(neutron, router_name):
:return:
"""
routers = neutron.list_routers(**{'name': router_name})
- for router, routerInst in routers.iteritems():
+ for router, routerInst in routers.items():
for inst in routerInst:
if inst.get('name') == router_name:
return {'router': inst}
@@ -228,10 +228,10 @@ def remove_interface_router(neutron, router, subnet=None, port=None):
logger.info('Removing router interface from router named ' + router['router']['name'])
neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
except NotFound as e:
- logger.warn('Could not remove router interface. NotFound - ' + e.message)
+ logger.warning('Could not remove router interface. NotFound - ' + str(e))
pass
else:
- logger.warn('Could not remove router interface, No router object')
+ logger.warning('Could not remove router interface, No router object')
def __create_port_json_body(subnet=None, port=None):
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index e7428ed..419f451 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,9 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.backends import default_backend
+
import os
import logging
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
from novaclient.client import Client
from novaclient.exceptions import NotFound
@@ -58,10 +63,29 @@ def get_latest_server_object(nova, server):
return nova.servers.get(server)
+def create_keys(key_size=2048):
+ """
+ Generates public and private keys
+ :param key_size: the number of bytes for the key size
+ :return: the cryptography keys
+ """
+ return rsa.generate_private_key(backend=default_backend(), public_exponent=65537,
+ key_size=key_size)
+
+
+def public_key_openssh(keys):
+ """
+ Returns the public key for OpenSSH
+ :param keys: the keys generated by create_keys() from cryptography
+ :return: the OpenSSH public key
+ """
+ return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+
+
def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
"""
Saves the generated RSA generated keys to the filesystem
- :param keys: the keys to save
+ :param keys: the keys to save generated by cryptography
:param pub_file_path: the path to the public keys
:param priv_file_path: the path to the private keys
:return: None
@@ -72,7 +96,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
if not os.path.isdir(pub_dir):
os.mkdir(pub_dir)
public_handle = open(pub_file_path, 'wb')
- public_handle.write(keys.publickey().exportKey('OpenSSH'))
+ public_bytes = keys.public_key().public_bytes(
+ serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+ public_handle.write(public_bytes)
public_handle.close()
os.chmod(pub_file_path, 0o400)
logger.info("Saved public key to - " + pub_file_path)
@@ -81,7 +107,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
if not os.path.isdir(priv_dir):
os.mkdir(priv_dir)
private_handle = open(priv_file_path, 'wb')
- private_handle.write(keys.exportKey())
+ private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()))
private_handle.close()
os.chmod(priv_file_path, 0o400)
logger.info("Saved private key to - " + priv_file_path)
@@ -95,7 +123,7 @@ def upload_keypair_file(nova, name, file_path):
:param file_path: the path to the public key file
:return: the keypair object
"""
- with open(os.path.expanduser(file_path)) as fpubkey:
+ with open(os.path.expanduser(file_path), 'rb') as fpubkey:
logger.info('Saving keypair to - ' + file_path)
return upload_keypair(nova, name, fpubkey.read())
@@ -109,7 +137,7 @@ def upload_keypair(nova, name, key):
:return: the keypair object
"""
logger.info('Creating keypair with name - ' + name)
- return nova.keypairs.create(name=name, public_key=key)
+ return nova.keypairs.create(name=name, public_key=key.decode('utf-8'))
def keypair_exists(nova, keypair_obj):
@@ -212,7 +240,7 @@ def get_nova_availability_zones(nova):
zones = nova.availability_zones.list()
for zone in zones:
if zone.zoneName == 'nova':
- for key, host in zone.hosts.iteritems():
+ for key, host in zone.hosts.items():
out.append(zone.zoneName + ':' + key)
return out
diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py
index 76a43ef..c072fd3 100644
--- a/snaps/openstack/utils/tests/keystone_utils_tests.py
+++ b/snaps/openstack/utils/tests/keystone_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -93,8 +93,8 @@ class KeystoneUtilsTests(OSComponentTestCase):
"""
project_settings = ProjectSettings(name=self.project_name)
self.project = keystone_utils.create_project(self.keystone, project_settings)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name)
self.assertIsNotNone(project)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 5f95fc9..9a043e3 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -109,14 +109,16 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
Tests the neutron_utils.create_neutron_net() function with an empty network name
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings(name=''))
def test_create_network_null_name(self):
"""
Tests the neutron_utils.create_neutron_net() function when the network name is None
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings())
class NeutronUtilsSubnetTests(OSComponentTestCase):
@@ -644,7 +646,7 @@ def validate_port(neutron, port_obj, this_port_name):
:return: True/False
"""
ports = neutron.list_ports()
- for port, port_insts in ports.iteritems():
+ for port, port_insts in ports.items():
for inst in port_insts:
if inst['id'] == port_obj['port']['id']:
return inst['name'] == this_port_name
diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py
index f6c9156..0a2b24b 100644
--- a/snaps/openstack/utils/tests/nova_utils_tests.py
+++ b/snaps/openstack/utils/tests/nova_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,8 +16,6 @@ import logging
import os
import uuid
-from Crypto.PublicKey import RSA
-
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.create_flavor import FlavorSettings
@@ -71,8 +69,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
self.pub_key_file_path = self.priv_key_file_path + '.pub'
self.nova = nova_utils.nova_client(self.os_creds)
- self.keys = RSA.generate(1024)
- self.public_key = self.keys.publickey().exportKey('OpenSSH')
+ self.keys = nova_utils.create_keys()
+ self.public_key = nova_utils.public_key_openssh(self.keys)
self.keypair_name = guid
self.keypair = None
self.floating_ip = None
@@ -106,9 +104,9 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
- self.assertEquals(self.keypair, keypair)
+ self.assertEqual(self.keypair, keypair)
def test_create_delete_keypair(self):
"""
@@ -116,7 +114,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
nova_utils.delete_keypair(self.nova, self.keypair)
result2 = nova_utils.keypair_exists(self.nova, self.keypair)
self.assertIsNone(result2)
@@ -129,7 +127,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path)
self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path)
pub_key = open(os.path.expanduser(self.pub_key_file_path)).read()
- self.assertEquals(self.keypair.public_key, pub_key)
+ self.assertEqual(self.keypair.public_key, pub_key)
def test_floating_ips(self):
"""
@@ -141,7 +139,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name)
returned = nova_utils.get_floating_ip(self.nova, self.floating_ip)
- self.assertEquals(self.floating_ip, returned)
+ self.assertEqual(self.floating_ip, returned)
class NovaUtilsFlavorTests(OSComponentTestCase):
@@ -192,17 +190,17 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
Validates the flavor_settings against the OpenStack flavor object
"""
self.assertIsNotNone(self.flavor)
- self.assertEquals(self.flavor_settings.name, self.flavor.name)
- self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id)
- self.assertEquals(self.flavor_settings.ram, self.flavor.ram)
- self.assertEquals(self.flavor_settings.disk, self.flavor.disk)
- self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus)
- self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral)
+ self.assertEqual(self.flavor_settings.name, self.flavor.name)
+ self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
+ self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
+ self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
+ self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
+ self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
if self.flavor_settings.swap == 0:
- self.assertEquals('', self.flavor.swap)
+ self.assertEqual('', self.flavor.swap)
else:
- self.assertEquals(self.flavor_settings.swap, self.flavor.swap)
+ self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
- self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
- self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public)
+ self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
+ self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)