diff options
Diffstat (limited to 'snaps/openstack/tests/create_security_group_tests.py')
-rw-r--r-- | snaps/openstack/tests/create_security_group_tests.py | 109 |
1 files changed, 56 insertions, 53 deletions
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index ed62548..090d736 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -15,10 +15,13 @@ import unittest import uuid +from snaps.config.security_group import ( + SecurityGroupConfig, SecurityGroupRuleConfig, + SecurityGroupRuleConfigError, SecurityGroupConfigError) from snaps.openstack import create_security_group from snaps.openstack.create_security_group import ( SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype, - Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError) + Protocol) from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils @@ -32,64 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**dict()) def test_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(sec_grp_name='foo') def test_config_with_name_only(self): - with self.assertRaises(SecurityGroupRuleSettingsError): + with self.assertRaises(SecurityGroupRuleConfigError): SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'}) def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Direction.ingress.value, settings.direction.value) def test_proto_ah_str(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'ah'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.ah, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) def test_proto_ah_value(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 51}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.ah, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.ah.value, settings.protocol.value) def test_proto_any(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'any'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.null, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_proto_null(self): settings = SecurityGroupRuleSettings( **{'sec_grp_name': 'foo', 'direction': 'ingress', 'protocol': 'null'}) self.assertEqual('foo', settings.sec_grp_name) - self.assertEqual(Direction.ingress, settings.direction) - self.assertEqual(Protocol.null, settings.protocol) + self.assertEqual(Direction.ingress.value, settings.direction.value) + self.assertEqual(Protocol.null.value, settings.protocol.value) def test_all(self): settings = SecurityGroupRuleSettings( @@ -100,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): remote_ip_prefix='prfx') self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.icmp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.icmp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -121,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): 'remote_ip_prefix': 'prfx'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) - self.assertEqual(Direction.egress, settings.direction) + self.assertEqual(Direction.egress.value, settings.direction.value) self.assertEqual('rgi', settings.remote_group_id) - self.assertEqual(Protocol.tcp, settings.protocol) - self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(Protocol.tcp.value, settings.protocol.value) + self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value) self.assertEqual(1, settings.port_range_min) self.assertEqual(2, settings.port_range_max) self.assertEqual('prfx', settings.remote_ip_prefix) @@ -136,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): """ def test_no_params(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings() def test_empty_config(self): - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(**dict()) def test_name_only(self): @@ -155,7 +158,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): rule_setting = SecurityGroupRuleSettings( sec_grp_name='bar', direction=Direction.ingress, description='test_rule_1') - with self.assertRaises(SecurityGroupSettingsError): + with self.assertRaises(SecurityGroupConfigError): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): @@ -189,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.project_name) self.assertEqual(1, len(settings.rule_settings)) self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) - self.assertEqual(Direction.ingress, - settings.rule_settings[0].direction) + self.assertEqual(Direction.ingress.value, + settings.rule_settings[0].direction.value) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -226,8 +229,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) self.sec_grp_creator.create() @@ -254,7 +257,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.admin_os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -283,7 +286,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', project_name=self.os_creds.project_name) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -312,8 +315,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, - description='hello group') + sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name, + description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() @@ -339,10 +342,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -372,12 +375,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -407,21 +410,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -451,10 +454,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -480,7 +483,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( + self.sec_grp_creator.add_rule(SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, direction=Direction.egress, protocol=Protocol.icmp, description='test_rule_2')) @@ -496,21 +499,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -547,21 +550,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.ingress, description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6, description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings( + SecurityGroupRuleConfig( sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20, description='test_rule_3')) - sec_grp_settings = SecurityGroupSettings( + sec_grp_settings = SecurityGroupConfig( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( @@ -615,7 +618,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): this is the only means to tell if the rule is custom or defaulted by OpenStack :param neutron: the neutron client - :param rule_settings: collection of SecurityGroupRuleSettings objects + :param rule_settings: collection of SecurityGroupRuleConfig objects :param rules: a collection of SecurityGroupRule domain objects :return: T/F """ @@ -646,7 +649,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules): proto_str == str(rule_setting.protocol.value) and rule.remote_group_id == rule_setting.remote_group_id and rule.remote_ip_prefix == rule_setting.remote_ip_prefix and - rule.security_group_id == sec_grp.id): + rule.security_group_id == sec_grp.id): match = True break |