From 57777f3df521553a06cd01a3861b415d2905ceca Mon Sep 17 00:00:00 2001 From: spisarski Date: Wed, 15 Feb 2017 09:13:54 -0700 Subject: Initial patch with all code from CableLabs repository. Change-Id: I70a2778718c5e7f21fd14e4ad28c9269d3761cc7 Signed-off-by: spisarski --- .../openstack/tests/create_security_group_tests.py | 355 +++++++++++++++++++++ 1 file changed, 355 insertions(+) create mode 100644 snaps/openstack/tests/create_security_group_tests.py (limited to 'snaps/openstack/tests/create_security_group_tests.py') diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py new file mode 100644 index 0000000..079be0c --- /dev/null +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -0,0 +1,355 @@ +# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import uuid +import unittest + +from snaps.openstack import create_security_group +from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \ + Ethertype, Protocol +from snaps.openstack.tests import validation_utils +from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase +from snaps.openstack.utils import neutron_utils + +__author__ = 'spisarski' + + +class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): + """ + Tests the construction of the SecurityGroupRuleSettings class + """ + + def test_no_params(self): + with self.assertRaises(Exception): + SecurityGroupRuleSettings() + + def test_empty_config(self): + with self.assertRaises(Exception): + SecurityGroupRuleSettings(config=dict()) + + def test_name_only(self): + with self.assertRaises(Exception): + SecurityGroupRuleSettings(sec_grp_name='foo') + + def test_config_with_name_only(self): + with self.assertRaises(Exception): + SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'}) + + def test_name_and_direction(self): + settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) + self.assertEquals('foo', settings.sec_grp_name) + self.assertEquals(Direction.ingress, settings.direction) + + def test_config_name_and_direction(self): + settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'}) + self.assertEquals('foo', settings.sec_grp_name) + self.assertEquals(Direction.ingress, settings.direction) + + def test_all(self): + settings = SecurityGroupRuleSettings( + sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi', + protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2, + remote_ip_prefix='prfx') + self.assertEquals('foo', settings.sec_grp_name) + self.assertEquals('fubar', settings.description) + self.assertEquals(Direction.egress, settings.direction) + self.assertEquals('rgi', settings.remote_group_id) + self.assertEquals(Protocol.icmp, settings.protocol) + self.assertEquals(Ethertype.IPv6, settings.ethertype) + self.assertEquals(1, settings.port_range_min) + self.assertEquals(2, settings.port_range_max) + self.assertEquals('prfx', settings.remote_ip_prefix) + + def test_config_all(self): + settings = SecurityGroupRuleSettings( + config={'sec_grp_name': 'foo', + 'description': 'fubar', + 'direction': 'egress', + 'remote_group_id': 'rgi', + 'protocol': 'tcp', + 'ethertype': 'IPv6', + 'port_range_min': 1, + 'port_range_max': 2, + 'remote_ip_prefix': 'prfx'}) + self.assertEquals('foo', settings.sec_grp_name) + self.assertEquals('fubar', settings.description) + self.assertEquals(Direction.egress, settings.direction) + self.assertEquals('rgi', settings.remote_group_id) + self.assertEquals(Protocol.tcp, settings.protocol) + self.assertEquals(Ethertype.IPv6, settings.ethertype) + self.assertEquals(1, settings.port_range_min) + self.assertEquals(2, settings.port_range_max) + self.assertEquals('prfx', settings.remote_ip_prefix) + + +class SecurityGroupSettingsUnitTests(unittest.TestCase): + """ + Tests the construction of the SecurityGroupSettings class + """ + + def test_no_params(self): + with self.assertRaises(Exception): + SecurityGroupSettings() + + def test_empty_config(self): + with self.assertRaises(Exception): + SecurityGroupSettings(config=dict()) + + def test_name_only(self): + settings = SecurityGroupSettings(name='foo') + self.assertEquals('foo', settings.name) + + def test_config_with_name_only(self): + settings = SecurityGroupSettings(config={'name': 'foo'}) + self.assertEquals('foo', settings.name) + + def test_invalid_rule(self): + rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress) + with self.assertRaises(Exception): + SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) + + def test_all(self): + rule_settings = list() + rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress)) + rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)) + settings = SecurityGroupSettings( + name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) + + self.assertEquals('bar', settings.name) + self.assertEquals('fubar', settings.description) + self.assertEquals('foo', settings.project_name) + self.assertEquals(rule_settings[0], settings.rule_settings[0]) + self.assertEquals(rule_settings[1], settings.rule_settings[1]) + + def test_config_all(self): + settings = SecurityGroupSettings( + config={'name': 'bar', + 'description': 'fubar', + 'project_name': 'foo', + 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]}) + + self.assertEquals('bar', settings.name) + self.assertEquals('fubar', settings.description) + self.assertEquals('foo', settings.project_name) + self.assertEquals(1, len(settings.rule_settings)) + self.assertEquals('bar', settings.rule_settings[0].sec_grp_name) + self.assertEquals(Direction.ingress, settings.rule_settings[0].direction) + + +class CreateSecurityGroupTests(OSIntegrationTestCase): + """ + Test for the CreateSecurityGroup class defined in create_security_group.py + """ + + def setUp(self): + """ + Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file + within OpenStack + """ + super(self.__class__, self).__start__() + + guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) + self.sec_grp_name = guid + 'name' + self.neutron = neutron_utils.neutron_client(self.os_creds) + + # Initialize for cleanup + self.sec_grp_creator = None + + def tearDown(self): + """ + Cleans the image and downloaded image file + """ + if self.sec_grp_creator: + self.sec_grp_creator.clean() + + super(self.__class__, self).__clean__() + + def test_create_group_without_rules(self): + """ + Tests the creation of an OpenStack Security Group without custom rules. + """ + # Create Image + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + self.assertIsNotNone(sec_grp) + + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + def test_create_delete_group(self): + """ + Tests the creation of an OpenStack Security Group without custom rules. + """ + # Create Image + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + created_sec_grp = self.sec_grp_creator.create() + self.assertIsNotNone(created_sec_grp) + + neutron_utils.delete_security_group(self.neutron, created_sec_grp) + self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name)) + + self.sec_grp_creator.clean() + + def test_create_group_with_one_simple_rule(self): + """ + Tests the creation of an OpenStack Security Group with one simple custom rule. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + def test_create_group_with_several_rules(self): + """ + Tests the creation of an OpenStack Security Group with one simple custom rule. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + def test_add_rule(self): + """ + Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, + direction=Direction.egress, protocol=Protocol.icmp)) + rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + self.assertEquals(len(rules) + 1, len(rules2)) + + def test_remove_rule_by_id(self): + """ + Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID. + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id']) + rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(rules) - 1, len(rules_after_del)) + + def test_remove_rule_by_setting(self): + """ + Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule + setting object + """ + # Create Image + sec_grp_rule_settings = list() + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + self.sec_grp_creator.create() + + sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + + self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) + rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, + self.sec_grp_creator.get_security_group()) + self.assertEquals(len(rules) - 1, len(rules_after_del)) + +# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex -- cgit 1.2.3-korg