summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/how-to-use/InstallSnaps.rst4
-rw-r--r--examples/launch.py49
-rw-r--r--snaps/domain/test/image_tests.py16
-rw-r--r--snaps/file_utils.py37
-rw-r--r--snaps/openstack/create_image.py2
-rw-r--r--snaps/openstack/create_instance.py30
-rw-r--r--snaps/openstack/create_keypairs.py42
-rw-r--r--snaps/openstack/create_network.py4
-rw-r--r--snaps/openstack/create_project.py23
-rw-r--r--snaps/openstack/create_router.py74
-rw-r--r--snaps/openstack/create_security_group.py55
-rw-r--r--snaps/openstack/tests/create_flavor_tests.py87
-rw-r--r--snaps/openstack/tests/create_image_tests.py192
-rw-r--r--snaps/openstack/tests/create_instance_tests.py233
-rw-r--r--snaps/openstack/tests/create_keypairs_tests.py52
-rw-r--r--snaps/openstack/tests/create_network_tests.py184
-rw-r--r--snaps/openstack/tests/create_project_tests.py36
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py90
-rw-r--r--snaps/openstack/tests/create_user_tests.py33
-rw-r--r--snaps/openstack/tests/validation_utils.py4
-rw-r--r--snaps/openstack/utils/glance_utils.py4
-rw-r--r--snaps/openstack/utils/neutron_utils.py16
-rw-r--r--snaps/openstack/utils/nova_utils.py44
-rw-r--r--snaps/openstack/utils/tests/keystone_utils_tests.py6
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py10
-rw-r--r--snaps/openstack/utils/tests/nova_utils_tests.py38
-rw-r--r--snaps/provisioning/ansible_utils.py4
-rw-r--r--snaps/provisioning/tests/ansible_utils_tests.py10
-rw-r--r--snaps/tests/file_utils_tests.py10
29 files changed, 704 insertions, 685 deletions
diff --git a/docs/how-to-use/InstallSnaps.rst b/docs/how-to-use/InstallSnaps.rst
index cce06a0..df46743 100644
--- a/docs/how-to-use/InstallSnaps.rst
+++ b/docs/how-to-use/InstallSnaps.rst
@@ -12,6 +12,10 @@ Git is used to download the snaps source from the OPNFV Gerrit repository.
Python, GCC and additional libraries are required to compile and install the packages used by SNAPS. These
dependencies need to be installed whether or not a virtual Python environment is used.
+Note: SNAPS-OO works best under Python 2.7; however, all of the code except in snaps.openstack.provisioning.ansible
+should work properly within a Python 3.4.4 runtime. Ansible support will not work in any Python 3.x when the Ansible
+version is 2.3.0 or prior. No indications when this support will be added as of 5 May 2017.
+
CentOS 7
--------
diff --git a/examples/launch.py b/examples/launch.py
index 6ddfe3b..c13d05f 100644
--- a/examples/launch.py
+++ b/examples/launch.py
@@ -1,6 +1,6 @@
#!/usr/bin/python
#
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -63,7 +63,6 @@ def __parse_ports_config(config):
"""
Parses the "ports" configuration
:param config: The dictionary to parse
- :param os_creds: The OpenStack credentials object
:return: a list of PortConfig objects
"""
out = list()
@@ -92,7 +91,7 @@ def __create_flavors(os_conn_config, flavors_config, cleanup=False):
flavor_creator.create(cleanup=cleanup)
flavors[flavor_config['name']] = flavor_creator
except Exception as e:
- for key, flavor_creator in flavors.iteritems():
+ for key, flavor_creator in flavors.items():
flavor_creator.clean()
raise e
logger.info('Created configured flavors')
@@ -118,7 +117,7 @@ def __create_images(os_conn_config, images_config, cleanup=False):
images[image_config['name']] = deploy_utils.create_image(__get_os_credentials(os_conn_config),
ImageSettings(image_config), cleanup)
except Exception as e:
- for key, image_creator in images.iteritems():
+ for key, image_creator in images.items():
image_creator.clean()
raise e
logger.info('Created configured images')
@@ -144,7 +143,7 @@ def __create_networks(os_conn_config, network_confs, cleanup=False):
network_dict[net_name] = deploy_utils.create_network(
os_creds, NetworkSettings(config=network_conf['network']), cleanup)
except Exception as e:
- for key, net_creator in network_dict.iteritems():
+ for key, net_creator in network_dict.items():
net_creator.clean()
raise e
@@ -171,7 +170,7 @@ def __create_routers(os_conn_config, router_confs, cleanup=False):
router_dict[router_name] = deploy_utils.create_router(
os_creds, RouterSettings(config=router_conf['router']), cleanup)
except Exception as e:
- for key, router_creator in router_dict.iteritems():
+ for key, router_creator in router_dict.items():
router_creator.clean()
raise e
@@ -197,7 +196,7 @@ def __create_keypairs(os_conn_config, keypair_confs, cleanup=False):
keypairs_dict[keypair_config['name']] = deploy_utils.create_keypair(
__get_os_credentials(os_conn_config), kp_settings, cleanup)
except Exception as e:
- for key, keypair_creator in keypairs_dict.iteritems():
+ for key, keypair_creator in keypairs_dict.items():
keypair_creator.clean()
raise e
@@ -240,8 +239,8 @@ def __create_instances(os_conn_config, instances_config, image_dict, keypairs_di
else:
raise Exception('Instance configuration is None. Cannot instantiate')
except Exception as e:
- logger.error('Unexpected error creating instances. Attempting to cleanup environment - ' + e.message)
- for key, inst_creator in vm_dict.iteritems():
+ logger.error('Unexpected error creating instances. Attempting to cleanup environment - ' + str(e))
+ for key, inst_creator in vm_dict.items():
inst_creator.clean()
raise e
@@ -265,9 +264,9 @@ def __apply_ansible_playbooks(ansible_configs, os_conn_config, vm_dict, image_di
logger.info("Applying Ansible Playbooks")
if ansible_configs:
# Ensure all hosts are accepting SSH session requests
- for vm_inst in vm_dict.values():
+ for vm_inst in list(vm_dict.values()):
if not vm_inst.vm_ssh_active(block=True):
- logger.warn("Timeout waiting for instance to respond to SSH requests")
+ logger.warning("Timeout waiting for instance to respond to SSH requests")
return False
# Set CWD so the deployment file's playbook location can leverage relative paths
@@ -304,7 +303,7 @@ def __apply_ansible_playbook(ansible_config, os_creds, vm_dict, image_dict, flav
proxy_setting=proxy_settings)
if retval != 0:
# Not a fatal type of event
- logger.warn('Unable to apply playbook found at location - ' + ansible_config('playbook_location'))
+ logger.warning('Unable to apply playbook found at location - ' + ansible_config('playbook_location'))
def __get_connection_info(ansible_config, vm_dict):
@@ -357,13 +356,13 @@ def __get_variables(var_config, os_creds, vm_dict, image_dict, flavor_dict):
"""
if var_config and vm_dict and len(vm_dict) > 0:
variables = dict()
- for key, value in var_config.iteritems():
+ for key, value in var_config.items():
value = __get_variable_value(value, os_creds, vm_dict, image_dict, flavor_dict)
if key and value:
variables[key] = value
logger.info("Set Jinga2 variable with key [" + key + "] the value [" + value + ']')
else:
- logger.warn('Key [' + str(key) + '] or Value [' + str(value) + '] must not be None')
+ logger.warning('Key [' + str(key) + '] or Value [' + str(value) + '] must not be None')
return variables
return None
@@ -530,6 +529,8 @@ def main(arguments):
if config:
os_config = config.get('openstack')
+ os_conn_config = None
+ flavor_dict = {}
image_dict = {}
network_dict = {}
router_dict = {}
@@ -565,7 +566,7 @@ def main(arguments):
arguments.clean is not ARG_NOT_SET)
logger.info('Completed creating/retrieving all configured instances')
except Exception as e:
- logger.error('Unexpected error deploying environment. Rolling back due to - ' + e.message)
+ logger.error('Unexpected error deploying environment. Rolling back due to - ' + str(e))
__cleanup(vm_dict, keypairs_dict, router_dict, network_dict, image_dict, flavor_dict, True)
raise e
@@ -576,7 +577,7 @@ def main(arguments):
arguments.clean_image is not ARG_NOT_SET)
elif arguments.deploy is not ARG_NOT_SET:
logger.info('Configuring NICs where required')
- for vm in vm_dict.itervalues():
+ for vm in vm_dict.values():
vm.config_nics()
logger.info('Completed NIC configuration')
@@ -594,24 +595,24 @@ def main(arguments):
def __cleanup(vm_dict, keypairs_dict, router_dict, network_dict, image_dict, flavor_dict, clean_image=False):
- for key, vm_inst in vm_dict.iteritems():
+ for key, vm_inst in vm_dict.items():
vm_inst.clean()
- for key, kp_inst in keypairs_dict.iteritems():
+ for key, kp_inst in keypairs_dict.items():
kp_inst.clean()
- for key, router_inst in router_dict.iteritems():
+ for key, router_inst in router_dict.items():
try:
router_inst.clean()
except Exception:
logger.warning("Router not found continuing to next component")
- for key, net_inst in network_dict.iteritems():
+ for key, net_inst in network_dict.items():
try:
net_inst.clean()
except Exception:
logger.warning("Network not found continuing to next component")
if clean_image:
- for key, image_inst in image_dict.iteritems():
+ for key, image_inst in image_dict.items():
image_inst.clean()
- for key, flavor_inst in flavor_dict.iteritems():
+ for key, flavor_inst in flavor_dict.items():
flavor_inst.clean()
@@ -632,9 +633,9 @@ if __name__ == '__main__':
args = parser.parse_args()
if args.deploy is ARG_NOT_SET and args.clean is ARG_NOT_SET:
- print 'Must enter either -d for deploy or -c for cleaning up and environment'
+ print('Must enter either -d for deploy or -c for cleaning up and environment')
exit(1)
if args.deploy is not ARG_NOT_SET and args.clean is not ARG_NOT_SET:
- print 'Cannot enter both options -d/--deploy and -c/--clean'
+ print('Cannot enter both options -d/--deploy and -c/--clean')
exit(1)
main(args)
diff --git a/snaps/domain/test/image_tests.py b/snaps/domain/test/image_tests.py
index de517eb..53d9526 100644
--- a/snaps/domain/test/image_tests.py
+++ b/snaps/domain/test/image_tests.py
@@ -25,15 +25,15 @@ class ImageDomainObjectTests(unittest.TestCase):
def test_construction_positional(self):
props = {'foo': 'bar'}
image = Image('name', 'id', 100, props)
- self.assertEquals('name', image.name)
- self.assertEquals('id', image.id)
- self.assertEquals(100, image.size)
- self.assertEquals(props, image.properties)
+ self.assertEqual('name', image.name)
+ self.assertEqual('id', image.id)
+ self.assertEqual(100, image.size)
+ self.assertEqual(props, image.properties)
def test_construction_named(self):
props = {'foo': 'bar'}
image = Image(image_id='id', properties=props, name='name', size=101)
- self.assertEquals('name', image.name)
- self.assertEquals('id', image.id)
- self.assertEquals(101, image.size)
- self.assertEquals(props, image.properties)
+ self.assertEqual('name', image.name)
+ self.assertEqual('id', image.id)
+ self.assertEqual(101, image.size)
+ self.assertEqual(props, image.properties)
diff --git a/snaps/file_utils.py b/snaps/file_utils.py
index 34eb30c..a321cc2 100644
--- a/snaps/file_utils.py
+++ b/snaps/file_utils.py
@@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
-import urllib2
import logging
+try:
+ import urllib.request as urllib
+except ImportError:
+ import urllib2 as urllib
import yaml
@@ -46,7 +49,7 @@ def get_file(file_path):
:raise Exception when file cannot be found
"""
if file_exists(file_path):
- return open(file_path, 'r')
+ return open(file_path, 'rb')
else:
raise Exception('File with path cannot be found - ' + file_path)
@@ -59,18 +62,11 @@ def download(url, dest_path, name=None):
if not name:
name = url.rsplit('/')[-1]
dest = dest_path + '/' + name
- try:
- logger.debug('Downloading file from - ' + url)
- # Override proxy settings to use localhost to download file
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- urllib2.install_opener(opener)
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
- raise Exception
-
+ logger.debug('Downloading file from - ' + url)
+ # Override proxy settings to use localhost to download file
with open(dest, 'wb') as f:
logger.debug('Saving file to - ' + dest)
+ response = __get_url_response(url)
f.write(response.read())
return f
@@ -81,13 +77,22 @@ def get_content_length(url):
:param url: the URL to inspect
:return: the number of bytes
"""
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- urllib2.install_opener(opener)
- response = urllib2.urlopen(url)
+ response = __get_url_response(url)
return response.headers['Content-Length']
+def __get_url_response(url):
+ """
+ Returns a response object for a given URL
+ :param url: the URL
+ :return: the response
+ """
+ proxy_handler = urllib.ProxyHandler({})
+ opener = urllib.build_opener(proxy_handler)
+ urllib.install_opener(opener)
+ return urllib.urlopen(url)
+
+
def read_yaml(config_file_path):
"""
Reads the yaml file and returns a dictionary object representation
diff --git a/snaps/openstack/create_image.py b/snaps/openstack/create_image.py
index 4f2bd7e..1e2e4dc 100644
--- a/snaps/openstack/create_image.py
+++ b/snaps/openstack/create_image.py
@@ -176,7 +176,7 @@ class OpenStackImage:
# TODO - Place this API call into glance_utils.
status = glance_utils.get_image_status(self.__glance, self.__image)
if not status:
- logger.warn('Cannot image status for image with ID - ' + self.__image.id)
+ logger.warning('Cannot image status for image with ID - ' + self.__image.id)
return False
if status == 'ERROR':
diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py
index dda68fd..9fa3232 100644
--- a/snaps/openstack/create_instance.py
+++ b/snaps/openstack/create_instance.py
@@ -210,7 +210,7 @@ class OpenStackVmInstance:
logger.info('Deleting Floating IP - ' + floating_ip.ip)
nova_utils.delete_floating_ip(self.__nova, floating_ip)
except Exception as e:
- logger.error('Error deleting Floating IP - ' + e.message)
+ logger.error('Error deleting Floating IP - ' + str(e))
self.__floating_ips = list()
self.__floating_ip_dict = dict()
@@ -220,7 +220,7 @@ class OpenStackVmInstance:
try:
neutron_utils.delete_port(self.__neutron, port)
except PortNotFoundClient as e:
- logger.warn('Unexpected error deleting port - ' + e.message)
+ logger.warning('Unexpected error deleting port - ' + str(e))
pass
self.__ports = list()
@@ -243,7 +243,7 @@ class OpenStackVmInstance:
logger.error('VM not deleted within the timeout period of ' +
str(self.instance_settings.vm_delete_timeout) + ' seconds')
except Exception as e:
- logger.error('Unexpected error while checking VM instance status - ' + e.message)
+ logger.error('Unexpected error while checking VM instance status - ' + str(e))
def __setup_ports(self, port_settings, cleanup):
"""
@@ -303,7 +303,7 @@ class OpenStackVmInstance:
' on instance - ' + self.instance_settings.name)
return
except Exception as e:
- logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + e.message)
+ logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + str(e))
time.sleep(poll_interval)
count -= 1
pass
@@ -341,7 +341,7 @@ class OpenStackVmInstance:
if subnet_name:
subnet = neutron_utils.get_subnet_by_name(self.__neutron, subnet_name)
if not subnet:
- logger.warn('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
+ logger.warning('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
return None
for fixed_ip in port_dict['fixed_ips']:
if fixed_ip['subnet_id'] == subnet['subnet']['id']:
@@ -374,7 +374,7 @@ class OpenStackVmInstance:
for key, port in self.__ports:
if key == port_name:
return port
- logger.warn('Cannot find port with name - ' + port_name)
+ logger.warning('Cannot find port with name - ' + port_name)
return None
def config_nics(self):
@@ -427,7 +427,7 @@ class OpenStackVmInstance:
[floating_ip], self.get_image_user(), self.keypair_settings.private_filepath,
variables, self.__os_creds.proxy_settings)
else:
- logger.warn('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
+ logger.warning('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
'No playbook or keypairs found.')
def get_image_user(self):
@@ -452,7 +452,7 @@ class OpenStackVmInstance:
return self.__vm_status_check(STATUS_DELETED, block, self.instance_settings.vm_delete_timeout,
poll_interval)
except NotFound as e:
- logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + e.message)
+ logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + str(e))
return True
def vm_active(self, block=False, poll_interval=POLL_INTERVAL):
@@ -500,7 +500,7 @@ class OpenStackVmInstance:
"""
instance = self.__nova.servers.get(self.__vm.id)
if not instance:
- logger.warn('Cannot find instance with id - ' + self.__vm.id)
+ logger.warning('Cannot find instance with id - ' + self.__vm.id)
return False
if instance.status == 'ERROR':
@@ -575,7 +575,7 @@ class OpenStackVmInstance:
self.keypair_settings.private_filepath,
proxy_settings=self.__os_creds.proxy_settings)
else:
- logger.warn('Cannot return an SSH client. No Floating IP configured')
+ logger.warning('Cannot return an SSH client. No Floating IP configured')
def add_security_group(self, security_group):
"""
@@ -586,14 +586,14 @@ class OpenStackVmInstance:
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot add')
+ logger.warning('Security group object is None, cannot add')
return False
try:
nova_utils.add_security_group(self.__nova, self.get_vm_inst(), security_group['security_group']['name'])
return True
except NotFound as e:
- logger.warn('Security group not added - ' + e.message)
+ logger.warning('Security group not added - ' + str(e))
return False
def remove_security_group(self, security_group):
@@ -605,14 +605,14 @@ class OpenStackVmInstance:
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot remove')
+ logger.warning('Security group object is None, cannot remove')
return False
try:
nova_utils.remove_security_group(self.__nova, self.get_vm_inst(), security_group)
return True
except NotFound as e:
- logger.warn('Security group not removed - ' + e.message)
+ logger.warning('Security group not removed - ' + str(e))
return False
@@ -659,7 +659,7 @@ class VmInstanceSettings:
self.security_group_names = set(config['security_group_names'])
elif isinstance(config['security_group_names'], set):
self.security_group_names = config['security_group_names']
- elif isinstance(config['security_group_names'], basestring):
+ elif isinstance(config['security_group_names'], str):
self.security_group_names = [config['security_group_names']]
else:
raise Exception('Invalid data type for security_group_names attribute')
diff --git a/snaps/openstack/create_keypairs.py b/snaps/openstack/create_keypairs.py
index ea7c811..6af40c6 100644
--- a/snaps/openstack/create_keypairs.py
+++ b/snaps/openstack/create_keypairs.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,7 +15,6 @@
import logging
import os
-from Crypto.PublicKey import RSA
from novaclient.exceptions import NotFound
from snaps.openstack.utils import nova_utils
@@ -50,28 +49,23 @@ class OpenStackKeypair:
"""
logger.info('Creating keypair %s...' % self.keypair_settings.name)
- try:
- self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
-
- if not self.__keypair and not cleanup:
- if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
- logger.info("Uploading existing keypair")
- self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
- self.keypair_settings.public_filepath)
- else:
- logger.info("Creating new keypair")
- # TODO - Make this value configurable
- keys = RSA.generate(1024)
- self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
- keys.publickey().exportKey('OpenSSH'))
- nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
- self.keypair_settings.private_filepath)
-
- return self.__keypair
- except Exception as e:
- logger.error('Unexpected error creating keypair named - ' + self.keypair_settings.name)
- self.clean()
- raise Exception(e.message)
+ self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
+
+ if not self.__keypair and not cleanup:
+ if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
+ logger.info("Uploading existing keypair")
+ self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
+ self.keypair_settings.public_filepath)
+ else:
+ logger.info("Creating new keypair")
+ # TODO - Make this value configurable
+ keys = nova_utils.create_keys(1024)
+ self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
+ nova_utils.public_key_openssh(keys))
+ nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
+ self.keypair_settings.private_filepath)
+
+ return self.__keypair
def clean(self):
"""
diff --git a/snaps/openstack/create_network.py b/snaps/openstack/create_network.py
index a214ba1..210c2bf 100644
--- a/snaps/openstack/create_network.py
+++ b/snaps/openstack/create_network.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -89,7 +89,7 @@ class OpenStackNetwork:
logger.info('Deleting subnet with name ' + subnet['subnet']['name'])
neutron_utils.delete_subnet(self.__neutron, subnet)
except NotFound as e:
- logger.warn('Error deleting subnet with message - ' + e.message)
+ logger.warning('Error deleting subnet with message - ' + str(e))
pass
self.__subnets = list()
diff --git a/snaps/openstack/create_project.py b/snaps/openstack/create_project.py
index 60f9ed0..2658d13 100644
--- a/snaps/openstack/create_project.py
+++ b/snaps/openstack/create_project.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -47,19 +47,14 @@ class OpenStackProject:
:param cleanup: Denotes whether or not this is being called for cleanup or not
:return: The OpenStack Image object
"""
- try:
- self.__project = keystone_utils.get_project(keystone=self.__keystone,
- project_name=self.project_settings.name)
- if self.__project:
- logger.info('Found project with name - ' + self.project_settings.name)
- elif not cleanup:
- self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
- else:
- logger.info('Did not create image due to cleanup mode')
- except Exception as e:
- logger.error('Unexpected error. Rolling back')
- self.clean()
- raise Exception(e.message)
+ self.__project = keystone_utils.get_project(keystone=self.__keystone,
+ project_name=self.project_settings.name)
+ if self.__project:
+ logger.info('Found project with name - ' + self.project_settings.name)
+ elif not cleanup:
+ self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
+ else:
+ logger.info('Did not create image due to cleanup mode')
return self.__project
diff --git a/snaps/openstack/create_router.py b/snaps/openstack/create_router.py
index 70c6b76..5c461cc 100644
--- a/snaps/openstack/create_router.py
+++ b/snaps/openstack/create_router.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -59,46 +59,42 @@ class OpenStackRouter:
:return: the router object
"""
logger.debug('Creating Router with name - ' + self.router_settings.name)
- try:
- existing = False
- router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
- if router_inst:
- self.__router = router_inst
- existing = True
+ existing = False
+ router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
+ if router_inst:
+ self.__router = router_inst
+ existing = True
+ else:
+ if not cleanup:
+ self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
+
+ for internal_subnet_name in self.router_settings.internal_subnets:
+ internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
+ if internal_subnet:
+ self.__internal_subnets.append(internal_subnet)
+ if internal_subnet and not cleanup and not existing:
+ logger.debug('Adding router to subnet...')
+ self.__internal_router_interface = neutron_utils.add_interface_router(
+ self.__neutron, self.__router, subnet=internal_subnet)
else:
- if not cleanup:
- self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
-
- for internal_subnet_name in self.router_settings.internal_subnets:
- internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
- if internal_subnet:
- self.__internal_subnets.append(internal_subnet)
- if internal_subnet and not cleanup and not existing:
- logger.debug('Adding router to subnet...')
- self.__internal_router_interface = neutron_utils.add_interface_router(
- self.__neutron, self.__router, subnet=internal_subnet)
- else:
- raise Exception('Subnet not found with name ' + internal_subnet_name)
+ raise Exception('Subnet not found with name ' + internal_subnet_name)
+
+ for port_setting in self.router_settings.port_settings:
+ port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
+ logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if port:
+ self.__ports.append(port)
- for port_setting in self.router_settings.port_settings:
- port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
- logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if not port and not cleanup and not existing:
+ port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
if port:
+ logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
self.__ports.append(port)
+ neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
+ else:
+ raise Exception('Error creating port with name - ' + port_setting.name)
- if not port and not cleanup and not existing:
- port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
- if port:
- logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
- self.__ports.append(port)
- neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
- else:
- raise Exception('Error creating port with name - ' + port_setting.name)
-
- return self.__router
- except Exception as e:
- self.clean()
- raise Exception(e.message)
+ return self.__router
def clean(self):
"""
@@ -163,8 +159,6 @@ class RouterSettings:
policies.
:param external_gateway: Name of the external network to which to route
:param admin_state_up: The administrative status of the router. True = up / False = down (default True)
- :param enable_snat: Boolean value. Enable Source NAT (SNAT) attribute. Default is True. To persist this
- attribute value, set the enable_snat_by_default option in the neutron.conf file.
:param external_fixed_ips: Dictionary containing the IP address parameters.
:param internal_subnets: List of subnet names to which to connect this router for Floating IP purposes
:param port_settings: List of PortSettings objects
@@ -238,7 +232,7 @@ class RouterSettings:
else:
raise Exception('Could not find the external network named - ' + self.external_gateway)
- #TODO: Enable SNAT option for Router
- #TODO: Add external_fixed_ips Tests
+ # TODO: Enable SNAT option for Router
+ # TODO: Add external_fixed_ips Tests
return {'router': out}
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py
index fc1ee98..57f7284 100644
--- a/snaps/openstack/create_security_group.py
+++ b/snaps/openstack/create_security_group.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -58,7 +58,7 @@ class OpenStackSecurityGroup:
if not self.__security_group and not cleanup:
# Create the security group
self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone,
- self.sec_grp_settings)
+ self.sec_grp_settings)
# Get the rules added for free
auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group)
@@ -118,11 +118,11 @@ class OpenStackSecurityGroup:
"""
Removes and deletes the rules then the security group.
"""
- for setting, rule in self.__rules.iteritems():
+ for setting, rule in self.__rules.items():
try:
neutron_utils.delete_security_group_rule(self.__neutron, rule)
except NotFound as e:
- logger.warn('Rule not found, cannot delete - ' + e.message)
+ logger.warning('Rule not found, cannot delete - ' + str(e))
pass
self.__rules = dict()
@@ -130,7 +130,7 @@ class OpenStackSecurityGroup:
try:
neutron_utils.delete_security_group(self.__neutron, self.__security_group)
except NotFound as e:
- logger.warn('Security Group not found, cannot delete - ' + e.message)
+ logger.warning('Security Group not found, cannot delete - ' + str(e))
self.__security_group = None
@@ -177,7 +177,7 @@ class OpenStackSecurityGroup:
if rule_setting:
self.__rules.pop(rule_setting)
else:
- logger.warn('Rule setting is None, cannot remove rule')
+ logger.warning('Rule setting is None, cannot remove rule')
def __get_setting_from_rule(self, rule):
"""
@@ -460,17 +460,16 @@ def map_direction(direction):
"""
if not direction:
return None
- if type(direction) is Direction:
+ if isinstance(direction, Direction):
return direction
- elif isinstance(direction, basestring):
- if direction == 'egress':
+ else:
+ dir_str = str(direction)
+ if dir_str == 'egress':
return Direction.egress
- elif direction == 'ingress':
+ elif dir_str == 'ingress':
return Direction.ingress
else:
- raise Exception('Invalid Direction - ' + direction)
- else:
- raise Exception('Invalid Direction object - ' + str(direction))
+ raise Exception('Invalid Direction - ' + dir_str)
def map_protocol(protocol):
@@ -482,21 +481,20 @@ def map_protocol(protocol):
"""
if not protocol:
return None
- elif type(protocol) is Protocol:
+ elif isinstance(protocol, Protocol):
return protocol
- elif isinstance(protocol, basestring):
- if protocol == 'icmp':
+ else:
+ proto_str = str(protocol)
+ if proto_str == 'icmp':
return Protocol.icmp
- elif protocol == 'tcp':
+ elif proto_str == 'tcp':
return Protocol.tcp
- elif protocol == 'udp':
+ elif proto_str == 'udp':
return Protocol.udp
- elif protocol == 'null':
+ elif proto_str == 'null':
return Protocol.null
else:
- raise Exception('Invalid Protocol - ' + protocol)
- else:
- raise Exception('Invalid Protocol object - ' + str(protocol))
+ raise Exception('Invalid Protocol - ' + proto_str)
def map_ethertype(ethertype):
@@ -508,14 +506,13 @@ def map_ethertype(ethertype):
"""
if not ethertype:
return None
- elif type(ethertype) is Ethertype:
+ elif isinstance(ethertype, Ethertype):
return ethertype
- elif isinstance(ethertype, basestring):
- if ethertype == 'IPv6':
+ else:
+ eth_str = str(ethertype)
+ if eth_str == 'IPv6':
return Ethertype.IPv6
- elif ethertype == 'IPv4':
+ elif eth_str == 'IPv4':
return Ethertype.IPv4
else:
- raise Exception('Invalid Ethertype - ' + ethertype)
- else:
- raise Exception('Invalid Ethertype object - ' + str(ethertype))
+ raise Exception('Invalid Ethertype - ' + eth_str)
diff --git a/snaps/openstack/tests/create_flavor_tests.py b/snaps/openstack/tests/create_flavor_tests.py
index ad135ee..be7ac64 100644
--- a/snaps/openstack/tests/create_flavor_tests.py
+++ b/snaps/openstack/tests/create_flavor_tests.py
@@ -1,3 +1,8 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
@@ -160,60 +165,60 @@ class FlavorSettingsUnitTests(unittest.TestCase):
def test_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(name='foo', ram=1, disk=2, vcpus=3)
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_config_with_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(config={'name': 'foo', 'ram': 1, 'disk': 2, 'vcpus': 3})
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(name='foo', flavor_id='bar', ram=1, disk=2, vcpus=3, ephemeral=4, swap=5,
rxtx_factor=6.0, is_public=False, metadata=metadata)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
def test_config_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(config={'name': 'foo', 'flavor_id': 'bar', 'ram': 1, 'disk': 2, 'vcpus': 3,
'ephemeral': 4, 'swap': 5, 'rxtx_factor': 6.0, 'is_public': False,
'metadata': metadata})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
class CreateFlavorTests(OSComponentTestCase):
@@ -264,7 +269,7 @@ class CreateFlavorTests(OSComponentTestCase):
flavor_creator_2 = OpenStackFlavor(self.os_creds, flavor_settings)
flavor2 = flavor_creator_2.create()
- self.assertEquals(flavor.id, flavor2.id)
+ self.assertEqual(flavor.id, flavor2.id)
def test_create_clean_flavor(self):
"""
diff --git a/snaps/openstack/tests/create_image_tests.py b/snaps/openstack/tests/create_image_tests.py
index 1c213bb..36e4271 100644
--- a/snaps/openstack/tests/create_image_tests.py
+++ b/snaps/openstack/tests/create_image_tests.py
@@ -20,7 +20,7 @@ import unittest
from snaps import file_utils
from snaps.openstack.create_image import ImageSettings
-import openstack_tests
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.utils import glance_utils
from snaps.openstack import create_image
from snaps.openstack import os_credentials
@@ -78,10 +78,10 @@ class ImageSettingsUnitTests(unittest.TestCase):
def test_name_user_format_url_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
@@ -89,41 +89,41 @@ class ImageSettingsUnitTests(unittest.TestCase):
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_url_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'download_url': 'http://foo.com'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_name_user_format_file_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_file_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_all_url(self):
@@ -133,21 +133,21 @@ class ImageSettingsUnitTests(unittest.TestCase):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties, nic_config_pb_loc='/foo/bar',
kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('bar', settings.kernel_image_settings.image_user)
- self.assertEquals('qcow2', settings.kernel_image_settings.format)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
- self.assertEquals('bar', settings.ramdisk_image_settings.image_user)
- self.assertEquals('qcow2', settings.ramdisk_image_settings.format)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('bar', settings.kernel_image_settings.image_user)
+ self.assertEqual('qcow2', settings.kernel_image_settings.format)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
+ self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
def test_config_all_url(self):
settings = ImageSettings(
@@ -159,42 +159,42 @@ class ImageSettingsUnitTests(unittest.TestCase):
'image_user': 'bar', 'format': 'qcow2'},
'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com',
'image_user': 'bar', 'format': 'qcow2'}})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
def test_all_file(self):
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
extra_properties=properties, nic_config_pb_loc='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals(properties, settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual(properties, settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
def test_config_all_file(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow',
'extra_properties': '{\'hw_video_model\' : \'vga\'}',
'nic_config_pb_loc': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
class CreateImageSuccessTests(OSIntegrationTestCase):
@@ -246,10 +246,10 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(created_image.size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_image_clean_url_properties(self):
"""
@@ -263,11 +263,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_image_clean_file(self):
"""
@@ -288,11 +288,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_delete_image(self):
"""
@@ -305,8 +305,8 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
# Delete Image manually
glance_utils.delete_image(self.glance, created_image)
@@ -327,16 +327,16 @@ class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(image1.name, retrieved_image.name)
- self.assertEquals(image1.id, retrieved_image.id)
- self.assertEquals(image1.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(image1.name, retrieved_image.name)
+ self.assertEqual(image1.id, retrieved_image.id)
+ self.assertEqual(image1.properties, retrieved_image.properties)
# Should be retrieving the instance data
os_image_2 = create_image.OpenStackImage(self.os_creds, self.image_settings)
image2 = os_image_2.create()
- self.assertEquals(image1.id, image2.id)
+ self.assertEqual(image1.id, image2.id)
class CreateImageNegativeTests(OSIntegrationTestCase):
@@ -490,17 +490,17 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
main_image = glance_utils.get_image(self.glance, image_settings.name)
self.assertIsNotNone(main_image)
self.assertIsNotNone(image_creator.get_image())
- self.assertEquals(image_creator.get_image().id, main_image.id)
+ self.assertEqual(image_creator.get_image().id, main_image.id)
kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name)
self.assertIsNotNone(kernel_image)
self.assertIsNotNone(image_creator.get_kernel_image())
- self.assertEquals(kernel_image.id, image_creator.get_kernel_image().id)
+ self.assertEqual(kernel_image.id, image_creator.get_kernel_image().id)
ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name)
self.assertIsNotNone(ramdisk_image)
self.assertIsNotNone(image_creator.get_ramdisk_image())
- self.assertEquals(ramdisk_image.id, image_creator.get_ramdisk_image().id)
+ self.assertEqual(ramdisk_image.id, image_creator.get_ramdisk_image().id)
def test_create_three_part_image_from_file_3_creators(self):
"""
@@ -522,7 +522,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_file_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
@@ -535,7 +535,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
# Create the main image
image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
@@ -552,11 +552,11 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_three_part_image_from_url_3_creators(self):
"""
@@ -578,7 +578,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_image_settings = openstack_tests.cirros_image_settings(
@@ -590,7 +590,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_image_settings), ramdisk_image.size)
# Create the main image
os_image_settings = openstack_tests.cirros_image_settings(
@@ -612,12 +612,12 @@ class CreateMultiPartImageTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(os_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def get_image_size(image_settings):
diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py
index 977d33b..e0dca17 100644
--- a/snaps/openstack/tests/create_instance_tests.py
+++ b/snaps/openstack/tests/create_instance_tests.py
@@ -69,33 +69,33 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
def test_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(name='foo', flavor='bar', port_settings=[port_settings])
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_config_with_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(config={'name': 'foo', 'flavor': 'bar', 'ports': [port_settings]})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_all(self):
@@ -106,22 +106,22 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
security_group_names=['sec_grp_1'], floating_ip_settings=[fip_settings],
sudo_user='joe', vm_boot_timeout=999, vm_delete_timeout=333,
ssh_connect_timeout=111, availability_zone='server name')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals('sec_grp_1', settings.security_group_names[0])
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual('sec_grp_1', settings.security_group_names[0])
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
def test_config_all(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
@@ -132,21 +132,21 @@ class VmInstanceSettingsUnitTests(unittest.TestCase):
'floating_ips': [fip_settings], 'sudo_user': 'joe',
'vm_boot_timeout': 999, 'vm_delete_timeout': 333,
'ssh_connect_timeout': 111, 'availability_zone': 'server name'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
class FloatingIpSettingsUnitTests(unittest.TestCase):
@@ -188,36 +188,36 @@ class FloatingIpSettingsUnitTests(unittest.TestCase):
def test_name_port_router_only(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router')
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_config_with_name_port_router_only(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_all(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router',
subnet_name='bar-subnet', provisioning=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
def test_config_all(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router',
'subnet_name': 'bar-subnet', 'provisioning': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
@@ -283,7 +283,7 @@ class SimpleHealthCheck(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -295,19 +295,19 @@ class SimpleHealthCheck(OSIntegrationTestCase):
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -408,25 +408,25 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -441,13 +441,13 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase):
self.os_creds, instance_settings, self.image_creator.image_settings)
vm_inst = self.inst_creator.create()
- self.assertEquals(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Delete instance
nova_utils.delete_vm_instance(self.nova, vm_inst)
self.assertTrue(self.inst_creator.vm_deleted(block=True))
- self.assertEquals(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Exception should not be thrown
self.inst_creator.clean()
@@ -523,13 +523,13 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -541,25 +541,25 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.router_creator:
try:
self.router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -585,9 +585,9 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.inst_creators.append(inst_creator)
vm_inst = inst_creator.create()
- self.assertEquals(ip_1, inst_creator.get_port_ip(self.port_1_name))
+ self.assertEqual(ip_1, inst_creator.get_port_ip(self.port_1_name))
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
def test_ssh_client_fip_before_active(self):
"""
@@ -610,7 +610,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
@@ -637,7 +637,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
@@ -698,25 +698,25 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -736,7 +736,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
def test_set_custom_invalid_ip_one_subnet(self):
@@ -772,7 +772,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_custom_invalid_mac(self):
"""
@@ -808,9 +808,9 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_allowed_address_pairs(self):
"""
@@ -832,7 +832,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase):
port = self.inst_creator.get_port_by_name(port_settings.name)
self.assertIsNotNone(port)
self.assertIsNotNone(port['port'].get('allowed_address_pairs'))
- self.assertEquals(1, len(port['port']['allowed_address_pairs']))
+ self.assertEqual(1, len(port['port']['allowed_address_pairs']))
validation_utils.objects_equivalent(pair, port['port']['allowed_address_pairs'][0])
def test_set_allowed_address_pairs_bad_mac(self):
@@ -933,25 +933,25 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase):
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -987,7 +987,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase):
vm = creator.get_vm_inst()
deployed_zone = vm._info['OS-EXT-AZ:availability_zone']
deployed_host = vm._info['OS-EXT-SRV-ATTR:host']
- self.assertEquals(zone, deployed_zone + ':' + deployed_host)
+ self.assertEqual(zone, deployed_zone + ':' + deployed_host)
index += 1
@@ -1065,7 +1065,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
self.keypair_creator.create()
except Exception as e:
self.tearDown()
- raise Exception(e.message)
+ raise Exception(str(e))
def tearDown(self):
"""
@@ -1075,13 +1075,13 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
@@ -1093,25 +1093,25 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
for router_creator in self.router_creators:
try:
router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
for network_creator in self.network_creators:
try:
network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -1146,7 +1146,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
vm_inst = self.inst_creator.create(block=True)
- self.assertEquals(vm_inst, self.inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, self.inst_creator.get_vm_inst())
# Effectively blocks until VM has been properly activated
self.assertTrue(self.inst_creator.vm_active(block=True))
@@ -1154,6 +1154,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
# Effectively blocks until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
+ # TODO - Refactor config_nics() to return a status that can be validated here.
self.inst_creator.config_nics()
# TODO - *** ADD VALIDATION HERE ***
@@ -1226,31 +1227,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
for sec_grp_creator in self.sec_grp_creators:
try:
sec_grp_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning security group with message - ' + e.message)
+ logger.error('Unexpected exception cleaning security group with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
@@ -1495,25 +1496,25 @@ class CreateInstanceFromThreePartImage(OSIntegrationTestCase):
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
diff --git a/snaps/openstack/tests/create_keypairs_tests.py b/snaps/openstack/tests/create_keypairs_tests.py
index e4409a9..b587a50 100644
--- a/snaps/openstack/tests/create_keypairs_tests.py
+++ b/snaps/openstack/tests/create_keypairs_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,8 +16,6 @@ import os
import uuid
import unittest
-from Crypto.PublicKey import RSA
-
from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
@@ -40,52 +38,52 @@ class KeypairSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = KeypairSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_only(self):
settings = KeypairSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_pub_only(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_pub_only(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_priv_only(self):
settings = KeypairSettings(name='foo', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_with_name_priv_only(self):
settings = KeypairSettings(config={'name': 'foo', 'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_all(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_all(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub',
'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
class CreateKeypairsTests(OSIntegrationTestCase):
@@ -132,7 +130,7 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
def test_create_delete_keypair(self):
"""
@@ -162,10 +160,10 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
def test_create_keypair_save_both(self):
"""
@@ -178,10 +176,10 @@ class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
self.assertTrue(os.path.isfile(self.priv_file_path))
@@ -190,14 +188,14 @@ class CreateKeypairsTests(OSIntegrationTestCase):
Tests the creation of an existing public keypair from a file
:return:
"""
- keys = RSA.generate(1024)
+ keys = nova_utils.create_keys()
nova_utils.save_keys_to_files(keys=keys, pub_file_path=self.pub_file_path)
self.keypair_creator = OpenStackKeypair(
self.os_creds, KeypairSettings(name=self.keypair_name, public_filepath=self.pub_file_path))
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
diff --git a/snaps/openstack/tests/create_network_tests.py b/snaps/openstack/tests/create_network_tests.py
index a2b17f8..a7f21ad 100644
--- a/snaps/openstack/tests/create_network_tests.py
+++ b/snaps/openstack/tests/create_network_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -40,37 +40,37 @@ class NetworkSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = NetworkSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_config_with_name_only(self):
settings = NetworkSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_all(self):
sub_settings = SubnetSettings(name='foo-subnet', cidr='10.0.0.0/24')
settings = NetworkSettings(name='foo', admin_state_up=False, shared=True, project_name='bar', external=True,
network_type='flat', physical_network='phy', subnet_settings=[sub_settings])
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
def test_config_all(self):
settings = NetworkSettings(config={'name': 'foo', 'admin_state_up': False, 'shared': True,
@@ -78,15 +78,15 @@ class NetworkSettingsUnitTests(unittest.TestCase):
'physical_network': 'phy',
'subnets':
[{'subnet': {'name': 'foo-subnet', 'cidr': '10.0.0.0/24'}}]})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
class SubnetSettingsUnitTests(unittest.TestCase):
@@ -112,15 +112,15 @@ class SubnetSettingsUnitTests(unittest.TestCase):
def test_name_cidr_only(self):
settings = SubnetSettings(name='foo', cidr='10.0.0.0/24')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
@@ -129,16 +129,16 @@ class SubnetSettingsUnitTests(unittest.TestCase):
def test_config_with_name_cidr_only(self):
settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.gateway_ip)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
@@ -151,22 +151,22 @@ class SubnetSettingsUnitTests(unittest.TestCase):
start='10.0.0.2', end='10.0.0.101', gateway_ip='10.0.0.1', enable_dhcp=False,
dns_nameservers=['8.8.8.8'], host_routes=[host_routes], destination='dest',
nexthop='hop', ipv6_ra_mode='dhcpv6-stateful', ipv6_address_mode='slaac')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
def test_config_all(self):
host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'}
@@ -176,22 +176,22 @@ class SubnetSettingsUnitTests(unittest.TestCase):
'dns_nameservers': ['8.8.8.8'], 'host_routes': [host_routes],
'destination': 'dest', 'nexthop': 'hop', 'ipv6_ra_mode': 'dhcpv6-stateful',
'ipv6_address_mode': 'slaac'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
class PortSettingsUnitTests(unittest.TestCase):
@@ -217,8 +217,8 @@ class PortSettingsUnitTests(unittest.TestCase):
def test_name_netname_only(self):
settings = PortSettings(name='foo', network_name='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
@@ -233,8 +233,8 @@ class PortSettingsUnitTests(unittest.TestCase):
def test_config_with_name_netname_only(self):
settings = PortSettings(config={'name': 'foo', 'network_name': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
@@ -257,20 +257,20 @@ class PortSettingsUnitTests(unittest.TestCase):
security_groups=['foo_grp_id'], allowed_address_pairs=allowed_address_pairs,
opt_value='opt value', opt_name='opt name', device_owner='owner',
device_id='device number')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
def test_config_all(self):
ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}]
@@ -282,20 +282,20 @@ class PortSettingsUnitTests(unittest.TestCase):
'fixed_ips': fixed_ips, 'security_groups': ['foo_grp_id'],
'allowed_address_pairs': allowed_address_pairs, 'opt_value': 'opt value',
'opt_name': 'opt name', 'device_owner': 'owner', 'device_id': 'device number'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
class CreateNetworkSuccessTests(OSIntegrationTestCase):
@@ -415,8 +415,8 @@ class CreateNetworkSuccessTests(OSIntegrationTestCase):
self.net_creator2 = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
self.net_creator2.create()
- self.assertEquals(self.net_creator.get_network()['network']['id'],
- self.net_creator2.get_network()['network']['id'])
+ self.assertEqual(self.net_creator.get_network()['network']['id'],
+ self.net_creator2.get_network()['network']['id'])
class CreateNetworkTypeTests(OSComponentTestCase):
@@ -492,7 +492,7 @@ class CreateNetworkTypeTests(OSComponentTestCase):
# Validate network was created
neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
- self.assertEquals(network_type, network['network']['provider:network_type'])
+ self.assertEqual(network_type, network['network']['provider:network_type'])
# TODO - determine what value we need to place into physical_network
# - Do not know what vaule to place into the 'physical_network' setting.
diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py
index 4e1d254..ede8b4b 100644
--- a/snaps/openstack/tests/create_project_tests.py
+++ b/snaps/openstack/tests/create_project_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -41,30 +41,30 @@ class ProjectSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = ProjectSettings(name='foo')
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_config_with_name_only(self):
settings = ProjectSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_all(self):
settings = ProjectSettings(name='foo', domain='bar', description='foobar', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = ProjectSettings(config={'name': 'foo', 'domain': 'bar', 'description': 'foobar', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
@@ -104,7 +104,7 @@ class CreateProjectSuccessTests(OSComponentTestCase):
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
def test_create_project_2x(self):
"""
@@ -116,10 +116,10 @@ class CreateProjectSuccessTests(OSComponentTestCase):
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
project2 = OpenStackProject(self.os_creds, self.project_settings).create()
- self.assertEquals(retrieved_project, project2)
+ self.assertEqual(retrieved_project, project2)
def test_create_delete_project(self):
"""
@@ -194,9 +194,9 @@ class CreateProjectUserTests(OSComponentTestCase):
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
@@ -229,8 +229,8 @@ class CreateProjectUserTests(OSComponentTestCase):
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 079be0c..6a3c0ef 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -48,28 +48,28 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
def test_name_and_direction(self):
settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_config_name_and_direction(self):
settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_all(self):
settings = SecurityGroupRuleSettings(
sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
remote_ip_prefix='prfx')
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.icmp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
def test_config_all(self):
settings = SecurityGroupRuleSettings(
@@ -82,15 +82,15 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
'port_range_min': 1,
'port_range_max': 2,
'remote_ip_prefix': 'prfx'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.tcp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
class SecurityGroupSettingsUnitTests(unittest.TestCase):
@@ -108,11 +108,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = SecurityGroupSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_config_with_name_only(self):
settings = SecurityGroupSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
@@ -126,11 +126,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(rule_settings[0], settings.rule_settings[0])
- self.assertEquals(rule_settings[1], settings.rule_settings[1])
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
def test_config_all(self):
settings = SecurityGroupSettings(
@@ -139,12 +139,12 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
'project_name': 'foo',
'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(1, len(settings.rule_settings))
- self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.rule_settings[0].direction)
class CreateSecurityGroupTests(OSIntegrationTestCase):
@@ -189,7 +189,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_delete_group(self):
@@ -224,7 +224,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_group_with_several_rules(self):
@@ -253,7 +253,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_add_rule(self):
@@ -273,13 +273,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
direction=Direction.egress, protocol=Protocol.icmp))
rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) + 1, len(rules2))
+ self.assertEqual(len(rules) + 1, len(rules2))
def test_remove_rule_by_id(self):
"""
@@ -308,13 +308,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
def test_remove_rule_by_setting(self):
"""
@@ -344,12 +344,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex
diff --git a/snaps/openstack/tests/create_user_tests.py b/snaps/openstack/tests/create_user_tests.py
index 1f7a163..007f7f0 100644
--- a/snaps/openstack/tests/create_user_tests.py
+++ b/snaps/openstack/tests/create_user_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -52,35 +52,35 @@ class UserSettingsUnitTests(unittest.TestCase):
def test_name_pass_only(self):
settings = UserSettings(name='foo', password='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_config_with_name_pass_only(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_all(self):
settings = UserSettings(name='foo', password='bar', project_name='proj-foo', email='foo@bar.com', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar', 'project_name': 'proj-foo',
'email': 'foo@bar.com', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
@@ -120,7 +120,7 @@ class CreateUserSuccessTests(OSComponentTestCase):
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
def test_create_user_2x(self):
"""
@@ -132,11 +132,11 @@ class CreateUserSuccessTests(OSComponentTestCase):
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
# Create user for the second time to ensure it is the same
user2 = OpenStackUser(self.os_creds, self.user_settings).create()
- self.assertEquals(retrieved_user, user2)
+ self.assertEqual(retrieved_user, user2)
def test_create_delete_user(self):
"""
@@ -152,4 +152,3 @@ class CreateUserSuccessTests(OSComponentTestCase):
# Delete user
self.user_creator.clean()
self.assertIsNone(self.user_creator.get_user())
-
diff --git a/snaps/openstack/tests/validation_utils.py b/snaps/openstack/tests/validation_utils.py
index 7c9bd7f..b6d5856 100644
--- a/snaps/openstack/tests/validation_utils.py
+++ b/snaps/openstack/tests/validation_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -42,7 +42,7 @@ def dicts_equivalent(dict1, dict2):
:return: T/F
"""
if (type(dict1) is dict or type(dict1) is _DictWithMeta) and (type(dict2) is dict or type(dict2) is _DictWithMeta):
- for key, value1 in dict1.iteritems():
+ for key, value1 in dict1.items():
if not objects_equivalent(value1, dict2.get(key)):
return False
return True
diff --git a/snaps/openstack/utils/glance_utils.py b/snaps/openstack/utils/glance_utils.py
index d859257..ca9c490 100644
--- a/snaps/openstack/utils/glance_utils.py
+++ b/snaps/openstack/utils/glance_utils.py
@@ -151,10 +151,8 @@ def __create_image_v2(glance, image_settings):
kwargs['name'] = image_settings.name
kwargs['disk_format'] = image_settings.format
kwargs['container_format'] = 'bare'
-
if image_settings.extra_properties:
- for key, value in image_settings.extra_properties.iteritems():
- kwargs[key] = value
+ kwargs.update(image_settings.extra_properties)
created_image = glance.images.create(**kwargs)
image_file = file_utils.get_file(image_filename)
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index 6c92d2e..a65a42f 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,7 +16,7 @@ import logging
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
@@ -80,7 +80,7 @@ def get_network(neutron, network_name, project_id=None):
net_filter['project_id'] = project_id
networks = neutron.list_networks(**net_filter)
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('name') == network_name:
if project_id and inst.get('project_id') == project_id:
@@ -98,7 +98,7 @@ def get_network_by_id(neutron, network_id):
:return:
"""
networks = neutron.list_networks(**{'id': network_id})
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('id') == network_id:
return {'network': inst}
@@ -144,7 +144,7 @@ def get_subnet_by_name(neutron, subnet_name):
:return:
"""
subnets = neutron.list_subnets(**{'name': subnet_name})
- for subnet, subnetInst in subnets.iteritems():
+ for subnet, subnetInst in subnets.items():
for inst in subnetInst:
if inst.get('name') == subnet_name:
return {'subnet': inst}
@@ -189,7 +189,7 @@ def get_router_by_name(neutron, router_name):
:return:
"""
routers = neutron.list_routers(**{'name': router_name})
- for router, routerInst in routers.iteritems():
+ for router, routerInst in routers.items():
for inst in routerInst:
if inst.get('name') == router_name:
return {'router': inst}
@@ -228,10 +228,10 @@ def remove_interface_router(neutron, router, subnet=None, port=None):
logger.info('Removing router interface from router named ' + router['router']['name'])
neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
except NotFound as e:
- logger.warn('Could not remove router interface. NotFound - ' + e.message)
+ logger.warning('Could not remove router interface. NotFound - ' + str(e))
pass
else:
- logger.warn('Could not remove router interface, No router object')
+ logger.warning('Could not remove router interface, No router object')
def __create_port_json_body(subnet=None, port=None):
diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py
index e7428ed..419f451 100644
--- a/snaps/openstack/utils/nova_utils.py
+++ b/snaps/openstack/utils/nova_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -12,9 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.backends import default_backend
+
import os
import logging
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
from novaclient.client import Client
from novaclient.exceptions import NotFound
@@ -58,10 +63,29 @@ def get_latest_server_object(nova, server):
return nova.servers.get(server)
+def create_keys(key_size=2048):
+ """
+ Generates public and private keys
+ :param key_size: the number of bytes for the key size
+ :return: the cryptography keys
+ """
+ return rsa.generate_private_key(backend=default_backend(), public_exponent=65537,
+ key_size=key_size)
+
+
+def public_key_openssh(keys):
+ """
+ Returns the public key for OpenSSH
+ :param keys: the keys generated by create_keys() from cryptography
+ :return: the OpenSSH public key
+ """
+ return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+
+
def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
"""
Saves the generated RSA generated keys to the filesystem
- :param keys: the keys to save
+ :param keys: the keys to save generated by cryptography
:param pub_file_path: the path to the public keys
:param priv_file_path: the path to the private keys
:return: None
@@ -72,7 +96,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
if not os.path.isdir(pub_dir):
os.mkdir(pub_dir)
public_handle = open(pub_file_path, 'wb')
- public_handle.write(keys.publickey().exportKey('OpenSSH'))
+ public_bytes = keys.public_key().public_bytes(
+ serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+ public_handle.write(public_bytes)
public_handle.close()
os.chmod(pub_file_path, 0o400)
logger.info("Saved public key to - " + pub_file_path)
@@ -81,7 +107,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
if not os.path.isdir(priv_dir):
os.mkdir(priv_dir)
private_handle = open(priv_file_path, 'wb')
- private_handle.write(keys.exportKey())
+ private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()))
private_handle.close()
os.chmod(priv_file_path, 0o400)
logger.info("Saved private key to - " + priv_file_path)
@@ -95,7 +123,7 @@ def upload_keypair_file(nova, name, file_path):
:param file_path: the path to the public key file
:return: the keypair object
"""
- with open(os.path.expanduser(file_path)) as fpubkey:
+ with open(os.path.expanduser(file_path), 'rb') as fpubkey:
logger.info('Saving keypair to - ' + file_path)
return upload_keypair(nova, name, fpubkey.read())
@@ -109,7 +137,7 @@ def upload_keypair(nova, name, key):
:return: the keypair object
"""
logger.info('Creating keypair with name - ' + name)
- return nova.keypairs.create(name=name, public_key=key)
+ return nova.keypairs.create(name=name, public_key=key.decode('utf-8'))
def keypair_exists(nova, keypair_obj):
@@ -212,7 +240,7 @@ def get_nova_availability_zones(nova):
zones = nova.availability_zones.list()
for zone in zones:
if zone.zoneName == 'nova':
- for key, host in zone.hosts.iteritems():
+ for key, host in zone.hosts.items():
out.append(zone.zoneName + ':' + key)
return out
diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py
index 76a43ef..c072fd3 100644
--- a/snaps/openstack/utils/tests/keystone_utils_tests.py
+++ b/snaps/openstack/utils/tests/keystone_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -93,8 +93,8 @@ class KeystoneUtilsTests(OSComponentTestCase):
"""
project_settings = ProjectSettings(name=self.project_name)
self.project = keystone_utils.create_project(self.keystone, project_settings)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name)
self.assertIsNotNone(project)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 5f95fc9..9a043e3 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -109,14 +109,16 @@ class NeutronUtilsNetworkTests(OSComponentTestCase):
Tests the neutron_utils.create_neutron_net() function with an empty network name
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings(name=''))
def test_create_network_null_name(self):
"""
Tests the neutron_utils.create_neutron_net() function when the network name is None
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings())
class NeutronUtilsSubnetTests(OSComponentTestCase):
@@ -644,7 +646,7 @@ def validate_port(neutron, port_obj, this_port_name):
:return: True/False
"""
ports = neutron.list_ports()
- for port, port_insts in ports.iteritems():
+ for port, port_insts in ports.items():
for inst in port_insts:
if inst['id'] == port_obj['port']['id']:
return inst['name'] == this_port_name
diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py
index f6c9156..0a2b24b 100644
--- a/snaps/openstack/utils/tests/nova_utils_tests.py
+++ b/snaps/openstack/utils/tests/nova_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -16,8 +16,6 @@ import logging
import os
import uuid
-from Crypto.PublicKey import RSA
-
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.create_flavor import FlavorSettings
@@ -71,8 +69,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
self.pub_key_file_path = self.priv_key_file_path + '.pub'
self.nova = nova_utils.nova_client(self.os_creds)
- self.keys = RSA.generate(1024)
- self.public_key = self.keys.publickey().exportKey('OpenSSH')
+ self.keys = nova_utils.create_keys()
+ self.public_key = nova_utils.public_key_openssh(self.keys)
self.keypair_name = guid
self.keypair = None
self.floating_ip = None
@@ -106,9 +104,9 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
- self.assertEquals(self.keypair, keypair)
+ self.assertEqual(self.keypair, keypair)
def test_create_delete_keypair(self):
"""
@@ -116,7 +114,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
nova_utils.delete_keypair(self.nova, self.keypair)
result2 = nova_utils.keypair_exists(self.nova, self.keypair)
self.assertIsNone(result2)
@@ -129,7 +127,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path)
self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path)
pub_key = open(os.path.expanduser(self.pub_key_file_path)).read()
- self.assertEquals(self.keypair.public_key, pub_key)
+ self.assertEqual(self.keypair.public_key, pub_key)
def test_floating_ips(self):
"""
@@ -141,7 +139,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase):
self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name)
returned = nova_utils.get_floating_ip(self.nova, self.floating_ip)
- self.assertEquals(self.floating_ip, returned)
+ self.assertEqual(self.floating_ip, returned)
class NovaUtilsFlavorTests(OSComponentTestCase):
@@ -192,17 +190,17 @@ class NovaUtilsFlavorTests(OSComponentTestCase):
Validates the flavor_settings against the OpenStack flavor object
"""
self.assertIsNotNone(self.flavor)
- self.assertEquals(self.flavor_settings.name, self.flavor.name)
- self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id)
- self.assertEquals(self.flavor_settings.ram, self.flavor.ram)
- self.assertEquals(self.flavor_settings.disk, self.flavor.disk)
- self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus)
- self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral)
+ self.assertEqual(self.flavor_settings.name, self.flavor.name)
+ self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
+ self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
+ self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
+ self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
+ self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
if self.flavor_settings.swap == 0:
- self.assertEquals('', self.flavor.swap)
+ self.assertEqual('', self.flavor.swap)
else:
- self.assertEquals(self.flavor_settings.swap, self.flavor.swap)
+ self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
- self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
- self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public)
+ self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
+ self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
diff --git a/snaps/provisioning/ansible_utils.py b/snaps/provisioning/ansible_utils.py
index 39ae1e6..3fbc88f 100644
--- a/snaps/provisioning/ansible_utils.py
+++ b/snaps/provisioning/ansible_utils.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -105,4 +105,4 @@ def ssh_client(ip, user, private_key_filepath, proxy_settings=None):
ssh.connect(ip, username=user, key_filename=private_key_filepath, sock=proxy_cmd)
return ssh
except Exception as e:
- logger.warn('Unable to connect via SSH with message - ' + e.message)
+ logger.warning('Unable to connect via SSH with message - ' + str(e))
diff --git a/snaps/provisioning/tests/ansible_utils_tests.py b/snaps/provisioning/tests/ansible_utils_tests.py
index 41bc199..1f03e09 100644
--- a/snaps/provisioning/tests/ansible_utils_tests.py
+++ b/snaps/provisioning/tests/ansible_utils_tests.py
@@ -111,9 +111,9 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
self.inst_creator = create_instance.OpenStackVmInstance(
self.os_creds, instance_settings, self.image_creator.image_settings,
keypair_settings=self.keypair_creator.keypair_settings)
- except Exception as e:
+ except:
self.tearDown()
- raise Exception(e.message)
+ raise
def tearDown(self):
"""
@@ -173,7 +173,7 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
retval = ansible_utils.apply_playbook('provisioning/tests/playbooks/simple_playbook.yml', [ip], user, priv_key,
proxy_setting=self.os_creds.proxy_settings)
- self.assertEquals(0, retval)
+ self.assertEqual(0, retval)
ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings)
self.assertIsNotNone(ssh)
@@ -184,7 +184,7 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
with open(self.test_file_local_path) as f:
file_contents = f.readline()
- self.assertEquals('Hello World!', file_contents)
+ self.assertEqual('Hello World!', file_contents)
def test_apply_template_playbook(self):
"""
@@ -215,4 +215,4 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
with open(self.test_file_local_path) as f:
file_contents = f.readline()
- self.assertEquals('Hello Foo!', file_contents)
+ self.assertEqual('Hello Foo!', file_contents)
diff --git a/snaps/tests/file_utils_tests.py b/snaps/tests/file_utils_tests.py
index d517d5d..a28231b 100644
--- a/snaps/tests/file_utils_tests.py
+++ b/snaps/tests/file_utils_tests.py
@@ -1,4 +1,4 @@
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -96,7 +96,7 @@ class FileUtilsTests(unittest.TestCase):
:return:
"""
os_env_dict = file_utils.read_os_env_file('openstack/tests/conf/overcloudrc_test')
- self.assertEquals('test_pw', os_env_dict['OS_PASSWORD'])
- self.assertEquals('http://foo:5000/v2.0/', os_env_dict['OS_AUTH_URL'])
- self.assertEquals('admin', os_env_dict['OS_USERNAME'])
- self.assertEquals('admin', os_env_dict['OS_TENANT_NAME'])
+ self.assertEqual('test_pw', os_env_dict['OS_PASSWORD'])
+ self.assertEqual('http://foo:5000/v2.0/', os_env_dict['OS_AUTH_URL'])
+ self.assertEqual('admin', os_env_dict['OS_USERNAME'])
+ self.assertEqual('admin', os_env_dict['OS_TENANT_NAME'])