summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorspisarski <s.pisarski@cablelabs.com>2017-11-20 14:56:44 -0700
committerspisarski <s.pisarski@cablelabs.com>2017-11-21 09:31:20 -0700
commit0e377731a20617c9cdb886a597395c61ae490e38 (patch)
treeae58c1cf1cabbfcbcc0b483d62be31926a24f0e2
parent594e4d2b15dcf59fc7dc1b8380b096382a0b1cb1 (diff)
Refactoring of SecurityGroupSettings to extend SecurityGroupConfig
SecurityGroupSettings, SecurityGroupSettings and neutron_utils have a runtime cyclical dependency. This patch reduces this dependency and deprecates the SecurityGroupSettings and SecurityGroupRuleSettings class with snaps.config.security_group SecurityGroupConfig and SecurityGroupRuleConfg classes JIRA: SNAPS-224 Change-Id: I6cd82ed5be31f4a24449be767b203e86489b1962 Signed-off-by: spisarski <s.pisarski@cablelabs.com>
-rw-r--r--docs/how-to-use/UnitTests.rst16
-rw-r--r--examples/launch.py6
-rw-r--r--snaps/config/security_group.py388
-rw-r--r--snaps/config/tests/security_group_tests.py187
-rw-r--r--snaps/openstack/create_security_group.py335
-rw-r--r--snaps/openstack/create_stack.py2
-rw-r--r--snaps/openstack/tests/create_instance_tests.py90
-rw-r--r--snaps/openstack/tests/create_project_tests.py8
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py109
-rw-r--r--snaps/openstack/utils/neutron_utils.py2
-rw-r--r--snaps/openstack/utils/settings_utils.py10
-rw-r--r--snaps/openstack/utils/tests/neutron_utils_tests.py26
-rw-r--r--snaps/openstack/utils/tests/settings_utils_tests.py24
-rw-r--r--snaps/provisioning/tests/ansible_utils_tests.py24
-rw-r--r--snaps/test_suite_builder.py29
15 files changed, 781 insertions, 475 deletions
diff --git a/docs/how-to-use/UnitTests.rst b/docs/how-to-use/UnitTests.rst
index 7f7c6ae..cb0c5f3 100644
--- a/docs/how-to-use/UnitTests.rst
+++ b/docs/how-to-use/UnitTests.rst
@@ -36,11 +36,17 @@ OSCredsUnitTests
Ensures that all required members are included when constructing a
OSCreds object
+SecurityGroupRuleConfigUnitTests
+--------------------------------
+
+Ensures that all required members are included when constructing a
+SecurityGroupRuleConfig object
+
SecurityGroupRuleSettingsUnitTests
----------------------------------
Ensures that all required members are included when constructing a
-SecurityGroupRuleSettings object
+deprecated SecurityGroupRuleSettings object
SecurityGroupRuleDomainObjectTests
----------------------------------
@@ -48,11 +54,17 @@ SecurityGroupRuleDomainObjectTests
Ensures that all required members are included when constructing a
SecurityGroupRule domain object
+SecurityGroupConfigUnitTests
+----------------------------
+
+Ensures that all required members are included when constructing a
+SecuirtyGroupConfig object
+
SecurityGroupSettingsUnitTests
------------------------------
Ensures that all required members are included when constructing a
-SecuirtyGroupSettings object
+deprecated SecuirtyGroupSettings object
SecurityGroupDomainObjectTests
------------------------------
diff --git a/examples/launch.py b/examples/launch.py
index 8810c32..975d834 100644
--- a/examples/launch.py
+++ b/examples/launch.py
@@ -33,6 +33,7 @@ from snaps.config.network import PortConfig, NetworkConfig
from snaps.config.project import ProjectConfig
from snaps.config.qos import QoSConfig
from snaps.config.router import RouterConfig
+from snaps.config.security_group import SecurityGroupConfig
from snaps.config.user import UserConfig
from snaps.config.vm_inst import VmInstanceConfig
from snaps.config.volume import VolumeConfig
@@ -44,8 +45,7 @@ from snaps.openstack.create_network import OpenStackNetwork
from snaps.openstack.create_project import OpenStackProject
from snaps.openstack.create_qos import OpenStackQoS
from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_security_group import (
- OpenStackSecurityGroup, SecurityGroupSettings)
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_user import OpenStackUser
from snaps.openstack.create_volume import OpenStackVolume
from snaps.openstack.create_volume_type import OpenStackVolumeType
@@ -690,7 +690,7 @@ def main(arguments):
# Create security groups
creators.append(__create_instances(
os_creds_dict, OpenStackSecurityGroup,
- SecurityGroupSettings,
+ SecurityGroupConfig,
os_config.get('security_groups'), 'security_group', clean,
users_dict))
diff --git a/snaps/config/security_group.py b/snaps/config/security_group.py
new file mode 100644
index 0000000..32a1e95
--- /dev/null
+++ b/snaps/config/security_group.py
@@ -0,0 +1,388 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import enum
+
+from snaps.openstack.utils import keystone_utils, neutron_utils
+
+
+class SecurityGroupConfig(object):
+ """
+ Class representing a keypair configuration
+ """
+
+ def __init__(self, **kwargs):
+ """
+ Constructor
+ :param name: The security group's name (required)
+ :param description: The security group's description (optional)
+ :param project_name: The name of the project under which the security
+ group will be created
+ :param rule_settings: a list of SecurityGroupRuleConfig objects
+ :return:
+ """
+ self.name = kwargs.get('name')
+ self.description = kwargs.get('description')
+ self.project_name = kwargs.get('project_name')
+ self.rule_settings = list()
+
+ rule_settings = kwargs.get('rules')
+ if not rule_settings:
+ rule_settings = kwargs.get('rule_settings')
+
+ if rule_settings:
+ for rule_setting in rule_settings:
+ if isinstance(rule_setting, SecurityGroupRuleConfig):
+ self.rule_settings.append(rule_setting)
+ else:
+ rule_setting['sec_grp_name'] = self.name
+ self.rule_settings.append(SecurityGroupRuleConfig(
+ **rule_setting))
+
+ if not self.name:
+ raise SecurityGroupConfigError('The attribute name is required')
+
+ for rule_setting in self.rule_settings:
+ if rule_setting.sec_grp_name is not self.name:
+ raise SecurityGroupConfigError(
+ 'Rule settings must correspond with the name of this '
+ 'security group')
+
+ def dict_for_neutron(self, keystone):
+ """
+ Returns a dictionary object representing this object.
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
+
+ TODO - expand automated testing to exercise all parameters
+ :param keystone: the Keystone client
+ :return: the dictionary object
+ """
+ out = dict()
+
+ if self.name:
+ out['name'] = self.name
+ if self.description:
+ out['description'] = self.description
+ if self.project_name:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ project_id = None
+ if project:
+ project_id = project.id
+ if project_id:
+ out['tenant_id'] = project_id
+ else:
+ raise SecurityGroupConfigError(
+ 'Could not find project ID for project named - ' +
+ self.project_name)
+
+ return {'security_group': out}
+
+
+class Direction(enum.Enum):
+ """
+ A rule's direction
+ """
+ ingress = 'ingress'
+ egress = 'egress'
+
+
+class Protocol(enum.Enum):
+ """
+ A rule's protocol
+ """
+ ah = 51
+ dccp = 33
+ egp = 8
+ esp = 50
+ gre = 47
+ icmp = 1
+ icmpv6 = 58
+ igmp = 2
+ ipv6_encap = 41
+ ipv6_frag = 44
+ ipv6_icmp = 58
+ ipv6_nonxt = 59
+ ipv6_opts = 60
+ ipv6_route = 43
+ ospf = 89
+ pgm = 113
+ rsvp = 46
+ sctp = 132
+ tcp = 6
+ udp = 17
+ udplite = 136
+ vrrp = 112
+ any = 'any'
+ null = 'null'
+
+
+class Ethertype(enum.Enum):
+ """
+ A rule's ethertype
+ """
+ IPv4 = 4
+ IPv6 = 6
+
+
+class SecurityGroupConfigError(Exception):
+ """
+ Exception to be thrown when security group settings attributes are
+ invalid
+ """
+
+
+class SecurityGroupRuleConfig(object):
+ """
+ Class representing a keypair configuration
+ """
+
+ def __init__(self, **kwargs):
+ """
+ Constructor - all parameters are optional
+ :param sec_grp_name: The security group's name on which to add the
+ rule. (required)
+ :param description: The rule's description
+ :param direction: An enumeration of type
+ create_security_group.RULE_DIRECTION (required)
+ :param remote_group_id: The group ID to associate with this rule
+ (this should be changed to group name once
+ snaps support Groups) (optional)
+ :param protocol: An enumeration of type
+ create_security_group.RULE_PROTOCOL or a string value
+ that will be mapped accordingly (optional)
+ :param ethertype: An enumeration of type
+ create_security_group.RULE_ETHERTYPE (optional)
+ :param port_range_min: The minimum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max.
+ :param port_range_max: The maximum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max.
+ :param remote_ip_prefix: The remote IP prefix to associate with this
+ metering rule packet (optional)
+
+ TODO - Need to support the tenant...
+ """
+
+ self.description = kwargs.get('description')
+ self.sec_grp_name = kwargs.get('sec_grp_name')
+ self.remote_group_id = kwargs.get('remote_group_id')
+ self.direction = None
+ if kwargs.get('direction'):
+ self.direction = map_direction(kwargs['direction'])
+
+ self.protocol = None
+ if kwargs.get('protocol'):
+ self.protocol = map_protocol(kwargs['protocol'])
+ else:
+ self.protocol = Protocol.null
+
+ self.ethertype = None
+ if kwargs.get('ethertype'):
+ self.ethertype = map_ethertype(kwargs['ethertype'])
+
+ self.port_range_min = kwargs.get('port_range_min')
+ self.port_range_max = kwargs.get('port_range_max')
+ self.remote_ip_prefix = kwargs.get('remote_ip_prefix')
+
+ if not self.direction or not self.sec_grp_name:
+ raise SecurityGroupRuleConfigError(
+ 'direction and sec_grp_name are required')
+
+ def dict_for_neutron(self, neutron):
+ """
+ Returns a dictionary object representing this object.
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
+
+ :param neutron: the neutron client for performing lookups
+ :return: the dictionary object
+ """
+ out = dict()
+
+ if self.description:
+ out['description'] = self.description
+ if self.direction:
+ out['direction'] = self.direction.name
+ if self.port_range_min:
+ out['port_range_min'] = self.port_range_min
+ if self.port_range_max:
+ out['port_range_max'] = self.port_range_max
+ if self.ethertype:
+ out['ethertype'] = self.ethertype.name
+ if self.protocol and self.protocol.value != 'null':
+ out['protocol'] = self.protocol.value
+ if self.sec_grp_name:
+ sec_grp = neutron_utils.get_security_group(
+ neutron, sec_grp_name=self.sec_grp_name)
+ if sec_grp:
+ out['security_group_id'] = sec_grp.id
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Cannot locate security group with name - ' +
+ self.sec_grp_name)
+ if self.remote_group_id:
+ out['remote_group_id'] = self.remote_group_id
+ if self.remote_ip_prefix:
+ out['remote_ip_prefix'] = self.remote_ip_prefix
+
+ return {'security_group_rule': out}
+
+ def rule_eq(self, rule):
+ """
+ Returns True if this setting created the rule
+ :param rule: the rule to evaluate
+ :return: T/F
+ """
+ if self.description is not None:
+ if rule.description is not None and rule.description != '':
+ return False
+ elif self.description != rule.description:
+ if rule.description != '':
+ return False
+
+ if self.direction.name != rule.direction:
+ return False
+
+ if self.ethertype and rule.ethertype:
+ if self.ethertype.name != rule.ethertype:
+ return False
+
+ if self.port_range_min and rule.port_range_min:
+ if self.port_range_min != rule.port_range_min:
+ return False
+
+ if self.port_range_max and rule.port_range_max:
+ if self.port_range_max != rule.port_range_max:
+ return False
+
+ if self.protocol and rule.protocol:
+ if self.protocol.name != rule.protocol:
+ return False
+
+ if self.remote_group_id and rule.remote_group_id:
+ if self.remote_group_id != rule.remote_group_id:
+ return False
+
+ if self.remote_ip_prefix and rule.remote_ip_prefix:
+ if self.remote_ip_prefix != rule.remote_ip_prefix:
+ return False
+
+ return True
+
+ def __eq__(self, other):
+ return (
+ self.description == other.description and
+ self.direction == other.direction and
+ self.port_range_min == other.port_range_min and
+ self.port_range_max == other.port_range_max and
+ self.ethertype == other.ethertype and
+ self.protocol == other.protocol and
+ self.sec_grp_name == other.sec_grp_name and
+ self.remote_group_id == other.remote_group_id and
+ self.remote_ip_prefix == other.remote_ip_prefix)
+
+ def __hash__(self):
+ return hash((self.sec_grp_name, self.description, self.direction,
+ self.remote_group_id,
+ self.protocol, self.ethertype, self.port_range_min,
+ self.port_range_max, self.remote_ip_prefix))
+
+
+def map_direction(direction):
+ """
+ Takes a the direction value maps it to the Direction enum. When None return
+ None
+ :param direction: the direction value
+ :return: the Direction enum object
+ :raise: Exception if value is invalid
+ """
+ if not direction:
+ return None
+ if isinstance(direction, Direction):
+ return direction
+ elif (isinstance(direction, str) or isinstance(direction, unicode)
+ or isinstance(direction, unicode)):
+ dir_str = str(direction)
+ if dir_str == 'egress':
+ return Direction.egress
+ elif dir_str == 'ingress':
+ return Direction.ingress
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Direction - ' + dir_str)
+ else:
+ return map_direction(direction.value)
+
+
+def map_protocol(protocol):
+ """
+ Takes a the protocol value maps it to the Protocol enum. When None return
+ None
+ :param protocol: the protocol value
+ :return: the Protocol enum object
+ :raise: Exception if value is invalid
+ """
+ if not protocol:
+ return None
+ elif isinstance(protocol, Protocol):
+ return protocol
+ elif (isinstance(protocol, str) or isinstance(protocol, unicode)
+ or isinstance(protocol, int)):
+ for proto_enum in Protocol:
+ if proto_enum.name == protocol or proto_enum.value == protocol:
+ if proto_enum == Protocol.any:
+ return Protocol.null
+ return proto_enum
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Protocol - ' + protocol)
+ else:
+ return map_protocol(protocol.value)
+
+
+def map_ethertype(ethertype):
+ """
+ Takes a the ethertype value maps it to the Ethertype enum. When None return
+ None
+ :param ethertype: the ethertype value
+ :return: the Ethertype enum object
+ :raise: Exception if value is invalid
+ """
+ if not ethertype:
+ return None
+ elif isinstance(ethertype, Ethertype):
+ return ethertype
+ elif (isinstance(ethertype, str) or isinstance(ethertype, unicode)
+ or isinstance(ethertype, int)):
+ eth_str = str(ethertype)
+ if eth_str == 'IPv6' or eth_str == '6':
+ return Ethertype.IPv6
+ elif eth_str == 'IPv4' or eth_str == '4':
+ return Ethertype.IPv4
+ else:
+ raise SecurityGroupRuleConfigError(
+ 'Invalid Ethertype - ' + eth_str)
+ else:
+ return map_ethertype(ethertype.value)
+
+
+class SecurityGroupRuleConfigError(Exception):
+ """
+ Exception to be thrown when security group rule settings attributes are
+ invalid
+ """
diff --git a/snaps/config/tests/security_group_tests.py b/snaps/config/tests/security_group_tests.py
new file mode 100644
index 0000000..8834836
--- /dev/null
+++ b/snaps/config/tests/security_group_tests.py
@@ -0,0 +1,187 @@
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+from snaps.config.security_group import (
+ Direction, SecurityGroupConfig, SecurityGroupRuleConfig,
+ SecurityGroupConfigError, Protocol, Ethertype,
+ SecurityGroupRuleConfigError)
+
+
+class SecurityGroupRuleConfigUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupRuleConfig class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig()
+
+ def test_empty_config(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(**dict())
+
+ def test_name_only(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(sec_grp_name='foo')
+
+ def test_config_with_name_only(self):
+ with self.assertRaises(SecurityGroupRuleConfigError):
+ SecurityGroupRuleConfig(**{'sec_grp_name': 'foo'})
+
+ def test_name_and_direction(self):
+ settings = SecurityGroupRuleConfig(sec_grp_name='foo',
+ direction=Direction.ingress)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+
+ def test_config_name_and_direction(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+
+ def test_proto_ah_str(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'ah'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.ah, settings.protocol)
+
+ def test_proto_ah_value(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 51})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.ah, settings.protocol)
+
+ def test_proto_any(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'any'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.null, settings.protocol)
+
+ def test_proto_null(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress',
+ 'protocol': 'null'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Protocol.null, settings.protocol)
+
+ def test_all(self):
+ settings = SecurityGroupRuleConfig(
+ sec_grp_name='foo', description='fubar',
+ direction=Direction.egress, remote_group_id='rgi',
+ protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
+ port_range_max=2,
+ remote_ip_prefix='prfx')
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
+
+ def test_config_all(self):
+ settings = SecurityGroupRuleConfig(
+ **{'sec_grp_name': 'foo',
+ 'description': 'fubar',
+ 'direction': 'egress',
+ 'remote_group_id': 'rgi',
+ 'protocol': 'tcp',
+ 'ethertype': 'IPv6',
+ 'port_range_min': 1,
+ 'port_range_max': 2,
+ 'remote_ip_prefix': 'prfx'})
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
+
+
+class SecurityGroupConfigUnitTests(unittest.TestCase):
+ """
+ Tests the construction of the SecurityGroupConfig class
+ """
+
+ def test_no_params(self):
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig()
+
+ def test_empty_config(self):
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig(**dict())
+
+ def test_name_only(self):
+ settings = SecurityGroupConfig(name='foo')
+ self.assertEqual('foo', settings.name)
+
+ def test_config_with_name_only(self):
+ settings = SecurityGroupConfig(**{'name': 'foo'})
+ self.assertEqual('foo', settings.name)
+
+ def test_invalid_rule(self):
+ rule_setting = SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_1')
+ with self.assertRaises(SecurityGroupConfigError):
+ SecurityGroupConfig(name='foo', rule_settings=[rule_setting])
+
+ def test_all(self):
+ rule_settings = list()
+ rule_settings.append(SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.egress,
+ description='test_rule_1'))
+ rule_settings.append(SecurityGroupRuleConfig(
+ sec_grp_name='bar', direction=Direction.ingress,
+ description='test_rule_2'))
+ settings = SecurityGroupConfig(
+ name='bar', description='fubar', project_name='foo',
+ rule_settings=rule_settings)
+
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
+
+ def test_config_all(self):
+ settings = SecurityGroupConfig(
+ **{'name': 'bar',
+ 'description': 'fubar',
+ 'project_name': 'foo',
+ 'rules': [
+ {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
+
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress,
+ settings.rule_settings[0].direction)
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py
index fcad584..7a20fe1 100644
--- a/snaps/openstack/create_security_group.py
+++ b/snaps/openstack/create_security_group.py
@@ -17,6 +17,8 @@ import logging
import enum
from neutronclient.common.exceptions import NotFound, Conflict
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig)
from snaps.openstack.openstack_creator import OpenStackNetworkObject
from snaps.openstack.utils import keystone_utils
from snaps.openstack.utils import neutron_utils
@@ -115,15 +117,15 @@ class OpenStackSecurityGroup(OpenStackNetworkObject):
def __generate_rule_setting(self, rule):
"""
- Creates a SecurityGroupRuleSettings object for a given rule
+ Creates a SecurityGroupRuleConfig object for a given rule
:param rule: the rule from which to create the
- SecurityGroupRuleSettings object
- :return: the newly instantiated SecurityGroupRuleSettings object
+ SecurityGroupRuleConfig object
+ :return: the newly instantiated SecurityGroupRuleConfig object
"""
sec_grp = neutron_utils.get_security_group_by_id(
self._neutron, rule.security_group_id)
- setting = SecurityGroupRuleSettings(
+ setting = SecurityGroupRuleConfig(
description=rule.description,
direction=rule.direction,
ethertype=rule.ethertype,
@@ -218,83 +220,24 @@ class OpenStackSecurityGroup(OpenStackNetworkObject):
return None
-class SecurityGroupSettings:
+class SecurityGroupSettings(SecurityGroupConfig):
"""
- Class representing a keypair configuration
+ Class to hold the configuration settings required for creating OpenStack
+ SecurityGroup objects
+ deprecated - use snaps.config.security_group.SecurityGroupConfig instead
"""
def __init__(self, **kwargs):
- """
- Constructor
- :param name: The security group's name (required)
- :param description: The security group's description (optional)
- :param project_name: The name of the project under which the security
- group will be created
- :param rule_settings: a list of SecurityGroupRuleSettings objects
- :return:
- """
- self.name = kwargs.get('name')
- self.description = kwargs.get('description')
- self.project_name = kwargs.get('project_name')
- self.rule_settings = list()
-
- rule_settings = kwargs.get('rules')
- if not rule_settings:
- rule_settings = kwargs.get('rule_settings')
-
- if rule_settings:
- for rule_setting in rule_settings:
- if isinstance(rule_setting, SecurityGroupRuleSettings):
- self.rule_settings.append(rule_setting)
- else:
- rule_setting['sec_grp_name'] = self.name
- self.rule_settings.append(SecurityGroupRuleSettings(
- **rule_setting))
-
- if not self.name:
- raise SecurityGroupSettingsError('The attribute name is required')
-
- for rule_setting in self.rule_settings:
- if rule_setting.sec_grp_name is not self.name:
- raise SecurityGroupSettingsError(
- 'Rule settings must correspond with the name of this '
- 'security group')
-
- def dict_for_neutron(self, keystone):
- """
- Returns a dictionary object representing this object.
- This is meant to be converted into JSON designed for use by the Neutron
- API
-
- TODO - expand automated testing to exercise all parameters
- :param keystone: the Keystone client
- :return: the dictionary object
- """
- out = dict()
-
- if self.name:
- out['name'] = self.name
- if self.description:
- out['description'] = self.description
- if self.project_name:
- project = keystone_utils.get_project(
- keystone=keystone, project_name=self.project_name)
- project_id = None
- if project:
- project_id = project.id
- if project_id:
- out['tenant_id'] = project_id
- else:
- raise SecurityGroupSettingsError(
- 'Could not find project ID for project named - ' +
- self.project_name)
-
- return {'security_group': out}
+ from warnings import warn
+ warn('Use snaps.config.security_group.SecurityGroupConfig instead',
+ DeprecationWarning)
+ super(self.__class__, self).__init__(**kwargs)
class Direction(enum.Enum):
"""
A rule's direction
+ deprecated - use snaps.config.security_group.Direction
"""
ingress = 'ingress'
egress = 'egress'
@@ -303,6 +246,7 @@ class Direction(enum.Enum):
class Protocol(enum.Enum):
"""
A rule's protocol
+ deprecated - use snaps.config.security_group.Protocol
"""
ah = 51
dccp = 33
@@ -333,249 +277,22 @@ class Protocol(enum.Enum):
class Ethertype(enum.Enum):
"""
A rule's ethertype
+ deprecated - use snaps.config.security_group.Ethertype
"""
IPv4 = 4
IPv6 = 6
-class SecurityGroupSettingsError(Exception):
- """
- Exception to be thrown when security group settings attributes are
- invalid
- """
-
-
-class SecurityGroupRuleSettings:
+class SecurityGroupRuleSettings(SecurityGroupRuleConfig):
"""
- Class representing a keypair configuration
+ Class to hold the configuration settings required for creating OpenStack
+ SecurityGroupRule objects
+ deprecated - use snaps.config.security_group.SecurityGroupRuleConfig
+ instead
"""
def __init__(self, **kwargs):
- """
- Constructor - all parameters are optional
- :param sec_grp_name: The security group's name on which to add the
- rule. (required)
- :param description: The rule's description
- :param direction: An enumeration of type
- create_security_group.RULE_DIRECTION (required)
- :param remote_group_id: The group ID to associate with this rule
- (this should be changed to group name once
- snaps support Groups) (optional)
- :param protocol: An enumeration of type
- create_security_group.RULE_PROTOCOL or a string value
- that will be mapped accordingly (optional)
- :param ethertype: An enumeration of type
- create_security_group.RULE_ETHERTYPE (optional)
- :param port_range_min: The minimum port number in the range that is
- matched by the security group rule. When the
- protocol is TCP or UDP, this value must be <=
- port_range_max.
- :param port_range_max: The maximum port number in the range that is
- matched by the security group rule. When the
- protocol is TCP or UDP, this value must be <=
- port_range_max.
- :param remote_ip_prefix: The remote IP prefix to associate with this
- metering rule packet (optional)
-
- TODO - Need to support the tenant...
- """
-
- self.description = kwargs.get('description')
- self.sec_grp_name = kwargs.get('sec_grp_name')
- self.remote_group_id = kwargs.get('remote_group_id')
- self.direction = None
- if kwargs.get('direction'):
- self.direction = map_direction(kwargs['direction'])
-
- self.protocol = None
- if kwargs.get('protocol'):
- self.protocol = map_protocol(kwargs['protocol'])
- else:
- self.protocol = Protocol.null
-
- self.ethertype = None
- if kwargs.get('ethertype'):
- self.ethertype = map_ethertype(kwargs['ethertype'])
-
- self.port_range_min = kwargs.get('port_range_min')
- self.port_range_max = kwargs.get('port_range_max')
- self.remote_ip_prefix = kwargs.get('remote_ip_prefix')
-
- if not self.direction or not self.sec_grp_name:
- raise SecurityGroupRuleSettingsError(
- 'direction and sec_grp_name are required')
-
- def dict_for_neutron(self, neutron):
- """
- Returns a dictionary object representing this object.
- This is meant to be converted into JSON designed for use by the Neutron
- API
-
- :param neutron: the neutron client for performing lookups
- :return: the dictionary object
- """
- out = dict()
-
- if self.description:
- out['description'] = self.description
- if self.direction:
- out['direction'] = self.direction.name
- if self.port_range_min:
- out['port_range_min'] = self.port_range_min
- if self.port_range_max:
- out['port_range_max'] = self.port_range_max
- if self.ethertype:
- out['ethertype'] = self.ethertype.name
- if self.protocol and self.protocol.value != 'null':
- out['protocol'] = self.protocol.value
- if self.sec_grp_name:
- sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name=self.sec_grp_name)
- if sec_grp:
- out['security_group_id'] = sec_grp.id
- else:
- raise SecurityGroupRuleSettingsError(
- 'Cannot locate security group with name - ' +
- self.sec_grp_name)
- if self.remote_group_id:
- out['remote_group_id'] = self.remote_group_id
- if self.remote_ip_prefix:
- out['remote_ip_prefix'] = self.remote_ip_prefix
-
- return {'security_group_rule': out}
-
- def rule_eq(self, rule):
- """
- Returns True if this setting created the rule
- :param rule: the rule to evaluate
- :return: T/F
- """
- if self.description is not None:
- if (rule.description is not None and
- rule.description != ''):
- return False
- elif self.description != rule.description:
- if rule.description != '':
- return False
-
- if self.direction.name != rule.direction:
- return False
-
- if self.ethertype and rule.ethertype:
- if self.ethertype.name != rule.ethertype:
- return False
-
- if self.port_range_min and rule.port_range_min:
- if self.port_range_min != rule.port_range_min:
- return False
-
- if self.port_range_max and rule.port_range_max:
- if self.port_range_max != rule.port_range_max:
- return False
-
- if self.protocol and rule.protocol:
- if self.protocol.name != rule.protocol:
- return False
-
- if self.remote_group_id and rule.remote_group_id:
- if self.remote_group_id != rule.remote_group_id:
- return False
-
- if self.remote_ip_prefix and rule.remote_ip_prefix:
- if self.remote_ip_prefix != rule.remote_ip_prefix:
- return False
-
- return True
-
- def __eq__(self, other):
- return (
- self.description == other.description and
- self.direction == other.direction and
- self.port_range_min == other.port_range_min and
- self.port_range_max == other.port_range_max and
- self.ethertype == other.ethertype and
- self.protocol == other.protocol and
- self.sec_grp_name == other.sec_grp_name and
- self.remote_group_id == other.remote_group_id and
- self.remote_ip_prefix == other.remote_ip_prefix)
-
- def __hash__(self):
- return hash((self.sec_grp_name, self.description, self.direction,
- self.remote_group_id,
- self.protocol, self.ethertype, self.port_range_min,
- self.port_range_max, self.remote_ip_prefix))
-
-
-def map_direction(direction):
- """
- Takes a the direction value maps it to the Direction enum. When None return
- None
- :param direction: the direction value
- :return: the Direction enum object
- :raise: Exception if value is invalid
- """
- if not direction:
- return None
- if isinstance(direction, Direction):
- return direction
- else:
- dir_str = str(direction)
- if dir_str == 'egress':
- return Direction.egress
- elif dir_str == 'ingress':
- return Direction.ingress
- else:
- raise SecurityGroupRuleSettingsError(
- 'Invalid Direction - ' + dir_str)
-
-
-def map_protocol(protocol):
- """
- Takes a the protocol value maps it to the Protocol enum. When None return
- None
- :param protocol: the protocol value
- :return: the Protocol enum object
- :raise: Exception if value is invalid
- """
- if not protocol:
- return None
- elif isinstance(protocol, Protocol):
- return protocol
- else:
- for proto_enum in Protocol:
- if proto_enum.name == protocol or proto_enum.value == protocol:
- if proto_enum == Protocol.any:
- return Protocol.null
- return proto_enum
- raise SecurityGroupRuleSettingsError(
- 'Invalid Protocol - ' + protocol)
-
-
-def map_ethertype(ethertype):
- """
- Takes a the ethertype value maps it to the Ethertype enum. When None return
- None
- :param ethertype: the ethertype value
- :return: the Ethertype enum object
- :raise: Exception if value is invalid
- """
- if not ethertype:
- return None
- elif isinstance(ethertype, Ethertype):
- return ethertype
- else:
- eth_str = str(ethertype)
- if eth_str == 'IPv6':
- return Ethertype.IPv6
- elif eth_str == 'IPv4':
- return Ethertype.IPv4
- else:
- raise SecurityGroupRuleSettingsError(
- 'Invalid Ethertype - ' + eth_str)
-
-
-class SecurityGroupRuleSettingsError(Exception):
- """
- Exception to be thrown when security group rule settings attributes are
- invalid
- """
+ from warnings import warn
+ warn('Use snaps.config.security_group.SecurityGroupRuleConfig instead',
+ DeprecationWarning)
+ super(self.__class__, self).__init__(**kwargs)
diff --git a/snaps/openstack/create_stack.py b/snaps/openstack/create_stack.py
index fa0a0c5..f0e1527 100644
--- a/snaps/openstack/create_stack.py
+++ b/snaps/openstack/create_stack.py
@@ -245,7 +245,7 @@ class OpenStackHeatStack(OpenStackCloudObject, object):
self.__heat_cli, neutron, self.__stack)
for stack_security_group in stack_security_groups:
- settings = settings_utils.create_security_group_settings(
+ settings = settings_utils.create_security_group_config(
neutron, stack_security_group)
creator = OpenStackSecurityGroup(self._os_creds, settings)
out.append(creator)
diff --git a/snaps/openstack/tests/create_instance_tests.py b/snaps/openstack/tests/create_instance_tests.py
index be83879..4a516d3 100644
--- a/snaps/openstack/tests/create_instance_tests.py
+++ b/snaps/openstack/tests/create_instance_tests.py
@@ -29,6 +29,8 @@ from snaps.config.image import ImageConfig
from snaps.config.keypair import KeypairConfig
from snaps.config.network import PortConfig, NetworkConfig, SubnetConfig
from snaps.config.router import RouterConfig
+from snaps.config.security_group import (
+ Protocol, SecurityGroupRuleConfig, Direction, SecurityGroupConfig)
from snaps.config.vm_inst import (
VmInstanceConfig, FloatingIpConfig, VmInstanceConfigError,
FloatingIpConfigError)
@@ -41,9 +43,7 @@ from snaps.openstack.create_instance import (
from snaps.openstack.create_keypairs import OpenStackKeypair
from snaps.openstack.create_network import OpenStackNetwork
from snaps.openstack.create_router import OpenStackRouter
-from snaps.openstack.create_security_group import (
- SecurityGroupSettings, OpenStackSecurityGroup, SecurityGroupRuleSettings,
- Direction, Protocol)
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.create_volume import OpenStackVolume
from snaps.openstack.tests import openstack_tests, validation_utils
from snaps.openstack.tests.os_source_file_test import (
@@ -585,18 +585,16 @@ class CreateInstanceSingleNetworkTests(OSIntegrationTestCase):
self.keypair_creator.create()
sec_grp_name = guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
except Exception as e:
self.tearDown()
@@ -864,18 +862,16 @@ class CreateInstanceIPv6NetworkTests(OSIntegrationTestCase):
self.keypair_creator.create()
sec_grp_name = self.guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
except Exception as e:
self.tearDown()
@@ -1554,18 +1550,16 @@ class CreateInstancePubPrivNetTests(OSIntegrationTestCase):
self.keypair_creator.create()
sec_grp_name = self.guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
except:
self.tearDown()
@@ -1809,8 +1803,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
# Create security group object to add to instance
- sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name',
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group')
sec_grp_creator = OpenStackSecurityGroup(self.os_creds,
sec_grp_settings)
sec_grp = sec_grp_creator.create()
@@ -1843,8 +1837,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
self.assertIsNotNone(vm_inst)
# Create security group object to add to instance
- sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name',
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group')
sec_grp_creator = OpenStackSecurityGroup(self.os_creds,
sec_grp_settings)
sec_grp = sec_grp_creator.create()
@@ -1868,8 +1862,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
instance.
"""
# Create security group object to add to instance
- sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name',
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group')
sec_grp_creator = OpenStackSecurityGroup(self.os_creds,
sec_grp_settings)
sec_grp = sec_grp_creator.create()
@@ -1904,8 +1898,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
place.
"""
# Create security group object to add to instance
- sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name',
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group')
sec_grp_creator = OpenStackSecurityGroup(self.os_creds,
sec_grp_settings)
sec_grp = sec_grp_creator.create()
@@ -1939,8 +1933,8 @@ class InstanceSecurityGroupTests(OSIntegrationTestCase):
instance.
"""
# Create security group object to add to instance
- sec_grp_settings = SecurityGroupSettings(name=self.guid + '-name',
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group')
sec_grp_creator = OpenStackSecurityGroup(self.os_creds,
sec_grp_settings)
sec_grp = sec_grp_creator.create()
@@ -2739,13 +2733,13 @@ class CreateInstanceTwoNetTests(OSIntegrationTestCase):
self.flavor_creator.create()
sec_grp_name = self.guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1]))
self.sec_grp_creator.create()
except:
self.tearDown()
diff --git a/snaps/openstack/tests/create_project_tests.py b/snaps/openstack/tests/create_project_tests.py
index d7d4dcf..2c10311 100644
--- a/snaps/openstack/tests/create_project_tests.py
+++ b/snaps/openstack/tests/create_project_tests.py
@@ -17,13 +17,13 @@ import uuid
from keystoneclient.exceptions import BadRequest
+from snaps.config.security_group import SecurityGroupConfig
from snaps.config.user import UserConfig
from snaps.config.project import ProjectConfigError, ProjectConfig
from snaps.domain.project import ComputeQuotas, NetworkQuotas
from snaps.openstack.create_project import (
OpenStackProject, ProjectSettings)
from snaps.openstack.create_security_group import OpenStackSecurityGroup
-from snaps.openstack.create_security_group import SecurityGroupSettings
from snaps.openstack.create_user import OpenStackUser
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import keystone_utils, nova_utils, neutron_utils
@@ -284,7 +284,7 @@ class CreateProjectUserTests(OSComponentTestCase):
sec_grp_os_creds = user_creator.get_os_creds(
self.project_creator.get_project().name)
sec_grp_creator = OpenStackSecurityGroup(
- sec_grp_os_creds, SecurityGroupSettings(
+ sec_grp_os_creds, SecurityGroupConfig(
name=self.guid + '-name', description='hello group'))
sec_grp = sec_grp_creator.create()
self.assertIsNotNone(sec_grp)
@@ -327,8 +327,8 @@ class CreateProjectUserTests(OSComponentTestCase):
sec_grp_creator = OpenStackSecurityGroup(
sec_grp_os_creds,
- SecurityGroupSettings(name=self.guid + '-name',
- description='hello group'))
+ SecurityGroupConfig(
+ name=self.guid + '-name', description='hello group'))
sec_grp = sec_grp_creator.create()
self.assertIsNotNone(sec_grp)
self.sec_grp_creators.append(sec_grp_creator)
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index ed62548..090d736 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -15,10 +15,13 @@
import unittest
import uuid
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig,
+ SecurityGroupRuleConfigError, SecurityGroupConfigError)
from snaps.openstack import create_security_group
from snaps.openstack.create_security_group import (
SecurityGroupSettings, SecurityGroupRuleSettings, Direction, Ethertype,
- Protocol, SecurityGroupRuleSettingsError, SecurityGroupSettingsError)
+ Protocol)
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
@@ -32,64 +35,64 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings()
def test_empty_config(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(**dict())
def test_name_only(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(sec_grp_name='foo')
def test_config_with_name_only(self):
- with self.assertRaises(SecurityGroupRuleSettingsError):
+ with self.assertRaises(SecurityGroupRuleConfigError):
SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
def test_name_and_direction(self):
settings = SecurityGroupRuleSettings(sec_grp_name='foo',
direction=Direction.ingress)
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
def test_config_name_and_direction(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress'})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
def test_proto_ah_str(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress',
'protocol': 'ah'})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
- self.assertEqual(Protocol.ah, settings.protocol)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
def test_proto_ah_value(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress',
'protocol': 51})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
- self.assertEqual(Protocol.ah, settings.protocol)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.ah.value, settings.protocol.value)
def test_proto_any(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress',
'protocol': 'any'})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
- self.assertEqual(Protocol.null, settings.protocol)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
def test_proto_null(self):
settings = SecurityGroupRuleSettings(
**{'sec_grp_name': 'foo', 'direction': 'ingress',
'protocol': 'null'})
self.assertEqual('foo', settings.sec_grp_name)
- self.assertEqual(Direction.ingress, settings.direction)
- self.assertEqual(Protocol.null, settings.protocol)
+ self.assertEqual(Direction.ingress.value, settings.direction.value)
+ self.assertEqual(Protocol.null.value, settings.protocol.value)
def test_all(self):
settings = SecurityGroupRuleSettings(
@@ -100,10 +103,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
remote_ip_prefix='prfx')
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
- self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
self.assertEqual('rgi', settings.remote_group_id)
- self.assertEqual(Protocol.icmp, settings.protocol)
- self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(Protocol.icmp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
self.assertEqual(1, settings.port_range_min)
self.assertEqual(2, settings.port_range_max)
self.assertEqual('prfx', settings.remote_ip_prefix)
@@ -121,10 +124,10 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
'remote_ip_prefix': 'prfx'})
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
- self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual(Direction.egress.value, settings.direction.value)
self.assertEqual('rgi', settings.remote_group_id)
- self.assertEqual(Protocol.tcp, settings.protocol)
- self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(Protocol.tcp.value, settings.protocol.value)
+ self.assertEqual(Ethertype.IPv6.value, settings.ethertype.value)
self.assertEqual(1, settings.port_range_min)
self.assertEqual(2, settings.port_range_max)
self.assertEqual('prfx', settings.remote_ip_prefix)
@@ -136,11 +139,11 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
"""
def test_no_params(self):
- with self.assertRaises(SecurityGroupSettingsError):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings()
def test_empty_config(self):
- with self.assertRaises(SecurityGroupSettingsError):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings(**dict())
def test_name_only(self):
@@ -155,7 +158,7 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
rule_setting = SecurityGroupRuleSettings(
sec_grp_name='bar', direction=Direction.ingress,
description='test_rule_1')
- with self.assertRaises(SecurityGroupSettingsError):
+ with self.assertRaises(SecurityGroupConfigError):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
@@ -189,8 +192,8 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
self.assertEqual('foo', settings.project_name)
self.assertEqual(1, len(settings.rule_settings))
self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEqual(Direction.ingress,
- settings.rule_settings[0].direction)
+ self.assertEqual(Direction.ingress.value,
+ settings.rule_settings[0].direction.value)
class CreateSecurityGroupTests(OSIntegrationTestCase):
@@ -226,8 +229,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
@@ -254,7 +257,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
project_name=self.admin_os_creds.project_name)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -283,7 +286,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
project_name=self.os_creds.project_name)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -312,8 +315,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
+ description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.os_creds, sec_grp_settings)
created_sec_grp = self.sec_grp_creator.create()
@@ -339,10 +342,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
description='test_rule_1'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -372,12 +375,12 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv4,
port_range_min=10, port_range_max=20,
description='test_rule_1'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -407,21 +410,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv6,
description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv4,
port_range_min=10, port_range_max=20,
description='test_rule_3'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -451,10 +454,10 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
description='test_rule_1'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -480,7 +483,7 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
rules)
- self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
+ self.sec_grp_creator.add_rule(SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
direction=Direction.egress, protocol=Protocol.icmp,
description='test_rule_2'))
@@ -496,21 +499,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv6,
description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv4,
port_range_min=10, port_range_max=20,
description='test_rule_3'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -547,21 +550,21 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
# Create Image
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress,
description='test_rule_1'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv6,
description='test_rule_2'))
sec_grp_rule_settings.append(
- SecurityGroupRuleSettings(
+ SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.egress,
protocol=Protocol.udp, ethertype=Ethertype.IPv4,
port_range_min=10, port_range_max=20,
description='test_rule_3'))
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=sec_grp_rule_settings)
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
@@ -615,7 +618,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
this is the only means to tell if the rule is custom or defaulted by
OpenStack
:param neutron: the neutron client
- :param rule_settings: collection of SecurityGroupRuleSettings objects
+ :param rule_settings: collection of SecurityGroupRuleConfig objects
:param rules: a collection of SecurityGroupRule domain objects
:return: T/F
"""
@@ -646,7 +649,7 @@ def validate_sec_grp_rules(neutron, rule_settings, rules):
proto_str == str(rule_setting.protocol.value) and
rule.remote_group_id == rule_setting.remote_group_id and
rule.remote_ip_prefix == rule_setting.remote_ip_prefix and
- rule.security_group_id == sec_grp.id):
+ rule.security_group_id == sec_grp.id):
match = True
break
diff --git a/snaps/openstack/utils/neutron_utils.py b/snaps/openstack/utils/neutron_utils.py
index b59f811..eaceb37 100644
--- a/snaps/openstack/utils/neutron_utils.py
+++ b/snaps/openstack/utils/neutron_utils.py
@@ -579,7 +579,7 @@ def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None,
the security group will be used, else if the query parameters are None then
None will be returned
:param neutron: the client
- :param sec_grp_settings: an instance of SecurityGroupSettings config object
+ :param sec_grp_settings: an instance of SecurityGroupConfig object
:param sec_grp_name: the name of security group object to retrieve
:param project_id: the ID of the project/tentant object that owns the
secuity group to retrieve
diff --git a/snaps/openstack/utils/settings_utils.py b/snaps/openstack/utils/settings_utils.py
index 5433570..145b481 100644
--- a/snaps/openstack/utils/settings_utils.py
+++ b/snaps/openstack/utils/settings_utils.py
@@ -19,12 +19,12 @@ from snaps.config.flavor import FlavorConfig
from snaps.config.keypair import KeypairConfig
from snaps.config.network import SubnetConfig, PortConfig, NetworkConfig
from snaps.config.router import RouterConfig
+from snaps.config.security_group import (
+ SecurityGroupRuleConfig, SecurityGroupConfig)
from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
from snaps.config.volume import VolumeConfig
from snaps.config.volume_type import (
ControlLocation, VolumeTypeEncryptionConfig, VolumeTypeConfig)
-from snaps.openstack.create_security_group import (
- SecurityGroupSettings, SecurityGroupRuleSettings)
from snaps.openstack.utils import (
neutron_utils, nova_utils, heat_utils, glance_utils)
@@ -41,7 +41,7 @@ def create_network_config(neutron, network):
subnet_settings=create_subnet_config(neutron, network))
-def create_security_group_settings(neutron, security_group):
+def create_security_group_config(neutron, security_group):
"""
Returns a NetworkConfig object
:param neutron: the neutron client
@@ -52,7 +52,7 @@ def create_security_group_settings(neutron, security_group):
rule_settings = list()
for rule in rules:
- rule_settings.append(SecurityGroupRuleSettings(
+ rule_settings.append(SecurityGroupRuleConfig(
sec_grp_name=security_group.name, description=rule.description,
direction=rule.direction, ethertype=rule.ethertype,
port_range_min=rule.port_range_min,
@@ -60,7 +60,7 @@ def create_security_group_settings(neutron, security_group):
remote_group_id=rule.remote_group_id,
remote_ip_prefix=rule.remote_ip_prefix))
- return SecurityGroupSettings(
+ return SecurityGroupConfig(
name=security_group.name, description=security_group.description,
rule_settings=rule_settings)
diff --git a/snaps/openstack/utils/tests/neutron_utils_tests.py b/snaps/openstack/utils/tests/neutron_utils_tests.py
index 44bc59f..38faf71 100644
--- a/snaps/openstack/utils/tests/neutron_utils_tests.py
+++ b/snaps/openstack/utils/tests/neutron_utils_tests.py
@@ -17,8 +17,8 @@ import uuid
from neutronclient.common.exceptions import NotFound, BadRequest
from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
-from snaps.openstack.create_security_group import (
- SecurityGroupSettings, SecurityGroupRuleSettings, Direction)
+from snaps.config.security_group import (
+ SecurityGroupConfig, SecurityGroupRuleConfig, Direction)
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
@@ -841,7 +841,7 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name)
+ sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
security_group = neutron_utils.create_security_group(self.neutron,
self.keystone,
sec_grp_settings)
@@ -861,13 +861,13 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
def test_create_sec_grp_no_name(self):
"""
- Tests the SecurityGroupSettings constructor and
+ Tests the SecurityGroupConfig constructor and
neutron_utils.create_security_group() function to ensure that
attempting to create a security group without a name will raise an
exception
"""
with self.assertRaises(Exception):
- sec_grp_settings = SecurityGroupSettings()
+ sec_grp_settings = SecurityGroupConfig()
self.security_groups.append(
neutron_utils.create_security_group(self.neutron,
self.keystone,
@@ -877,8 +877,8 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
"""
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
- description='hello group')
+ sec_grp_settings = SecurityGroupConfig(
+ name=self.sec_grp_name, description='hello group')
self.security_groups.append(
neutron_utils.create_security_group(self.neutron, self.keystone,
sec_grp_settings))
@@ -896,9 +896,9 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
Tests the neutron_utils.create_security_group() function
"""
- sec_grp_rule_settings = SecurityGroupRuleSettings(
+ sec_grp_rule_settings = SecurityGroupRuleConfig(
sec_grp_name=self.sec_grp_name, direction=Direction.ingress)
- sec_grp_settings = SecurityGroupSettings(
+ sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
rule_settings=[sec_grp_rule_settings])
@@ -939,12 +939,12 @@ class NeutronUtilsSecurityGroupTests(OSComponentTestCase):
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-1',
- description='hello group')))
+ SecurityGroupConfig(
+ name=self.sec_grp_name + '-1', description='hello group')))
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
- SecurityGroupSettings(name=self.sec_grp_name + '-2',
- description='hello group')))
+ SecurityGroupConfig(
+ name=self.sec_grp_name + '-2', description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0].id)
diff --git a/snaps/openstack/utils/tests/settings_utils_tests.py b/snaps/openstack/utils/tests/settings_utils_tests.py
index d0390e2..06062c5 100644
--- a/snaps/openstack/utils/tests/settings_utils_tests.py
+++ b/snaps/openstack/utils/tests/settings_utils_tests.py
@@ -22,6 +22,8 @@ from snaps.config.network import SubnetConfig, NetworkConfig, PortConfig
from snaps.config.flavor import FlavorConfig
from snaps.config.keypair import KeypairConfig
from snaps.config.qos import Consumer
+from snaps.config.security_group import (
+ SecurityGroupRuleConfig, Direction, Protocol, SecurityGroupConfig)
from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
from snaps.domain.flavor import Flavor
from snaps.domain.volume import (
@@ -31,9 +33,7 @@ from snaps.openstack import (
create_keypairs, create_instance)
from snaps.openstack.create_qos import Consumer
from snaps.openstack.create_network import OpenStackNetwork
-from snaps.openstack.create_security_group import (
- SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup,
- SecurityGroupSettings)
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
@@ -218,18 +218,16 @@ class SettingsUtilsVmInstTests(OSComponentTestCase):
# Create Security Group
sec_grp_name = guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
# Create instance
diff --git a/snaps/provisioning/tests/ansible_utils_tests.py b/snaps/provisioning/tests/ansible_utils_tests.py
index 0e55f1f..4f1f65e 100644
--- a/snaps/provisioning/tests/ansible_utils_tests.py
+++ b/snaps/provisioning/tests/ansible_utils_tests.py
@@ -22,6 +22,8 @@ from scp import SCPClient
from snaps.config.flavor import FlavorConfig
from snaps.config.keypair import KeypairConfig
from snaps.config.network import PortConfig
+from snaps.config.security_group import (
+ Direction, Protocol, SecurityGroupConfig, SecurityGroupRuleConfig)
from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
from snaps.openstack import create_flavor
@@ -30,9 +32,7 @@ from snaps.openstack import create_instance
from snaps.openstack import create_keypairs
from snaps.openstack import create_network
from snaps.openstack import create_router
-from snaps.openstack.create_security_group import (
- SecurityGroupRuleSettings, Direction, Protocol, OpenStackSecurityGroup,
- SecurityGroupSettings)
+from snaps.openstack.create_security_group import OpenStackSecurityGroup
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.create_instance_tests import check_dhcp_lease
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
@@ -121,18 +121,16 @@ class AnsibleProvisioningTests(OSIntegrationTestCase):
# Create Security Group
sec_grp_name = guid + '-sec-grp'
- rule1 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.icmp)
- rule2 = SecurityGroupRuleSettings(sec_grp_name=sec_grp_name,
- direction=Direction.ingress,
- protocol=Protocol.tcp,
- port_range_min=22,
- port_range_max=22)
+ rule1 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.icmp)
+ rule2 = SecurityGroupRuleConfig(
+ sec_grp_name=sec_grp_name, direction=Direction.ingress,
+ protocol=Protocol.tcp, port_range_min=22, port_range_max=22)
self.sec_grp_creator = OpenStackSecurityGroup(
self.os_creds,
- SecurityGroupSettings(name=sec_grp_name,
- rule_settings=[rule1, rule2]))
+ SecurityGroupConfig(
+ name=sec_grp_name, rule_settings=[rule1, rule2]))
self.sec_grp_creator.create()
# Create instance
diff --git a/snaps/test_suite_builder.py b/snaps/test_suite_builder.py
index 542b8ff..dba60f7 100644
--- a/snaps/test_suite_builder.py
+++ b/snaps/test_suite_builder.py
@@ -18,8 +18,10 @@ import unittest
from snaps.config.tests.network_tests import (
NetworkConfigUnitTests, SubnetConfigUnitTests, PortConfigUnitTests)
-from snaps.config.tests.vm_inst_tests import VmInstanceConfigUnitTests, \
- FloatingIpConfigUnitTests
+from snaps.config.tests.security_group_tests import (
+ SecurityGroupConfigUnitTests, SecurityGroupRuleConfigUnitTests)
+from snaps.config.tests.vm_inst_tests import (
+ VmInstanceConfigUnitTests, FloatingIpConfigUnitTests)
from snaps.config.tests.volume_tests import VolumeConfigUnitTests
from snaps.config.tests.volume_type_tests import VolumeTypeConfigUnitTests
from snaps.config.tests.qos_tests import QoSConfigUnitTests
@@ -135,14 +137,18 @@ def add_unit_tests(suite):
"""
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(FileUtilsTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
- SecurityGroupRuleSettingsUnitTests))
- suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
ProxySettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
OSCredsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ SecurityGroupConfigUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
SecurityGroupSettingsUnitTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ SecurityGroupRuleConfigUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
+ SecurityGroupRuleSettingsUnitTests))
+ suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
SecurityGroupDomainObjectTests))
suite.addTest(unittest.TestLoader().loadTestsFromTestCase(
SecurityGroupRuleDomainObjectTests))
@@ -391,6 +397,10 @@ def add_openstack_api_tests(suite, os_creds, ext_net_name, use_keystone=True,
ext_net_name=ext_net_name, log_level=log_level,
image_metadata=image_metadata))
suite.addTest(OSComponentTestCase.parameterize(
+ HeatUtilsVolumeTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, log_level=log_level,
+ image_metadata=image_metadata))
+ suite.addTest(OSComponentTestCase.parameterize(
CinderUtilsQoSTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level,
image_metadata=image_metadata))
@@ -614,6 +624,11 @@ def add_openstack_integration_tests(suite, os_creds, ext_net_name,
flavor_metadata=flavor_metadata, image_metadata=image_metadata,
log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
+ CreateInstancePubPrivNetTests, os_creds=os_creds,
+ ext_net_name=ext_net_name, use_keystone=use_keystone,
+ flavor_metadata=flavor_metadata, image_metadata=image_metadata,
+ log_level=log_level))
+ suite.addTest(OSIntegrationTestCase.parameterize(
AnsibleProvisioningTests, os_creds=os_creds,
ext_net_name=ext_net_name, use_keystone=use_keystone,
flavor_metadata=flavor_metadata, image_metadata=image_metadata,
@@ -699,14 +714,8 @@ def add_openstack_staging_tests(suite, os_creds, ext_net_name,
CreateInstanceMockOfflineTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSIntegrationTestCase.parameterize(
- CreateInstancePubPrivNetTests, os_creds=os_creds,
- ext_net_name=ext_net_name, log_level=log_level))
- suite.addTest(OSIntegrationTestCase.parameterize(
CreateInstanceIPv6NetworkTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level))
suite.addTest(OSComponentTestCase.parameterize(
- HeatUtilsVolumeTests, os_creds=os_creds,
- ext_net_name=ext_net_name, log_level=log_level))
- suite.addTest(OSComponentTestCase.parameterize(
MagnumSmokeTests, os_creds=os_creds,
ext_net_name=ext_net_name, log_level=log_level))