summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Pisarski <s.pisarski@cablelabs.com>2017-06-21 13:35:49 +0000
committerGerrit Code Review <gerrit@opnfv.org>2017-06-21 13:35:49 +0000
commit2e0e2c683e2329084f3e7eea6ed5d9ab5ed56661 (patch)
tree14588ed3bdef56e1d677eeb519bb86665402458e
parentbc69f7d3a13d111f537aab754598baf256b25839 (diff)
parentce3c297aedc17e30ccdc61b96619d1f80ee0b231 (diff)
Merge "Changes SecurityGroupSettings & SecurityGroupRuleSettings constructor to use kwargs."
-rw-r--r--snaps/openstack/create_security_group.py249
-rw-r--r--snaps/openstack/tests/create_security_group_tests.py340
2 files changed, 341 insertions, 248 deletions
diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py
index 4e98e82..49cc67b 100644
--- a/snaps/openstack/create_security_group.py
+++ b/snaps/openstack/create_security_group.py
@@ -16,8 +16,8 @@ import logging
import enum
from neutronclient.common.exceptions import NotFound
-from snaps.openstack.utils import neutron_utils
from snaps.openstack.utils import keystone_utils
+from snaps.openstack.utils import neutron_utils
__author__ = 'spisarski'
@@ -49,22 +49,26 @@ class OpenStackSecurityGroup:
def create(self, cleanup=False):
"""
Responsible for creating the security group.
- :param cleanup: Denotes whether or not this is being called for cleanup or not
+ :param cleanup: Denotes whether or not this is being called for cleanup
:return: the OpenStack security group object
"""
self.__neutron = neutron_utils.neutron_client(self.__os_creds)
self.__keystone = keystone_utils.keystone_client(self.__os_creds)
- logger.info('Creating security group %s...' % self.sec_grp_settings.name)
+ logger.info(
+ 'Creating security group %s...' % self.sec_grp_settings.name)
- self.__security_group = neutron_utils.get_security_group(self.__neutron, self.sec_grp_settings.name)
+ self.__security_group = neutron_utils.get_security_group(
+ self.__neutron, self.sec_grp_settings.name)
if not self.__security_group and not cleanup:
# Create the security group
- self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone,
- self.sec_grp_settings)
+ self.__security_group = neutron_utils.create_security_group(
+ self.__neutron, self.__keystone,
+ self.sec_grp_settings)
# Get the rules added for free
- auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group)
+ auto_rules = neutron_utils.get_rules_by_security_group(
+ self.__neutron, self.__security_group)
ctr = 0
for auto_rule in auto_rules:
@@ -74,14 +78,17 @@ class OpenStackSecurityGroup:
# Create the custom rules
for sec_grp_rule_setting in self.sec_grp_settings.rule_settings:
- custom_rule = neutron_utils.create_security_group_rule(self.__neutron, sec_grp_rule_setting)
+ custom_rule = neutron_utils.create_security_group_rule(
+ self.__neutron, sec_grp_rule_setting)
self.__rules[sec_grp_rule_setting] = custom_rule
- # Refresh security group object to reflect the new rules added to it
- self.__security_group = neutron_utils.get_security_group(self.__neutron, self.sec_grp_settings.name)
+ # Refresh security group object to reflect the new rules added
+ self.__security_group = neutron_utils.get_security_group(
+ self.__neutron, self.sec_grp_settings.name)
else:
# Populate rules
- existing_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group)
+ existing_rules = neutron_utils.get_rules_by_security_group(
+ self.__neutron, self.__security_group)
for existing_rule in existing_rules:
# For Custom Rules
@@ -99,22 +106,27 @@ class OpenStackSecurityGroup:
def __generate_rule_setting(self, rule):
"""
Creates a SecurityGroupRuleSettings object for a given rule
- :param rule: the rule from which to create the SecurityGroupRuleSettings object
+ :param rule: the rule from which to create the
+ SecurityGroupRuleSettings object
:return: the newly instantiated SecurityGroupRuleSettings object
"""
rule_dict = rule['security_group_rule']
sec_grp_name = None
if rule_dict['security_group_id']:
- sec_grp = neutron_utils.get_security_group_by_id(self.__neutron, rule_dict['security_group_id'])
+ sec_grp = neutron_utils.get_security_group_by_id(
+ self.__neutron, rule_dict['security_group_id'])
if sec_grp:
sec_grp_name = sec_grp['security_group']['name']
- setting = SecurityGroupRuleSettings(description=rule_dict['description'],
- direction=rule_dict['direction'], ethertype=rule_dict['ethertype'],
- port_range_min=rule_dict['port_range_min'],
- port_range_max=rule_dict['port_range_max'], protocol=rule_dict['protocol'],
- remote_group_id=rule_dict['remote_group_id'],
- remote_ip_prefix=rule_dict['remote_ip_prefix'], sec_grp_name=sec_grp_name)
+ setting = SecurityGroupRuleSettings(
+ description=rule_dict['description'],
+ direction=rule_dict['direction'], ethertype=rule_dict['ethertype'],
+ port_range_min=rule_dict['port_range_min'],
+ port_range_max=rule_dict['port_range_max'],
+ protocol=rule_dict['protocol'],
+ remote_group_id=rule_dict['remote_group_id'],
+ remote_ip_prefix=rule_dict['remote_ip_prefix'],
+ sec_grp_name=sec_grp_name)
return setting
def clean(self):
@@ -131,9 +143,11 @@ class OpenStackSecurityGroup:
if self.__security_group:
try:
- neutron_utils.delete_security_group(self.__neutron, self.__security_group)
+ neutron_utils.delete_security_group(self.__neutron,
+ self.__security_group)
except NotFound as e:
- logger.warning('Security Group not found, cannot delete - ' + str(e))
+ logger.warning(
+ 'Security Group not found, cannot delete - ' + str(e))
self.__security_group = None
@@ -157,25 +171,29 @@ class OpenStackSecurityGroup:
:param rule_setting: the rule configuration
"""
rule_setting.sec_grp_name = self.sec_grp_settings.name
- new_rule = neutron_utils.create_security_group_rule(self.__neutron, rule_setting)
+ new_rule = neutron_utils.create_security_group_rule(self.__neutron,
+ rule_setting)
self.__rules[rule_setting] = new_rule
self.sec_grp_settings.rule_settings.append(rule_setting)
def remove_rule(self, rule_id=None, rule_setting=None):
"""
- Removes a rule to this security group by id, name, or rule_setting object
+ Removes a rule to this security group by id, name, or rule_setting
+ object
:param rule_id: the rule's id
:param rule_setting: the rule's setting object
"""
rule_to_remove = None
if rule_id or rule_setting:
if rule_id:
- rule_to_remove = neutron_utils.get_rule_by_id(self.__neutron, self.__security_group, rule_id)
+ rule_to_remove = neutron_utils.get_rule_by_id(
+ self.__neutron, self.__security_group, rule_id)
elif rule_setting:
rule_to_remove = self.__rules.get(rule_setting)
if rule_to_remove:
- neutron_utils.delete_security_group_rule(self.__neutron, rule_to_remove)
+ neutron_utils.delete_security_group_rule(self.__neutron,
+ rule_to_remove)
rule_setting = self.__get_setting_from_rule(rule_to_remove)
if rule_setting:
self.__rules.pop(rule_setting)
@@ -199,42 +217,46 @@ class SecurityGroupSettings:
Class representing a keypair configuration
"""
- def __init__(self, config=None, name=None, description=None, project_name=None,
- rule_settings=list()):
+ def __init__(self, **kwargs):
"""
Constructor - all parameters are optional
- :param config: Should be a dict object containing the configuration settings using the attribute names below
- as each member's the key and overrides any of the other parameters.
:param name: The keypair name.
:param description: The security group's description
- :param project_name: The name of the project under which the security group will be created
+ :param project_name: The name of the project under which the security
+ group will be created
:return:
"""
- if config:
- self.name = config.get('name')
- self.description = config.get('description')
- self.project_name = config.get('project_name')
- self.rule_settings = list()
- if config.get('rules') and type(config['rules']) is list:
- for config_rule in config['rules']:
- self.rule_settings.append(SecurityGroupRuleSettings(config=config_rule))
- else:
- self.name = name
- self.description = description
- self.project_name = project_name
- self.rule_settings = rule_settings
+ self.name = kwargs.get('name')
+ self.description = kwargs.get('description')
+ self.project_name = kwargs.get('project_name')
+ self.rule_settings = list()
+
+ rule_settings = kwargs.get('rules')
+ if not rule_settings:
+ rule_settings = kwargs.get('rule_settings')
+
+ if rule_settings:
+ for rule_setting in rule_settings:
+ if isinstance(rule_setting, SecurityGroupRuleSettings):
+ self.rule_settings.append(rule_setting)
+ else:
+ self.rule_settings.append(SecurityGroupRuleSettings(
+ **rule_setting))
if not self.name:
raise Exception('The attribute name is required')
for rule_setting in self.rule_settings:
if rule_setting.sec_grp_name is not self.name:
- raise Exception('Rule settings must correspond with the name of this security group')
+ raise Exception(
+ 'Rule settings must correspond with the name of this '
+ 'security group')
def dict_for_neutron(self, keystone):
"""
Returns a dictionary object representing this object.
- This is meant to be converted into JSON designed for use by the Neutron API
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
TODO - expand automated testing to exercise all parameters
:param keystone: the Keystone client
@@ -254,7 +276,9 @@ class SecurityGroupSettings:
if project_id:
out['project_id'] = project_id
else:
- raise Exception('Could not find project ID for project named - ' + self.project_name)
+ raise Exception(
+ 'Could not find project ID for project named - ' +
+ self.project_name)
return {'security_group': out}
@@ -290,67 +314,62 @@ class SecurityGroupRuleSettings:
Class representing a keypair configuration
"""
- def __init__(self, config=None, sec_grp_name=None, description=None, direction=None,
- remote_group_id=None, protocol=None, ethertype=None, port_range_min=None, port_range_max=None,
- sec_grp_rule=None, remote_ip_prefix=None):
+ def __init__(self, **kwargs):
"""
Constructor - all parameters are optional
- :param config: Should be a dict object containing the configuration settings using the attribute names below
- as each member's the key and overrides any of the other parameters.
- :param sec_grp_name: The security group's name on which to add the rule. (required)
+ :param sec_grp_name: The security group's name on which to add the
+ rule. (required)
:param description: The rule's description
- :param direction: An enumeration of type create_security_group.RULE_DIRECTION (required)
- :param remote_group_id: The group ID to associate with this rule (this should be changed to group name
- once snaps support Groups) (optional)
- :param protocol: An enumeration of type create_security_group.RULE_PROTOCOL or a string value that will be
- mapped accordingly (optional)
- :param ethertype: An enumeration of type create_security_group.RULE_ETHERTYPE (optional)
- :param port_range_min: The minimum port number in the range that is matched by the security group rule. When
- the protocol is TCP or UDP, this value must be <= port_range_max. When the protocol is
- ICMP, this value must be an ICMP type.
- :param port_range_max: The maximum port number in the range that is matched by the security group rule. When
- the protocol is TCP or UDP, this value must be <= port_range_max. When the protocol is
- ICMP, this value must be an ICMP type.
- :param sec_grp_rule: The OpenStack rule object to a security group rule object to associate
- (note: Cannot be set using the config object nor can I see any real uses for this
- parameter)
- :param remote_ip_prefix: The remote IP prefix to associate with this metering rule packet (optional)
+ :param direction: An enumeration of type
+ create_security_group.RULE_DIRECTION (required)
+ :param remote_group_id: The group ID to associate with this rule
+ (this should be changed to group name once
+ snaps support Groups) (optional)
+ :param protocol: An enumeration of type
+ create_security_group.RULE_PROTOCOL or a string value
+ that will be mapped accordingly (optional)
+ :param ethertype: An enumeration of type
+ create_security_group.RULE_ETHERTYPE (optional)
+ :param port_range_min: The minimum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max. When the protocol is ICMP, this
+ value must be an ICMP type.
+ :param port_range_max: The maximum port number in the range that is
+ matched by the security group rule. When the
+ protocol is TCP or UDP, this value must be <=
+ port_range_max. When the protocol is ICMP, this
+ value must be an ICMP type.
+ :param sec_grp_rule: The OpenStack rule object to a security group rule
+ object to associate
+ (note: Cannot be set using the config object nor
+ can I see any real uses for this parameter)
+ :param remote_ip_prefix: The remote IP prefix to associate with this
+ metering rule packet (optional)
TODO - Need to support the tenant...
"""
- if config:
- self.description = config.get('description')
- self.sec_grp_name = config.get('sec_grp_name')
- self.remote_group_id = config.get('remote_group_id')
- self.direction = None
- if config.get('direction'):
- self.direction = map_direction(config['direction'])
-
- self.protocol = None
- if config.get('protocol'):
- self.protocol = map_protocol(config['protocol'])
- else:
- self.protocol = Protocol.null
+ self.description = kwargs.get('description')
+ self.sec_grp_name = kwargs.get('sec_grp_name')
+ self.remote_group_id = kwargs.get('remote_group_id')
+ self.direction = None
+ if kwargs.get('direction'):
+ self.direction = map_direction(kwargs['direction'])
- self.ethertype = None
- if config.get('ethertype'):
- self.ethertype = map_ethertype(config['ethertype'])
-
- self.port_range_min = config.get('port_range_min')
- self.port_range_max = config.get('port_range_max')
- self.remote_ip_prefix = config.get('remote_ip_prefix')
+ self.protocol = None
+ if kwargs.get('protocol'):
+ self.protocol = map_protocol(kwargs['protocol'])
else:
- self.description = description
- self.sec_grp_name = sec_grp_name
- self.remote_group_id = remote_group_id
- self.direction = map_direction(direction)
- self.protocol = map_protocol(protocol)
- self.ethertype = map_ethertype(ethertype)
- self.port_range_min = port_range_min
- self.port_range_max = port_range_max
- self.sec_grp_rule = sec_grp_rule
- self.remote_ip_prefix = remote_ip_prefix
+ self.protocol = Protocol.null
+
+ self.ethertype = None
+ if kwargs.get('ethertype'):
+ self.ethertype = map_ethertype(kwargs['ethertype'])
+
+ self.port_range_min = kwargs.get('port_range_min')
+ self.port_range_max = kwargs.get('port_range_max')
+ self.remote_ip_prefix = kwargs.get('remote_ip_prefix')
if not self.direction or not self.sec_grp_name:
raise Exception('direction and sec_grp_name are required')
@@ -358,7 +377,8 @@ class SecurityGroupRuleSettings:
def dict_for_neutron(self, neutron):
"""
Returns a dictionary object representing this object.
- This is meant to be converted into JSON designed for use by the Neutron API
+ This is meant to be converted into JSON designed for use by the Neutron
+ API
:param neutron: the neutron client for performing lookups
:return: the dictionary object
@@ -375,18 +395,19 @@ class SecurityGroupRuleSettings:
out['port_range_max'] = self.port_range_max
if self.ethertype:
out['ethertype'] = self.ethertype.name
- if self.protocol:
+ if self.protocol and self.protocol.name != 'null':
out['protocol'] = self.protocol.name
if self.sec_grp_name:
- sec_grp = neutron_utils.get_security_group(neutron, self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ neutron, self.sec_grp_name)
if sec_grp:
out['security_group_id'] = sec_grp['security_group']['id']
else:
- raise Exception('Cannot locate security group with name - ' + self.sec_grp_name)
+ raise Exception(
+ 'Cannot locate security group with name - ' +
+ self.sec_grp_name)
if self.remote_group_id:
out['remote_group_id'] = self.remote_group_id
- if self.sec_grp_rule:
- out['security_group_rule'] = self.sec_grp_rule
if self.remote_ip_prefix:
out['remote_ip_prefix'] = self.remote_ip_prefix
@@ -401,7 +422,8 @@ class SecurityGroupRuleSettings:
rule_dict = rule['security_group_rule']
if self.description is not None:
- if rule_dict['description'] is not None and rule_dict['description'] != '':
+ if rule_dict['description'] is not None and rule_dict[
+ 'description'] != '':
return False
elif self.description != rule_dict['description']:
if rule_dict['description'] != '':
@@ -445,18 +467,19 @@ class SecurityGroupRuleSettings:
self.protocol == other.protocol and \
self.sec_grp_name == other.sec_grp_name and \
self.remote_group_id == other.remote_group_id and \
- self.sec_grp_rule == other.sec_grp_rule and \
self.remote_ip_prefix == other.remote_ip_prefix
def __hash__(self):
- return hash((self.sec_grp_name, self.description, self.direction, self.remote_group_id,
- self.protocol, self.ethertype, self.port_range_min, self.port_range_max, self.sec_grp_rule,
- self.remote_ip_prefix))
+ return hash((self.sec_grp_name, self.description, self.direction,
+ self.remote_group_id,
+ self.protocol, self.ethertype, self.port_range_min,
+ self.port_range_max, self.remote_ip_prefix))
def map_direction(direction):
"""
- Takes a the direction value maps it to the Direction enum. When None return None
+ Takes a the direction value maps it to the Direction enum. When None return
+ None
:param direction: the direction value
:return: the Direction enum object
:raise: Exception if value is invalid
@@ -477,7 +500,8 @@ def map_direction(direction):
def map_protocol(protocol):
"""
- Takes a the protocol value maps it to the Protocol enum. When None return None
+ Takes a the protocol value maps it to the Protocol enum. When None return
+ None
:param protocol: the protocol value
:return: the Protocol enum object
:raise: Exception if value is invalid
@@ -502,7 +526,8 @@ def map_protocol(protocol):
def map_ethertype(ethertype):
"""
- Takes a the ethertype value maps it to the Ethertype enum. When None return None
+ Takes a the ethertype value maps it to the Ethertype enum. When None return
+ None
:param ethertype: the ethertype value
:return: the Ethertype enum object
:raise: Exception if value is invalid
diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py
index 6a3c0ef..8fd8a6a 100644
--- a/snaps/openstack/tests/create_security_group_tests.py
+++ b/snaps/openstack/tests/create_security_group_tests.py
@@ -12,12 +12,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import uuid
import unittest
+import uuid
from snaps.openstack import create_security_group
-from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \
- Ethertype, Protocol
+from snaps.openstack.create_security_group import (SecurityGroupSettings,
+ SecurityGroupRuleSettings,
+ Direction, Ethertype,
+ Protocol)
from snaps.openstack.tests import validation_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import neutron_utils
@@ -36,7 +38,7 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
def test_empty_config(self):
with self.assertRaises(Exception):
- SecurityGroupRuleSettings(config=dict())
+ SecurityGroupRuleSettings(**dict())
def test_name_only(self):
with self.assertRaises(Exception):
@@ -44,22 +46,26 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
def test_config_with_name_only(self):
with self.assertRaises(Exception):
- SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'})
+ SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'})
def test_name_and_direction(self):
- settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
+ settings = SecurityGroupRuleSettings(sec_grp_name='foo',
+ direction=Direction.ingress)
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual(Direction.ingress, settings.direction)
def test_config_name_and_direction(self):
- settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
+ settings = SecurityGroupRuleSettings(
+ **{'sec_grp_name': 'foo', 'direction': 'ingress'})
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual(Direction.ingress, settings.direction)
def test_all(self):
settings = SecurityGroupRuleSettings(
- sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
- protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
+ sec_grp_name='foo', description='fubar',
+ direction=Direction.egress, remote_group_id='rgi',
+ protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1,
+ port_range_max=2,
remote_ip_prefix='prfx')
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
@@ -73,15 +79,15 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase):
def test_config_all(self):
settings = SecurityGroupRuleSettings(
- config={'sec_grp_name': 'foo',
- 'description': 'fubar',
- 'direction': 'egress',
- 'remote_group_id': 'rgi',
- 'protocol': 'tcp',
- 'ethertype': 'IPv6',
- 'port_range_min': 1,
- 'port_range_max': 2,
- 'remote_ip_prefix': 'prfx'})
+ **{'sec_grp_name': 'foo',
+ 'description': 'fubar',
+ 'direction': 'egress',
+ 'remote_group_id': 'rgi',
+ 'protocol': 'tcp',
+ 'ethertype': 'IPv6',
+ 'port_range_min': 1,
+ 'port_range_max': 2,
+ 'remote_ip_prefix': 'prfx'})
self.assertEqual('foo', settings.sec_grp_name)
self.assertEqual('fubar', settings.description)
self.assertEqual(Direction.egress, settings.direction)
@@ -104,27 +110,31 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_empty_config(self):
with self.assertRaises(Exception):
- SecurityGroupSettings(config=dict())
+ SecurityGroupSettings(**dict())
def test_name_only(self):
settings = SecurityGroupSettings(name='foo')
self.assertEqual('foo', settings.name)
def test_config_with_name_only(self):
- settings = SecurityGroupSettings(config={'name': 'foo'})
+ settings = SecurityGroupSettings(**{'name': 'foo'})
self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
- rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
+ rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar',
+ direction=Direction.ingress)
with self.assertRaises(Exception):
SecurityGroupSettings(name='foo', rule_settings=[rule_setting])
def test_all(self):
rule_settings = list()
- rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress))
- rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress))
+ rule_settings.append(SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.egress))
+ rule_settings.append(SecurityGroupRuleSettings(
+ sec_grp_name='bar', direction=Direction.ingress))
settings = SecurityGroupSettings(
- name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
+ name='bar', description='fubar', project_name='foo',
+ rule_settings=rule_settings)
self.assertEqual('bar', settings.name)
self.assertEqual('fubar', settings.description)
@@ -134,17 +144,19 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_config_all(self):
settings = SecurityGroupSettings(
- config={'name': 'bar',
- 'description': 'fubar',
- 'project_name': 'foo',
- 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
+ **{'name': 'bar',
+ 'description': 'fubar',
+ 'project_name': 'foo',
+ 'rules': [
+ {'sec_grp_name': 'bar', 'direction': 'ingress'}]})
self.assertEqual('bar', settings.name)
self.assertEqual('fubar', settings.description)
self.assertEqual('foo', settings.project_name)
self.assertEqual(1, len(settings.rule_settings))
self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEqual(Direction.ingress, settings.rule_settings[0].direction)
+ self.assertEqual(Direction.ingress,
+ settings.rule_settings[0].direction)
class CreateSecurityGroupTests(OSIntegrationTestCase):
@@ -154,8 +166,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
def setUp(self):
"""
- Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file
- within OpenStack
+ Instantiates the CreateSecurityGroup object that is responsible for
+ downloading and creating an OS image file within OpenStack
"""
super(self.__class__, self).__start__()
@@ -180,176 +192,232 @@ class CreateSecurityGroupTests(OSIntegrationTestCase):
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
+ description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
self.assertIsNotNone(sec_grp)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
# Create Image
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group')
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name,
+ description='hello group')
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
created_sec_grp = self.sec_grp_creator.create()
self.assertIsNotNone(created_sec_grp)
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
- self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name))
+ self.assertIsNone(neutron_utils.get_security_group(
+ self.neutron, self.sec_grp_creator.sec_grp_settings.name))
self.sec_grp_creator.clean()
def test_create_group_with_one_simple_rule(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
def test_create_group_with_several_rules(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
def test_add_rule(self):
"""
- Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation.
+ Tests the creation of an OpenStack Security Group with one simple
+ custom rule then adds one after creation.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
-
- self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
- direction=Direction.egress, protocol=Protocol.icmp))
- rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(
+ sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
+ direction=Direction.egress, protocol=Protocol.icmp))
+ rules2 = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) + 1, len(rules2))
def test_remove_rule_by_id(self):
"""
- Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID.
+ Tests the creation of an OpenStack Security Group with two simple
+ custom rules then removes one by the rule ID.
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
-
- self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
- rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
+
+ self.sec_grp_creator.remove_rule(
+ rule_id=rules[0]['security_group_rule']['id'])
+ rules_after_del = neutron_utils.get_rules_by_security_group(
+ self.neutron,
+ self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
def test_remove_rule_by_setting(self):
"""
- Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule
- setting object
+ Tests the creation of an OpenStack Security Group with two simple
+ custom rules then removes one by the rule setting object
"""
# Create Image
sec_grp_rule_settings = list()
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.ingress))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv6))
- sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
- direction=Direction.egress,
- protocol=Protocol.udp,
- ethertype=Ethertype.IPv4,
- port_range_min=10,
- port_range_max=20))
- sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group',
- rule_settings=sec_grp_rule_settings)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings)
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.ingress))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv6))
+ sec_grp_rule_settings.append(
+ SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name,
+ direction=Direction.egress,
+ protocol=Protocol.udp,
+ ethertype=Ethertype.IPv4,
+ port_range_min=10,
+ port_range_max=20))
+ sec_grp_settings = SecurityGroupSettings(
+ name=self.sec_grp_name, description='hello group',
+ rule_settings=sec_grp_rule_settings)
+ self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
+ self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
- validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ sec_grp = neutron_utils.get_security_group(self.neutron,
+ self.sec_grp_name)
+ validation_utils.objects_equivalent(
+ self.sec_grp_creator.get_security_group(), sec_grp)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, self.sec_grp_creator.get_security_group())
self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
- validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
+ validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(),
+ rules)
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
- rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
- self.sec_grp_creator.get_security_group())
+ rules_after_del = neutron_utils.get_rules_by_security_group(
+ self.neutron,
+ self.sec_grp_creator.get_security_group())
self.assertEqual(len(rules) - 1, len(rules_after_del))
-# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex
+# TODO - Add more tests with different rules. Rule creation parameters can be
+# somewhat complex