# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs") # and others. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import enum from neutronclient.common.exceptions import NotFound, Conflict from snaps.config.security_group import ( SecurityGroupConfig, SecurityGroupRuleConfig) from snaps.openstack.openstack_creator import OpenStackNetworkObject from snaps.openstack.utils import neutron_utils __author__ = 'spisarski' logger = logging.getLogger('OpenStackSecurityGroup') class OpenStackSecurityGroup(OpenStackNetworkObject): """ Class responsible for managing a Security Group in OpenStack """ def __init__(self, os_creds, sec_grp_settings): """ Constructor - all parameters are required :param os_creds: The credentials to connect with OpenStack :param sec_grp_settings: The settings used to create a security group """ super(self.__class__, self).__init__(os_creds) self.sec_grp_settings = sec_grp_settings # Attributes instantiated on create() self.__security_group = None # dict where the rule settings object is the key self.__rules = dict() def initialize(self): """ Loads existing security group. :return: the security group domain object """ super(self.__class__, self).initialize() self.__security_group = neutron_utils.get_security_group( self._neutron, self._keystone, sec_grp_settings=self.sec_grp_settings, project_name=self._os_creds.project_name) if self.__security_group: # Populate rules existing_rules = neutron_utils.get_rules_by_security_group( self._neutron, self.__security_group) for existing_rule in existing_rules: # For Custom Rules rule_setting = self.__get_setting_from_rule(existing_rule) self.__rules[rule_setting] = existing_rule self.__security_group = neutron_utils.get_security_group_by_id( self._neutron, self.__security_group.id) return self.__security_group def create(self): """ Responsible for creating the security group. :return: the security group domain object """ self.initialize() if not self.__security_group: logger.info( 'Creating security group %s...' % self.sec_grp_settings.name) self.__security_group = neutron_utils.create_security_group( self._neutron, self._keystone, self.sec_grp_settings) # Get the rules added for free auto_rules = neutron_utils.get_rules_by_security_group( self._neutron, self.__security_group) ctr = 0 for auto_rule in auto_rules: auto_rule_setting = self.__generate_rule_setting(auto_rule) self.__rules[auto_rule_setting] = auto_rule ctr += 1 # Create the custom rules for sec_grp_rule_setting in self.sec_grp_settings.rule_settings: try: custom_rule = neutron_utils.create_security_group_rule( self._neutron, self._keystone, sec_grp_rule_setting, self._os_creds.project_name) self.__rules[sec_grp_rule_setting] = custom_rule except Conflict as e: logger.warn('Unable to create rule due to conflict - %s', e) # Refresh security group object to reflect the new rules added self.__security_group = neutron_utils.get_security_group_by_id( self._neutron, self.__security_group.id) return self.__security_group def __generate_rule_setting(self, rule): """ Creates a SecurityGroupRuleConfig object for a given rule :param rule: the rule from which to create the SecurityGroupRuleConfig object :return: the newly instantiated SecurityGroupRuleConfig object """ sec_grp = neutron_utils.get_security_group_by_id( self._neutron, rule.security_group_id) setting = SecurityGroupRuleConfig( description=rule.description, direction=rule.direction, ethertype=rule.ethertype, port_range_min=rule.port_range_min, port_range_max=rule.port_range_max, protocol=rule.protocol, remote_group_id=rule.remote_group_id, remote_ip_prefix=rule.remote_ip_prefix, sec_grp_name=sec_grp.name) return setting def clean(self): """ Removes and deletes the rules then the security group. """ for setting, rule in self.__rules.items(): try: neutron_utils.delete_security_group_rule(self._neutron, rule) except NotFound as e: logger.warning('Rule not found, cannot delete - ' + str(e)) pass self.__rules = dict() if self.__security_group: try: neutron_utils.delete_secu