diff options
Diffstat (limited to 'snaps/config')
-rw-r--r-- | snaps/config/security_group.py | 388 | ||||
-rw-r--r-- | snaps/config/tests/security_group_tests.py | 187 |
2 files changed, 575 insertions, 0 deletions
diff --git a/snaps/config/security_group.py b/snaps/config/security_group.py new file mode 100644 index 0000000..32a1e95 --- /dev/null +++ b/snaps/config/security_group.py @@ -0,0 +1,388 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import enum + +from snaps.openstack.utils import keystone_utils, neutron_utils + + +class SecurityGroupConfig(object): + """ + Class representing a keypair configuration + """ + + def __init__(self, **kwargs): + """ + Constructor + :param name: The security group's name (required) + :param description: The security group's description (optional) + :param project_name: The name of the project under which the security + group will be created + :param rule_settings: a list of SecurityGroupRuleConfig objects + :return: + """ + self.name = kwargs.get('name') + self.description = kwargs.get('description') + self.project_name = kwargs.get('project_name') + self.rule_settings = list() + + rule_settings = kwargs.get('rules') + if not rule_settings: + rule_settings = kwargs.get('rule_settings') + + if rule_settings: + for rule_setting in rule_settings: + if isinstance(rule_setting, SecurityGroupRuleConfig): + self.rule_settings.append(rule_setting) + else: + rule_setting['sec_grp_name'] = self.name + self.rule_settings.append(SecurityGroupRuleConfig( + **rule_setting)) + + if not self.name: + raise SecurityGroupConfigError('The attribute name is required') + + for rule_setting in self.rule_settings: + if rule_setting.sec_grp_name is not self.name: + raise SecurityGroupConfigError( + 'Rule settings must correspond with the name of this ' + 'security group') + + def dict_for_neutron(self, keystone): + """ + Returns a dictionary object representing this object. + This is meant to be converted into JSON designed for use by the Neutron + API + + TODO - expand automated testing to exercise all parameters + :param keystone: the Keystone client + :return: the dictionary object + """ + out = dict() + + if self.name: + out['name'] = self.name + if self.description: + out['description'] = self.description + if self.project_name: + project = keystone_utils.get_project( + keystone=keystone, project_name=self.project_name) + project_id = None + if project: + project_id = project.id + if project_id: + out['tenant_id'] = project_id + else: + raise SecurityGroupConfigError( + 'Could not find project ID for project named - ' + + self.project_name) + + return {'security_group': out} + + +class Direction(enum.Enum): + """ + A rule's direction + """ + ingress = 'ingress' + egress = 'egress' + + +class Protocol(enum.Enum): + """ + A rule's protocol + """ + ah = 51 + dccp = 33 + egp = 8 + esp = 50 + gre = 47 + icmp = 1 + icmpv6 = 58 + igmp = 2 + ipv6_encap = 41 + ipv6_frag = 44 + ipv6_icmp = 58 + ipv6_nonxt = 59 + ipv6_opts = 60 + ipv6_route = 43 + ospf = 89 + pgm = 113 + rsvp = 46 + sctp = 132 + tcp = 6 + udp = 17 + udplite = 136 + vrrp = 112 + any = 'any' + null = 'null' + + +class Ethertype(enum.Enum): + """ + A rule's ethertype + """ + IPv4 = 4 + IPv6 = 6 + + +class SecurityGroupConfigError(Exception): + """ + Exception to be thrown when security group settings attributes are + invalid + """ + + +class SecurityGroupRuleConfig(object): + """ + Class representing a keypair configuration + """ + + def __init__(self, **kwargs): + """ + Constructor - all parameters are optional + :param sec_grp_name: The security group's name on which to add the + rule. (required) + :param description: The rule's description + :param direction: An enumeration of type + create_security_group.RULE_DIRECTION (required) + :param remote_group_id: The group ID to associate with this rule + (this should be changed to group name once + snaps support Groups) (optional) + :param protocol: An enumeration of type + create_security_group.RULE_PROTOCOL or a string value + that will be mapped accordingly (optional) + :param ethertype: An enumeration of type + create_security_group.RULE_ETHERTYPE (optional) + :param port_range_min: The minimum port number in the range that is + matched by the security group rule. When the + protocol is TCP or UDP, this value must be <= + port_range_max. + :param port_range_max: The maximum port number in the range that is + matched by the security group rule. When the + protocol is TCP or UDP, this value must be <= + port_range_max. + :param remote_ip_prefix: The remote IP prefix to associate with this + metering rule packet (optional) + + TODO - Need to support the tenant... + """ + + self.description = kwargs.get('description') + self.sec_grp_name = kwargs.get('sec_grp_name') + self.remote_group_id = kwargs.get('remote_group_id') + self.direction = None + if kwargs.get('direction'): + self.direction = map_direction(kwargs['direction']) + + self.protocol = None + if kwargs.get('protocol'): + self.protocol = map_protocol(kwargs['protocol']) + else: + self.protocol = Protocol.null + + self.ethertype = None + if kwargs.get('ethertype'): + self.ethertype = map_ethertype(kwargs['ethertype']) + + self.port_range_min = kwargs.get('port_range_min') + self.port_range_max = kwargs.get('port_range_max') + self.remote_ip_prefix = kwargs.get('remote_ip_prefix') + + if not self.direction or not self.sec_grp_name: + raise SecurityGroupRuleConfigError( + 'direction and sec_grp_name are required') + + def dict_for_neutron(self, neutron): + """ + Returns a dictionary object representing this object. + This is meant to be converted into JSON designed for use by the Neutron + API + + :param neutron: the neutron client for performing lookups + :return: the dictionary object + """ + out = dict() + + if self.description: + out['description'] = self.description + if self.direction: + out['direction'] = self.direction.name + if self.port_range_min: + out['port_range_min'] = self.port_range_min + if self.port_range_max: + out['port_range_max'] = self.port_range_max + if self.ethertype: + out['ethertype'] = self.ethertype.name + if self.protocol and self.protocol.value != 'null': + out['protocol'] = self.protocol.value + if self.sec_grp_name: + sec_grp = neutron_utils.get_security_group( + neutron, sec_grp_name=self.sec_grp_name) + if sec_grp: + out['security_group_id'] = sec_grp.id + else: + raise SecurityGroupRuleConfigError( + 'Cannot locate security group with name - ' + + self.sec_grp_name) + if self.remote_group_id: + out['remote_group_id'] = self.remote_group_id + if self.remote_ip_prefix: + out['remote_ip_prefix'] = self.remote_ip_prefix + + return {'security_group_rule': out} + + def rule_eq(self, rule): + """ + Returns True if this setting created the rule + :param rule: the rule to evaluate + :return: T/F + """ + if self.description is not None: + if rule.description is not None and rule.description != '': + return False + elif self.description != rule.description: + if rule.description != '': + return False + + if self.direction.name != rule.direction: + return False + + if self.ethertype and rule.ethertype: + if self.ethertype.name != rule.ethertype: + return False + + if self.port_range_min and rule.port_range_min: + if self.port_range_min != rule.port_range_min: + return False + + if self.port_range_max and rule.port_range_max: + if self.port_range_max != rule.port_range_max: + return False + + if self.protocol and rule.protocol: + if self.protocol.name != rule.protocol: + return False + + if self.remote_group_id and rule.remote_group_id: + if self.remote_group_id != rule.remote_group_id: + return False + + if self.remote_ip_prefix and rule.remote_ip_prefix: + if self.remote_ip_prefix != rule.remote_ip_prefix: + return False + + return True + + def __eq__(self, other): + return ( + self.description == other.description and + self.direction == other.direction and + self.port_range_min == other.port_range_min and + self.port_range_max == other.port_range_max and + self.ethertype == other.ethertype and + self.protocol == other.protocol and + self.sec_grp_name == other.sec_grp_name and + self.remote_group_id == other.remote_group_id and + self.remote_ip_prefix == other.remote_ip_prefix) + + def __hash__(self): + return hash((self.sec_grp_name, self.description, self.direction, + self.remote_group_id, + self.protocol, self.ethertype, self.port_range_min, + self.port_range_max, self.remote_ip_prefix)) + + +def map_direction(direction): + """ + Takes a the direction value maps it to the Direction enum. When None return + None + :param direction: the direction value + :return: the Direction enum object + :raise: Exception if value is invalid + """ + if not direction: + return None + if isinstance(direction, Direction): + return direction + elif (isinstance(direction, str) or isinstance(direction, unicode) + or isinstance(direction, unicode)): + dir_str = str(direction) + if dir_str == 'egress': + return Direction.egress + elif dir_str == 'ingress': + return Direction.ingress + else: + raise SecurityGroupRuleConfigError( + 'Invalid Direction - ' + dir_str) + else: + return map_direction(direction.value) + + +def map_protocol(protocol): + """ + Takes a the protocol value maps it to the Protocol enum. When None return + None + :param protocol: the protocol value + :return: the Protocol enum object + :raise: Exception if value is invalid + """ + if not protocol: + return None + elif isinstance(protocol, Protocol): + return protocol + elif (isinstance(protocol, str) or isinstance(protocol, unicode) + or isinstance(protocol, int)): + for proto_enum in Protocol: + if proto_enum.name == protocol or proto_enum.value == protocol: + if proto_enum == Protocol.any: + return Protocol.null + return proto_enum + raise SecurityGroupRuleConfigError( + 'Invalid Protocol - ' + protocol) + else: + return map_protocol(protocol.value) + + +def map_ethertype(ethertype): + """ + Takes a the ethertype value maps it to the Ethertype enum. When None return + None + :param ethertype: the ethertype value + :return: the Ethertype enum object + :raise: Exception if value is invalid + """ + if not ethertype: + return None + elif isinstance(ethertype, Ethertype): + return ethertype + elif (isinstance(ethertype, str) or isinstance(ethertype, unicode) + or isinstance(ethertype, int)): + eth_str = str(ethertype) + if eth_str == 'IPv6' or eth_str == '6': + return Ethertype.IPv6 + elif eth_str == 'IPv4' or eth_str == '4': + return Ethertype.IPv4 + else: + raise SecurityGroupRuleConfigError( + 'Invalid Ethertype - ' + eth_str) + else: + return map_ethertype(ethertype.value) + + +class SecurityGroupRuleConfigError(Exception): + """ + Exception to be thrown when security group rule settings attributes are + invalid + """ diff --git a/snaps/config/tests/security_group_tests.py b/snaps/config/tests/security_group_tests.py new file mode 100644 index 0000000..8834836 --- /dev/null +++ b/snaps/config/tests/security_group_tests.py @@ -0,0 +1,187 @@ +# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") +# and others. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import unittest + +from snaps.config.security_group import ( + Direction, SecurityGroupConfig, SecurityGroupRuleConfig, + SecurityGroupConfigError, Protocol, Ethertype, + SecurityGroupRuleConfigError) + + +class SecurityGroupRuleConfigUnitTests(unittest.TestCase): + """ + Tests the construction of the SecurityGroupRuleConfig class + """ + + def test_no_params(self): + with self.assertRaises(SecurityGroupRuleConfigError): + SecurityGroupRuleConfig() + + def test_empty_config(self): + with self.assertRaises(SecurityGroupRuleConfigError): + SecurityGroupRuleConfig(**dict()) + + def test_name_only(self): + with self.assertRaises(SecurityGroupRuleConfigError): + SecurityGroupRuleConfig(sec_grp_name='foo') + + def test_config_with_name_only(self): + with self.assertRaises(SecurityGroupRuleConfigError): + SecurityGroupRuleConfig(**{'sec_grp_name': 'foo'}) + + def test_name_and_direction(self): + settings = SecurityGroupRuleConfig(sec_grp_name='foo', + direction=Direction.ingress) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + + def test_config_name_and_direction(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', 'direction': 'ingress'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + + def test_proto_ah_str(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'ah'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Protocol.ah, settings.protocol) + + def test_proto_ah_value(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 51}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Protocol.ah, settings.protocol) + + def test_proto_any(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'any'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Protocol.null, settings.protocol) + + def test_proto_null(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', 'direction': 'ingress', + 'protocol': 'null'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual(Direction.ingress, settings.direction) + self.assertEqual(Protocol.null, settings.protocol) + + def test_all(self): + settings = SecurityGroupRuleConfig( + sec_grp_name='foo', description='fubar', + direction=Direction.egress, remote_group_id='rgi', + protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, + port_range_max=2, + remote_ip_prefix='prfx') + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual('fubar', settings.description) + self.assertEqual(Direction.egress, settings.direction) + self.assertEqual('rgi', settings.remote_group_id) + self.assertEqual(Protocol.icmp, settings.protocol) + self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(1, settings.port_range_min) + self.assertEqual(2, settings.port_range_max) + self.assertEqual('prfx', settings.remote_ip_prefix) + + def test_config_all(self): + settings = SecurityGroupRuleConfig( + **{'sec_grp_name': 'foo', + 'description': 'fubar', + 'direction': 'egress', + 'remote_group_id': 'rgi', + 'protocol': 'tcp', + 'ethertype': 'IPv6', + 'port_range_min': 1, + 'port_range_max': 2, + 'remote_ip_prefix': 'prfx'}) + self.assertEqual('foo', settings.sec_grp_name) + self.assertEqual('fubar', settings.description) + self.assertEqual(Direction.egress, settings.direction) + self.assertEqual('rgi', settings.remote_group_id) + self.assertEqual(Protocol.tcp, settings.protocol) + self.assertEqual(Ethertype.IPv6, settings.ethertype) + self.assertEqual(1, settings.port_range_min) + self.assertEqual(2, settings.port_range_max) + self.assertEqual('prfx', settings.remote_ip_prefix) + + +class SecurityGroupConfigUnitTests(unittest.TestCase): + """ + Tests the construction of the SecurityGroupConfig class + """ + + def test_no_params(self): + with self.assertRaises(SecurityGroupConfigError): + SecurityGroupConfig() + + def test_empty_config(self): + with self.assertRaises(SecurityGroupConfigError): + SecurityGroupConfig(**dict()) + + def test_name_only(self): + settings = SecurityGroupConfig(name='foo') + self.assertEqual('foo', settings.name) + + def test_config_with_name_only(self): + settings = SecurityGroupConfig(**{'name': 'foo'}) + self.assertEqual('foo', settings.name) + + def test_invalid_rule(self): + rule_setting = SecurityGroupRuleConfig( + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_1') + with self.assertRaises(SecurityGroupConfigError): + SecurityGroupConfig(name='foo', rule_settings=[rule_setting]) + + def test_all(self): + rule_settings = list() + rule_settings.append(SecurityGroupRuleConfig( + sec_grp_name='bar', direction=Direction.egress, + description='test_rule_1')) + rule_settings.append(SecurityGroupRuleConfig( + sec_grp_name='bar', direction=Direction.ingress, + description='test_rule_2')) + settings = SecurityGroupConfig( + name='bar', description='fubar', project_name='foo', + rule_settings=rule_settings) + + self.assertEqual('bar', settings.name) + self.assertEqual('fubar', settings.description) + self.assertEqual('foo', settings.project_name) + self.assertEqual(rule_settings[0], settings.rule_settings[0]) + self.assertEqual(rule_settings[1], settings.rule_settings[1]) + + def test_config_all(self): + settings = SecurityGroupConfig( + **{'name': 'bar', + 'description': 'fubar', + 'project_name': 'foo', + 'rules': [ + {'sec_grp_name': 'bar', 'direction': 'ingress'}]}) + + self.assertEqual('bar', settings.name) + self.assertEqual('fubar', settings.description) + self.assertEqual('foo', settings.project_name) + self.assertEqual(1, len(settings.rule_settings)) + self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) + self.assertEqual(Direction.ingress, + settings.rule_settings[0].direction) |