diff options
Diffstat (limited to 'snaps/openstack')
22 files changed, 634 insertions, 625 deletions
diff --git a/snaps/openstack/create_image.py b/snaps/openstack/create_image.py index 4f2bd7e..1e2e4dc 100644 --- a/snaps/openstack/create_image.py +++ b/snaps/openstack/create_image.py @@ -176,7 +176,7 @@ class OpenStackImage: # TODO - Place this API call into glance_utils. status = glance_utils.get_image_status(self.__glance, self.__image) if not status: - logger.warn('Cannot image status for image with ID - ' + self.__image.id) + logger.warning('Cannot image status for image with ID - ' + self.__image.id) return False if status == 'ERROR': diff --git a/snaps/openstack/create_instance.py b/snaps/openstack/create_instance.py index dda68fd..9fa3232 100644 --- a/snaps/openstack/create_instance.py +++ b/snaps/openstack/create_instance.py @@ -210,7 +210,7 @@ class OpenStackVmInstance: logger.info('Deleting Floating IP - ' + floating_ip.ip) nova_utils.delete_floating_ip(self.__nova, floating_ip) except Exception as e: - logger.error('Error deleting Floating IP - ' + e.message) + logger.error('Error deleting Floating IP - ' + str(e)) self.__floating_ips = list() self.__floating_ip_dict = dict() @@ -220,7 +220,7 @@ class OpenStackVmInstance: try: neutron_utils.delete_port(self.__neutron, port) except PortNotFoundClient as e: - logger.warn('Unexpected error deleting port - ' + e.message) + logger.warning('Unexpected error deleting port - ' + str(e)) pass self.__ports = list() @@ -243,7 +243,7 @@ class OpenStackVmInstance: logger.error('VM not deleted within the timeout period of ' + str(self.instance_settings.vm_delete_timeout) + ' seconds') except Exception as e: - logger.error('Unexpected error while checking VM instance status - ' + e.message) + logger.error('Unexpected error while checking VM instance status - ' + str(e)) def __setup_ports(self, port_settings, cleanup): """ @@ -303,7 +303,7 @@ class OpenStackVmInstance: ' on instance - ' + self.instance_settings.name) return except Exception as e: - logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + e.message) + logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + str(e)) time.sleep(poll_interval) count -= 1 pass @@ -341,7 +341,7 @@ class OpenStackVmInstance: if subnet_name: subnet = neutron_utils.get_subnet_by_name(self.__neutron, subnet_name) if not subnet: - logger.warn('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name) + logger.warning('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name) return None for fixed_ip in port_dict['fixed_ips']: if fixed_ip['subnet_id'] == subnet['subnet']['id']: @@ -374,7 +374,7 @@ class OpenStackVmInstance: for key, port in self.__ports: if key == port_name: return port - logger.warn('Cannot find port with name - ' + port_name) + logger.warning('Cannot find port with name - ' + port_name) return None def config_nics(self): @@ -427,7 +427,7 @@ class OpenStackVmInstance: [floating_ip], self.get_image_user(), self.keypair_settings.private_filepath, variables, self.__os_creds.proxy_settings) else: - logger.warn('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' + + logger.warning('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' + 'No playbook or keypairs found.') def get_image_user(self): @@ -452,7 +452,7 @@ class OpenStackVmInstance: return self.__vm_status_check(STATUS_DELETED, block, self.instance_settings.vm_delete_timeout, poll_interval) except NotFound as e: - logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + e.message) + logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + str(e)) return True def vm_active(self, block=False, poll_interval=POLL_INTERVAL): @@ -500,7 +500,7 @@ class OpenStackVmInstance: """ instance = self.__nova.servers.get(self.__vm.id) if not instance: - logger.warn('Cannot find instance with id - ' + self.__vm.id) + logger.warning('Cannot find instance with id - ' + self.__vm.id) return False if instance.status == 'ERROR': @@ -575,7 +575,7 @@ class OpenStackVmInstance: self.keypair_settings.private_filepath, proxy_settings=self.__os_creds.proxy_settings) else: - logger.warn('Cannot return an SSH client. No Floating IP configured') + logger.warning('Cannot return an SSH client. No Floating IP configured') def add_security_group(self, security_group): """ @@ -586,14 +586,14 @@ class OpenStackVmInstance: self.vm_active(block=True) if not security_group: - logger.warn('Security group object is None, cannot add') + logger.warning('Security group object is None, cannot add') return False try: nova_utils.add_security_group(self.__nova, self.get_vm_inst(), security_group['security_group']['name']) return True except NotFound as e: - logger.warn('Security group not added - ' + e.message) + logger.warning('Security group not added - ' + str(e)) return False def remove_security_group(self, security_group): @@ -605,14 +605,14 @@ class OpenStackVmInstance: self.vm_active(block=True) if not security_group: - logger.warn('Security group object is None, cannot remove') + logger.warning('Security group object is None, cannot remove') return False try: nova_utils.remove_security_group(self.__nova, self.get_vm_inst(), security_group) return True except NotFound as e: - logger.warn('Security group not removed - ' + e.message) + logger.warning('Security group not removed - ' + str(e)) return False @@ -659,7 +659,7 @@ class VmInstanceSettings: self.security_group_names = set(config['security_group_names']) elif isinstance(config['security_group_names'], set): self.security_group_names = config['security_group_names'] - elif isinstance(config['security_group_names'], basestring): + elif isinstance(config['security_group_names'], str): self.security_group_names = [config['security_group_names']] else: raise Exception('Invalid data type for security_group_names attribute') diff --git a/snaps/openstack/create_keypairs.py b/snaps/openstack/create_keypairs.py index ea7c811..6af40c6 100644 --- a/snaps/openstack/create_keypairs.py +++ b/snaps/openstack/create_keypairs.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,7 +15,6 @@ import logging import os -from Crypto.PublicKey import RSA from novaclient.exceptions import NotFound from snaps.openstack.utils import nova_utils @@ -50,28 +49,23 @@ class OpenStackKeypair: """ logger.info('Creating keypair %s...' % self.keypair_settings.name) - try: - self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name) - - if not self.__keypair and not cleanup: - if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath): - logger.info("Uploading existing keypair") - self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name, - self.keypair_settings.public_filepath) - else: - logger.info("Creating new keypair") - # TODO - Make this value configurable - keys = RSA.generate(1024) - self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name, - keys.publickey().exportKey('OpenSSH')) - nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath, - self.keypair_settings.private_filepath) - - return self.__keypair - except Exception as e: - logger.error('Unexpected error creating keypair named - ' + self.keypair_settings.name) - self.clean() - raise Exception(e.message) + self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name) + + if not self.__keypair and not cleanup: + if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath): + logger.info("Uploading existing keypair") + self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name, + self.keypair_settings.public_filepath) + else: + logger.info("Creating new keypair") + # TODO - Make this value configurable + keys = nova_utils.create_keys(1024) + self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name, + nova_utils.public_key_openssh(keys)) + nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath, + self.keypair_settings.private_filepath) + + return self.__keypair def clean(self): """ diff --git a/snaps/openstack/create_network.py b/snaps/openstack/create_network.py index a214ba1..210c2bf 100644 --- a/snaps/openstack/create_network.py +++ b/snaps/openstack/create_network.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -89,7 +89,7 @@ class OpenStackNetwork: logger.info('Deleting subnet with name ' + subnet['subnet']['name']) neutron_utils.delete_subnet(self.__neutron, subnet) except NotFound as e: - logger.warn('Error deleting subnet with message - ' + e.message) + logger.warning('Error deleting subnet with message - ' + str(e)) pass self.__subnets = list() diff --git a/snaps/openstack/create_project.py b/snaps/openstack/create_project.py index 60f9ed0..2658d13 100644 --- a/snaps/openstack/create_project.py +++ b/snaps/openstack/create_project.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -47,19 +47,14 @@ class OpenStackProject: :param cleanup: Denotes whether or not this is being called for cleanup or not :return: The OpenStack Image object """ - try: - self.__project = keystone_utils.get_project(keystone=self.__keystone, - project_name=self.project_settings.name) - if self.__project: - logger.info('Found project with name - ' + self.project_settings.name) - elif not cleanup: - self.__project = keystone_utils.create_project(self.__keystone, self.project_settings) - else: - logger.info('Did not create image due to cleanup mode') - except Exception as e: - logger.error('Unexpected error. Rolling back') - self.clean() - raise Exception(e.message) + self.__project = keystone_utils.get_project(keystone=self.__keystone, + project_name=self.project_settings.name) + if self.__project: + logger.info('Found project with name - ' + self.project_settings.name) + elif not cleanup: + self.__project = keystone_utils.create_project(self.__keystone, self.project_settings) + else: + logger.info('Did not create image due to cleanup mode') return self.__project diff --git a/snaps/openstack/create_router.py b/snaps/openstack/create_router.py index 70c6b76..5c461cc 100644 --- a/snaps/openstack/create_router.py +++ b/snaps/openstack/create_router.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -59,46 +59,42 @@ class OpenStackRouter: :return: the router object """ logger.debug('Creating Router with name - ' + self.router_settings.name) - try: - existing = False - router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name) - if router_inst: - self.__router = router_inst - existing = True + existing = False + router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name) + if router_inst: + self.__router = router_inst + existing = True + else: + if not cleanup: + self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings) + + for internal_subnet_name in self.router_settings.internal_subnets: + internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name) + if internal_subnet: + self.__internal_subnets.append(internal_subnet) + if internal_subnet and not cleanup and not existing: + logger.debug('Adding router to subnet...') + self.__internal_router_interface = neutron_utils.add_interface_router( + self.__neutron, self.__router, subnet=internal_subnet) else: - if not cleanup: - self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings) - - for internal_subnet_name in self.router_settings.internal_subnets: - internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name) - if internal_subnet: - self.__internal_subnets.append(internal_subnet) - if internal_subnet and not cleanup and not existing: - logger.debug('Adding router to subnet...') - self.__internal_router_interface = neutron_utils.add_interface_router( - self.__neutron, self.__router, subnet=internal_subnet) - else: - raise Exception('Subnet not found with name ' + internal_subnet_name) + raise Exception('Subnet not found with name ' + internal_subnet_name) + + for port_setting in self.router_settings.port_settings: + port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name) + logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name) + if port: + self.__ports.append(port) - for port_setting in self.router_settings.port_settings: - port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name) - logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name) + if not port and not cleanup and not existing: + port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting) if port: + logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name) self.__ports.append(port) + neutron_utils.add_interface_router(self.__neutron, self.__router, port=port) + else: + raise Exception('Error creating port with name - ' + port_setting.name) - if not port and not cleanup and not existing: - port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting) - if port: - logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name) - self.__ports.append(port) - neutron_utils.add_interface_router(self.__neutron, self.__router, port=port) - else: - raise Exception('Error creating port with name - ' + port_setting.name) - - return self.__router - except Exception as e: - self.clean() - raise Exception(e.message) + return self.__router def clean(self): """ @@ -163,8 +159,6 @@ class RouterSettings: policies. :param external_gateway: Name of the external network to which to route :param admin_state_up: The administrative status of the router. True = up / False = down (default True) - :param enable_snat: Boolean value. Enable Source NAT (SNAT) attribute. Default is True. To persist this - attribute value, set the enable_snat_by_default option in the neutron.conf file. :param external_fixed_ips: Dictionary containing the IP address parameters. :param internal_subnets: List of subnet names to which to connect this router for Floating IP purposes :param port_settings: List of PortSettings objects @@ -238,7 +232,7 @@ class RouterSettings: else: raise Exception('Could not find the external network named - ' + self.external_gateway) - #TODO: Enable SNAT option for Router - #TODO: Add external_fixed_ips Tests + # TODO: Enable SNAT option for Router + # TODO: Add external_fixed_ips Tests return {'router': out} diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py index fc1ee98..57f7284 100644 --- a/snaps/openstack/create_security_group.py +++ b/snaps/openstack/create_security_group.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -58,7 +58,7 @@ class OpenStackSecurityGroup: if not self.__security_group and not cleanup: # Create the security group self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone, - self.sec_grp_settings) + self.sec_grp_settings) # Get the rules added for free auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group) @@ -118,11 +118,11 @@ class OpenStackSecurityGroup: """ Removes and deletes the rules then the security group. """ - for setting, rule in self.__rules.iteritems(): + for setting, rule in self.__rules.items(): try: neutron_utils.delete_security_group_rule(self.__neutron, rule) except NotFound as e: - logger.warn('Rule not found, cannot delete - ' + e.message) + logger.warning('Rule not found, cannot delete - ' + str(e)) pass self.__rules = dict() @@ -130,7 +130,7 @@ class OpenStackSecurityGroup: try: neutron_utils.delete_security_group(self.__neutron, self.__security_group) except NotFound as e: - logger.warn('Security Group not found, cannot delete - ' + e.message) + logger.warning('Security Group not found, cannot delete - ' + str(e)) self.__security_group = None @@ -177,7 +177,7 @@ class OpenStackSecurityGroup: if rule_setting: self.__rules.pop(rule_setting) else: - logger.warn('Rule setting is None, cannot remove rule') + logger.warning('Rule setting is None, cannot remove rule') def __get_setting_from_rule(self, rule): """ @@ -460,17 +460,16 @@ def map_direction(direction): """ if not direction: return None - if type(direction) is Direction: + if isinstance(direction, Direction): return direction - elif isinstance(direction, basestring): - if direction == 'egress': + else: + dir_str = str(direction) + if dir_str == 'egress': return Direction.egress - elif direction == 'ingress': + elif dir_str == 'ingress': return Direction.ingress else: - raise Exception('Invalid Direction - ' + direction) - else: - raise Exception('Invalid Direction object - ' + str(direction)) + raise Exception('Invalid Direction - ' + dir_str) def map_protocol(protocol): @@ -482,21 +481,20 @@ def map_protocol(protocol): """ if not protocol: return None - elif type(protocol) is Protocol: + elif isinstance(protocol, Protocol): return protocol - elif isinstance(protocol, basestring): - if protocol == 'icmp': + else: + proto_str = str(protocol) + if proto_str == 'icmp': return Protocol.icmp - elif protocol == 'tcp': + elif proto_str == 'tcp': return Protocol.tcp - elif protocol == 'udp': + elif proto_str == 'udp': return Protocol.udp - elif protocol == 'null': + elif proto_str == 'null': return Protocol.null else: - raise Exception('Invalid Protocol - ' + protocol) - else: - raise Exception('Invalid Protocol object - ' + str(protocol)) + raise Exception('Invalid Protocol - ' + proto_str) def map_ethertype(ethertype): @@ -508,14 +506,13 @@ def map_ethertype(ethertype): """ if not ethertype: return None - elif type(ethertype) is Ethertype: + elif isinstance(ethertype, Ethertype): return ethertype - elif isinstance(ethertype, basestring): - if ethertype == 'IPv6': + else: + eth_str = str(ethertype) + if eth_str == 'IPv6': return Ethertype.IPv6 - elif ethertype == 'IPv4': + elif eth_str == 'IPv4': return Ethertype.IPv4 else: - raise Exception('Invalid Ethertype - ' + ethertype) - else: - raise Exception('Invalid Ethertype object - ' + str(ethertype)) + raise Exception('Invalid Ethertype - ' + eth_str) diff --git a/snaps/openstack/tests/create_flavor_tests.py b/snaps/openstack/tests/create_flavor_tests.py index ad135ee..be7ac64 100644 --- a/snaps/openstack/tests/create_flavor_tests.py +++ b/snaps/openstack/tests/create_flavor_tests.py @@ -1,3 +1,8 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 @@ -160,60 +165,60 @@ class FlavorSettingsUnitTests(unittest.TestCase): def test_name_ram_disk_vcpus_only(self): settings = FlavorSettings(name='foo', ram=1, disk=2, vcpus=3) - self.assertEquals('foo', settings.name) - self.assertEquals('auto', settings.flavor_id) - self.assertEquals(1, settings.ram) - self.assertEquals(2, settings.disk) - self.assertEquals(3, settings.vcpus) - self.assertEquals(0, settings.ephemeral) - self.assertEquals(0, settings.swap) - self.assertEquals(1.0, settings.rxtx_factor) - self.assertEquals(True, settings.is_public) - self.assertEquals(None, settings.metadata) + self.assertEqual('foo', settings.name) + self.assertEqual('auto', settings.flavor_id) + self.assertEqual(1, settings.ram) + self.assertEqual(2, settings.disk) + self.assertEqual(3, settings.vcpus) + self.assertEqual(0, settings.ephemeral) + self.assertEqual(0, settings.swap) + self.assertEqual(1.0, settings.rxtx_factor) + self.assertEqual(True, settings.is_public) + self.assertEqual(None, settings.metadata) def test_config_with_name_ram_disk_vcpus_only(self): settings = FlavorSettings(config={'name': 'foo', 'ram': 1, 'disk': 2, 'vcpus': 3}) - self.assertEquals('foo', settings.name) - self.assertEquals('auto', settings.flavor_id) - self.assertEquals(1, settings.ram) - self.assertEquals(2, settings.disk) - self.assertEquals(3, settings.vcpus) - self.assertEquals(0, settings.ephemeral) - self.assertEquals(0, settings.swap) - self.assertEquals(1.0, settings.rxtx_factor) - self.assertEquals(True, settings.is_public) - self.assertEquals(None, settings.metadata) + self.assertEqual('foo', settings.name) + self.assertEqual('auto', settings.flavor_id) + self.assertEqual(1, settings.ram) + self.assertEqual(2, settings.disk) + self.assertEqual(3, settings.vcpus) + self.assertEqual(0, settings.ephemeral) + self.assertEqual(0, settings.swap) + self.assertEqual(1.0, settings.rxtx_factor) + self.assertEqual(True, settings.is_public) + self.assertEqual(None, settings.metadata) def test_all(self): metadata = {'foo': 'bar'} settings = FlavorSettings(name='foo', flavor_id='bar', ram=1, disk=2, vcpus=3, ephemeral=4, swap=5, rxtx_factor=6.0, is_public=False, metadata=metadata) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor_id) - self.assertEquals(1, settings.ram) - self.assertEquals(2, settings.disk) - self.assertEquals(3, settings.vcpus) - self.assertEquals(4, settings.ephemeral) - self.assertEquals(5, settings.swap) - self.assertEquals(6.0, settings.rxtx_factor) - self.assertEquals(False, settings.is_public) - self.assertEquals(metadata, settings.metadata) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor_id) + self.assertEqual(1, settings.ram) + self.assertEqual(2, settings.disk) + self.assertEqual(3, settings.vcpus) + self.assertEqual(4, settings.ephemeral) + self.assertEqual(5, settings.swap) + self.assertEqual(6.0, settings.rxtx_factor) + self.assertEqual(False, settings.is_public) + self.assertEqual(metadata, settings.metadata) def test_config_all(self): metadata = {'foo': 'bar'} settings = FlavorSettings(config={'name': 'foo', 'flavor_id': 'bar', 'ram': 1, 'disk': 2, 'vcpus': 3, 'ephemeral': 4, 'swap': 5, 'rxtx_factor': 6.0, 'is_public': False, 'metadata': metadata}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor_id) - self.assertEquals(1, settings.ram) - self.assertEquals(2, settings.disk) - self.assertEquals(3, settings.vcpus) - self.assertEquals(4, settings.ephemeral) - self.assertEquals(5, settings.swap) - self.assertEquals(6.0, settings.rxtx_factor) - self.assertEquals(False, settings.is_public) - self.assertEquals(metadata, settings.metadata) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor_id) + self.assertEqual(1, settings.ram) + self.assertEqual(2, settings.disk) + self.assertEqual(3, settings.vcpus) + self.assertEqual(4, settings.ephemeral) + self.assertEqual(5, settings.swap) + self.assertEqual(6.0, settings.rxtx_factor) + self.assertEqual(False, settings.is_public) + self.assertEqual(metadata, settings.metadata) class CreateFlavorTests(OSComponentTestCase): @@ -264,7 +269,7 @@ class CreateFlavorTests(OSComponentTestCase): flavor_creator_2 = OpenStackFlavor(self.os_creds, flavor_settings) flavor2 = flavor_creator_2.create() - self.assertEquals(flavor.id, flavor2.id) + self.assertEqual(flavor.id, flavor2.id) def test_create_clean_flavor(self): """ diff --git a/snaps/openstack/tests/create_image_tests.py b/snaps/openstack/tests/create_image_tests.py index 6b8c1c0..87e7bdf 100644 --- a/snaps/openstack/tests/create_image_tests.py +++ b/snaps/openstack/tests/create_image_tests.py @@ -20,7 +20,7 @@ import unittest from snaps import file_utils from snaps.openstack.create_image import ImageSettings -import openstack_tests +from snaps.openstack.tests import openstack_tests from snaps.openstack.utils import glance_utils from snaps.openstack import create_image from snaps.openstack import os_credentials @@ -78,10 +78,10 @@ class ImageSettingsUnitTests(unittest.TestCase): def test_name_user_format_url_only(self): settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) - self.assertEquals('http://foo.com', settings.url) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) + self.assertEqual('http://foo.com', settings.url) self.assertIsNone(settings.image_file) self.assertIsNone(settings.nic_config_pb_loc) @@ -89,41 +89,41 @@ class ImageSettingsUnitTests(unittest.TestCase): properties = {'hw_video_model': 'vga'} settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com', extra_properties=properties) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) - self.assertEquals('http://foo.com', settings.url) - self.assertEquals(properties, settings.extra_properties) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) + self.assertEqual('http://foo.com', settings.url) + self.assertEqual(properties, settings.extra_properties) self.assertIsNone(settings.image_file) self.assertIsNone(settings.nic_config_pb_loc) def test_config_with_name_user_format_url_only(self): settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2', 'download_url': 'http://foo.com'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) - self.assertEquals('http://foo.com', settings.url) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) + self.assertEqual('http://foo.com', settings.url) self.assertIsNone(settings.image_file) self.assertIsNone(settings.nic_config_pb_loc) def test_name_user_format_file_only(self): settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) self.assertIsNone(settings.url) - self.assertEquals('/foo/bar.qcow', settings.image_file) + self.assertEqual('/foo/bar.qcow', settings.image_file) self.assertIsNone(settings.nic_config_pb_loc) def test_config_with_name_user_format_file_only(self): settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2', 'image_file': '/foo/bar.qcow'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) self.assertIsNone(settings.url) - self.assertEquals('/foo/bar.qcow', settings.image_file) + self.assertEqual('/foo/bar.qcow', settings.image_file) self.assertIsNone(settings.nic_config_pb_loc) def test_all_url(self): @@ -133,21 +133,21 @@ class ImageSettingsUnitTests(unittest.TestCase): settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com', extra_properties=properties, nic_config_pb_loc='/foo/bar', kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) - self.assertEquals('http://foo.com', settings.url) - self.assertEquals(properties, settings.extra_properties) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) + self.assertEqual('http://foo.com', settings.url) + self.assertEqual(properties, settings.extra_properties) self.assertIsNone(settings.image_file) - self.assertEquals('/foo/bar', settings.nic_config_pb_loc) - self.assertEquals('kernel', settings.kernel_image_settings.name) - self.assertEquals('http://kernel.com', settings.kernel_image_settings.url) - self.assertEquals('bar', settings.kernel_image_settings.image_user) - self.assertEquals('qcow2', settings.kernel_image_settings.format) - self.assertEquals('ramdisk', settings.ramdisk_image_settings.name) - self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url) - self.assertEquals('bar', settings.ramdisk_image_settings.image_user) - self.assertEquals('qcow2', settings.ramdisk_image_settings.format) + self.assertEqual('/foo/bar', settings.nic_config_pb_loc) + self.assertEqual('kernel', settings.kernel_image_settings.name) + self.assertEqual('http://kernel.com', settings.kernel_image_settings.url) + self.assertEqual('bar', settings.kernel_image_settings.image_user) + self.assertEqual('qcow2', settings.kernel_image_settings.format) + self.assertEqual('ramdisk', settings.ramdisk_image_settings.name) + self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url) + self.assertEqual('bar', settings.ramdisk_image_settings.image_user) + self.assertEqual('qcow2', settings.ramdisk_image_settings.format) def test_config_all_url(self): settings = ImageSettings( @@ -159,42 +159,42 @@ class ImageSettingsUnitTests(unittest.TestCase): 'image_user': 'bar', 'format': 'qcow2'}, 'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com', 'image_user': 'bar', 'format': 'qcow2'}}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) - self.assertEquals('http://foo.com', settings.url) - self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) + self.assertEqual('http://foo.com', settings.url) + self.assertEqual('{\'hw_video_model\': \'vga\'}', settings.extra_properties) self.assertIsNone(settings.image_file) - self.assertEquals('/foo/bar', settings.nic_config_pb_loc) - self.assertEquals('kernel', settings.kernel_image_settings.name) - self.assertEquals('http://kernel.com', settings.kernel_image_settings.url) - self.assertEquals('ramdisk', settings.ramdisk_image_settings.name) - self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url) + self.assertEqual('/foo/bar', settings.nic_config_pb_loc) + self.assertEqual('kernel', settings.kernel_image_settings.name) + self.assertEqual('http://kernel.com', settings.kernel_image_settings.url) + self.assertEqual('ramdisk', settings.ramdisk_image_settings.name) + self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url) def test_all_file(self): properties = {'hw_video_model': 'vga'} settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow', extra_properties=properties, nic_config_pb_loc='/foo/bar') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) self.assertIsNone(settings.url) - self.assertEquals('/foo/bar.qcow', settings.image_file) - self.assertEquals(properties, settings.extra_properties) - self.assertEquals('/foo/bar', settings.nic_config_pb_loc) + self.assertEqual('/foo/bar.qcow', settings.image_file) + self.assertEqual(properties, settings.extra_properties) + self.assertEqual('/foo/bar', settings.nic_config_pb_loc) def test_config_all_file(self): settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2', 'image_file': '/foo/bar.qcow', 'extra_properties': '{\'hw_video_model\' : \'vga\'}', 'nic_config_pb_loc': '/foo/bar'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.image_user) - self.assertEquals('qcow2', settings.format) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.image_user) + self.assertEqual('qcow2', settings.format) self.assertIsNone(settings.url) - self.assertEquals('/foo/bar.qcow', settings.image_file) - self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties) - self.assertEquals('/foo/bar', settings.nic_config_pb_loc) + self.assertEqual('/foo/bar.qcow', settings.image_file) + self.assertEqual('{\'hw_video_model\' : \'vga\'}', settings.extra_properties) + self.assertEqual('/foo/bar', settings.nic_config_pb_loc) class CreateImageSuccessTests(OSIntegrationTestCase): @@ -246,10 +246,10 @@ class CreateImageSuccessTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(created_image.size, retrieved_image.size) - self.assertEquals(get_image_size(self.image_settings), retrieved_image.size) - self.assertEquals(created_image.name, retrieved_image.name) - self.assertEquals(created_image.id, retrieved_image.id) + self.assertEqual(created_image.size, retrieved_image.size) + self.assertEqual(get_image_size(self.image_settings), retrieved_image.size) + self.assertEqual(created_image.name, retrieved_image.name) + self.assertEqual(created_image.id, retrieved_image.id) def test_create_image_clean_url_properties(self): """ @@ -263,11 +263,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creator.get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(self.image_settings), retrieved_image.size) - self.assertEquals(created_image.name, retrieved_image.name) - self.assertEquals(created_image.id, retrieved_image.id) - self.assertEquals(created_image.properties, retrieved_image.properties) + self.assertEqual(self.image_creator.get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(self.image_settings), retrieved_image.size) + self.assertEqual(created_image.name, retrieved_image.name) + self.assertEqual(created_image.id, retrieved_image.id) + self.assertEqual(created_image.properties, retrieved_image.properties) def test_create_image_clean_file(self): """ @@ -288,11 +288,11 @@ class CreateImageSuccessTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creator.get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(file_image_settings), retrieved_image.size) + self.assertEqual(self.image_creator.get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(file_image_settings), retrieved_image.size) - self.assertEquals(created_image.name, retrieved_image.name) - self.assertEquals(created_image.id, retrieved_image.id) + self.assertEqual(created_image.name, retrieved_image.name) + self.assertEqual(created_image.id, retrieved_image.id) def test_create_delete_image(self): """ @@ -305,8 +305,8 @@ class CreateImageSuccessTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creator.get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(self.image_settings), retrieved_image.size) + self.assertEqual(self.image_creator.get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(self.image_settings), retrieved_image.size) # Delete Image manually glance_utils.delete_image(self.glance, created_image) @@ -327,16 +327,16 @@ class CreateImageSuccessTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creator.get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(self.image_settings), retrieved_image.size) - self.assertEquals(image1.name, retrieved_image.name) - self.assertEquals(image1.id, retrieved_image.id) - self.assertEquals(image1.properties, retrieved_image.properties) + self.assertEqual(self.image_creator.get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(self.image_settings), retrieved_image.size) + self.assertEqual(image1.name, retrieved_image.name) + self.assertEqual(image1.id, retrieved_image.id) + self.assertEqual(image1.properties, retrieved_image.properties) # Should be retrieving the instance data os_image_2 = create_image.OpenStackImage(self.os_creds, self.image_settings) image2 = os_image_2.create() - self.assertEquals(image1.id, image2.id) + self.assertEqual(image1.id, image2.id) class CreateImageNegativeTests(OSIntegrationTestCase): @@ -490,17 +490,17 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): main_image = glance_utils.get_image(self.glance, image_settings.name) self.assertIsNotNone(main_image) self.assertIsNotNone(image_creator.get_image()) - self.assertEquals(image_creator.get_image().id, main_image.id) + self.assertEqual(image_creator.get_image().id, main_image.id) kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name) self.assertIsNotNone(kernel_image) self.assertIsNotNone(image_creator.get_kernel_image()) - self.assertEquals(kernel_image.id, image_creator.get_kernel_image().id) + self.assertEqual(kernel_image.id, image_creator.get_kernel_image().id) ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name) self.assertIsNotNone(ramdisk_image) self.assertIsNotNone(image_creator.get_ramdisk_image()) - self.assertEquals(ramdisk_image.id, image_creator.get_ramdisk_image().id) + self.assertEqual(ramdisk_image.id, image_creator.get_ramdisk_image().id) def test_create_three_part_image_from_file_3_creators(self): """ @@ -522,7 +522,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings)) kernel_image = self.image_creators[-1].create() self.assertIsNotNone(kernel_image) - self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size) + self.assertEqual(get_image_size(kernel_file_image_settings), kernel_image.size) # Create the ramdisk image ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs' @@ -535,7 +535,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings)) ramdisk_image = self.image_creators[-1].create() self.assertIsNotNone(ramdisk_image) - self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size) + self.assertEqual(get_image_size(ramdisk_file_image_settings), ramdisk_image.size) # Create the main image image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img' @@ -552,11 +552,11 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(file_image_settings), retrieved_image.size) - self.assertEquals(created_image.name, retrieved_image.name) - self.assertEquals(created_image.id, retrieved_image.id) - self.assertEquals(created_image.properties, retrieved_image.properties) + self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(file_image_settings), retrieved_image.size) + self.assertEqual(created_image.name, retrieved_image.name) + self.assertEqual(created_image.id, retrieved_image.id) + self.assertEqual(created_image.properties, retrieved_image.properties) def test_create_three_part_image_from_url_3_creators(self): """ @@ -578,7 +578,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings)) kernel_image = self.image_creators[-1].create() self.assertIsNotNone(kernel_image) - self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size) + self.assertEqual(get_image_size(kernel_image_settings), kernel_image.size) # Create the ramdisk image ramdisk_image_settings = openstack_tests.cirros_image_settings( @@ -590,7 +590,7 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings)) ramdisk_image = self.image_creators[-1].create() self.assertIsNotNone(ramdisk_image) - self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size) + self.assertEqual(get_image_size(ramdisk_image_settings), ramdisk_image.size) # Create the main image os_image_settings = openstack_tests.cirros_image_settings( @@ -612,12 +612,12 @@ class CreateMultiPartImageTests(OSIntegrationTestCase): retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name) self.assertIsNotNone(retrieved_image) - self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size) - self.assertEquals(get_image_size(os_image_settings), retrieved_image.size) + self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size) + self.assertEqual(get_image_size(os_image_settings), retrieved_image.size) - self.assertEquals(created_image.name, retrieved_image.name) - self.assertEquals(created_image.id, retrieved_image.id) - self.assertEquals(created_image.properties, retrieved_image.properties) + self.assertEqual(created_image.name, retrieved_image.name) + self.assertEqual(created_image.id, retrieved_image.id) + self.assertEqual(created_image.properties, retrieved_image.properties) def get_image_size(image_settings): diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py index 977d33b..e0dca17 100644 --- a/snaps/openstack/tests/create_instance_tests.py +++ b/snaps/openstack/tests/create_instance_tests.py @@ -69,33 +69,33 @@ class VmInstanceSettingsUnitTests(unittest.TestCase): def test_name_flavor_port_only(self): port_settings = PortSettings(name='foo-port', network_name='bar-net') settings = VmInstanceSettings(name='foo', flavor='bar', port_settings=[port_settings]) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor) - self.assertEquals(1, len(settings.port_settings)) - self.assertEquals('foo-port', settings.port_settings[0].name) - self.assertEquals('bar-net', settings.port_settings[0].network_name) - self.assertEquals(0, len(settings.security_group_names)) - self.assertEquals(0, len(settings.floating_ip_settings)) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor) + self.assertEqual(1, len(settings.port_settings)) + self.assertEqual('foo-port', settings.port_settings[0].name) + self.assertEqual('bar-net', settings.port_settings[0].network_name) + self.assertEqual(0, len(settings.security_group_names)) + self.assertEqual(0, len(settings.floating_ip_settings)) self.assertIsNone(settings.sudo_user) - self.assertEquals(900, settings.vm_boot_timeout) - self.assertEquals(300, settings.vm_delete_timeout) - self.assertEquals(180, settings.ssh_connect_timeout) + self.assertEqual(900, settings.vm_boot_timeout) + self.assertEqual(300, settings.vm_delete_timeout) + self.assertEqual(180, settings.ssh_connect_timeout) self.assertIsNone(settings.availability_zone) def test_config_with_name_flavor_port_only(self): port_settings = PortSettings(name='foo-port', network_name='bar-net') settings = VmInstanceSettings(config={'name': 'foo', 'flavor': 'bar', 'ports': [port_settings]}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor) - self.assertEquals(1, len(settings.port_settings)) - self.assertEquals('foo-port', settings.port_settings[0].name) - self.assertEquals('bar-net', settings.port_settings[0].network_name) - self.assertEquals(0, len(settings.security_group_names)) - self.assertEquals(0, len(settings.floating_ip_settings)) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor) + self.assertEqual(1, len(settings.port_settings)) + self.assertEqual('foo-port', settings.port_settings[0].name) + self.assertEqual('bar-net', settings.port_settings[0].network_name) + self.assertEqual(0, len(settings.security_group_names)) + self.assertEqual(0, len(settings.floating_ip_settings)) self.assertIsNone(settings.sudo_user) - self.assertEquals(900, settings.vm_boot_timeout) - self.assertEquals(300, settings.vm_delete_timeout) - self.assertEquals(180, settings.ssh_connect_timeout) + self.assertEqual(900, settings.vm_boot_timeout) + self.assertEqual(300, settings.vm_delete_timeout) + self.assertEqual(180, settings.ssh_connect_timeout) self.assertIsNone(settings.availability_zone) def test_all(self): @@ -106,22 +106,22 @@ class VmInstanceSettingsUnitTests(unittest.TestCase): security_group_names=['sec_grp_1'], floating_ip_settings=[fip_settings], sudo_user='joe', vm_boot_timeout=999, vm_delete_timeout=333, ssh_connect_timeout=111, availability_zone='server name') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor) - self.assertEquals(1, len(settings.port_settings)) - self.assertEquals('foo-port', settings.port_settings[0].name) - self.assertEquals('bar-net', settings.port_settings[0].network_name) - self.assertEquals(1, len(settings.security_group_names)) - self.assertEquals('sec_grp_1', settings.security_group_names[0]) - self.assertEquals(1, len(settings.floating_ip_settings)) - self.assertEquals('foo-fip', settings.floating_ip_settings[0].name) - self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name) - self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name) - self.assertEquals('joe', settings.sudo_user) - self.assertEquals(999, settings.vm_boot_timeout) - self.assertEquals(333, settings.vm_delete_timeout) - self.assertEquals(111, settings.ssh_connect_timeout) - self.assertEquals('server name', settings.availability_zone) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor) + self.assertEqual(1, len(settings.port_settings)) + self.assertEqual('foo-port', settings.port_settings[0].name) + self.assertEqual('bar-net', settings.port_settings[0].network_name) + self.assertEqual(1, len(settings.security_group_names)) + self.assertEqual('sec_grp_1', settings.security_group_names[0]) + self.assertEqual(1, len(settings.floating_ip_settings)) + self.assertEqual('foo-fip', settings.floating_ip_settings[0].name) + self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name) + self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name) + self.assertEqual('joe', settings.sudo_user) + self.assertEqual(999, settings.vm_boot_timeout) + self.assertEqual(333, settings.vm_delete_timeout) + self.assertEqual(111, settings.ssh_connect_timeout) + self.assertEqual('server name', settings.availability_zone) def test_config_all(self): port_settings = PortSettings(name='foo-port', network_name='bar-net') @@ -132,21 +132,21 @@ class VmInstanceSettingsUnitTests(unittest.TestCase): 'floating_ips': [fip_settings], 'sudo_user': 'joe', 'vm_boot_timeout': 999, 'vm_delete_timeout': 333, 'ssh_connect_timeout': 111, 'availability_zone': 'server name'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.flavor) - self.assertEquals(1, len(settings.port_settings)) - self.assertEquals('foo-port', settings.port_settings[0].name) - self.assertEquals('bar-net', settings.port_settings[0].network_name) - self.assertEquals(1, len(settings.security_group_names)) - self.assertEquals(1, len(settings.floating_ip_settings)) - self.assertEquals('foo-fip', settings.floating_ip_settings[0].name) - self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name) - self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name) - self.assertEquals('joe', settings.sudo_user) - self.assertEquals(999, settings.vm_boot_timeout) - self.assertEquals(333, settings.vm_delete_timeout) - self.assertEquals(111, settings.ssh_connect_timeout) - self.assertEquals('server name', settings.availability_zone) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.flavor) + self.assertEqual(1, len(settings.port_settings)) + self.assertEqual('foo-port', settings.port_settings[0].name) + self.assertEqual('bar-net', settings.port_settings[0].network_name) + self.assertEqual(1, len(settings.security_group_names)) + self.assertEqual(1, len(settings.floating_ip_settings)) + self.assertEqual('foo-fip', settings.floating_ip_settings[0].name) + self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name) + self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name) + self.assertEqual('joe', settings.sudo_user) + self.assertEqual(999, settings.vm_boot_timeout) + self.assertEqual(333, settings.vm_delete_timeout) + self.assertEqual(111, settings.ssh_connect_timeout) + self.assertEqual('server name', settings.availability_zone) class FloatingIpSettingsUnitTests(unittest.TestCase): @@ -188,36 +188,36 @@ class FloatingIpSettingsUnitTests(unittest.TestCase): def test_name_port_router_only(self): settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router') - self.assertEquals('foo', settings.name) - self.assertEquals('foo-port', settings.port_name) - self.assertEquals('bar-router', settings.router_name) + self.assertEqual('foo', settings.name) + self.assertEqual('foo-port', settings.port_name) + self.assertEqual('bar-router', settings.router_name) self.assertIsNone(settings.subnet_name) self.assertTrue(settings.provisioning) def test_config_with_name_port_router_only(self): settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router'}) - self.assertEquals('foo', settings.name) - self.assertEquals('foo-port', settings.port_name) - self.assertEquals('bar-router', settings.router_name) + self.assertEqual('foo', settings.name) + self.assertEqual('foo-port', settings.port_name) + self.assertEqual('bar-router', settings.router_name) self.assertIsNone(settings.subnet_name) self.assertTrue(settings.provisioning) def test_all(self): settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router', subnet_name='bar-subnet', provisioning=False) - self.assertEquals('foo', settings.name) - self.assertEquals('foo-port', settings.port_name) - self.assertEquals('bar-router', settings.router_name) - self.assertEquals('bar-subnet', settings.subnet_name) + self.assertEqual('foo', settings.name) + self.assertEqual('foo-port', settings.port_name) + self.assertEqual('bar-router', settings.router_name) + self.assertEqual('bar-subnet', settings.subnet_name) self.assertFalse(settings.provisioning) def test_config_all(self): settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router', 'subnet_name': 'bar-subnet', 'provisioning': False}) - self.assertEquals('foo', settings.name) - self.assertEquals('foo-port', settings.port_name) - self.assertEquals('bar-router', settings.router_name) - self.assertEquals('bar-subnet', settings.subnet_name) + self.assertEqual('foo', settings.name) + self.assertEqual('foo-port', settings.port_name) + self.assertEqual('bar-router', settings.router_name) + self.assertEqual('bar-subnet', settings.subnet_name) self.assertFalse(settings.provisioning) @@ -283,7 +283,7 @@ class SimpleHealthCheck(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if os.path.isfile(self.keypair_pub_filepath): os.remove(self.keypair_pub_filepath) @@ -295,19 +295,19 @@ class SimpleHealthCheck(OSIntegrationTestCase): try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -408,25 +408,25 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -441,13 +441,13 @@ class CreateInstanceSimpleTests(OSIntegrationTestCase): self.os_creds, instance_settings, self.image_creator.image_settings) vm_inst = self.inst_creator.create() - self.assertEquals(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name))) + self.assertEqual(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name))) # Delete instance nova_utils.delete_vm_instance(self.nova, vm_inst) self.assertTrue(self.inst_creator.vm_deleted(block=True)) - self.assertEquals(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name))) + self.assertEqual(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name))) # Exception should not be thrown self.inst_creator.clean() @@ -523,13 +523,13 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): try: inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.keypair_creator: try: self.keypair_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning keypair with message - ' + e.message) + logger.error('Unexpected exception cleaning keypair with message - ' + str(e)) if os.path.isfile(self.keypair_pub_filepath): os.remove(self.keypair_pub_filepath) @@ -541,25 +541,25 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.router_creator: try: self.router_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning router with message - ' + e.message) + logger.error('Unexpected exception cleaning router with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -585,9 +585,9 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.inst_creators.append(inst_creator) vm_inst = inst_creator.create() - self.assertEquals(ip_1, inst_creator.get_port_ip(self.port_1_name)) + self.assertEqual(ip_1, inst_creator.get_port_ip(self.port_1_name)) self.assertTrue(inst_creator.vm_active(block=True)) - self.assertEquals(vm_inst, inst_creator.get_vm_inst()) + self.assertEqual(vm_inst, inst_creator.get_vm_inst()) def test_ssh_client_fip_before_active(self): """ @@ -610,7 +610,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) self.assertTrue(inst_creator.vm_active(block=True)) - self.assertEquals(vm_inst, inst_creator.get_vm_inst()) + self.assertEqual(vm_inst, inst_creator.get_vm_inst()) self.assertTrue(validate_ssh_client(inst_creator)) @@ -637,7 +637,7 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) self.assertTrue(inst_creator.vm_active(block=True)) - self.assertEquals(vm_inst, inst_creator.get_vm_inst()) + self.assertEqual(vm_inst, inst_creator.get_vm_inst()) self.assertTrue(validate_ssh_client(inst_creator)) @@ -698,25 +698,25 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -736,7 +736,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase): self.image_creator.image_settings) self.inst_creator.create(block=True) - self.assertEquals(ip, self.inst_creator.get_port_ip( + self.assertEqual(ip, self.inst_creator.get_port_ip( self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name)) def test_set_custom_invalid_ip_one_subnet(self): @@ -772,7 +772,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase): self.image_creator.image_settings) self.inst_creator.create(block=True) - self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name)) + self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name)) def test_set_custom_invalid_mac(self): """ @@ -808,9 +808,9 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase): self.image_creator.image_settings) self.inst_creator.create(block=True) - self.assertEquals(ip, self.inst_creator.get_port_ip( + self.assertEqual(ip, self.inst_creator.get_port_ip( self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name)) - self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name)) + self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name)) def test_set_allowed_address_pairs(self): """ @@ -832,7 +832,7 @@ class CreateInstancePortManipulationTests(OSIntegrationTestCase): port = self.inst_creator.get_port_by_name(port_settings.name) self.assertIsNotNone(port) self.assertIsNotNone(port['port'].get('allowed_address_pairs')) - self.assertEquals(1, len(port['port']['allowed_address_pairs'])) + self.assertEqual(1, len(port['port']['allowed_address_pairs'])) validation_utils.objects_equivalent(pair, port['port']['allowed_address_pairs'][0]) def test_set_allowed_address_pairs_bad_mac(self): @@ -933,25 +933,25 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase): try: inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -987,7 +987,7 @@ class CreateInstanceOnComputeHost(OSIntegrationTestCase): vm = creator.get_vm_inst() deployed_zone = vm._info['OS-EXT-AZ:availability_zone'] deployed_host = vm._info['OS-EXT-SRV-ATTR:host'] - self.assertEquals(zone, deployed_zone + ':' + deployed_host) + self.assertEqual(zone, deployed_zone + ':' + deployed_host) index += 1 @@ -1065,7 +1065,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): self.keypair_creator.create() except Exception as e: self.tearDown() - raise Exception(e.message) + raise Exception(str(e)) def tearDown(self): """ @@ -1075,13 +1075,13 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.keypair_creator: try: self.keypair_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning keypair with message - ' + e.message) + logger.error('Unexpected exception cleaning keypair with message - ' + str(e)) if os.path.isfile(self.keypair_pub_filepath): os.remove(self.keypair_pub_filepath) @@ -1093,25 +1093,25 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) for router_creator in self.router_creators: try: router_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning router with message - ' + e.message) + logger.error('Unexpected exception cleaning router with message - ' + str(e)) for network_creator in self.network_creators: try: network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -1146,7 +1146,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): vm_inst = self.inst_creator.create(block=True) - self.assertEquals(vm_inst, self.inst_creator.get_vm_inst()) + self.assertEqual(vm_inst, self.inst_creator.get_vm_inst()) # Effectively blocks until VM has been properly activated self.assertTrue(self.inst_creator.vm_active(block=True)) @@ -1154,6 +1154,7 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): # Effectively blocks until VM's ssh port has been opened self.assertTrue(self.inst_creator.vm_ssh_active(block=True)) + # TODO - Refactor config_nics() to return a status that can be validated here. self.inst_creator.config_nics() # TODO - *** ADD VALIDATION HERE *** @@ -1226,31 +1227,31 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) for sec_grp_creator in self.sec_grp_creators: try: sec_grp_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning security group with message - ' + e.message) + logger.error('Unexpected exception cleaning security group with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() @@ -1495,25 +1496,25 @@ class CreateInstanceFromThreePartImage(OSIntegrationTestCase): try: self.inst_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning VM instance with message - ' + e.message) + logger.error('Unexpected exception cleaning VM instance with message - ' + str(e)) if self.flavor_creator: try: self.flavor_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning flavor with message - ' + e.message) + logger.error('Unexpected exception cleaning flavor with message - ' + str(e)) if self.network_creator: try: self.network_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning network with message - ' + e.message) + logger.error('Unexpected exception cleaning network with message - ' + str(e)) if self.image_creator: try: self.image_creator.clean() except Exception as e: - logger.error('Unexpected exception cleaning image with message - ' + e.message) + logger.error('Unexpected exception cleaning image with message - ' + str(e)) super(self.__class__, self).__clean__() diff --git a/snaps/openstack/tests/create_keypairs_tests.py b/snaps/openstack/tests/create_keypairs_tests.py index e4409a9..b587a50 100644 --- a/snaps/openstack/tests/create_keypairs_tests.py +++ b/snaps/openstack/tests/create_keypairs_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,8 +16,6 @@ import os import uuid import unittest -from Crypto.PublicKey import RSA - from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair from snaps.openstack.utils import nova_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase @@ -40,52 +38,52 @@ class KeypairSettingsUnitTests(unittest.TestCase): def test_name_only(self): settings = KeypairSettings(name='foo') - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertIsNone(settings.public_filepath) self.assertIsNone(settings.private_filepath) def test_config_with_name_only(self): settings = KeypairSettings(config={'name': 'foo'}) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertIsNone(settings.public_filepath) self.assertIsNone(settings.private_filepath) def test_name_pub_only(self): settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub') - self.assertEquals('foo', settings.name) - self.assertEquals('/foo/bar.pub', settings.public_filepath) + self.assertEqual('foo', settings.name) + self.assertEqual('/foo/bar.pub', settings.public_filepath) self.assertIsNone(settings.private_filepath) def test_config_with_name_pub_only(self): settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub'}) - self.assertEquals('foo', settings.name) - self.assertEquals('/foo/bar.pub', settings.public_filepath) + self.assertEqual('foo', settings.name) + self.assertEqual('/foo/bar.pub', settings.public_filepath) self.assertIsNone(settings.private_filepath) def test_name_priv_only(self): settings = KeypairSettings(name='foo', private_filepath='/foo/bar') - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertIsNone(settings.public_filepath) - self.assertEquals('/foo/bar', settings.private_filepath) + self.assertEqual('/foo/bar', settings.private_filepath) def test_config_with_name_priv_only(self): settings = KeypairSettings(config={'name': 'foo', 'private_filepath': '/foo/bar'}) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertIsNone(settings.public_filepath) - self.assertEquals('/foo/bar', settings.private_filepath) + self.assertEqual('/foo/bar', settings.private_filepath) def test_all(self): settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub', private_filepath='/foo/bar') - self.assertEquals('foo', settings.name) - self.assertEquals('/foo/bar.pub', settings.public_filepath) - self.assertEquals('/foo/bar', settings.private_filepath) + self.assertEqual('foo', settings.name) + self.assertEqual('/foo/bar.pub', settings.public_filepath) + self.assertEqual('/foo/bar', settings.private_filepath) def test_config_all(self): settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub', 'private_filepath': '/foo/bar'}) - self.assertEquals('foo', settings.name) - self.assertEquals('/foo/bar.pub', settings.public_filepath) - self.assertEquals('/foo/bar', settings.private_filepath) + self.assertEqual('foo', settings.name) + self.assertEqual('/foo/bar.pub', settings.public_filepath) + self.assertEqual('/foo/bar', settings.private_filepath) class CreateKeypairsTests(OSIntegrationTestCase): @@ -132,7 +130,7 @@ class CreateKeypairsTests(OSIntegrationTestCase): self.keypair_creator.create() keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair()) - self.assertEquals(self.keypair_creator.get_keypair(), keypair) + self.assertEqual(self.keypair_creator.get_keypair(), keypair) def test_create_delete_keypair(self): """ @@ -162,10 +160,10 @@ class CreateKeypairsTests(OSIntegrationTestCase): self.keypair_creator.create() keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair()) - self.assertEquals(self.keypair_creator.get_keypair(), keypair) + self.assertEqual(self.keypair_creator.get_keypair(), keypair) file_key = open(os.path.expanduser(self.pub_file_path)).read() - self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key) + self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key) def test_create_keypair_save_both(self): """ @@ -178,10 +176,10 @@ class CreateKeypairsTests(OSIntegrationTestCase): self.keypair_creator.create() keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair()) - self.assertEquals(self.keypair_creator.get_keypair(), keypair) + self.assertEqual(self.keypair_creator.get_keypair(), keypair) file_key = open(os.path.expanduser(self.pub_file_path)).read() - self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key) + self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key) self.assertTrue(os.path.isfile(self.priv_file_path)) @@ -190,14 +188,14 @@ class CreateKeypairsTests(OSIntegrationTestCase): Tests the creation of an existing public keypair from a file :return: """ - keys = RSA.generate(1024) + keys = nova_utils.create_keys() nova_utils.save_keys_to_files(keys=keys, pub_file_path=self.pub_file_path) self.keypair_creator = OpenStackKeypair( self.os_creds, KeypairSettings(name=self.keypair_name, public_filepath=self.pub_file_path)) self.keypair_creator.create() keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair()) - self.assertEquals(self.keypair_creator.get_keypair(), keypair) + self.assertEqual(self.keypair_creator.get_keypair(), keypair) file_key = open(os.path.expanduser(self.pub_file_path)).read() - self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key) + self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key) diff --git a/snaps/openstack/tests/create_network_tests.py b/snaps/openstack/tests/create_network_tests.py index a2b17f8..a7f21ad 100644 --- a/snaps/openstack/tests/create_network_tests.py +++ b/snaps/openstack/tests/create_network_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -40,37 +40,37 @@ class NetworkSettingsUnitTests(unittest.TestCase): def test_name_only(self): settings = NetworkSettings(name='foo') - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertTrue(settings.admin_state_up) self.assertIsNone(settings.shared) self.assertIsNone(settings.project_name) self.assertFalse(settings.external) self.assertIsNone(settings.network_type) - self.assertEquals(0, len(settings.subnet_settings)) + self.assertEqual(0, len(settings.subnet_settings)) def test_config_with_name_only(self): settings = NetworkSettings(config={'name': 'foo'}) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertTrue(settings.admin_state_up) self.assertIsNone(settings.shared) self.assertIsNone(settings.project_name) self.assertFalse(settings.external) self.assertIsNone(settings.network_type) - self.assertEquals(0, len(settings.subnet_settings)) + self.assertEqual(0, len(settings.subnet_settings)) def test_all(self): sub_settings = SubnetSettings(name='foo-subnet', cidr='10.0.0.0/24') settings = NetworkSettings(name='foo', admin_state_up=False, shared=True, project_name='bar', external=True, network_type='flat', physical_network='phy', subnet_settings=[sub_settings]) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertFalse(settings.admin_state_up) self.assertTrue(settings.shared) - self.assertEquals('bar', settings.project_name) + self.assertEqual('bar', settings.project_name) self.assertTrue(settings.external) - self.assertEquals('flat', settings.network_type) - self.assertEquals('phy', settings.physical_network) - self.assertEquals(1, len(settings.subnet_settings)) - self.assertEquals('foo-subnet', settings.subnet_settings[0].name) + self.assertEqual('flat', settings.network_type) + self.assertEqual('phy', settings.physical_network) + self.assertEqual(1, len(settings.subnet_settings)) + self.assertEqual('foo-subnet', settings.subnet_settings[0].name) def test_config_all(self): settings = NetworkSettings(config={'name': 'foo', 'admin_state_up': False, 'shared': True, @@ -78,15 +78,15 @@ class NetworkSettingsUnitTests(unittest.TestCase): 'physical_network': 'phy', 'subnets': [{'subnet': {'name': 'foo-subnet', 'cidr': '10.0.0.0/24'}}]}) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) self.assertFalse(settings.admin_state_up) self.assertTrue(settings.shared) - self.assertEquals('bar', settings.project_name) + self.assertEqual('bar', settings.project_name) self.assertTrue(settings.external) - self.assertEquals('flat', settings.network_type) - self.assertEquals('phy', settings.physical_network) - self.assertEquals(1, len(settings.subnet_settings)) - self.assertEquals('foo-subnet', settings.subnet_settings[0].name) + self.assertEqual('flat', settings.network_type) + self.assertEqual('phy', settings.physical_network) + self.assertEqual(1, len(settings.subnet_settings)) + self.assertEqual('foo-subnet', settings.subnet_settings[0].name) class SubnetSettingsUnitTests(unittest.TestCase): @@ -112,15 +112,15 @@ class SubnetSettingsUnitTests(unittest.TestCase): def test_name_cidr_only(self): settings = SubnetSettings(name='foo', cidr='10.0.0.0/24') - self.assertEquals('foo', settings.name) - self.assertEquals('10.0.0.0/24', settings.cidr) - self.assertEquals(4, settings.ip_version) + self.assertEqual('foo', settings.name) + self.assertEqual('10.0.0.0/24', settings.cidr) + self.assertEqual(4, settings.ip_version) self.assertIsNone(settings.project_name) self.assertIsNone(settings.start) self.assertIsNone(settings.end) self.assertIsNone(settings.enable_dhcp) - self.assertEquals(1, len(settings.dns_nameservers)) - self.assertEquals('8.8.8.8', settings.dns_nameservers[0]) + self.assertEqual(1, len(settings.dns_nameservers)) + self.assertEqual('8.8.8.8', settings.dns_nameservers[0]) self.assertIsNone(settings.host_routes) self.assertIsNone(settings.destination) self.assertIsNone(settings.nexthop) @@ -129,16 +129,16 @@ class SubnetSettingsUnitTests(unittest.TestCase): def test_config_with_name_cidr_only(self): settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24'}) - self.assertEquals('foo', settings.name) - self.assertEquals('10.0.0.0/24', settings.cidr) - self.assertEquals(4, settings.ip_version) + self.assertEqual('foo', settings.name) + self.assertEqual('10.0.0.0/24', settings.cidr) + self.assertEqual(4, settings.ip_version) self.assertIsNone(settings.project_name) self.assertIsNone(settings.start) self.assertIsNone(settings.end) self.assertIsNone(settings.gateway_ip) self.assertIsNone(settings.enable_dhcp) - self.assertEquals(1, len(settings.dns_nameservers)) - self.assertEquals('8.8.8.8', settings.dns_nameservers[0]) + self.assertEqual(1, len(settings.dns_nameservers)) + self.assertEqual('8.8.8.8', settings.dns_nameservers[0]) self.assertIsNone(settings.host_routes) self.assertIsNone(settings.destination) self.assertIsNone(settings.nexthop) @@ -151,22 +151,22 @@ class SubnetSettingsUnitTests(unittest.TestCase): start='10.0.0.2', end='10.0.0.101', gateway_ip='10.0.0.1', enable_dhcp=False, dns_nameservers=['8.8.8.8'], host_routes=[host_routes], destination='dest', nexthop='hop', ipv6_ra_mode='dhcpv6-stateful', ipv6_address_mode='slaac') - self.assertEquals('foo', settings.name) - self.assertEquals('10.0.0.0/24', settings.cidr) - self.assertEquals(6, settings.ip_version) - self.assertEquals('bar-project', settings.project_name) - self.assertEquals('10.0.0.2', settings.start) - self.assertEquals('10.0.0.101', settings.end) - self.assertEquals('10.0.0.1', settings.gateway_ip) - self.assertEquals(False, settings.enable_dhcp) - self.assertEquals(1, len(settings.dns_nameservers)) - self.assertEquals('8.8.8.8', settings.dns_nameservers[0]) - self.assertEquals(1, len(settings.host_routes)) - self.assertEquals(host_routes, settings.host_routes[0]) - self.assertEquals('dest', settings.destination) - self.assertEquals('hop', settings.nexthop) - self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode) - self.assertEquals('slaac', settings.ipv6_address_mode) + self.assertEqual('foo', settings.name) + self.assertEqual('10.0.0.0/24', settings.cidr) + self.assertEqual(6, settings.ip_version) + self.assertEqual('bar-project', settings.project_name) + self.assertEqual('10.0.0.2', settings.start) + self.assertEqual('10.0.0.101', settings.end) + self.assertEqual('10.0.0.1', settings.gateway_ip) + self.assertEqual(False, settings.enable_dhcp) + self.assertEqual(1, len(settings.dns_nameservers)) + self.assertEqual('8.8.8.8', settings.dns_nameservers[0]) + self.assertEqual(1, len(settings.host_routes)) + self.assertEqual(host_routes, settings.host_routes[0]) + self.assertEqual('dest', settings.destination) + self.assertEqual('hop', settings.nexthop) + self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode) + self.assertEqual('slaac', settings.ipv6_address_mode) def test_config_all(self): host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'} @@ -176,22 +176,22 @@ class SubnetSettingsUnitTests(unittest.TestCase): 'dns_nameservers': ['8.8.8.8'], 'host_routes': [host_routes], 'destination': 'dest', 'nexthop': 'hop', 'ipv6_ra_mode': 'dhcpv6-stateful', 'ipv6_address_mode': 'slaac'}) - self.assertEquals('foo', settings.name) - self.assertEquals('10.0.0.0/24', settings.cidr) - self.assertEquals(6, settings.ip_version) - self.assertEquals('bar-project', settings.project_name) - self.assertEquals('10.0.0.2', settings.start) - self.assertEquals('10.0.0.101', settings.end) - self.assertEquals('10.0.0.1', settings.gateway_ip) - self.assertEquals(False, settings.enable_dhcp) - self.assertEquals(1, len(settings.dns_nameservers)) - self.assertEquals('8.8.8.8', settings.dns_nameservers[0]) - self.assertEquals(1, len(settings.host_routes)) - self.assertEquals(host_routes, settings.host_routes[0]) - self.assertEquals('dest', settings.destination) - self.assertEquals('hop', settings.nexthop) - self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode) - self.assertEquals('slaac', settings.ipv6_address_mode) + self.assertEqual('foo', settings.name) + self.assertEqual('10.0.0.0/24', settings.cidr) + self.assertEqual(6, settings.ip_version) + self.assertEqual('bar-project', settings.project_name) + self.assertEqual('10.0.0.2', settings.start) + self.assertEqual('10.0.0.101', settings.end) + self.assertEqual('10.0.0.1', settings.gateway_ip) + self.assertEqual(False, settings.enable_dhcp) + self.assertEqual(1, len(settings.dns_nameservers)) + self.assertEqual('8.8.8.8', settings.dns_nameservers[0]) + self.assertEqual(1, len(settings.host_routes)) + self.assertEqual(host_routes, settings.host_routes[0]) + self.assertEqual('dest', settings.destination) + self.assertEqual('hop', settings.nexthop) + self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode) + self.assertEqual('slaac', settings.ipv6_address_mode) class PortSettingsUnitTests(unittest.TestCase): @@ -217,8 +217,8 @@ class PortSettingsUnitTests(unittest.TestCase): def test_name_netname_only(self): settings = PortSettings(name='foo', network_name='bar') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.network_name) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.network_name) self.assertTrue(settings.admin_state_up) self.assertIsNone(settings.project_name) self.assertIsNone(settings.mac_address) @@ -233,8 +233,8 @@ class PortSettingsUnitTests(unittest.TestCase): def test_config_with_name_netname_only(self): settings = PortSettings(config={'name': 'foo', 'network_name': 'bar'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.network_name) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.network_name) self.assertTrue(settings.admin_state_up) self.assertIsNone(settings.project_name) self.assertIsNone(settings.mac_address) @@ -257,20 +257,20 @@ class PortSettingsUnitTests(unittest.TestCase): security_groups=['foo_grp_id'], allowed_address_pairs=allowed_address_pairs, opt_value='opt value', opt_name='opt name', device_owner='owner', device_id='device number') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.network_name) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.network_name) self.assertFalse(settings.admin_state_up) - self.assertEquals('foo-project', settings.project_name) - self.assertEquals('1234', settings.mac_address) - self.assertEquals(ip_addrs, settings.ip_addrs) - self.assertEquals(fixed_ips, settings.fixed_ips) - self.assertEquals(1, len(settings.security_groups)) - self.assertEquals('foo_grp_id', settings.security_groups[0]) - self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs) - self.assertEquals('opt value', settings.opt_value) - self.assertEquals('opt name', settings.opt_name) - self.assertEquals('owner', settings.device_owner) - self.assertEquals('device number', settings.device_id) + self.assertEqual('foo-project', settings.project_name) + self.assertEqual('1234', settings.mac_address) + self.assertEqual(ip_addrs, settings.ip_addrs) + self.assertEqual(fixed_ips, settings.fixed_ips) + self.assertEqual(1, len(settings.security_groups)) + self.assertEqual('foo_grp_id', settings.security_groups[0]) + self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs) + self.assertEqual('opt value', settings.opt_value) + self.assertEqual('opt name', settings.opt_name) + self.assertEqual('owner', settings.device_owner) + self.assertEqual('device number', settings.device_id) def test_config_all(self): ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}] @@ -282,20 +282,20 @@ class PortSettingsUnitTests(unittest.TestCase): 'fixed_ips': fixed_ips, 'security_groups': ['foo_grp_id'], 'allowed_address_pairs': allowed_address_pairs, 'opt_value': 'opt value', 'opt_name': 'opt name', 'device_owner': 'owner', 'device_id': 'device number'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.network_name) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.network_name) self.assertFalse(settings.admin_state_up) - self.assertEquals('foo-project', settings.project_name) - self.assertEquals('1234', settings.mac_address) - self.assertEquals(ip_addrs, settings.ip_addrs) - self.assertEquals(fixed_ips, settings.fixed_ips) - self.assertEquals(1, len(settings.security_groups)) - self.assertEquals('foo_grp_id', settings.security_groups[0]) - self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs) - self.assertEquals('opt value', settings.opt_value) - self.assertEquals('opt name', settings.opt_name) - self.assertEquals('owner', settings.device_owner) - self.assertEquals('device number', settings.device_id) + self.assertEqual('foo-project', settings.project_name) + self.assertEqual('1234', settings.mac_address) + self.assertEqual(ip_addrs, settings.ip_addrs) + self.assertEqual(fixed_ips, settings.fixed_ips) + self.assertEqual(1, len(settings.security_groups)) + self.assertEqual('foo_grp_id', settings.security_groups[0]) + self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs) + self.assertEqual('opt value', settings.opt_value) + self.assertEqual('opt name', settings.opt_name) + self.assertEqual('owner', settings.device_owner) + self.assertEqual('device number', settings.device_id) class CreateNetworkSuccessTests(OSIntegrationTestCase): @@ -415,8 +415,8 @@ class CreateNetworkSuccessTests(OSIntegrationTestCase): self.net_creator2 = OpenStackNetwork(self.os_creds, self.net_config.network_settings) self.net_creator2.create() - self.assertEquals(self.net_creator.get_network()['network']['id'], - self.net_creator2.get_network()['network']['id']) + self.assertEqual(self.net_creator.get_network()['network']['id'], + self.net_creator2.get_network()['network']['id']) class CreateNetworkTypeTests(OSComponentTestCase): @@ -492,7 +492,7 @@ class CreateNetworkTypeTests(OSComponentTestCase): # Validate network was created neutron_utils_tests.validate_network(self.neutron, net_settings.name, True) - self.assertEquals(network_type, network['network']['provider:network_type']) + self.assertEqual(network_type, network['network']['provider:network_type']) # TODO - determine what value we need to place into physical_network # - Do not know what vaule to place into the 'physical_network' setting. diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py index 4e1d254..ede8b4b 100644 --- a/snaps/openstack/tests/create_project_tests.py +++ b/snaps/openstack/tests/create_project_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -41,30 +41,30 @@ class ProjectSettingsUnitTests(unittest.TestCase): def test_name_only(self): settings = ProjectSettings(name='foo') - self.assertEquals('foo', settings.name) - self.assertEquals('default', settings.domain) + self.assertEqual('foo', settings.name) + self.assertEqual('default', settings.domain) self.assertIsNone(settings.description) self.assertTrue(settings.enabled) def test_config_with_name_only(self): settings = ProjectSettings(config={'name': 'foo'}) - self.assertEquals('foo', settings.name) - self.assertEquals('default', settings.domain) + self.assertEqual('foo', settings.name) + self.assertEqual('default', settings.domain) self.assertIsNone(settings.description) self.assertTrue(settings.enabled) def test_all(self): settings = ProjectSettings(name='foo', domain='bar', description='foobar', enabled=False) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.domain) - self.assertEquals('foobar', settings.description) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.domain) + self.assertEqual('foobar', settings.description) self.assertFalse(settings.enabled) def test_config_all(self): settings = ProjectSettings(config={'name': 'foo', 'domain': 'bar', 'description': 'foobar', 'enabled': False}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.domain) - self.assertEquals('foobar', settings.description) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.domain) + self.assertEqual('foobar', settings.description) self.assertFalse(settings.enabled) @@ -104,7 +104,7 @@ class CreateProjectSuccessTests(OSComponentTestCase): retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name) self.assertIsNotNone(retrieved_project) - self.assertEquals(created_project, retrieved_project) + self.assertEqual(created_project, retrieved_project) def test_create_project_2x(self): """ @@ -116,10 +116,10 @@ class CreateProjectSuccessTests(OSComponentTestCase): retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name) self.assertIsNotNone(retrieved_project) - self.assertEquals(created_project, retrieved_project) + self.assertEqual(created_project, retrieved_project) project2 = OpenStackProject(self.os_creds, self.project_settings).create() - self.assertEquals(retrieved_project, project2) + self.assertEqual(retrieved_project, project2) def test_create_delete_project(self): """ @@ -194,9 +194,9 @@ class CreateProjectUserTests(OSComponentTestCase): self.sec_grp_creators.append(sec_grp_creator) if 'tenant_id' in sec_grp['security_group']: - self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id']) + self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id']) elif 'project_id' in sec_grp['security_group']: - self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id']) + self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id']) else: self.fail('Cannot locate the project or tenant ID') @@ -229,8 +229,8 @@ class CreateProjectUserTests(OSComponentTestCase): self.sec_grp_creators.append(sec_grp_creator) if 'tenant_id' in sec_grp['security_group']: - self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id']) + self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id']) elif 'project_id' in sec_grp['security_group']: - self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id']) + self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id']) else: self.fail('Cannot locate the project or tenant ID') diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index 079be0c..6a3c0ef 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -48,28 +48,28 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) - self.assertEquals('foo', settings.sec_grp_name) - self.assertEquals(Direction.ingress, settings.direction) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'}) - self.assertEquals('foo', settings.sec_grp_name) - self.assertEquals(Direction.ingress, settings.direction) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) def test_all(self): settings = SecurityGroupRuleSettings( sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi', protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2, remote_ip_prefix='prfx') - self.assertEquals('foo', settings.sec_grp_name) - self.assertEquals('fubar', settings.description) - self.assertEquals(Direction.egress, settings.direction) - self.assertEquals('rgi', settings.remote_group_id) - self.assertEquals(Protocol.icmp, settings.protocol) - self.assertEquals(Ethertype.IPv6, settings.ethertype) - self.assertEquals(1, settings.port_range_min) - self.assertEquals(2, settings.port_range_max) - self.assertEquals('prfx', settings.remote_ip_prefix) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual('fubar', settings.description) + self.assertEqual(Direction.egress, settings.direction) + self.assertEqual('rgi', settings.remote_group_id) + self.assertEqual(Protocol.icmp, settings.protocol) + self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(1, settings.port_range_min) + self.assertEqual(2, settings.port_range_max) + self.assertEqual('prfx', settings.remote_ip_prefix) def test_config_all(self): settings = SecurityGroupRuleSettings( @@ -82,15 +82,15 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): 'port_range_min': 1, 'port_range_max': 2, 'remote_ip_prefix': 'prfx'}) - self.assertEquals('foo', settings.sec_grp_name) - self.assertEquals('fubar', settings.description) - self.assertEquals(Direction.egress, settings.direction) - self.assertEquals('rgi', settings.remote_group_id) - self.assertEquals(Protocol.tcp, settings.protocol) - self.assertEquals(Ethertype.IPv6, settings.ethertype) - self.assertEquals(1, settings.port_range_min) - self.assertEquals(2, settings.port_range_max) - self.assertEquals('prfx', settings.remote_ip_prefix) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual('fubar', settings.description) + self.assertEqual(Direction.egress, settings.direction) + self.assertEqual('rgi', settings.remote_group_id) + self.assertEqual(Protocol.tcp, settings.protocol) + self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(1, settings.port_range_min) + self.assertEqual(2, settings.port_range_max) + self.assertEqual('prfx', settings.remote_ip_prefix) class SecurityGroupSettingsUnitTests(unittest.TestCase): @@ -108,11 +108,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): def test_name_only(self): settings = SecurityGroupSettings(name='foo') - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) def test_config_with_name_only(self): settings = SecurityGroupSettings(config={'name': 'foo'}) - self.assertEquals('foo', settings.name) + self.assertEqual('foo', settings.name) def test_invalid_rule(self): rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress) @@ -126,11 +126,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): settings = SecurityGroupSettings( name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) - self.assertEquals('bar', settings.name) - self.assertEquals('fubar', settings.description) - self.assertEquals('foo', settings.project_name) - self.assertEquals(rule_settings[0], settings.rule_settings[0]) - self.assertEquals(rule_settings[1], settings.rule_settings[1]) + self.assertEqual('bar', settings.name) + self.assertEqual('fubar', settings.description) + self.assertEqual('foo', settings.project_name) + self.assertEqual(rule_settings[0], settings.rule_settings[0]) + self.assertEqual(rule_settings[1], settings.rule_settings[1]) def test_config_all(self): settings = SecurityGroupSettings( @@ -139,12 +139,12 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): 'project_name': 'foo', 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]}) - self.assertEquals('bar', settings.name) - self.assertEquals('fubar', settings.description) - self.assertEquals('foo', settings.project_name) - self.assertEquals(1, len(settings.rule_settings)) - self.assertEquals('bar', settings.rule_settings[0].sec_grp_name) - self.assertEquals(Direction.ingress, settings.rule_settings[0].direction) + self.assertEqual('bar', settings.name) + self.assertEqual('fubar', settings.description) + self.assertEqual('foo', settings.project_name) + self.assertEqual(1, len(settings.rule_settings)) + self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) + self.assertEqual(Direction.ingress, settings.rule_settings[0].direction) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -189,7 +189,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_create_delete_group(self): @@ -224,7 +224,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_create_group_with_several_rules(self): @@ -253,7 +253,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_add_rule(self): @@ -273,13 +273,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, direction=Direction.egress, protocol=Protocol.icmp)) rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(rules) + 1, len(rules2)) + self.assertEqual(len(rules) + 1, len(rules2)) def test_remove_rule_by_id(self): """ @@ -308,13 +308,13 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id']) rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(rules) - 1, len(rules_after_del)) + self.assertEqual(len(rules) - 1, len(rules_after_del)) def test_remove_rule_by_setting(self): """ @@ -344,12 +344,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) - self.assertEquals(len(rules) - 1, len(rules_after_del)) + self.assertEqual(len(rules) - 1, len(rules_after_del)) # TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex diff --git a/snaps/openstack/tests/create_user_tests.py b/snaps/openstack/tests/create_user_tests.py index 1f7a163..007f7f0 100644 --- a/snaps/openstack/tests/create_user_tests.py +++ b/snaps/openstack/tests/create_user_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -52,35 +52,35 @@ class UserSettingsUnitTests(unittest.TestCase): def test_name_pass_only(self): settings = UserSettings(name='foo', password='bar') - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.password) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.password) self.assertIsNone(settings.project_name) self.assertIsNone(settings.email) self.assertTrue(settings.enabled) def test_config_with_name_pass_only(self): settings = UserSettings(config={'name': 'foo', 'password': 'bar'}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.password) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.password) self.assertIsNone(settings.project_name) self.assertIsNone(settings.email) self.assertTrue(settings.enabled) def test_all(self): settings = UserSettings(name='foo', password='bar', project_name='proj-foo', email='foo@bar.com', enabled=False) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.password) - self.assertEquals('proj-foo', settings.project_name) - self.assertEquals('foo@bar.com', settings.email) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.password) + self.assertEqual('proj-foo', settings.project_name) + self.assertEqual('foo@bar.com', settings.email) self.assertFalse(settings.enabled) def test_config_all(self): settings = UserSettings(config={'name': 'foo', 'password': 'bar', 'project_name': 'proj-foo', 'email': 'foo@bar.com', 'enabled': False}) - self.assertEquals('foo', settings.name) - self.assertEquals('bar', settings.password) - self.assertEquals('proj-foo', settings.project_name) - self.assertEquals('foo@bar.com', settings.email) + self.assertEqual('foo', settings.name) + self.assertEqual('bar', settings.password) + self.assertEqual('proj-foo', settings.project_name) + self.assertEqual('foo@bar.com', settings.email) self.assertFalse(settings.enabled) @@ -120,7 +120,7 @@ class CreateUserSuccessTests(OSComponentTestCase): retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name) self.assertIsNotNone(retrieved_user) - self.assertEquals(created_user, retrieved_user) + self.assertEqual(created_user, retrieved_user) def test_create_user_2x(self): """ @@ -132,11 +132,11 @@ class CreateUserSuccessTests(OSComponentTestCase): retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name) self.assertIsNotNone(retrieved_user) - self.assertEquals(created_user, retrieved_user) + self.assertEqual(created_user, retrieved_user) # Create user for the second time to ensure it is the same user2 = OpenStackUser(self.os_creds, self.user_settings).create() - self.assertEquals(retrieved_user, user2) + self.assertEqual(retrieved_user, user2) def test_create_delete_user(self): """ @@ -152,4 +152,3 @@ class CreateUserSuccessTests(OSComponentTestCase): # Delete user self.user_creator.clean() self.assertIsNone(self.user_creator.get_user()) - diff --git a/snaps/openstack/tests/validation_utils.py b/snaps/openstack/tests/validation_utils.py index 7c9bd7f..b6d5856 100644 --- a/snaps/openstack/tests/validation_utils.py +++ b/snaps/openstack/tests/validation_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -42,7 +42,7 @@ def dicts_equivalent(dict1, dict2): :return: T/F """ if (type(dict1) is dict or type(dict1) is _DictWithMeta) and (type(dict2) is dict or type(dict2) is _DictWithMeta): - for key, value1 in dict1.iteritems(): + for key, value1 in dict1.items(): if not objects_equivalent(value1, dict2.get(key)): return False return True diff --git a/snaps/openstack/utils/glance_utils.py b/snaps/openstack/utils/glance_utils.py index d859257..ca9c490 100644 --- a/snaps/openstack/utils/glance_utils.py +++ b/snaps/openstack/utils/glance_utils.py @@ -151,10 +151,8 @@ def __create_image_v2(glance, image_settings): kwargs['name'] = image_settings.name kwargs['disk_format'] = image_settings.format kwargs['container_format'] = 'bare' - if image_settings.extra_properties: - for key, value in image_settings.extra_properties.iteritems(): - kwargs[key] = value + kwargs.update(image_settings.extra_properties) created_image = glance.images.create(**kwargs) image_file = file_utils.get_file(image_filename) diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py index 6c92d2e..a65a42f 100644 --- a/snaps/openstack/utils/neutron_utils.py +++ b/snaps/openstack/utils/neutron_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,7 +16,7 @@ import logging from neutronclient.common.exceptions import NotFound from neutronclient.neutron.client import Client -import keystone_utils +from snaps.openstack.utils import keystone_utils __author__ = 'spisarski' @@ -80,7 +80,7 @@ def get_network(neutron, network_name, project_id=None): net_filter['project_id'] = project_id networks = neutron.list_networks(**net_filter) - for network, netInsts in networks.iteritems(): + for network, netInsts in networks.items(): for inst in netInsts: if inst.get('name') == network_name: if project_id and inst.get('project_id') == project_id: @@ -98,7 +98,7 @@ def get_network_by_id(neutron, network_id): :return: """ networks = neutron.list_networks(**{'id': network_id}) - for network, netInsts in networks.iteritems(): + for network, netInsts in networks.items(): for inst in netInsts: if inst.get('id') == network_id: return {'network': inst} @@ -144,7 +144,7 @@ def get_subnet_by_name(neutron, subnet_name): :return: """ subnets = neutron.list_subnets(**{'name': subnet_name}) - for subnet, subnetInst in subnets.iteritems(): + for subnet, subnetInst in subnets.items(): for inst in subnetInst: if inst.get('name') == subnet_name: return {'subnet': inst} @@ -189,7 +189,7 @@ def get_router_by_name(neutron, router_name): :return: """ routers = neutron.list_routers(**{'name': router_name}) - for router, routerInst in routers.iteritems(): + for router, routerInst in routers.items(): for inst in routerInst: if inst.get('name') == router_name: return {'router': inst} @@ -228,10 +228,10 @@ def remove_interface_router(neutron, router, subnet=None, port=None): logger.info('Removing router interface from router named ' + router['router']['name']) neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port)) except NotFound as e: - logger.warn('Could not remove router interface. NotFound - ' + e.message) + logger.warning('Could not remove router interface. NotFound - ' + str(e)) pass else: - logger.warn('Could not remove router interface, No router object') + logger.warning('Could not remove router interface, No router object') def __create_port_json_body(subnet=None, port=None): diff --git a/snaps/openstack/utils/nova_utils.py b/snaps/openstack/utils/nova_utils.py index e7428ed..419f451 100644 --- a/snaps/openstack/utils/nova_utils.py +++ b/snaps/openstack/utils/nova_utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -12,9 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.hazmat.backends import default_backend + import os import logging -import keystone_utils +from snaps.openstack.utils import keystone_utils from novaclient.client import Client from novaclient.exceptions import NotFound @@ -58,10 +63,29 @@ def get_latest_server_object(nova, server): return nova.servers.get(server) +def create_keys(key_size=2048): + """ + Generates public and private keys + :param key_size: the number of bytes for the key size + :return: the cryptography keys + """ + return rsa.generate_private_key(backend=default_backend(), public_exponent=65537, + key_size=key_size) + + +def public_key_openssh(keys): + """ + Returns the public key for OpenSSH + :param keys: the keys generated by create_keys() from cryptography + :return: the OpenSSH public key + """ + return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + + def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): """ Saves the generated RSA generated keys to the filesystem - :param keys: the keys to save + :param keys: the keys to save generated by cryptography :param pub_file_path: the path to the public keys :param priv_file_path: the path to the private keys :return: None @@ -72,7 +96,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): if not os.path.isdir(pub_dir): os.mkdir(pub_dir) public_handle = open(pub_file_path, 'wb') - public_handle.write(keys.publickey().exportKey('OpenSSH')) + public_bytes = keys.public_key().public_bytes( + serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH) + public_handle.write(public_bytes) public_handle.close() os.chmod(pub_file_path, 0o400) logger.info("Saved public key to - " + pub_file_path) @@ -81,7 +107,9 @@ def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None): if not os.path.isdir(priv_dir): os.mkdir(priv_dir) private_handle = open(priv_file_path, 'wb') - private_handle.write(keys.exportKey()) + private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption())) private_handle.close() os.chmod(priv_file_path, 0o400) logger.info("Saved private key to - " + priv_file_path) @@ -95,7 +123,7 @@ def upload_keypair_file(nova, name, file_path): :param file_path: the path to the public key file :return: the keypair object """ - with open(os.path.expanduser(file_path)) as fpubkey: + with open(os.path.expanduser(file_path), 'rb') as fpubkey: logger.info('Saving keypair to - ' + file_path) return upload_keypair(nova, name, fpubkey.read()) @@ -109,7 +137,7 @@ def upload_keypair(nova, name, key): :return: the keypair object """ logger.info('Creating keypair with name - ' + name) - return nova.keypairs.create(name=name, public_key=key) + return nova.keypairs.create(name=name, public_key=key.decode('utf-8')) def keypair_exists(nova, keypair_obj): @@ -212,7 +240,7 @@ def get_nova_availability_zones(nova): zones = nova.availability_zones.list() for zone in zones: if zone.zoneName == 'nova': - for key, host in zone.hosts.iteritems(): + for key, host in zone.hosts.items(): out.append(zone.zoneName + ':' + key) return out diff --git a/snaps/openstack/utils/tests/keystone_utils_tests.py b/snaps/openstack/utils/tests/keystone_utils_tests.py index 76a43ef..c072fd3 100644 --- a/snaps/openstack/utils/tests/keystone_utils_tests.py +++ b/snaps/openstack/utils/tests/keystone_utils_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -93,8 +93,8 @@ class KeystoneUtilsTests(OSComponentTestCase): """ project_settings = ProjectSettings(name=self.project_name) self.project = keystone_utils.create_project(self.keystone, project_settings) - self.assertEquals(self.project_name, self.project.name) + self.assertEqual(self.project_name, self.project.name) project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name) self.assertIsNotNone(project) - self.assertEquals(self.project_name, self.project.name) + self.assertEqual(self.project_name, self.project.name) diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py index 5f95fc9..9a043e3 100644 --- a/snaps/openstack/utils/tests/neutron_utils_tests.py +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -109,14 +109,16 @@ class NeutronUtilsNetworkTests(OSComponentTestCase): Tests the neutron_utils.create_neutron_net() function with an empty network name """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name='')) + self.network = neutron_utils.create_network(self.neutron, self.os_creds, + network_settings=NetworkSettings(name='')) def test_create_network_null_name(self): """ Tests the neutron_utils.create_neutron_net() function when the network name is None """ with self.assertRaises(Exception): - self.network = neutron_utils.create_network(self.neutron, NetworkSettings()) + self.network = neutron_utils.create_network(self.neutron, self.os_creds, + network_settings=NetworkSettings()) class NeutronUtilsSubnetTests(OSComponentTestCase): @@ -644,7 +646,7 @@ def validate_port(neutron, port_obj, this_port_name): :return: True/False """ ports = neutron.list_ports() - for port, port_insts in ports.iteritems(): + for port, port_insts in ports.items(): for inst in port_insts: if inst['id'] == port_obj['port']['id']: return inst['name'] == this_port_name diff --git a/snaps/openstack/utils/tests/nova_utils_tests.py b/snaps/openstack/utils/tests/nova_utils_tests.py index f6c9156..0a2b24b 100644 --- a/snaps/openstack/utils/tests/nova_utils_tests.py +++ b/snaps/openstack/utils/tests/nova_utils_tests.py @@ -1,4 +1,4 @@ -# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -16,8 +16,6 @@ import logging import os import uuid -from Crypto.PublicKey import RSA - from snaps.openstack.utils import nova_utils from snaps.openstack.tests.os_source_file_test import OSComponentTestCase from snaps.openstack.create_flavor import FlavorSettings @@ -71,8 +69,8 @@ class NovaUtilsKeypairTests(OSComponentTestCase): self.pub_key_file_path = self.priv_key_file_path + '.pub' self.nova = nova_utils.nova_client(self.os_creds) - self.keys = RSA.generate(1024) - self.public_key = self.keys.publickey().exportKey('OpenSSH') + self.keys = nova_utils.create_keys() + self.public_key = nova_utils.public_key_openssh(self.keys) self.keypair_name = guid self.keypair = None self.floating_ip = None @@ -106,9 +104,9 @@ class NovaUtilsKeypairTests(OSComponentTestCase): """ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) - self.assertEquals(self.keypair, result) + self.assertEqual(self.keypair, result) keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name) - self.assertEquals(self.keypair, keypair) + self.assertEqual(self.keypair, keypair) def test_create_delete_keypair(self): """ @@ -116,7 +114,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase): """ self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key) result = nova_utils.keypair_exists(self.nova, self.keypair) - self.assertEquals(self.keypair, result) + self.assertEqual(self.keypair, result) nova_utils.delete_keypair(self.nova, self.keypair) result2 = nova_utils.keypair_exists(self.nova, self.keypair) self.assertIsNone(result2) @@ -129,7 +127,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase): nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path) self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path) pub_key = open(os.path.expanduser(self.pub_key_file_path)).read() - self.assertEquals(self.keypair.public_key, pub_key) + self.assertEqual(self.keypair.public_key, pub_key) def test_floating_ips(self): """ @@ -141,7 +139,7 @@ class NovaUtilsKeypairTests(OSComponentTestCase): self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name) returned = nova_utils.get_floating_ip(self.nova, self.floating_ip) - self.assertEquals(self.floating_ip, returned) + self.assertEqual(self.floating_ip, returned) class NovaUtilsFlavorTests(OSComponentTestCase): @@ -192,17 +190,17 @@ class NovaUtilsFlavorTests(OSComponentTestCase): Validates the flavor_settings against the OpenStack flavor object """ self.assertIsNotNone(self.flavor) - self.assertEquals(self.flavor_settings.name, self.flavor.name) - self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id) - self.assertEquals(self.flavor_settings.ram, self.flavor.ram) - self.assertEquals(self.flavor_settings.disk, self.flavor.disk) - self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus) - self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral) + self.assertEqual(self.flavor_settings.name, self.flavor.name) + self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id) + self.assertEqual(self.flavor_settings.ram, self.flavor.ram) + self.assertEqual(self.flavor_settings.disk, self.flavor.disk) + self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus) + self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral) if self.flavor_settings.swap == 0: - self.assertEquals('', self.flavor.swap) + self.assertEqual('', self.flavor.swap) else: - self.assertEquals(self.flavor_settings.swap, self.flavor.swap) + self.assertEqual(self.flavor_settings.swap, self.flavor.swap) - self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor) - self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public) + self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor) + self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public) |