# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import uuid import unittest from snaps.openstack import create_security_group from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \ Ethertype, Protocol from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): """ Tests the construction of the SecurityGroupRuleSettings class """ def test_no_params(self): with self.assertRaises(Exception): SecurityGroupRuleSettings() def test_empty_config(self): with self.assertRaises(Exception): SecurityGroupRuleSettings(config=dict()) def test_name_only(self): with self.assertRaises(Exception): SecurityGroupRuleSettings(sec_grp_name='foo') def test_config_with_name_only(self): with self.assertRaises(Exception): SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'}) def test_name_and_direction(self): settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) self.assertEquals('foo', settings.sec_grp_name) self.assertEquals(Direction.ingress, settings.direction) def test_config_name_and_direction(self): settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEquals('foo', settings.sec_grp_name) self.assertEquals(Direction.ingress, settings.direction) def test_all(self): settings = SecurityGroupRuleSettings( sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi', protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2, remote_ip_prefix='prfx') self.assertEquals('foo', settings.sec_grp_name) self.assertEquals('fubar', settings.description) self.assertEquals(Direction.egress, settings.direction) self.assertEquals('rgi', settings.remote_group_id) self.assertEquals(Protocol.icmp, settings.protocol) self.assertEquals(Ethertype.IPv6, settings.ethertype) self.assertEquals(1, settings.port_range_min) self.assertEquals(2, settings.port_range_max) self.assertEquals('prfx', settings.remote_ip_prefix) def test_config_all(self): settings = SecurityGroupRuleSettings( config={'sec_grp_name': 'foo', 'description': 'fubar', 'direction': 'egress', 'remote_group_id': 'rgi', 'protocol': 'tcp', 'ethertype': 'IPv6', 'port_range_min': 1, 'port_range_max': 2, 'remote_ip_prefix': 'prfx'}) self.assertEquals('foo', settings.sec_grp_name) self.assertEquals('fubar', settings.description) self.assertEquals(Direction.egress, settings.direction) self.assertEquals('rgi', settings.remote_group_id) self.assertEquals(Protocol.tcp, settings.protocol) self.assertEquals(Ethertype.IPv6, settings.ethertype) self.assertEquals(1, settings.port_range_min) self.assertEquals(2, settings.port_range_max) self.assertEquals('prfx', settings.remote_ip_prefix) class SecurityGroupSettingsUnitTests(unittest.TestCase): """ Tests the construction of the SecurityGroupSettings class """ def test_no_params(self): with self.assertRaises(Exception): SecurityGroupSettings() def test_empty_config(self): with self.assertRaises(Exception): SecurityGroupSettings(config=dict()) def test_name_only(self): settings = SecurityGroupSettings(name='foo') self.assertEquals('foo', settings.name) def test_config_with_name_only(self): settings = SecurityGroupSettings(config={'name': 'foo'}) self.assertEquals('foo', settings.name) def test_invalid_rule(self): rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress) with self.assertRaises(Exception): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): rule_settings = list() rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress)) rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)) settings = SecurityGroupSettings( name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) self.assertEquals('bar', settings.name) self.assertEquals('fubar', settings.description) self.assertEquals('foo', settings.project_name) self.assertEquals(rule_settings[0], settings.rule_settings[0]) self.assertEquals(rule_settings[1], settings.rule_settings[1]) def test_config_all(self): settings = SecurityGroupSettings( config={'name': 'bar', 'description': 'fubar', 'project_name': 'foo', 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]}) self.assertEquals('bar', settings.name) self.assertEquals('fubar', settings.description) self.assertEquals('foo', settings.project_name) self.assertEquals(1, len(settings.rule_settings)) self.assertEquals('bar', settings.rule_settings[0].sec_grp_name) self.assertEquals(Direction.ingress, settings.rule_settings[0].direction) class CreateSecurityGroupTests(OSIntegrationTestCase): """ Test for the CreateSecurityGroup class defined in create_security_group.py """ def setUp(self): """ Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file within OpenStack """ super(self.__class__, self).__start__() guid = self.__class__.__name__ + '-' + str(uuid.uuid4()) self.sec_grp_name = guid + 'name' self.neutron = neutron_utils.neutron_client(self.os_creds) # Initialize for cleanup self.sec_grp_creator = None def tearDown(self): """ Cleans the image and downloaded image file """ if self.sec_grp_creator: self.sec_grp_creator.clean() super(self.__class__, self).__clean__() def test_create_group_without_rules(self): """ Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) self.assertIsNotNone(sec_grp) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_create_delete_group(self): """ Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() self.assertIsNotNone(created_sec_grp) neutron_utils.delete_security_group(self.neutron, created_sec_grp) self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name)) self.sec_grp_creator.clean() def test_create_group_with_one_simple_rule(self): """ Tests the creation of an OpenStack Security Group with one simple custom rule. """ # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)) sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_create_group_with_several_rules(self): """ Tests the creation of an OpenStack Security Group with one simple custom rule. """ # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20)) sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) def test_add_rule(self): """ Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation. """ # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)) sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, direction=Direction.egress, protocol=Protocol.icmp)) rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(rules) + 1, len(rules2)) def test_remove_rule_by_id(self): """ Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID. """ # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20)) sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id']) rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(rules) - 1, len(rules_after_del)) def test_remove_rule_by_setting(self): """ Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule setting object """ # Create Image sec_grp_rule_settings = list() sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.ingress)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv6)) sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, direction=Direction.egress, protocol=Protocol.udp, ethertype=Ethertype.IPv4, port_range_min=10, port_range_max=20)) sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', rule_settings=sec_grp_rule_settings) self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) self.sec_grp_creator.create() sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules)) validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) self.assertEquals(len(rules) - 1, len(rules_after_del)) # TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex