summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/tests/create_security_group_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/openstack/tests/create_security_group_tests.py')
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py248
1 files changed, 201 insertions, 47 deletions
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 7cae62b..a0392ea 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -120,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
- direction=Direction.ingress)
+ rule_setting = SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.egress))
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.ingress))
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo',
rule_settings=rule_settings)
@@ -209,6 +212,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -233,6 +241,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -257,6 +270,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -269,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
self.neutron, self.sec_grp_creator.sec_grp_settings.name))
@@ -283,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -302,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_with_one_complex_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_with_several_rules(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -310,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -341,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_add_rule(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -349,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -362,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
@@ -370,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
+ direction=Direction.egress, protocol=Protocol.icmp,
+ description='test_rule_2'))
rules2 = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) + 1, len(rules2))
@@ -383,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -414,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(
rule_id=rules[0].id)
rules_after_del = neutron_utils.get_rules_by_security_group(
@@ -429,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -454,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(
self.neutron,
self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+ """
+ Returns True is the settings on a security group are properly contained
+ on the SNAPS SecurityGroup domain object
+ :param neutron: the neutron client
+ :param sec_grp_settings: the security group configuration
+ :param sec_grp: the SNAPS-OO security group object
+ :param rules: collection of SNAPS-OO security group rule objects
+ :return: T/F
+ """
+ return (sec_grp.description == sec_grp_settings.description and
+ sec_grp.name == sec_grp_settings.name and
+ validate_sec_grp_rules(
+ neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+ """
+ Returns True is the settings on a security group rule are properly
+ contained on the SNAPS SecurityGroupRule domain object.
+ This function will only operate on rules that contain a description as
+ this is the only means to tell if the rule is custom or defaulted by
+ OpenStack
+ :param neutron: the neutron client
+ :param rule_settings: collection of SecurityGroupRuleSettings objects
+ :param rules: a collection of SecurityGroupRule domain objects
+ :return: T/F
+ """
+
+ for rule_setting in rule_settings:
+ if rule_setting.description:
+ match = False
+ for rule in rules:
+ if rule_setting.protocol == Protocol.null:
+ setting_proto = None
+ else:
+ setting_proto = rule_setting.protocol.name
+
+ sec_grp = neutron_utils.get_security_group(
+ neutron, rule_setting.sec_grp_name)
+
+ setting_eth_type = create_security_group.Ethertype.IPv4
+ if rule_setting.ethertype:
+ setting_eth_type = rule_setting.ethertype
+
+ if not sec_grp:
+ return False
+
+ if (rule.description == rule_setting.description and
+ rule.direction == rule_setting.direction.name and
+ rule.ethertype == setting_eth_type.name and
+ rule.port_range_max == rule_setting.port_range_max and
+ rule.port_range_min == rule_setting.port_range_min and
+ rule.protocol == setting_proto and
+ rule.remote_group_id == rule_setting.remote_group_id and
+ rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+ rule.security_group_id == sec_grp.id):
+ match = True
+ break
+
+ if not match:
+ return False
+
+ return True