diff options
Diffstat (limited to 'snaps/openstack')
-rw-r--r-- | snaps/openstack/create_security_group.py | 335 | ||||
-rw-r--r-- | snaps/openstack/create_stack.py | 2 | ||||
-rw-r--r-- | snaps/openstack/tests/create_instance_tests.py | 90 | ||||
-rw-r--r-- | snaps/openstack/tests/create_project_tests.py | 8 | ||||
-rw-r--r-- | snaps/openstack/tests/create_security_group_tests.py | 109 | ||||
-rw-r--r-- | snaps/openstack/utils/neutron_utils.py | 2 | ||||
-rw-r--r-- | snaps/openstack/utils/settings_utils.py | 10 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/neutron_utils_tests.py | 26 | ||||
-rw-r--r-- | snaps/openstack/utils/tests/settings_utils_tests.py | 24 |
9 files changed, 159 insertions, 447 deletions
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py index fcad584..7a20fe1 100644 --- a/snaps/openstack/create_security_group.py +++ b/snaps/openstack/create_security_group.py @@ -17,6 +17,8 @@ import logging import enum from neutronclient.common.exceptions import NotFound, Conflict +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig) from snaps.openstack.openstack_creator import OpenStackNetworkObject from snaps.openstack.utils import keystone_utils from snaps.openstack.utils import neutron_utils @@ -115,15 +117,15 @@ class OpenStackSecurityGroup(OpenStackNetworkObject): def __generate_rule_setting(self, rule): """ - Creates a SecurityGroupRuleSettings object for a given rule + Creates a SecurityGroupRuleConfig object for a given rule :param rule: the rule from which to create the - SecurityGroupRuleSettings object - :return: the newly instantiated SecurityGroupRuleSettings object + SecurityGroupRuleConfig object + :return: the newly instantiated SecurityGroupRuleConfig object """ sec_grp = neutron_utils.get_security_group_by_id( self._neutron, rule.security_group_id) - setting = SecurityGroupRuleSettings( + setting = SecurityGroupRuleConfig( description=rule.description, direction=rule.direction, ethertype=rule.ethertype, @@ -218,83 +220,24 @@ class OpenStackSecurityGroup(OpenStackNetworkObject): return None -class SecurityGroupSettings: +class SecurityGroupSettings(SecurityGroupConfig): """ - Class representing a keypair configuration + Class to hold the configuration settings required for creating OpenStack + SecurityGroup objects + deprecated - use snaps.config.security_group.SecurityGroupConfig instead """ def __init__(self, **kwargs): - """ - Constructor - :param name: The security group's name (required) - :param description: The security group's description (optional) - :param project_name: The name of the project under which the security - group will be created - :param rule_settings: a list of SecurityGroupRuleSettings objects - :return: - """ - self.name = kwargs.get('name') - self.description = kwargs.get('description') - self.project_name = kwargs.get('project_name') - self.rule_settings = list() - - rule_settings = kwargs.get('rules') - if not rule_settings: - rule_settings = kwargs.get('rule_settings') - - if rule_settings: - for rule_setting in rule_settings: - if isinstance(rule_setting, SecurityGroupRuleSettings): - self.rule_settings.append(rule_setting) - else: - rule_setting['sec_grp_name'] = self.name - self.rule_settings.append(SecurityGroupRuleSettings( - **rule_setting)) - - if not self.name: - raise SecurityGroupSettingsError('The attribute name is required') - - for rule_setting in self.rule_settings: - if rule_setting.sec_grp_name is not self.name: - raise SecurityGroupSettingsError( - 'Rule settings must correspond with the name of this ' - 'security group') - - def dict_for_neutron(self, keystone): - """ - Returns a dictionary object representing this object. - This is meant to be converted into JSON designed for use by the Neutron - API - - TODO - expand automated testing to exercise all parameters - :param keystone: the Keystone client - :return: the dictionary object - """ - out = dict() - - if self.name: - out['name'] = self.name - if self.description: - out['description'] = self.description - if self.project_name: - project = keystone_utils.get_project( - keystone=keystone, project_name=self.project_name) - project_id = None - if project: - project_id = project.id - if project_id: - out['tenant_id'] = project_id - else: - raise SecurityGroupSettingsError( - 'Could not find project ID for project named - ' + - self.project_name) - - return {'security_group': out} + from warnings import warn + warn('Use snaps.config.security_group.SecurityGroupConfig instead', + DeprecationWarning) + super(self.__class__, self).__init__(**kwargs) class Direction(enum.Enum): """ A rule's direction + deprecated - use snaps.config.security_group.Direction """ ingress = 'ingress' egress = 'egress' @@ -303,6 +246,7 @@ class Direction(enum.Enum): class Protocol(enum.Enum): """ A rule's protocol + deprecated - use snaps.config.security_group.Protocol """ ah = 51 dccp = 33 @@ -333,249 +277,22 @@ class Protocol(enum.Enum): class Ethertype(enum.Enum): """ A rule's ethertype + deprecated - use snaps.config.security_group.Ethertype """ IPv4 = 4 IPv6 = 6 -class SecurityGroupSettingsError(Exception): - """ - Exception to be thrown when security group settings attributes are - invalid - """ - - -class SecurityGroupRuleSettings: +class SecurityGroupRuleSettings(SecurityGroupRuleConfig): """ - Class representing a keypair configuration + Class to hold the configuration settings required for creating OpenStack + SecurityGroupRule objects + deprecated - use snaps.config.security_group.SecurityGroupRuleConfig + instead """ def __init__(self, **kwargs): - """ - Constructor - all parameters are optional - :param sec_grp_name: The security group's name on which to add the - rule. (required) - :param description: The rule's description - :param direction: An enumeration of type - create_security_group.RULE_DIRECTION (required) - :param remote_group_id: The group ID to associate with this rule - (this should be changed to group name once - snaps support Groups) (optional) - :param protocol: An enumeration of type - create_security_group.RULE_PROTOCOL or a string value - that will be mapped accordingly (optional) - :param ethertype: An enumeration of type - create_security_group.RULE_ETHERTYPE (optional) - :param port_range_min: The minimum port number in the range that is - matched by the security group rule. When the - protocol is TCP or UDP, this value must be <= - port_range_max. - :param port_range_max: The maximum port number in the range that is - matched by the security group rule. When the - protocol is TCP or UDP, this value must be <= - port_range_max. - :param remote_ip_prefix: The remote IP prefix to associate with this - metering rule packet (optional) - - TODO - Need to support the tenant... - """ - - self.description = kwargs.get('description') - self.sec_grp_name = kwargs.get('sec_grp_name') - self.remote_group_id = kwargs.get('remote_group_id') - self.direction = None - if kwargs.get('direction'): - self.direction = map_direction(kwargs['direction']) - - self.protocol = None - if kwargs.get('protocol'): - self.protocol = map_protocol(kwargs['protocol']) - else: - self.protocol = Protocol.null - - self.ethertype = None - if kwargs.get('ethertype'): - self.ethertype = map_ethertype(kwargs['ethertype']) - - self.port_range_min = kwargs.get('port_range_min') - self.port_range_max = kwargs.get('port_range_max') - self.remote_ip_prefix = kwargs.get('remote_ip_prefix') - - if not self.direction or not self.sec_grp_name: - raise SecurityGroupRuleSettingsError( - 'direction and sec_grp_name are required') - - def dict_for_neutron(self, neutron): - """ - Returns a dictionary object representing this object. - This is meant to be converted into JSON designed for use by the Neutron - API - - :param neutron: the neutron client for performing lookups - :return: the dictionary object - """ - out = dict() - - if self.description: - out['description'] = self.description - if self.direction: - out['direction'] = self.direction.name - if self.port_range_min: - out['port_range_min'] = self.port_range_min - if self.port_range_max: - out['port_range_max'] = self.port_range_max - if self.ethertype: - out['ethertype'] = self.ethertype.name - if self.protocol and self.protocol.value != 'null': - out['protocol'] = self.protocol.value - if self.sec_grp_name: - sec_grp = neutron_utils.get_security_group( - neutron, sec_grp_name=self.sec_grp_name) - if sec_grp: - out['security_group_id'] = sec_grp.id - else: - raise SecurityGroupRuleSettingsError( - 'Cannot locate security group with name - ' + - self.sec_grp_name) - if self.remote_group_id: - out['remote_group_id'] = self.remote_group_id - if self.remote_ip_prefix: - out['remote_ip_prefix'] = self.remote_ip_prefix - - return {'security_group_rule': out} - - def rule_eq(self, rule): - """ - Returns True if this setting created the rule - :param rule: the rule to evaluate - :return: T/F - """ - if self.description is not None: - if (rule.description is not None and - rule.description != ''): - return False - elif self.description != rule.description: - if rule.description != '': - return False - - if self.direction.name != rule.direction: - return False - - if self.ethertype and rule.ethertype: - if self.ethertype.name != rule.ethertype: - return False - - if self.port_range_min and rule.port_range_min: - if self.port_range_min != rule.port_range_min: - return False - - if self.port_range_max and rule.port_range_max: - if self.port_range_max != rule.port_range_max: - return False - - if self.protocol and rule.protocol: - if self.protocol.name != rule.protocol: - return False - - if self.remote_group_id and rule.remote_group_id: - if self.remote_group_id != rule.remote_group_id: - return False - - if self.remote_ip_prefix and rule.remote_ip_prefix: - if self.remote_ip_prefix != rule.remote_ip_prefix: - return False - - return True - - def __eq__(self, other): - return ( - self.description == other.description and - self.direction == other.direction and - self.port_range_min == other.port_range_min and - self.port_range_max == other.port_range_max and - self.ethertype == other.ethertype and - self.protocol == other.protocol and - self.sec_grp_name == other.sec_grp_name and - self.remote_group_id == other.remote_group_id and - self.remote_ip_prefix == other.remote_ip_prefix) - - def __hash__(self): - return hash((self.sec_grp_name, self.description, self.direction, - self.remote_group_id, - self.protocol, self.ethertype, self.port_range_min, - self.port_range_max, self.remote_ip_prefix)) - - -def map_direction(direction): - """ - Takes a the direction value maps it to the Direction enum. When None return - None - :param direction: the direction value - :return: the Direction enum object - :raise: Exception if value is invalid - """ - if not direction: - return None - if isinstance(direction, Direction): - return direction - else: - dir_str = str(direction) - if dir_str == 'egress': - return Direction.egress - elif dir_str == 'ingress': - return Direction.ingress - else: - raise SecurityGroupRuleSettingsError( - 'Invalid Direction - ' + dir_str) - - -def map_protocol(protocol): - """ - Takes a the protocol value maps it to the Protocol enum. When None return - None - :param protocol: the protocol value - :return: the Protocol enum object - :raise: Exception if value is invalid - """ - if not protocol: - return None - elif isinstance(protocol, Protocol): - return protocol - else: - for proto_enum in Protocol: - if proto_enum.name == protocol or proto_enum.value == protocol: - if proto_enum == Protocol.any: - return Protocol.null - return proto_enum - raise SecurityGroupRuleSettingsError( - 'Invalid Protocol - ' + protocol) - - -def map_ethertype(ethertype): - """ - Takes a the ethertype value maps it to the Ethertype enum. When None return - None - :param ethertype: the ethertype value - :return: the Ethertype enum object - :raise: Exception if value is invalid - """ - if not ethertype: - return None - elif isinstance(ethertype, Ethertype): - return ethertype - else: - eth_str = str(ethertype) - if eth_str == 'IPv6': - return Ethertype.IPv6 - elif eth_str == 'IPv4': - return Ethertype.IPv4 - else: - raise SecurityGroupRuleSettingsError( - 'Invalid Ethertype - ' + eth_str) - - -class SecurityGroupRuleSettingsError(Exception): - """ - Exception to be thrown when security group rule settings attributes are - invalid - """ + from warnings import warn + warn('Use snaps.config.security_group.SecurityGroupRuleConfig instead', + DeprecationWarning) + super(self.__class__, self).__init__(**kwargs) diff --git a/snaps/openstack/create_stack.py b/snaps/openstack/create_stack.py index fa0a0c5..f0e1527 100644 --- a/snaps/openstack/create_stack.py +++ b/snaps/openstack/create_stack.py @@ -245,7 +245,7 @@ class OpenStackHeatStack(OpenStackCloudObject, object): self.__heat_cli, neutron, self.__stack) for stack_security_group in stack_security_groups: - settings = settings_utils.create_security_group_settings( + settings = settings_utils.create_security_group_config( neutron, stack_security_group) creator = OpenStackSecurityGroup(self._os_creds, settings) out.append(creator) diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py index be83879..4a516d3 100644 --- a/snaps/openstack/tests/create_instance_tests.py +++ b/snaps/openstack/tests/create_instance_tests.py @@ -29,6 +29,8 @@ from snaps.config.image import ImageConfig from snaps.config.keypair import KeypairConfig from snaps.config.network import PortConfig, NetworkConfig, SubnetConfig from snaps.config.router import RouterConfig +from snaps.config.security_group import ( + Protocol, SecurityGroupRuleConfig, Direction, SecurityGroupConfig) from snaps.config.vm_inst import ( VmInstanceConfig, FloatingIpConfig, VmInstanceConfigError, FloatingIpConfigError) @@ -41,9 +43,7 @@ from snaps.openstack.create_instance import ( from snaps.openstack.create_keypairs import OpenStackKeypair from snaps.openstack.create_network import OpenStackNetwork from snaps.openstack.create_router import OpenStackRouter -from snaps.openstack.create_security_group import ( - SecurityGroupSettings, OpenStackSecurityGroup, SecurityGroupRuleSettings, - Direction, Protocol) +from snaps.openstack.create_security_group import OpenStackSecurityGroup from snaps.openstack.create_volume import OpenStackVolume from snaps.openstack.tests import openstack_tests, validation_utils from snaps.openstack.tests.os_source_file_test import ( @@ -585,18 +585,16 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase): self.keypair_creator.create() sec_grp_name = guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.tcp, - port_range_min=22, - port_range_max=22) + rule1 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.icmp) + rule2 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.tcp, port_range_min=22, port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, - rule_settings=[rule1, rule2])) + SecurityGroupConfig( + name=sec_grp_name, rule_settings=[rule1, rule2])) self.sec_grp_creator.create() except Exception as e: self.tearDown() @@ -864,18 +862,16 @@ class CreateInstanceIPv6NetworkTests(OSIntegrationTestCase): self.keypair_creator.create() sec_grp_name = self.guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.tcp, - port_range_min=22, - port_range_max=22) + rule1 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.icmp) + rule2 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.tcp, port_range_min=22, port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, - rule_settings=[rule1, rule2])) + SecurityGroupConfig( + name=sec_grp_name, rule_settings=[rule1, rule2])) self.sec_grp_creator.create() except Exception as e: self.tearDown() @@ -1554,18 +1550,16 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase): self.keypair_creator.create() sec_grp_name = self.guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.tcp, - port_range_min=22, - port_range_max=22) + rule1 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.icmp) + rule2 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.tcp, port_range_min=22, port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, - rule_settings=[rule1, rule2])) + SecurityGroupConfig( + name=sec_grp_name, rule_settings=[rule1, rule2])) self.sec_grp_creator.create() except: self.tearDown() @@ -1809,8 +1803,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Create security group object to add to instance - sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name', - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.guid + '-name', description='hello group') sec_grp_creator = OpenStackSecurityGroup(self.os_creds, sec_grp_settings) sec_grp = sec_grp_creator.create() @@ -1843,8 +1837,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): self.assertIsNotNone(vm_inst) # Create security group object to add to instance - sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name', - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.guid + '-name', description='hello group') sec_grp_creator = OpenStackSecurityGroup(self.os_creds, sec_grp_settings) sec_grp = sec_grp_creator.create() @@ -1868,8 +1862,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): instance. """ # Create security group object to add to instance - sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name', - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.guid + '-name', description='hello group') sec_grp_creator = OpenStackSecurityGroup(self.os_creds, sec_grp_settings) sec_grp = sec_grp_creator.create() @@ -1904,8 +1898,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): place. """ # Create security group object to add to instance - sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name', - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.guid + '-name', description='hello group') sec_grp_creator = OpenStackSecurityGroup(self.os_creds, sec_grp_settings) sec_grp = sec_grp_creator.create() @@ -1939,8 +1933,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase): instance. """ # Create security group object to add to instance - sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name', - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.guid + '-name', description='hello group') sec_grp_creator = OpenStackSecurityGroup(self.os_creds, sec_grp_settings) sec_grp = sec_grp_creator.create() @@ -2739,13 +2733,13 @@ class CreateInstanceTwoNetTests(OSIntegrationTestCase): self.flavor_creator.create() sec_grp_name = self.guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.icmp) + rule1 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.icmp) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, - rule_settings=[rule1])) + SecurityGroupConfig( + name=sec_grp_name, rule_settings=[rule1])) self.sec_grp_creator.create() except: self.tearDown() diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py index d7d4dcf..2c10311 100644 --- a/snaps/openstack/tests/create_project_tests.py +++ b/snaps/openstack/tests/create_project_tests.py @@ -17,13 +17,13 @@ import uuid from keystoneclient.exceptions import BadRequest +from snaps.config.security_group import SecurityGroupConfig from snaps.config.user import UserConfig from snaps.config.project import ProjectConfigError, ProjectConfig from snaps.domain.project import ComputeQuotas, NetworkQuotas from snaps.openstack.create_project import ( OpenStackProject, ProjectSettings) from snaps.openstack.create_security_group import OpenStackSecurityGroup -from snaps.openstack.create_security_group import SecurityGroupSettings from snaps.openstack.create_user import OpenStackUser from snaps.openstack.tests.os_source_file_test import OSComponentTestCase from snaps.openstack.utils import keystone_utils, nova_utils, neutron_utils @@ -284,7 +284,7 @@ class CreateProjectUserTests(OSComponentTestCase): sec_grp_os_creds = user_creator.get_os_creds( self.project_creator.get_project().name) sec_grp_creator = OpenStackSecurityGroup( - sec_grp_os_creds, SecurityGroupSettings( + sec_grp_os_creds, SecurityGroupConfig( name=self.guid + '-name', description='hello group')) sec_grp = sec_grp_creator.create() self.assertIsNotNone(sec_grp) @@ -327,8 +327,8 @@ class CreateProjectUserTests(OSComponentTestCase): sec_grp_creator = OpenStackSecurityGroup( sec_grp_os_creds, - SecurityGroupSettings(name=self.guid + '-name', - description='hello group')) + SecurityGroupConfig( + name=self.guid + '-name', description='hello group')) sec_grp = sec_grp_creator.create() self.assertIsNotNone(sec_grp) self.sec_grp_creators.append(sec_grp_creator) diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index ed62548..090d736 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -15,10 +15,13 @@ import unittest import uuid +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig, + SecurityGroupRuleConfigError, SecurityGroupConfigError) from snaps.openstack import create_security_group from snaps.openstack.create_security_group import ( SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype, - Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError) + Protocol) from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils @@ -32,64 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**dict()) def test_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(sec_grp_name='foo') def test_config_with_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'}) def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_proto_ah_str(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'ah'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.ah, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) def test_proto_ah_value(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 51}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.ah, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) def test_proto_any(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'any'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.null, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_proto_null(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'null'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.null, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_all(self): settings = SecurityGroupRuleSettings( @@ -100,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): remote_ip_prefix='prfx') self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.icmp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.icmp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -121,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): 'remote_ip_prefix': 'prfx'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.tcp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.tcp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -136,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(**dict()) def test_name_only(self): @@ -155,7 +158,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): rule_setting = SecurityGroupRuleSettings( sec_grp_name='bar', direction=Direction.ingress, description='test_rule_1') - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): @@ -189,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.project_name) self.assertEqual(1, len(settings.rule_settings)) self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) - self.assertEqual(Direction.ingress, - settings.rule_settings[0].direction) + self.assertEqual(Direction.ingress.value, + settings.rule_settings[0].direction.value) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -226,8 +229,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() @@ -254,7 +257,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.admin_os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -283,7 +286,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -312,8 +315,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() @@ -339,10 +342,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -372,12 +375,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -407,21 +410,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -451,10 +454,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -480,7 +483,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( + self.sec_grp_creator.add_rule(SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, direction=Direction.egress, protocol=Protocol.icmp, description='test_rule_2')) @@ -496,21 +499,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -547,21 +550,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -615,7 +618,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): this is the only means to tell if the rule is custom or defaulted by OpenStack :param neutron: the neutron client - :param rule_settings: collection of SecurityGroupRuleSettings objects + :param rule_settings: collection of SecurityGroupRuleConfig objects :param rules: a collection of SecurityGroupRule domain objects :return: T/F """ @@ -646,7 +649,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): proto_str == str(rule_setting.protocol.value) and rule.remote_group_id == rule_setting.remote_group_id and rule.remote_ip_prefix == rule_setting.remote_ip_prefix and - rule.security_group_id == sec_grp.id): + rule.security_group_id == sec_grp.id): match = True break diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py index b59f811..eaceb37 100644 --- a/snaps/openstack/utils/neutron_utils.py +++ b/snaps/openstack/utils/neutron_utils.py @@ -579,7 +579,7 @@ def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None, the security group will be used, else if the query parameters are None then None will be returned :param neutron: the client - :param sec_grp_settings: an instance of SecurityGroupSettings config object + :param sec_grp_settings: an instance of SecurityGroupConfig object :param sec_grp_name: the name of security group object to retrieve :param project_id: the ID of the project/tentant object that owns the secuity group to retrieve diff --git a/snaps/openstack/utils/settings_utils.py b/snaps/openstack/utils/settings_utils.py index 5433570..145b481 100644 --- a/snaps/openstack/utils/settings_utils.py +++ b/snaps/openstack/utils/settings_utils.py @@ -19,12 +19,12 @@ from snaps.config.flavor import FlavorConfig from snaps.config.keypair import KeypairConfig from snaps.config.network import SubnetConfig, PortConfig, NetworkConfig from snaps.config.router import RouterConfig +from snaps.config.security_group import ( + SecurityGroupRuleConfig, SecurityGroupConfig) from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig from snaps.config.volume import VolumeConfig from snaps.config.volume_type import ( ControlLocation, VolumeTypeEncryptionConfig, VolumeTypeConfig) -from snaps.openstack.create_security_group import ( - SecurityGroupSettings, SecurityGroupRuleSettings) from snaps.openstack.utils import ( neutron_utils, nova_utils, heat_utils, glance_utils) @@ -41,7 +41,7 @@ def create_network_config(neutron, network): subnet_settings=create_subnet_config(neutron, network)) -def create_security_group_settings(neutron, security_group): +def create_security_group_config(neutron, security_group): """ Returns a NetworkConfig object :param neutron: the neutron client @@ -52,7 +52,7 @@ def create_security_group_settings(neutron, security_group): rule_settings = list() for rule in rules: - rule_settings.append(SecurityGroupRuleSettings( + rule_settings.append(SecurityGroupRuleConfig( sec_grp_name=security_group.name, description=rule.description, direction=rule.direction, ethertype=rule.ethertype, port_range_min=rule.port_range_min, @@ -60,7 +60,7 @@ def create_security_group_settings(neutron, security_group): remote_group_id=rule.remote_group_id, remote_ip_prefix=rule.remote_ip_prefix)) - return SecurityGroupSettings( + return SecurityGroupConfig( name=security_group.name, description=security_group.description, rule_settings=rule_settings) diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py index 44bc59f..38faf71 100644 --- a/snaps/openstack/utils/tests/neutron_utils_tests.py +++ b/snaps/openstack/utils/tests/neutron_utils_tests.py @@ -17,8 +17,8 @@ import uuid from neutronclient.common.exceptions import NotFound, BadRequest from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig -from snaps.openstack.create_security_group import ( - SecurityGroupSettings, SecurityGroupRuleSettings, Direction) +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig, Direction) from snaps.openstack.tests import openstack_tests from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSComponentTestCase @@ -841,7 +841,7 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name) + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name) security_group = neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings) @@ -861,13 +861,13 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): def test_create_sec_grp_no_name(self): """ - Tests the SecurityGroupSettings constructor and + Tests the SecurityGroupConfig constructor and neutron_utils.create_security_group() function to ensure that attempting to create a security group without a name will raise an exception """ with self.assertRaises(Exception): - sec_grp_settings = SecurityGroupSettings() + sec_grp_settings = SecurityGroupConfig() self.security_groups.append( neutron_utils.create_security_group(self.neutron, self.keystone, @@ -877,8 +877,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): """ Tests the neutron_utils.create_security_group() function """ - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig( + name=self.sec_grp_name, description='hello group') self.security_groups.append( neutron_utils.create_security_group(self.neutron, self.keystone, sec_grp_settings)) @@ -896,9 +896,9 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): Tests the neutron_utils.create_security_group() function """ - sec_grp_rule_settings = SecurityGroupRuleSettings( + sec_grp_rule_settings = SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=[sec_grp_rule_settings]) @@ -939,12 +939,12 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase): self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-1', - description='hello group'))) + SecurityGroupConfig( + name=self.sec_grp_name + '-1', description='hello group'))) self.security_groups.append(neutron_utils.create_security_group( self.neutron, self.keystone, - SecurityGroupSettings(name=self.sec_grp_name + '-2', - description='hello group'))) + SecurityGroupConfig( + name=self.sec_grp_name + '-2', description='hello group'))) sec_grp_1b = neutron_utils.get_security_group_by_id( self.neutron, self.security_groups[0].id) diff --git a/snaps/openstack/utils/tests/settings_utils_tests.py b/snaps/openstack/utils/tests/settings_utils_tests.py index d0390e2..06062c5 100644 --- a/snaps/openstack/utils/tests/settings_utils_tests.py +++ b/snaps/openstack/utils/tests/settings_utils_tests.py @@ -22,6 +22,8 @@ from snaps.config.network import SubnetConfig, NetworkConfig, PortConfig from snaps.config.flavor import FlavorConfig from snaps.config.keypair import KeypairConfig from snaps.config.qos import Consumer +from snaps.config.security_group import ( + SecurityGroupRuleConfig, Direction, Protocol, SecurityGroupConfig) from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig from snaps.domain.flavor import Flavor from snaps.domain.volume import ( @@ -31,9 +33,7 @@ from snaps.openstack import ( create_keypairs, create_instance) from snaps.openstack.create_qos import Consumer from snaps.openstack.create_network import OpenStackNetwork -from snaps.openstack.create_security_group import ( - SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup, - SecurityGroupSettings) +from snaps.openstack.create_security_group import OpenStackSecurityGroup from snaps.openstack.tests import openstack_tests from snaps.openstack.tests.os_source_file_test import OSComponentTestCase from snaps.openstack.utils import ( @@ -218,18 +218,16 @@ class SettingsUtilsVmInstTests(OSComponentTestCase): # Create Security Group sec_grp_name = guid + '-sec-grp' - rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.icmp) - rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name, - direction=Direction.ingress, - protocol=Protocol.tcp, - port_range_min=22, - port_range_max=22) + rule1 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.icmp) + rule2 = SecurityGroupRuleConfig( + sec_grp_name=sec_grp_name, direction=Direction.ingress, + protocol=Protocol.tcp, port_range_min=22, port_range_max=22) self.sec_grp_creator = OpenStackSecurityGroup( self.os_creds, - SecurityGroupSettings(name=sec_grp_name, - rule_settings=[rule1, rule2])) + SecurityGroupConfig( + name=sec_grp_name, rule_settings=[rule1, rule2])) self.sec_grp_creator.create() # Create instance |