summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Pisarski <s.pisarski@cablelabs.com>2017-07-27 14:38:50 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-07-27 14:38:50 +0000
commit80b70bdcc7a73e0050dae87e4d86159f2fd9b785 (patch)
treeb27ba9408a0f89e2222b8918fe578aa91f10970d
parentd72efa76ecdff6a0d575dfe92754dc2990c8b712 (diff)
parentcd6bc05b6e1998993cc646004eae53f2e6c4e4f4 (diff)
Merge "Expand OpenStackSecurityGroup class tests."
-rw-r--r--docs/how-to-use/IntegrationTests.rst5
-rw-r--r--snaps/domain/network.py1
-rw-r--r--snaps/domain/test/network_tests.py10
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py248
-rw-r--r--snaps/openstack/utils/neutron_utils.py14
5 files changed, 216 insertions, 62 deletions
diff --git a/docs/how-to-use/IntegrationTests.rst b/docs/how-to-use/IntegrationTests.rst
index 7414110..4cdd94f 100644
--- a/docs/how-to-use/IntegrationTests.rst
+++ b/docs/how-to-use/IntegrationTests.rst
@@ -27,7 +27,10 @@ create_security_group_tests.py - CreateSecurityGroupTests
| | | some other process |
+---------------------------------------+---------------+-----------------------------------------------------------+
| test_create_group_with_one_simple_rule| Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a |
-| | Neutron 2 | security group with a single rule |
+| | Neutron 2 | security group with a single simple rule |
++---------------------------------------+---------------+-----------------------------------------------------------+
+| test_create_group_with_one_complex | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a |
+| _rule | Neutron 2 | security group with a single complex rule |
+---------------------------------------+---------------+-----------------------------------------------------------+
| test_create_group_with_several_rules | Keysone 2 & 3 | Ensures the OpenStackSecurityGroup class can create a |
| | Neutron 2 | security group with several rules |
diff --git a/snaps/domain/network.py b/snaps/domain/network.py
index 68889f1..2c71db8 100644
--- a/snaps/domain/network.py
+++ b/snaps/domain/network.py
@@ -130,6 +130,7 @@ class SecurityGroup:
"""
self.name = kwargs.get('name')
self.id = kwargs.get('id')
+ self.description = kwargs.get('description')
self.project_id = kwargs.get('project_id', kwargs.get('tenant_id'))
def __eq__(self, other):
diff --git a/snaps/domain/test/network_tests.py b/snaps/domain/test/network_tests.py
index 592090b..4fd20d4 100644
--- a/snaps/domain/test/network_tests.py
+++ b/snaps/domain/test/network_tests.py
@@ -133,10 +133,11 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
def test_construction_proj_id_kwargs(self):
sec_grp = SecurityGroup(
- **{'name': 'name', 'id': 'id',
- 'project_id': 'foo'})
+ **{'name': 'name', 'id': 'id', 'project_id': 'foo',
+ 'description': 'test desc'})
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
+ self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
def test_construction_tenant_id_kwargs(self):
@@ -146,11 +147,14 @@ class SecurityGroupDomainObjectTests(unittest.TestCase):
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
self.assertEqual('foo', sec_grp.project_id)
+ self.assertIsNone(sec_grp.description)
def test_construction_named(self):
- sec_grp = SecurityGroup(tenant_id='foo', id='id', name='name')
+ sec_grp = SecurityGroup(description='test desc', tenant_id='foo',
+ id='id', name='name')
self.assertEqual('name', sec_grp.name)
self.assertEqual('id', sec_grp.id)
+ self.assertEqual('test desc', sec_grp.description)
self.assertEqual('foo', sec_grp.project_id)
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 7cae62b..a0392ea 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -120,17 +120,20 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
- direction=Direction.ingress)
+ rule_setting = SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
with self.assertRaises(SecurityGroupSettingsError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.egress))
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
rule_settings.append(SecurityGroupRuleSettings(
- sec_grp_name='bar', direction=Direction.ingress))
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo',
rule_settings=rule_settings)
@@ -209,6 +212,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -233,6 +241,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -257,6 +270,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
@@ -269,6 +287,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group()))
+
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
self.neutron, self.sec_grp_creator.sec_grp_settings.name))
@@ -283,8 +306,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -302,6 +326,46 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
+ def test_create_group_with_one_complex_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_1'))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_create_group_with_several_rules(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -310,20 +374,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -341,6 +405,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
def test_add_rule(self):
"""
Tests the creation of an OpenStack Security Group with one simple
@@ -349,8 +418,9 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -362,6 +432,15 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
+
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
@@ -370,7 +449,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
+ direction=Direction.egress, protocol=Protocol.icmp,
+ description='test_rule_2'))
rules2 = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) + 1, len(rules2))
@@ -383,20 +463,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -414,6 +494,11 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(
rule_id=rules[0].id)
rules_after_del = neutron_utils.get_rules_by_security_group(
@@ -429,20 +514,20 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
+ description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv6,
+ description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
+ SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_name, direction=Direction.egress,
+ protocol=Protocol.udp, ethertype=Ethertype.IPv4,
+ port_range_min=10, port_range_max=20,
+ description='test_rule_3'))
sec_grp_settings = SecurityGroupSettings(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
@@ -454,17 +539,86 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
self.sec_grp_name)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
+
rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
+ self.assertTrue(
+ validate_sec_grp(
+ self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.sec_grp_creator.get_security_group(), rules))
+
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(
self.neutron,
self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
-# TODO - Add more tests with different rules. Rule creation parameters can be
-# somewhat complex
+
+def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+ """
+ Returns True is the settings on a security group are properly contained
+ on the SNAPS SecurityGroup domain object
+ :param neutron: the neutron client
+ :param sec_grp_settings: the security group configuration
+ :param sec_grp: the SNAPS-OO security group object
+ :param rules: collection of SNAPS-OO security group rule objects
+ :return: T/F
+ """
+ return (sec_grp.description == sec_grp_settings.description and
+ sec_grp.name == sec_grp_settings.name and
+ validate_sec_grp_rules(
+ neutron, sec_grp_settings.rule_settings, rules))
+
+
+def validate_sec_grp_rules(neutron, rule_settings, rules):
+ """
+ Returns True is the settings on a security group rule are properly
+ contained on the SNAPS SecurityGroupRule domain object.
+ This function will only operate on rules that contain a description as
+ this is the only means to tell if the rule is custom or defaulted by
+ OpenStack
+ :param neutron: the neutron client
+ :param rule_settings: collection of SecurityGroupRuleSettings objects
+ :param rules: a collection of SecurityGroupRule domain objects
+ :return: T/F
+ """
+
+ for rule_setting in rule_settings:
+ if rule_setting.description:
+ match = False
+ for rule in rules:
+ if rule_setting.protocol == Protocol.null:
+ setting_proto = None
+ else:
+ setting_proto = rule_setting.protocol.name
+
+ sec_grp = neutron_utils.get_security_group(
+ neutron, rule_setting.sec_grp_name)
+
+ setting_eth_type = create_security_group.Ethertype.IPv4
+ if rule_setting.ethertype:
+ setting_eth_type = rule_setting.ethertype
+
+ if not sec_grp:
+ return False
+
+ if (rule.description == rule_setting.description and
+ rule.direction == rule_setting.direction.name and
+ rule.ethertype == setting_eth_type.name and
+ rule.port_range_max == rule_setting.port_range_max and
+ rule.port_range_min == rule_setting.port_range_min and
+ rule.protocol == setting_proto and
+ rule.remote_group_id == rule_setting.remote_group_id and
+ rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
+ rule.security_group_id == sec_grp.id):
+ match = True
+ break
+
+ if not match:
+ return False
+
+ return True
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index bf8cb08..2f8ef7e 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -331,11 +331,7 @@ def create_security_group(neutron, keystone, sec_grp_settings):
sec_grp_settings.name)
os_group = neutron.create_security_group(
sec_grp_settings.dict_for_neutron(keystone))
- return SecurityGroup(
- id=os_group['security_group']['id'],
- name=os_group['security_group']['name'],
- project_id=os_group['security_group'].get(
- 'project_id', os_group['security_group'].get('tenant_id')))
+ return SecurityGroup(**os_group['security_group'])
def delete_security_group(neutron, sec_grp):
@@ -360,9 +356,7 @@ def get_security_group(neutron, name):
groups = neutron.list_security_groups(**{'name': name})
for group in groups['security_groups']:
if group['name'] == name:
- return SecurityGroup(
- id=group['id'], name=group['name'],
- project_id=group.get('project_id', group.get('tenant_id')))
+ return SecurityGroup(**group)
return None
@@ -378,9 +372,7 @@ def get_security_group_by_id(neutron, sec_grp_id):
groups = neutron.list_security_groups(**{'id': sec_grp_id})
for group in groups['security_groups']:
if group['id'] == sec_grp_id:
- return SecurityGroup(
- id=group['id'], name=group['name'],
- project_id=group.get('project_id', group.get('tenant_id')))
+ return SecurityGroup(**group)
return None