From cd6bc05b6e1998993cc646004eae53f2e6c4e4f4 Mon Sep 17 00:00:00 2001 From: spisarski Date: Wed, 26 Jul 2017 15:12:16 -0600 Subject: Expand OpenStackSecurityGroup class tests. Improved validation and added another test case. JIRA: SNAPS-148 Change-Id: I249ac0fcce502dd91c82cc2bfb54ca22da5e33bb Signed-off-by: spisarski --- docs/how-to-use/IntegrationTests.rst | 7 +- snaps/domain/network.py | 1 + snaps/domain/test/network_tests.py | 10 +- .../openstack/tests/create_security_group_tests.py | 248 +++++++++++++++++---- snaps/openstack/utils/neutron_utils.py | 14 +- 5 files changed, 217 insertions(+), 63 deletions(-) diff --git a/docs/how-to-use/IntegrationTests.rst b/docs/how-to-use/IntegrationTests.rst index 70e51b9..92a5301 100644 --- a/docs/how-to-use/IntegrationTests.rst +++ b/docs/how-to-use/IntegrationTests.rst @@ -27,7 +27,10 @@ create_security_group_tests.py - CreateSecurityGroupTests | | | some other process | +---------------------------------------+---------------+-----------------------------------------------------------+ | test_create_group_with_one_simple_rule| Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a | -| | Neutron 2 | security group with a single rule | +| | Neutron 2 | security group with a single simple rule | ++---------------------------------------+---------------+-----------------------------------------------------------+ +| test_create_group_with_one_complex | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a | +| _rule | Neutron 2 | security group with a single complex rule | +---------------------------------------+---------------+-----------------------------------------------------------+ | test_create_group_with_several_rules | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a | | | Neutron 2 | security group with several rules | @@ -243,7 +246,7 @@ create_stack_tests.py - CreateStackSuccessTests +---------------------------------------+---------------+-----------------------------------------------------------+ create_stack_tests.py - CreateStackNegativeTests --------------------------------------------------- +------------------------------------------------ +----------------------------------------+---------------+-----------------------------------------------------------+ | Test Name | Neutron API | Description | diff --git a/snaps/domain/network.py b/snaps/domain/network.py index 68889f1..2c71db8 100644 --- a/snaps/domain/network.py +++ b/snaps/domain/network.py @@ -130,6 +130,7 @@ class SecurityGroup: """ self.name = kwargs.get('name') self.id = kwargs.get('id') + self.description = kwargs.get('description') self.project_id = kwargs.get('project_id', kwargs.get('tenant_id')) def __eq__(self, other): diff --git a/snaps/domain/test/network_tests.py b/snaps/domain/test/network_tests.py index 592090b..4fd20d4 100644 --- a/snaps/domain/test/network_tests.py +++ b/snaps/domain/test/network_tests.py @@ -133,10 +133,11 @@ class SecurityGroupDomainObjectTests(unittest.TestCase): def test_construction_proj_id_kwargs(self): sec_grp = SecurityGroup( - **{'name': 'name', 'id': 'id', - 'project_id': 'foo'}) + **{'name': 'name', 'id': 'id', 'project_id': 'foo', + 'description': 'test desc'}) self.assertEqual('name', sec_grp.name) self.assertEqual('id', sec_grp.id) + self.assertEqual('test desc', sec_grp.description) self.assertEqual('foo', sec_grp.project_id) def test_construction_tenant_id_kwargs(self): @@ -146,11 +147,14 @@ class SecurityGroupDomainObjectTests(unittest.TestCase): self.assertEqual('name', sec_grp.name) self.assertEqual('id', sec_grp.id) self.assertEqual('foo', sec_grp.project_id) + self.assertIsNone(sec_grp.description) def test_construction_named(self): - sec_grp = SecurityGroup(tenant_id='foo', id='id', name='name') + sec_grp = SecurityGroup(description='test desc', tenant_id='foo', + id='id', name='name') self.assertEqual('name', sec_grp.name) self.assertEqual('id', sec_grp.id) + self.assertEqual('test desc', sec_grp.description) self.assertEqual('foo', sec_grp.project_id) diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index 7cae62b..a0392ea 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -120,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): self.assertEqual('foo', settings.name) def test_invalid_rule(self): - rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', - direction=Direction.ingress) + rule_setting = SecurityGroupRuleSettings( + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_1') with self.assertRaises(SecurityGroupSettingsError): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): rule_settings = list() rule_settings.append(SecurityGroupRuleSettings( - sec_grp_name='bar', direction=Direction.egress)) + sec_grp_name='bar', direction=Direction.egress, + description='test_rule_1')) rule_settings.append(SecurityGroupRuleSettings( - sec_grp_name='bar', direction=Direction.ingress)) + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_2')) settings = SecurityGroupSettings( name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) @@ -209,6 +212,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group())) + def test_create_group_admin_user_to_new_project(self): """ Tests the creation of an OpenStack Security Group without custom rules. @@ -233,6 +241,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_create_group_new_user_to_admin_project(self): """ Tests the creation of an OpenStack Security Group without custom rules. @@ -257,6 +270,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_create_delete_group(self): """ Tests the creation of an OpenStack Security Group without custom rules. @@ -269,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): created_sec_grp = self.sec_grp_creator.create() self.assertIsNotNone(created_sec_grp) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group())) + neutron_utils.delete_security_group(self.neutron, created_sec_grp) self.assertIsNone(neutron_utils.get_security_group( self.neutron, self.sec_grp_creator.sec_grp_settings.name)) @@ -283,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_settings = SecurityGroupSettings( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) @@ -302,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + + def test_create_group_with_one_complex_rule(self): + """ + Tests the creation of an OpenStack Security Group with one simple + custom rule. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append( + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_1')) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_create_group_with_several_rules(self): """ Tests the creation of an OpenStack Security Group with one simple @@ -310,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) sec_grp_settings = SecurityGroupSettings( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) @@ -341,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + def test_add_rule(self): """ Tests the creation of an OpenStack Security Group with one simple @@ -349,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_settings = SecurityGroupSettings( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) @@ -362,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_name) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) + + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) + + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + rules = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) @@ -370,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, - direction=Direction.egress, protocol=Protocol.icmp)) + direction=Direction.egress, protocol=Protocol.icmp, + description='test_rule_2')) rules2 = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) + 1, len(rules2)) @@ -383,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) sec_grp_settings = SecurityGroupSettings( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) @@ -414,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + self.sec_grp_creator.remove_rule( rule_id=rules[0].id) rules_after_del = neutron_utils.get_rules_by_security_group( @@ -429,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.ingress, + description='test_rule_1')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv6, + description='test_rule_2')) sec_grp_rule_settings.append( - SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) + SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_name, direction=Direction.egress, + protocol=Protocol.udp, ethertype=Ethertype.IPv4, + port_range_min=10, port_range_max=20, + description='test_rule_3')) sec_grp_settings = SecurityGroupSettings( name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) @@ -454,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): self.sec_grp_name) validation_utils.objects_equivalent( self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + self.assertTrue( + validate_sec_grp( + self.neutron, self.sec_grp_creator.sec_grp_settings, + self.sec_grp_creator.get_security_group(), rules)) + self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) rules_after_del = neutron_utils.get_rules_by_security_group( self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) - 1, len(rules_after_del)) -# TODO - Add more tests with different rules. Rule creation parameters can be -# somewhat complex + +def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()): + """ + Returns True is the settings on a security group are properly contained + on the SNAPS SecurityGroup domain object + :param neutron: the neutron client + :param sec_grp_settings: the security group configuration + :param sec_grp: the SNAPS-OO security group object + :param rules: collection of SNAPS-OO security group rule objects + :return: T/F + """ + return (sec_grp.description == sec_grp_settings.description and + sec_grp.name == sec_grp_settings.name and + validate_sec_grp_rules( + neutron, sec_grp_settings.rule_settings, rules)) + + +def validate_sec_grp_rules(neutron, rule_settings, rules): + """ + Returns True is the settings on a security group rule are properly + contained on the SNAPS SecurityGroupRule domain object. + This function will only operate on rules that contain a description as + this is the only means to tell if the rule is custom or defaulted by + OpenStack + :param neutron: the neutron client + :param rule_settings: collection of SecurityGroupRuleSettings objects + :param rules: a collection of SecurityGroupRule domain objects + :return: T/F + """ + + for rule_setting in rule_settings: + if rule_setting.description: + match = False + for rule in rules: + if rule_setting.protocol == Protocol.null: + setting_proto = None + else: + setting_proto = rule_setting.protocol.name + + sec_grp = neutron_utils.get_security_group( + neutron, rule_setting.sec_grp_name) + + setting_eth_type = create_security_group.Ethertype.IPv4 + if rule_setting.ethertype: + setting_eth_type = rule_setting.ethertype + + if not sec_grp: + return False + + if (rule.description == rule_setting.description and + rule.direction == rule_setting.direction.name and + rule.ethertype == setting_eth_type.name and + rule.port_range_max == rule_setting.port_range_max and + rule.port_range_min == rule_setting.port_range_min and + rule.protocol == setting_proto and + rule.remote_group_id == rule_setting.remote_group_id and + rule.remote_ip_prefix == rule_setting.remote_ip_prefix and + rule.security_group_id == sec_grp.id): + match = True + break + + if not match: + return False + + return True diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py index bf8cb08..2f8ef7e 100644 --- a/snaps/openstack/utils/neutron_utils.py +++ b/snaps/openstack/utils/neutron_utils.py @@ -331,11 +331,7 @@ def create_security_group(neutron, keystone, sec_grp_settings): sec_grp_settings.name) os_group = neutron.create_security_group( sec_grp_settings.dict_for_neutron(keystone)) - return SecurityGroup( - id=os_group['security_group']['id'], - name=os_group['security_group']['name'], - project_id=os_group['security_group'].get( - 'project_id', os_group['security_group'].get('tenant_id'))) + return SecurityGroup(**os_group['security_group']) def delete_security_group(neutron, sec_grp): @@ -360,9 +356,7 @@ def get_security_group(neutron, name): groups = neutron.list_security_groups(**{'name': name}) for group in groups['security_groups']: if group['name'] == name: - return SecurityGroup( - id=group['id'], name=group['name'], - project_id=group.get('project_id', group.get('tenant_id'))) + return SecurityGroup(**group) return None @@ -378,9 +372,7 @@ def get_security_group_by_id(neutron, sec_grp_id): groups = neutron.list_security_groups(**{'id': sec_grp_id}) for group in groups['security_groups']: if group['id'] == sec_grp_id: - return SecurityGroup( - id=group['id'], name=group['name'], - project_id=group.get('project_id', group.get('tenant_id'))) + return SecurityGroup(**group) return None -- cgit 1.2.3-korg