summaryrefslogtreecommitdiffstats
path: root/snaps/config
diff options
context:
space:
mode:
Diffstat (limited to 'snaps/config')
-rw-r--r--snaps/config/security_group.py388
-rw-r--r--snaps/config/tests/security_group_tests.py187
2 files changed, 575 insertions, 0 deletions
diff --git a/snaps/config/security_group.py b/snaps/config/security_group.py
new file mode 100644
index 0000000..32a1e95
--- /dev/null
+++ b/snaps/config/security_group.py
@@ -0,0 +1,388 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import enum
+
+from snaps.openstack.utils import keystone_utils, neutron_utils
+
+
+class SecurityGroupConfig(object):
+ """
+ Class representing a keypair configuration
+ """
+
+ def __init__(self, **kwargs):
+ """
+ Constructor
+ :param name: The security group's name (required)
+ :param description: The security group's description (optional)
+ :param project_name: The name of the project under which the security
+ group will be created
+ :param rule_settings: a list of SecurityGroupRuleConfig objects
+ :return:
+ """
+ self.name = kwargs.get('name')
+ self.description = kwargs.get('description')
+ self.project_name = kwargs.get('project_name')
+ self.rule_settings = list()
+
+ rule_settings = kwargs.get('rules')
+ if not rule_settings:
+ rule_settings = kwargs.get('rule_settings')
+
+ if rule_settings:
+ for rule_setting in rule_settings:
+ if isinstance(rule_setting, SecurityGroupRuleConfig):
+ self.rule_settings.append(rule_setting)
+ else:
+ rule_setting['sec_grp_name'] = self.name
+ self.rule_settings.append(SecurityGroupRuleConfig(
+ **rule_setting))
+
+ if not self.name:
+ raise SecurityGroupConfigError('The attribute name is required')
+
+ for rule_setting in self.rule_settings:
+ if rule_setting.sec_grp_name is not self.name:
+ raise SecurityGroupConfigError(
+ 'Rule settings must correspond with the name of this '
+ 'security group')
+
+ def dict_for_neutron(self, keystone):
+ """
+ Returns a dictionary object representing this object.
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
+
+ TODO - expand automated testing to exercise all parameters
+ :param keystone: the Keystone client
+ :return: the dictionary object
+ """
+ out = dict()
+
+ if self.name:
+ out['name'] = self.name
+ if self.description:
+ out['description'] = self.description
+ if self.project_name:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ project_id = None
+ if project:
+ project_id = project.id
+ if project_id:
+ out['tenant_id'] = project_id
+ else:
+ raise SecurityGroupConfigError(
+ 'Could not find project ID for project named - ' +
+ self.project_name)
+
+ return {'security_group': out}
+
+
+class Direction(enum.Enum):
+ """
+ A rule's direction
+ """
+ ingress = 'ingress'
+ egress = 'egress'
+
+
+class Protocol(enum.Enum):
+ """
+ A rule's protocol
+ """
+ ah = 51
+ dccp = 33
+ egp = 8
+ esp = 50
+ gre = 47
+ icmp = 1
+ icmpv6 = 58
+ igmp = 2
+ ipv6_encap = 41
+ ipv6_frag = 44
+ ipv6_icmp = 58
+ ipv6_nonxt = 59
+ ipv6_opts = 60
+ ipv6_route = 43
+ ospf = 89
+ pgm = 113
+ rsvp = 46
+ sctp = 132
+ tcp = 6
+ udp = 17
+ udplite = 136
+ vrrp = 112
+ any = 'any'
+ null = 'null'
+
+
+class Ethertype(enum.Enum):
+ """
+ A rule's ethertype
+ """
+ IPv4 = 4
+ IPv6 = 6
+
+
+class SecurityGroupConfigError(Exception):
+ """
+ Exception to be thrown when security group settings attributes are
+ invalid
+ """
+
+
+class SecurityGroupRuleConfig(object):
+ """
+ Class representing a keypair configuration
+ """
+
+ def __init__(self, **kwargs):
+ """
+ Constructor - all parameters are optional
+ :param sec_grp_name: The security group's name on which to add the
+ rule. (required)
+ :param description: The rule's description
+ :param direction: An enumeration of type
+ create_security_group.RULE_DIRECTION (required)
+ :param remote_group_id: The group ID to associate with this rule
+ (this should be changed to group name once
+ snaps support Groups) (optional)
+ :param protocol: An enumeration of type
+ create_security_group.RULE_PROTOCOL or a string value
+ that will be mapped accordingly (optional)
+ :param ethertype: An enumeration of type
+ create_security_group.RULE_ETHERTYPE (optional)
+ :param port_range_min: The minimum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max.
+ :param port_range_max: The maximum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max.
+ :param remote_ip_prefix: The remote IP prefix to associate with this
+ metering rule packet (optional)
+
+ TODO - Need to support the tenant...
+ """
+
+ self.description = kwargs.get('description')
+ self.sec_grp_name = kwargs.get('sec_grp_name')
+ self.remote_group_id = kwargs.get('remote_group_id')
+ self.direction = None
+ if kwargs.get('direction'):
+ self.direction = map_direction(kwargs['direction'])
+
+ self.protocol = None
+ if kwargs.get('protocol'):
+ self.protocol = map_protocol(kwargs['protocol'])
+ else:
+ self.protocol = Protocol.null
+
+ self.ethertype = None
+ if kwargs.get('ethertype'):
+ self.ethertype = map_ethertype(kwargs['ethertype'])
+
+ self.port_range_min = kwargs.get('port_range_min')
+ self.port_range_max = kwargs.get('port_range_max')
+ self.remote_ip_prefix = kwargs.get('remote_ip_prefix')
+
+ if not self.direction or not self.sec_grp_name:
+ raise SecurityGroupRuleConfigError(
+ 'direction and sec_grp_name are required')
+
+ def dict_for_neutron(self, neutron):
+ """
+ Returns a dictionary object representing this object.
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
+
+ :param neutron: the neutron client for performing lookups
+ :return: the dictionary object
+ """
+ out = dict()
+
+ if self.description:
+ out['description'] = self.description
+ if self.direction:
+ out['direction'] = self.direction.name
+ if self.port_range_min:
+ out['port_range_min'] = self.port_range_min
+ if self.port_range_max:
+ out['port_range_max'] = self.port_range_max
+ if self.ethertype:
+ out['ethertype'] = self.ethertype.name
+ if self.protocol and self.protocol.value != 'null':
+ out['protocol'] = self.protocol.value
+ if self.sec_grp_name:
+ sec_grp = neutron_utils.get_security_group(
+ neutron, sec_grp_name=self.sec_grp_name)
+ if sec_grp:
+ out['security_group_id'] = sec_grp.id
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Cannot locate security group with name - ' +
+ self.sec_grp_name)
+ if self.remote_group_id:
+ out['remote_group_id'] = self.remote_group_id
+ if self.remote_ip_prefix:
+ out['remote_ip_prefix'] = self.remote_ip_prefix
+
+ return {'security_group_rule': out}
+
+ def rule_eq(self, rule):
+ """
+ Returns True if this setting created the rule
+ :param rule: the rule to evaluate
+ :return: T/F
+ """
+ if self.description is not None:
+ if rule.description is not None and rule.description != '':
+ return False
+ elif self.description != rule.description:
+ if rule.description != '':
+ return False
+
+ if self.direction.name != rule.direction:
+ return False
+
+ if self.ethertype and rule.ethertype:
+ if self.ethertype.name != rule.ethertype:
+ return False
+
+ if self.port_range_min and rule.port_range_min:
+ if self.port_range_min != rule.port_range_min:
+ return False
+
+ if self.port_range_max and rule.port_range_max:
+ if self.port_range_max != rule.port_range_max:
+ return False
+
+ if self.protocol and rule.protocol:
+ if self.protocol.name != rule.protocol:
+ return False
+
+ if self.remote_group_id and rule.remote_group_id:
+ if self.remote_group_id != rule.remote_group_id:
+ return False
+
+ if self.remote_ip_prefix and rule.remote_ip_prefix:
+ if self.remote_ip_prefix != rule.remote_ip_prefix:
+ return False
+
+ return True
+
+ def __eq__(self, other):
+ return (
+ self.description == other.description and
+ self.direction == other.direction and
+ self.port_range_min == other.port_range_min and
+ self.port_range_max == other.port_range_max and
+ self.ethertype == other.ethertype and
+ self.protocol == other.protocol and
+ self.sec_grp_name == other.sec_grp_name and
+ self.remote_group_id == other.remote_group_id and
+ self.remote_ip_prefix == other.remote_ip_prefix)
+
+ def __hash__(self):
+ return hash((self.sec_grp_name, self.description, self.direction,
+ self.remote_group_id,
+ self.protocol, self.ethertype, self.port_range_min,
+ self.port_range_max, self.remote_ip_prefix))
+
+
+def map_direction(direction):
+ """
+ Takes a the direction value maps it to the Direction enum. When None return
+ None
+ :param direction: the direction value
+ :return: the Direction enum object
+ :raise: Exception if value is invalid
+ """
+ if not direction:
+ return None
+ if isinstance(direction, Direction):
+ return direction
+ elif (isinstance(direction, str) or isinstance(direction, unicode)
+ or isinstance(direction, unicode)):
+ dir_str = str(direction)
+ if dir_str == 'egress':
+ return Direction.egress
+ elif dir_str == 'ingress':
+ return Direction.ingress
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Direction - ' + dir_str)
+ else:
+ return map_direction(direction.value)
+
+
+def map_protocol(protocol):
+ """
+ Takes a the protocol value maps it to the Protocol enum. When None return
+ None
+ :param protocol: the protocol value
+ :return: the Protocol enum object
+ :raise: Exception if value is invalid
+ """
+ if not protocol:
+ return None
+ elif isinstance(protocol, Protocol):
+ return protocol
+ elif (isinstance(protocol, str) or isinstance(protocol, unicode)
+ or isinstance(protocol, int)):
+ for proto_enum in Protocol:
+ if proto_enum.name == protocol or proto_enum.value == protocol:
+ if proto_enum == Protocol.any:
+ return Protocol.null
+ return proto_enum
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Protocol - ' + protocol)
+ else:
+ return map_protocol(protocol.value)
+
+
+def map_ethertype(ethertype):
+ """
+ Takes a the ethertype value maps it to the Ethertype enum. When None return
+ None
+ :param ethertype: the ethertype value
+ :return: the Ethertype enum object
+ :raise: Exception if value is invalid
+ """
+ if not ethertype:
+ return None
+ elif isinstance(ethertype, Ethertype):
+ return ethertype
+ elif (isinstance(ethertype, str) or isinstance(ethertype, unicode)
+ or isinstance(ethertype, int)):
+ eth_str = str(ethertype)
+ if eth_str == 'IPv6' or eth_str == '6':
+ return Ethertype.IPv6
+ elif eth_str == 'IPv4' or eth_str == '4':
+ return Ethertype.IPv4
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Ethertype - ' + eth_str)
+ else:
+ return map_ethertype(ethertype.value)
+
+
+class SecurityGroupRuleConfigError(Exception):
+ """
+ Exception to be thrown when security group rule settings attributes are
+ invalid
+ """
diff --git a/snaps/config/tests/security_group_tests.py b/snaps/config/tests/security_group_tests.py
new file mode 100644
index 0000000..8834836
--- /dev/null
+++ b/snaps/config/tests/security_group_tests.py
@@ -0,0 +1,187 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from snaps.config.security_group import (
+ Direction, SecurityGroupConfig, SecurityGroupRuleConfig,
+ SecurityGroupConfigError, Protocol, Ethertype,
+ SecurityGroupRuleConfigError)
+
+
+class SecurityGroupRuleConfigUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupRuleConfig class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig()
+
+ def test_empty_config(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(**dict())
+
+ def test_name_only(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(sec_grp_name='foo')
+
+ def test_config_with_name_only(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(**{'sec_grp_name': 'foo'})
+
+ def test_name_and_direction(self):
+ settings = SecurityGroupRuleConfig(sec_grp_name='foo',
+ direction=Direction.ingress)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+
+ def test_config_name_and_direction(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+
+ def test_proto_ah_str(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'ah'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.ah, settings.protocol)
+
+ def test_proto_ah_value(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 51})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.ah, settings.protocol)
+
+ def test_proto_any(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'any'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.null, settings.protocol)
+
+ def test_proto_null(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'null'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.null, settings.protocol)
+
+ def test_all(self):
+ settings = SecurityGroupRuleConfig(
+ sec_grp_name='foo', description='fubar',
+ direction=Direction.egress, remote_group_id='rgi',
+ protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
+ port_range_max=2,
+ remote_ip_prefix='prfx')
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
+
+ def test_config_all(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo',
+ 'description': 'fubar',
+ 'direction': 'egress',
+ 'remote_group_id': 'rgi',
+ 'protocol': 'tcp',
+ 'ethertype': 'IPv6',
+ 'port_range_min': 1,
+ 'port_range_max': 2,
+ 'remote_ip_prefix': 'prfx'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
+
+
+class SecurityGroupConfigUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupConfig class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig()
+
+ def test_empty_config(self):
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig(**dict())
+
+ def test_name_only(self):
+ settings = SecurityGroupConfig(name='foo')
+ self.assertEqual('foo', settings.name)
+
+ def test_config_with_name_only(self):
+ settings = SecurityGroupConfig(**{'name': 'foo'})
+ self.assertEqual('foo', settings.name)
+
+ def test_invalid_rule(self):
+ rule_setting = SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig(name='foo', rule_settings=[rule_setting])
+
+ def test_all(self):
+ rule_settings = list()
+ rule_settings.append(SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
+ rule_settings.append(SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
+ settings = SecurityGroupConfig(
+ name='bar', description='fubar', project_name='foo',
+ rule_settings=rule_settings)
+
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
+
+ def test_config_all(self):
+ settings = SecurityGroupConfig(
+ **{'name': 'bar',
+ 'description': 'fubar',
+ 'project_name': 'foo',
+ 'rules': [
+ {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
+
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress,
+ settings.rule_settings[0].direction)