summaryrefslogtreecommitdiffstats
path: root/snaps/openstack/tests/create_security_group_tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/openstack/tests/create_security_group_tests.py')
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py355
1 files changed, 355 insertions, 0 deletions
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
new file mode 100644
index 0000000..079be0c
--- /dev/null
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -0,0 +1,355 @@
+# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import uuid
+import unittest
+
+from snaps.openstack import create_security_group
+from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \
+ Ethertype, Protocol
+from snaps.openstack.tests import validation_utils
+from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
+from snaps.openstack.utils import neutron_utils
+
+__author__ = 'spisarski'
+
+
+class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupRuleSettings class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(Exception):
+ SecurityGroupRuleSettings()
+
+ def test_empty_config(self):
+ with self.assertRaises(Exception):
+ SecurityGroupRuleSettings(config=dict())
+
+ def test_name_only(self):
+ with self.assertRaises(Exception):
+ SecurityGroupRuleSettings(sec_grp_name='foo')
+
+ def test_config_with_name_only(self):
+ with self.assertRaises(Exception):
+ SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'})
+
+ def test_name_and_direction(self):
+ settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
+ self.assertEquals('foo', settings.sec_grp_name)
+ self.assertEquals(Direction.ingress, settings.direction)
+
+ def test_config_name_and_direction(self):
+ settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
+ self.assertEquals('foo', settings.sec_grp_name)
+ self.assertEquals(Direction.ingress, settings.direction)
+
+ def test_all(self):
+ settings = SecurityGroupRuleSettings(
+ sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
+ protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
+ remote_ip_prefix='prfx')
+ self.assertEquals('foo', settings.sec_grp_name)
+ self.assertEquals('fubar', settings.description)
+ self.assertEquals(Direction.egress, settings.direction)
+ self.assertEquals('rgi', settings.remote_group_id)
+ self.assertEquals(Protocol.icmp, settings.protocol)
+ self.assertEquals(Ethertype.IPv6, settings.ethertype)
+ self.assertEquals(1, settings.port_range_min)
+ self.assertEquals(2, settings.port_range_max)
+ self.assertEquals('prfx', settings.remote_ip_prefix)
+
+ def test_config_all(self):
+ settings = SecurityGroupRuleSettings(
+ config={'sec_grp_name': 'foo',
+ 'description': 'fubar',
+ 'direction': 'egress',
+ 'remote_group_id': 'rgi',
+ 'protocol': 'tcp',
+ 'ethertype': 'IPv6',
+ 'port_range_min': 1,
+ 'port_range_max': 2,
+ 'remote_ip_prefix': 'prfx'})
+ self.assertEquals('foo', settings.sec_grp_name)
+ self.assertEquals('fubar', settings.description)
+ self.assertEquals(Direction.egress, settings.direction)
+ self.assertEquals('rgi', settings.remote_group_id)
+ self.assertEquals(Protocol.tcp, settings.protocol)
+ self.assertEquals(Ethertype.IPv6, settings.ethertype)
+ self.assertEquals(1, settings.port_range_min)
+ self.assertEquals(2, settings.port_range_max)
+ self.assertEquals('prfx', settings.remote_ip_prefix)
+
+
+class SecurityGroupSettingsUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupSettings class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(Exception):
+ SecurityGroupSettings()
+
+ def test_empty_config(self):
+ with self.assertRaises(Exception):
+ SecurityGroupSettings(config=dict())
+
+ def test_name_only(self):
+ settings = SecurityGroupSettings(name='foo')
+ self.assertEquals('foo', settings.name)
+
+ def test_config_with_name_only(self):
+ settings = SecurityGroupSettings(config={'name': 'foo'})
+ self.assertEquals('foo', settings.name)
+
+ def test_invalid_rule(self):
+ rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
+ with self.assertRaises(Exception):
+ SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
+
+ def test_all(self):
+ rule_settings = list()
+ rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress))
+ rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress))
+ settings = SecurityGroupSettings(
+ name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
+
+ self.assertEquals('bar', settings.name)
+ self.assertEquals('fubar', settings.description)
+ self.assertEquals('foo', settings.project_name)
+ self.assertEquals(rule_settings[0], settings.rule_settings[0])
+ self.assertEquals(rule_settings[1], settings.rule_settings[1])
+
+ def test_config_all(self):
+ settings = SecurityGroupSettings(
+ config={'name': 'bar',
+ 'description': 'fubar',
+ 'project_name': 'foo',
+ 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
+
+ self.assertEquals('bar', settings.name)
+ self.assertEquals('fubar', settings.description)
+ self.assertEquals('foo', settings.project_name)
+ self.assertEquals(1, len(settings.rule_settings))
+ self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
+
+
+class CreateSecurityGroupTests(OSIntegrationTestCase):
+ """
+ Test for the CreateSecurityGroup class defined in create_security_group.py
+ """
+
+ def setUp(self):
+ """
+ Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file
+ within OpenStack
+ """
+ super(self.__class__, self).__start__()
+
+ guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
+ self.sec_grp_name = guid + 'name'
+ self.neutron = neutron_utils.neutron_client(self.os_creds)
+
+ # Initialize for cleanup
+ self.sec_grp_creator = None
+
+ def tearDown(self):
+ """
+ Cleans the image and downloaded image file
+ """
+ if self.sec_grp_creator:
+ self.sec_grp_creator.clean()
+
+ super(self.__class__, self).__clean__()
+
+ def test_create_group_without_rules(self):
+ """
+ Tests the creation of an OpenStack Security Group without custom rules.
+ """
+ # Create Image
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ self.assertIsNotNone(sec_grp)
+
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ def test_create_delete_group(self):
+ """
+ Tests the creation of an OpenStack Security Group without custom rules.
+ """
+ # Create Image
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ created_sec_grp = self.sec_grp_creator.create()
+ self.assertIsNotNone(created_sec_grp)
+
+ neutron_utils.delete_security_group(self.neutron, created_sec_grp)
+ self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name))
+
+ self.sec_grp_creator.clean()
+
+ def test_create_group_with_one_simple_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ def test_create_group_with_several_rules(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple custom rule.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ def test_add_rule(self):
+ """
+ Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
+ direction=Direction.egress, protocol=Protocol.icmp))
+ rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(rules) + 1, len(rules2))
+
+ def test_remove_rule_by_id(self):
+ """
+ Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID.
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
+ rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(rules) - 1, len(rules_after_del))
+
+ def test_remove_rule_by_setting(self):
+ """
+ Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule
+ setting object
+ """
+ # Create Image
+ sec_grp_rule_settings = list()
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ self.sec_grp_creator.create()
+
+ sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+
+ self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
+ rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
+ self.sec_grp_creator.get_security_group())
+ self.assertEquals(len(rules) - 1, len(rules_after_del))
+
+# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex