From ce3c297aedc17e30ccdc61b96619d1f80ee0b231 Mon Sep 17 00:00:00 2001 From: spisarski Date: Tue, 20 Jun 2017 12:57:02 -0600 Subject: Changes SecurityGroupSettings & SecurityGroupRuleSettings constructor to use kwargs. And changed line lengths to 79 for pep8. JIRA: SNAPS-102 Change-Id: Ifa878ba01694eaa17ee05cea5dbbe9bc8bde3cd0 Signed-off-by: spisarski --- snaps/openstack/create_security_group.py | 249 ++++++++------- .../openstack/tests/create_security_group_tests.py | 340 ++++++++++++--------- 2 files changed, 341 insertions(+), 248 deletions(-) diff --git a/snaps/openstack/create_security_group.py b/snaps/openstack/create_security_group.py index 4e98e82..49cc67b 100644 --- a/snaps/openstack/create_security_group.py +++ b/snaps/openstack/create_security_group.py @@ -16,8 +16,8 @@ import logging import enum from neutronclient.common.exceptions import NotFound -from snaps.openstack.utils import neutron_utils from snaps.openstack.utils import keystone_utils +from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' @@ -49,22 +49,26 @@ class OpenStackSecurityGroup: def create(self, cleanup=False): """ Responsible for creating the security group. - :param cleanup: Denotes whether or not this is being called for cleanup or not + :param cleanup: Denotes whether or not this is being called for cleanup :return: the OpenStack security group object """ self.__neutron = neutron_utils.neutron_client(self.__os_creds) self.__keystone = keystone_utils.keystone_client(self.__os_creds) - logger.info('Creating security group %s...' % self.sec_grp_settings.name) + logger.info( + 'Creating security group %s...' % self.sec_grp_settings.name) - self.__security_group = neutron_utils.get_security_group(self.__neutron, self.sec_grp_settings.name) + self.__security_group = neutron_utils.get_security_group( + self.__neutron, self.sec_grp_settings.name) if not self.__security_group and not cleanup: # Create the security group - self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone, - self.sec_grp_settings) + self.__security_group = neutron_utils.create_security_group( + self.__neutron, self.__keystone, + self.sec_grp_settings) # Get the rules added for free - auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group) + auto_rules = neutron_utils.get_rules_by_security_group( + self.__neutron, self.__security_group) ctr = 0 for auto_rule in auto_rules: @@ -74,14 +78,17 @@ class OpenStackSecurityGroup: # Create the custom rules for sec_grp_rule_setting in self.sec_grp_settings.rule_settings: - custom_rule = neutron_utils.create_security_group_rule(self.__neutron, sec_grp_rule_setting) + custom_rule = neutron_utils.create_security_group_rule( + self.__neutron, sec_grp_rule_setting) self.__rules[sec_grp_rule_setting] = custom_rule - # Refresh security group object to reflect the new rules added to it - self.__security_group = neutron_utils.get_security_group(self.__neutron, self.sec_grp_settings.name) + # Refresh security group object to reflect the new rules added + self.__security_group = neutron_utils.get_security_group( + self.__neutron, self.sec_grp_settings.name) else: # Populate rules - existing_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group) + existing_rules = neutron_utils.get_rules_by_security_group( + self.__neutron, self.__security_group) for existing_rule in existing_rules: # For Custom Rules @@ -99,22 +106,27 @@ class OpenStackSecurityGroup: def __generate_rule_setting(self, rule): """ Creates a SecurityGroupRuleSettings object for a given rule - :param rule: the rule from which to create the SecurityGroupRuleSettings object + :param rule: the rule from which to create the + SecurityGroupRuleSettings object :return: the newly instantiated SecurityGroupRuleSettings object """ rule_dict = rule['security_group_rule'] sec_grp_name = None if rule_dict['security_group_id']: - sec_grp = neutron_utils.get_security_group_by_id(self.__neutron, rule_dict['security_group_id']) + sec_grp = neutron_utils.get_security_group_by_id( + self.__neutron, rule_dict['security_group_id']) if sec_grp: sec_grp_name = sec_grp['security_group']['name'] - setting = SecurityGroupRuleSettings(description=rule_dict['description'], - direction=rule_dict['direction'], ethertype=rule_dict['ethertype'], - port_range_min=rule_dict['port_range_min'], - port_range_max=rule_dict['port_range_max'], protocol=rule_dict['protocol'], - remote_group_id=rule_dict['remote_group_id'], - remote_ip_prefix=rule_dict['remote_ip_prefix'], sec_grp_name=sec_grp_name) + setting = SecurityGroupRuleSettings( + description=rule_dict['description'], + direction=rule_dict['direction'], ethertype=rule_dict['ethertype'], + port_range_min=rule_dict['port_range_min'], + port_range_max=rule_dict['port_range_max'], + protocol=rule_dict['protocol'], + remote_group_id=rule_dict['remote_group_id'], + remote_ip_prefix=rule_dict['remote_ip_prefix'], + sec_grp_name=sec_grp_name) return setting def clean(self): @@ -131,9 +143,11 @@ class OpenStackSecurityGroup: if self.__security_group: try: - neutron_utils.delete_security_group(self.__neutron, self.__security_group) + neutron_utils.delete_security_group(self.__neutron, + self.__security_group) except NotFound as e: - logger.warning('Security Group not found, cannot delete - ' + str(e)) + logger.warning( + 'Security Group not found, cannot delete - ' + str(e)) self.__security_group = None @@ -157,25 +171,29 @@ class OpenStackSecurityGroup: :param rule_setting: the rule configuration """ rule_setting.sec_grp_name = self.sec_grp_settings.name - new_rule = neutron_utils.create_security_group_rule(self.__neutron, rule_setting) + new_rule = neutron_utils.create_security_group_rule(self.__neutron, + rule_setting) self.__rules[rule_setting] = new_rule self.sec_grp_settings.rule_settings.append(rule_setting) def remove_rule(self, rule_id=None, rule_setting=None): """ - Removes a rule to this security group by id, name, or rule_setting object + Removes a rule to this security group by id, name, or rule_setting + object :param rule_id: the rule's id :param rule_setting: the rule's setting object """ rule_to_remove = None if rule_id or rule_setting: if rule_id: - rule_to_remove = neutron_utils.get_rule_by_id(self.__neutron, self.__security_group, rule_id) + rule_to_remove = neutron_utils.get_rule_by_id( + self.__neutron, self.__security_group, rule_id) elif rule_setting: rule_to_remove = self.__rules.get(rule_setting) if rule_to_remove: - neutron_utils.delete_security_group_rule(self.__neutron, rule_to_remove) + neutron_utils.delete_security_group_rule(self.__neutron, + rule_to_remove) rule_setting = self.__get_setting_from_rule(rule_to_remove) if rule_setting: self.__rules.pop(rule_setting) @@ -199,42 +217,46 @@ class SecurityGroupSettings: Class representing a keypair configuration """ - def __init__(self, config=None, name=None, description=None, project_name=None, - rule_settings=list()): + def __init__(self, **kwargs): """ Constructor - all parameters are optional - :param config: Should be a dict object containing the configuration settings using the attribute names below - as each member's the key and overrides any of the other parameters. :param name: The keypair name. :param description: The security group's description - :param project_name: The name of the project under which the security group will be created + :param project_name: The name of the project under which the security + group will be created :return: """ - if config: - self.name = config.get('name') - self.description = config.get('description') - self.project_name = config.get('project_name') - self.rule_settings = list() - if config.get('rules') and type(config['rules']) is list: - for config_rule in config['rules']: - self.rule_settings.append(SecurityGroupRuleSettings(config=config_rule)) - else: - self.name = name - self.description = description - self.project_name = project_name - self.rule_settings = rule_settings + self.name = kwargs.get('name') + self.description = kwargs.get('description') + self.project_name = kwargs.get('project_name') + self.rule_settings = list() + + rule_settings = kwargs.get('rules') + if not rule_settings: + rule_settings = kwargs.get('rule_settings') + + if rule_settings: + for rule_setting in rule_settings: + if isinstance(rule_setting, SecurityGroupRuleSettings): + self.rule_settings.append(rule_setting) + else: + self.rule_settings.append(SecurityGroupRuleSettings( + **rule_setting)) if not self.name: raise Exception('The attribute name is required') for rule_setting in self.rule_settings: if rule_setting.sec_grp_name is not self.name: - raise Exception('Rule settings must correspond with the name of this security group') + raise Exception( + 'Rule settings must correspond with the name of this ' + 'security group') def dict_for_neutron(self, keystone): """ Returns a dictionary object representing this object. - This is meant to be converted into JSON designed for use by the Neutron API + This is meant to be converted into JSON designed for use by the Neutron + API TODO - expand automated testing to exercise all parameters :param keystone: the Keystone client @@ -254,7 +276,9 @@ class SecurityGroupSettings: if project_id: out['project_id'] = project_id else: - raise Exception('Could not find project ID for project named - ' + self.project_name) + raise Exception( + 'Could not find project ID for project named - ' + + self.project_name) return {'security_group': out} @@ -290,67 +314,62 @@ class SecurityGroupRuleSettings: Class representing a keypair configuration """ - def __init__(self, config=None, sec_grp_name=None, description=None, direction=None, - remote_group_id=None, protocol=None, ethertype=None, port_range_min=None, port_range_max=None, - sec_grp_rule=None, remote_ip_prefix=None): + def __init__(self, **kwargs): """ Constructor - all parameters are optional - :param config: Should be a dict object containing the configuration settings using the attribute names below - as each member's the key and overrides any of the other parameters. - :param sec_grp_name: The security group's name on which to add the rule. (required) + :param sec_grp_name: The security group's name on which to add the + rule. (required) :param description: The rule's description - :param direction: An enumeration of type create_security_group.RULE_DIRECTION (required) - :param remote_group_id: The group ID to associate with this rule (this should be changed to group name - once snaps support Groups) (optional) - :param protocol: An enumeration of type create_security_group.RULE_PROTOCOL or a string value that will be - mapped accordingly (optional) - :param ethertype: An enumeration of type create_security_group.RULE_ETHERTYPE (optional) - :param port_range_min: The minimum port number in the range that is matched by the security group rule. When - the protocol is TCP or UDP, this value must be <= port_range_max. When the protocol is - ICMP, this value must be an ICMP type. - :param port_range_max: The maximum port number in the range that is matched by the security group rule. When - the protocol is TCP or UDP, this value must be <= port_range_max. When the protocol is - ICMP, this value must be an ICMP type. - :param sec_grp_rule: The OpenStack rule object to a security group rule object to associate - (note: Cannot be set using the config object nor can I see any real uses for this - parameter) - :param remote_ip_prefix: The remote IP prefix to associate with this metering rule packet (optional) + :param direction: An enumeration of type + create_security_group.RULE_DIRECTION (required) + :param remote_group_id: The group ID to associate with this rule + (this should be changed to group name once + snaps support Groups) (optional) + :param protocol: An enumeration of type + create_security_group.RULE_PROTOCOL or a string value + that will be mapped accordingly (optional) + :param ethertype: An enumeration of type + create_security_group.RULE_ETHERTYPE (optional) + :param port_range_min: The minimum port number in the range that is + matched by the security group rule. When the + protocol is TCP or UDP, this value must be <= + port_range_max. When the protocol is ICMP, this + value must be an ICMP type. + :param port_range_max: The maximum port number in the range that is + matched by the security group rule. When the + protocol is TCP or UDP, this value must be <= + port_range_max. When the protocol is ICMP, this + value must be an ICMP type. + :param sec_grp_rule: The OpenStack rule object to a security group rule + object to associate + (note: Cannot be set using the config object nor + can I see any real uses for this parameter) + :param remote_ip_prefix: The remote IP prefix to associate with this + metering rule packet (optional) TODO - Need to support the tenant... """ - if config: - self.description = config.get('description') - self.sec_grp_name = config.get('sec_grp_name') - self.remote_group_id = config.get('remote_group_id') - self.direction = None - if config.get('direction'): - self.direction = map_direction(config['direction']) - - self.protocol = None - if config.get('protocol'): - self.protocol = map_protocol(config['protocol']) - else: - self.protocol = Protocol.null + self.description = kwargs.get('description') + self.sec_grp_name = kwargs.get('sec_grp_name') + self.remote_group_id = kwargs.get('remote_group_id') + self.direction = None + if kwargs.get('direction'): + self.direction = map_direction(kwargs['direction']) - self.ethertype = None - if config.get('ethertype'): - self.ethertype = map_ethertype(config['ethertype']) - - self.port_range_min = config.get('port_range_min') - self.port_range_max = config.get('port_range_max') - self.remote_ip_prefix = config.get('remote_ip_prefix') + self.protocol = None + if kwargs.get('protocol'): + self.protocol = map_protocol(kwargs['protocol']) else: - self.description = description - self.sec_grp_name = sec_grp_name - self.remote_group_id = remote_group_id - self.direction = map_direction(direction) - self.protocol = map_protocol(protocol) - self.ethertype = map_ethertype(ethertype) - self.port_range_min = port_range_min - self.port_range_max = port_range_max - self.sec_grp_rule = sec_grp_rule - self.remote_ip_prefix = remote_ip_prefix + self.protocol = Protocol.null + + self.ethertype = None + if kwargs.get('ethertype'): + self.ethertype = map_ethertype(kwargs['ethertype']) + + self.port_range_min = kwargs.get('port_range_min') + self.port_range_max = kwargs.get('port_range_max') + self.remote_ip_prefix = kwargs.get('remote_ip_prefix') if not self.direction or not self.sec_grp_name: raise Exception('direction and sec_grp_name are required') @@ -358,7 +377,8 @@ class SecurityGroupRuleSettings: def dict_for_neutron(self, neutron): """ Returns a dictionary object representing this object. - This is meant to be converted into JSON designed for use by the Neutron API + This is meant to be converted into JSON designed for use by the Neutron + API :param neutron: the neutron client for performing lookups :return: the dictionary object @@ -375,18 +395,19 @@ class SecurityGroupRuleSettings: out['port_range_max'] = self.port_range_max if self.ethertype: out['ethertype'] = self.ethertype.name - if self.protocol: + if self.protocol and self.protocol.name != 'null': out['protocol'] = self.protocol.name if self.sec_grp_name: - sec_grp = neutron_utils.get_security_group(neutron, self.sec_grp_name) + sec_grp = neutron_utils.get_security_group( + neutron, self.sec_grp_name) if sec_grp: out['security_group_id'] = sec_grp['security_group']['id'] else: - raise Exception('Cannot locate security group with name - ' + self.sec_grp_name) + raise Exception( + 'Cannot locate security group with name - ' + + self.sec_grp_name) if self.remote_group_id: out['remote_group_id'] = self.remote_group_id - if self.sec_grp_rule: - out['security_group_rule'] = self.sec_grp_rule if self.remote_ip_prefix: out['remote_ip_prefix'] = self.remote_ip_prefix @@ -401,7 +422,8 @@ class SecurityGroupRuleSettings: rule_dict = rule['security_group_rule'] if self.description is not None: - if rule_dict['description'] is not None and rule_dict['description'] != '': + if rule_dict['description'] is not None and rule_dict[ + 'description'] != '': return False elif self.description != rule_dict['description']: if rule_dict['description'] != '': @@ -445,18 +467,19 @@ class SecurityGroupRuleSettings: self.protocol == other.protocol and \ self.sec_grp_name == other.sec_grp_name and \ self.remote_group_id == other.remote_group_id and \ - self.sec_grp_rule == other.sec_grp_rule and \ self.remote_ip_prefix == other.remote_ip_prefix def __hash__(self): - return hash((self.sec_grp_name, self.description, self.direction, self.remote_group_id, - self.protocol, self.ethertype, self.port_range_min, self.port_range_max, self.sec_grp_rule, - self.remote_ip_prefix)) + return hash((self.sec_grp_name, self.description, self.direction, + self.remote_group_id, + self.protocol, self.ethertype, self.port_range_min, + self.port_range_max, self.remote_ip_prefix)) def map_direction(direction): """ - Takes a the direction value maps it to the Direction enum. When None return None + Takes a the direction value maps it to the Direction enum. When None return + None :param direction: the direction value :return: the Direction enum object :raise: Exception if value is invalid @@ -477,7 +500,8 @@ def map_direction(direction): def map_protocol(protocol): """ - Takes a the protocol value maps it to the Protocol enum. When None return None + Takes a the protocol value maps it to the Protocol enum. When None return + None :param protocol: the protocol value :return: the Protocol enum object :raise: Exception if value is invalid @@ -502,7 +526,8 @@ def map_protocol(protocol): def map_ethertype(ethertype): """ - Takes a the ethertype value maps it to the Ethertype enum. When None return None + Takes a the ethertype value maps it to the Ethertype enum. When None return + None :param ethertype: the ethertype value :return: the Ethertype enum object :raise: Exception if value is invalid diff --git a/snaps/openstack/tests/create_security_group_tests.py b/snaps/openstack/tests/create_security_group_tests.py index 6a3c0ef..8fd8a6a 100644 --- a/snaps/openstack/tests/create_security_group_tests.py +++ b/snaps/openstack/tests/create_security_group_tests.py @@ -12,12 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import uuid import unittest +import uuid from snaps.openstack import create_security_group -from snaps.openstack.create_security_group import SecurityGroupSettings, SecurityGroupRuleSettings, Direction, \ - Ethertype, Protocol +from snaps.openstack.create_security_group import (SecurityGroupSettings, + SecurityGroupRuleSettings, + Direction, Ethertype, + Protocol) from snaps.openstack.tests import validation_utils from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase from snaps.openstack.utils import neutron_utils @@ -36,7 +38,7 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): def test_empty_config(self): with self.assertRaises(Exception): - SecurityGroupRuleSettings(config=dict()) + SecurityGroupRuleSettings(**dict()) def test_name_only(self): with self.assertRaises(Exception): @@ -44,22 +46,26 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): def test_config_with_name_only(self): with self.assertRaises(Exception): - SecurityGroupRuleSettings(config={'sec_grp_name': 'foo'}) + SecurityGroupRuleSettings(**{'sec_grp_name': 'foo'}) def test_name_and_direction(self): - settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress) + settings = SecurityGroupRuleSettings(sec_grp_name='foo', + direction=Direction.ingress) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual(Direction.ingress, settings.direction) def test_config_name_and_direction(self): - settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'}) + settings = SecurityGroupRuleSettings( + **{'sec_grp_name': 'foo', 'direction': 'ingress'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual(Direction.ingress, settings.direction) def test_all(self): settings = SecurityGroupRuleSettings( - sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi', - protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2, + sec_grp_name='foo', description='fubar', + direction=Direction.egress, remote_group_id='rgi', + protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, + port_range_max=2, remote_ip_prefix='prfx') self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) @@ -73,15 +79,15 @@ class SecurityGroupRuleSettingsUnitTests(unittest.TestCase): def test_config_all(self): settings = SecurityGroupRuleSettings( - config={'sec_grp_name': 'foo', - 'description': 'fubar', - 'direction': 'egress', - 'remote_group_id': 'rgi', - 'protocol': 'tcp', - 'ethertype': 'IPv6', - 'port_range_min': 1, - 'port_range_max': 2, - 'remote_ip_prefix': 'prfx'}) + **{'sec_grp_name': 'foo', + 'description': 'fubar', + 'direction': 'egress', + 'remote_group_id': 'rgi', + 'protocol': 'tcp', + 'ethertype': 'IPv6', + 'port_range_min': 1, + 'port_range_max': 2, + 'remote_ip_prefix': 'prfx'}) self.assertEqual('foo', settings.sec_grp_name) self.assertEqual('fubar', settings.description) self.assertEqual(Direction.egress, settings.direction) @@ -104,27 +110,31 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): def test_empty_config(self): with self.assertRaises(Exception): - SecurityGroupSettings(config=dict()) + SecurityGroupSettings(**dict()) def test_name_only(self): settings = SecurityGroupSettings(name='foo') self.assertEqual('foo', settings.name) def test_config_with_name_only(self): - settings = SecurityGroupSettings(config={'name': 'foo'}) + settings = SecurityGroupSettings(**{'name': 'foo'}) self.assertEqual('foo', settings.name) def test_invalid_rule(self): - rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress) + rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', + direction=Direction.ingress) with self.assertRaises(Exception): SecurityGroupSettings(name='foo', rule_settings=[rule_setting]) def test_all(self): rule_settings = list() - rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.egress)) - rule_settings.append(SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)) + rule_settings.append(SecurityGroupRuleSettings( + sec_grp_name='bar', direction=Direction.egress)) + rule_settings.append(SecurityGroupRuleSettings( + sec_grp_name='bar', direction=Direction.ingress)) settings = SecurityGroupSettings( - name='bar', description='fubar', project_name='foo', rule_settings=rule_settings) + name='bar', description='fubar', project_name='foo', + rule_settings=rule_settings) self.assertEqual('bar', settings.name) self.assertEqual('fubar', settings.description) @@ -134,17 +144,19 @@ class SecurityGroupSettingsUnitTests(unittest.TestCase): def test_config_all(self): settings = SecurityGroupSettings( - config={'name': 'bar', - 'description': 'fubar', - 'project_name': 'foo', - 'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]}) + **{'name': 'bar', + 'description': 'fubar', + 'project_name': 'foo', + 'rules': [ + {'sec_grp_name': 'bar', 'direction': 'ingress'}]}) self.assertEqual('bar', settings.name) self.assertEqual('fubar', settings.description) self.assertEqual('foo', settings.project_name) self.assertEqual(1, len(settings.rule_settings)) self.assertEqual('bar', settings.rule_settings[0].sec_grp_name) - self.assertEqual(Direction.ingress, settings.rule_settings[0].direction) + self.assertEqual(Direction.ingress, + settings.rule_settings[0].direction) class CreateSecurityGroupTests(OSIntegrationTestCase): @@ -154,8 +166,8 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): def setUp(self): """ - Instantiates the CreateSecurityGroup object that is responsible for downloading and creating an OS image file - within OpenStack + Instantiates the CreateSecurityGroup object that is responsible for + downloading and creating an OS image file within OpenStack """ super(self.__class__, self).__start__() @@ -180,176 +192,232 @@ class CreateSecurityGroupTests(OSIntegrationTestCase): Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, + description='hello group') + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) self.assertIsNotNone(sec_grp) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) def test_create_delete_group(self): """ Tests the creation of an OpenStack Security Group without custom rules. """ # Create Image - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group') - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, + description='hello group') + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) created_sec_grp = self.sec_grp_creator.create() self.assertIsNotNone(created_sec_grp) neutron_utils.delete_security_group(self.neutron, created_sec_grp) - self.assertIsNone(neutron_utils.get_security_group(self.neutron, self.sec_grp_creator.sec_grp_settings.name)) + self.assertIsNone(neutron_utils.get_security_group( + self.neutron, self.sec_grp_creator.sec_grp_settings.name)) self.sec_grp_creator.clean() def test_create_group_with_one_simple_rule(self): """ - Tests the creation of an OpenStack Security Group with one simple custom rule. + Tests the creation of an OpenStack Security Group with one simple + custom rule. """ # Create Image sec_grp_rule_settings = list() - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=sec_grp_rule_settings) - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) def test_create_group_with_several_rules(self): """ - Tests the creation of an OpenStack Security Group with one simple custom rule. + Tests the creation of an OpenStack Security Group with one simple + custom rule. """ # Create Image sec_grp_rule_settings = list() - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=sec_grp_rule_settings) - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) def test_add_rule(self): """ - Tests the creation of an OpenStack Security Group with one simple custom rule then adds one after creation. + Tests the creation of an OpenStack Security Group with one simple + custom rule then adds one after creation. """ # Create Image sec_grp_rule_settings = list() - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=sec_grp_rule_settings) - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - - self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, - direction=Direction.egress, protocol=Protocol.icmp)) - rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group()) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.sec_grp_creator.add_rule(SecurityGroupRuleSettings( + sec_grp_name=self.sec_grp_creator.sec_grp_settings.name, + direction=Direction.egress, protocol=Protocol.icmp)) + rules2 = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) + 1, len(rules2)) def test_remove_rule_by_id(self): """ - Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule ID. + Tests the creation of an OpenStack Security Group with two simple + custom rules then removes one by the rule ID. """ # Create Image sec_grp_rule_settings = list() - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=sec_grp_rule_settings) - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) - - self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id']) - rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) + + self.sec_grp_creator.remove_rule( + rule_id=rules[0]['security_group_rule']['id']) + rules_after_del = neutron_utils.get_rules_by_security_group( + self.neutron, + self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) - 1, len(rules_after_del)) def test_remove_rule_by_setting(self): """ - Tests the creation of an OpenStack Security Group with two simple custom rules then removes one by the rule - setting object + Tests the creation of an OpenStack Security Group with two simple + custom rules then removes one by the rule setting object """ # Create Image sec_grp_rule_settings = list() - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.ingress)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv6)) - sec_grp_rule_settings.append(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, - direction=Direction.egress, - protocol=Protocol.udp, - ethertype=Ethertype.IPv4, - port_range_min=10, - port_range_max=20)) - sec_grp_settings = SecurityGroupSettings(name=self.sec_grp_name, description='hello group', - rule_settings=sec_grp_rule_settings) - self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(self.os_creds, sec_grp_settings) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.ingress)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv6)) + sec_grp_rule_settings.append( + SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_name, + direction=Direction.egress, + protocol=Protocol.udp, + ethertype=Ethertype.IPv4, + port_range_min=10, + port_range_max=20)) + sec_grp_settings = SecurityGroupSettings( + name=self.sec_grp_name, description='hello group', + rule_settings=sec_grp_rule_settings) + self.sec_grp_creator = create_security_group.OpenStackSecurityGroup( + self.os_creds, sec_grp_settings) self.sec_grp_creator.create() - sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name) - validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp) - rules = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + sec_grp = neutron_utils.get_security_group(self.neutron, + self.sec_grp_name) + validation_utils.objects_equivalent( + self.sec_grp_creator.get_security_group(), sec_grp) + rules = neutron_utils.get_rules_by_security_group( + self.neutron, self.sec_grp_creator.get_security_group()) self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules)) - validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules) + validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), + rules) self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0]) - rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron, - self.sec_grp_creator.get_security_group()) + rules_after_del = neutron_utils.get_rules_by_security_group( + self.neutron, + self.sec_grp_creator.get_security_group()) self.assertEqual(len(rules) - 1, len(rules_after_del)) -# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex +# TODO - Add more tests with different rules. Rule creation parameters can be +# somewhat complex -- cgit 1.2.3-korg