# Copyright 2015 Futurewei. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import six from oslo_log import helpers as log_helpers from oslo_log import log as logging from oslo_utils import uuidutils import sqlalchemy as sa from sqlalchemy.ext.orderinglist import ordering_list from sqlalchemy import orm from sqlalchemy.orm.collections import attribute_mapped_collection from sqlalchemy.orm import exc from neutron.api.v2.attributes import DESCRIPTION_MAX_LEN from neutron.api.v2.attributes import NAME_MAX_LEN from neutron.db import common_db_mixin from neutron.db import model_base from neutron.db import models_v2 from neutron.i18n import _LI from networking_sfc.db import flowclassifier_db as fc_db from networking_sfc.extensions import flowclassifier as ext_fc from networking_sfc.extensions import sfc as ext_sfc LOG = logging.getLogger(__name__) UUID_LEN = 36 PARAM_LEN = 255 class ChainParameter(model_base.BASEV2): """Represents a single chain parameter.""" __tablename__ = 'sfc_port_chain_parameters' keyword = sa.Column(sa.String(PARAM_LEN), primary_key=True) value = sa.Column(sa.String(PARAM_LEN)) chain_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE'), primary_key=True) class ServiceFunctionParam(model_base.BASEV2): """Represents a service function parameter.""" __tablename__ = 'sfc_service_function_params' keyword = sa.Column(sa.String(PARAM_LEN), primary_key=True) value = sa.Column(sa.String(PARAM_LEN)) pair_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_pairs.id', ondelete='CASCADE'), primary_key=True) class ChainClassifierAssoc(model_base.BASEV2): """Relation table between sfc_port_chains and flow_classifiers.""" __tablename__ = 'sfc_chain_classifier_associations' flowclassifier_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_flow_classifiers.id', ondelete='RESTRICT'), primary_key=True, nullable=False, unique=True) portchain_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE'), primary_key=True) flow_classifier = orm.relationship( fc_db.FlowClassifier, backref='chain_classifier_associations' ) class PortPair(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): """Represents the ingress and egress ports for a single service function. """ __tablename__ = 'sfc_port_pairs' name = sa.Column(sa.String(NAME_MAX_LEN)) description = sa.Column(sa.String(DESCRIPTION_MAX_LEN)) ingress = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('ports.id', ondelete='RESTRICT'), nullable=False) egress = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('ports.id', ondelete='RESTRICT'), nullable=False) portpairgroup_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_pair_groups.id', ondelete='RESTRICT')) service_function_parameters = orm.relationship( ServiceFunctionParam, collection_class=attribute_mapped_collection('keyword'), cascade='all, delete-orphan') __table_args__ = ( sa.UniqueConstraint( ingress, egress, name='uniq_sfc_port_pairs0ingress0egress' ), model_base.BASEV2.__table_args__ ) class ChainGroupAssoc(model_base.BASEV2): """Relation table between sfc_port_chains and sfc_port_pair_groups.""" __tablename__ = 'sfc_chain_group_associations' portpairgroup_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_pair_groups.id', ondelete='RESTRICT'), primary_key=True, nullable=False) portchain_id = sa.Column( sa.String(UUID_LEN), sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE'), primary_key=True) position = sa.Column(sa.Integer) class PortPairGroup(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): """Represents a port pair group model.""" __tablename__ = 'sfc_port_pair_groups' name = sa.Column(sa.String(NAME_MAX_LEN)) description = sa.Column(sa.String(DESCRIPTION_MAX_LEN)) port_pairs = orm.relationship( PortPair, backref='port_pair_group' ) chain_group_associations = orm.relationship( ChainGroupAssoc, backref='port_pair_groups') class PortChain(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): """Represents a Neutron service function Port Chain.""" __tablename__ = 'sfc_port_chains' name = sa.Column(sa.String(NAME_MAX_LEN)) description = sa.Column(sa.String(DESCRIPTION_MAX_LEN)) chain_group_associations = orm.relationship( ChainGroupAssoc, backref='port_chain', order_by="ChainGroupAssoc.position", collection_class=ordering_list('position'), cascade='all, delete-orphan') chain_classifier_associations = orm.relationship( ChainClassifierAssoc, backref='port_chain', cascade='all, delete-orphan') chain_parameters = orm.relationship( ChainParameter, collection_class=attribute_mapped_collection('keyword'), cascade='all, delete-orphan') class SfcDbPlugin( ext_sfc.SfcPluginBase, common_db_mixin.CommonDbMixin ): """Mixin class to add port chain to db_plugin_base_v2.""" def _make_port_chain_dict(self, port_chain, fields=None): res = { 'id': port_chain['id'], 'name': port_chain['name'], 'tenant_id': port_chain['tenant_id'], 'description': port_chain['description'], 'port_pair_groups': [ assoc['portpairgroup_id'] for assoc in port_chain['chain_group_associations'] ], 'flow_classifiers': [ assoc['flowclassifier_id'] for assoc in port_chain['chain_classifier_associations'] ], 'chain_parameters': { param['keyword']: param['value'] for k, param in six.iteritems(port_chain['chain_parameters']) } } return self._fields(res, fields) def _validate_port_pair_groups(self, context, pg_ids): with context.session.begin(subtransactions=True): query = self._model_query(context, PortChain) for port_chain_db in query.all(): pc_pg_ids = [ assoc['portpairgroup_id'] for assoc in port_chain_db.chain_group_associations ] if pc_pg_ids == pg_ids: raise ext_sfc.InvalidPortPairGroups( port_pair_groups=pg_ids, port_chain=port_chain_db.id) def _validate_flow_classifiers(self, context, fc_ids): # TODO(xiaodong): Validate flow classifiers if needed in future. pass def _setup_chain_group_associations( self, context, port_chain, pg_ids ): with context.session.begin(subtransactions=True): chain_group_associations = [] for pg_id in pg_ids: query = self._model_query(context, ChainGroupAssoc) chain_group_association = query.filter_by( portchain_id=port_chain.id, portpairgroup_id=pg_id ).first() if not chain_group_association: chain_group_association = ChainGroupAssoc( portpairgroup_id=pg_id ) chain_group_associations.append(chain_group_association) port_chain.chain_group_associations = chain_group_associations def _setup_chain_classifier_associations( self, context, port_chain, fc_ids ): with context.session.begin(subtransactions=True): chain_classifier_associations = [] for fc_id in fc_ids: query = self._model_query(context, ChainClassifierAssoc) chain_classifier_association = query.filter_by( portchain_id=port_chain.id, flowclassifier_id=fc_id ).first() if not chain_classifier_association: chain_classifier_association = ChainClassifierAssoc( flowclassifier_id=fc_id ) chain_classifier_associations.append( chain_classifier_association) port_chain.chain_classifier_associations = ( chain_classifier_associations) @log_helpers.log_method_call def create_port_chain(self, context, port_chain): """Create a port chain.""" pc = port_chain['port_chain'] tenant_id = self._get_tenant_id_for_create(context, pc) with context.session.begin(subtransactions=True): chain_parameters = { key: ChainParameter(keyword=key, value=val) for key, val in six.iteritems(pc['chain_parameters'])} pg_ids = pc['port_pair_groups'] for pg_id in pg_ids: self._get_port_pair_group(context, pg_id) fc_ids = pc['flow_classifiers'] fcs = [ self._get_flow_classifier(context, fc_id) for fc_id in fc_ids ] for fc in fcs: if fc.chain_classifier_associations: raise ext_fc.FlowClassifierInUse(id=fc.id) self._validate_port_pair_groups(context, pg_ids) self._validate_flow_classifiers(context, fc_ids) port_chain_db = PortChain(id=uuidutils.generate_uuid(), tenant_id=tenant_id, description=pc['description'], name=pc['name'], chain_parameters=chain_parameters) self._setup_chain_group_associations( context, port_chain_db, pg_ids) self._setup_chain_classifier_associations( context, port_chain_db, fc_ids) context.session.add(port_chain_db) return self._make_port_chain_dict(port_chain_db) @log_helpers.log_method_call def get_port_chains(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False, default_sg=False): marker_obj = self._get_marker_obj(context, 'port_chain', limit, marker) return self._get_collection(context, PortChain, self._make_port_chain_dict, filters=filters, fields=fields, sorts=sorts, limit=limit, marker_obj=marker_obj, page_reverse=page_reverse) def get_port_chains_count(self, context, filters=None): return self._get_collection_count(context, PortChain, filters=filters) @log_helpers.log_method_call def get_port_chain(self, context, id, fields=None): portchain = self._get_port_chain(context, id) return self._make_port_chain_dict(portchain, fields) @log_helpers.log_method_call def _get_port_chain(self, context, id): try: return self._get_by_id(context, PortChain, id) except exc.NoResultFound: raise ext_sfc.PortChainNotFound(id=id) @log_helpers.log_method_call def delete_port_chain(self, context, id): try: with context.session.begin(subtransactions=True): pc = self._get_port_chain(context, id) context.session.delete(pc) except ext_sfc.PortChainNotFound: LOG.info(_LI("Deleting a non-existing port chain.")) @log_helpers.log_method_call def update_port_chain(self, context, id, port_chain): p = port_chain['port_chain'] with context.session.begin(subtransactions=True): pc = self._get_port_chain(context, id) for k, v in six.iteritems(p): if k == 'flow_classifiers': for fc_id in v: self._get_flow_classifier(context, fc_id) self._setup_chain_classifier_associations(context, pc, v) else: pc[k] = v return self._make_port_chain_dict(pc) def _make_port_pair_dict(self, port_pair, fields=None): res = { 'id': port_pair['id'], 'name': port_pair['name'], 'description': port_pair['description'], 'tenant_id': port_pair['tenant_id'], 'ingress': port_pair['ingress'], 'egress': port_pair['egress'], 'service_function_parameters': { param['keyword']: param['value'] for k, param in six.iteritems( port_pair['service_function_parameters']) } } return self._fields(res, fields) def _validate_port_pair_ingress_egress(self, ingress, egress): if 'device_id' not in ingress or not ingress['device_id']: raise ext_sfc.PortPairIngressNoHost( ingress=ingress['id'] ) if 'device_id' not in egress or not egress['device_id']: raise ext_sfc.PortpairEgressNoHost( egress=egress['id'] ) if ingress['device_id'] != egress['device_id']: raise ext_sfc.PortPairIngressEgressDifferentHost( ingress=ingress['id'], egress=egress['id']) @log_helpers.log_method_call def create_port_pair(self, context, port_pair): """Create a port pair.""" pp = port_pair['port_pair'] tenant_id = self._get_tenant_id_for_create(context, pp) with context.session.begin(subtransactions=True): service_function_parameters = { key: ServiceFunctionParam(keyword=key, value=val) for key, val in six.iteritems( pp['service_function_parameters'] ) } ingress = self._get_port(context, pp['ingress']) egress = self._get_port(context, pp['egress']) self._validate_port_pair_ingress_egress(ingress, egress) port_pair_db = PortPair( id=uuidutils.generate_uuid(), name=pp['name'], description=pp['description'], tenant_id=tenant_id, ingress=pp['ingress'], egress=pp['egress'], service_function_parameters=service_function_parameters ) context.session.add(port_pair_db) return self._make_port_pair_dict(port_pair_db) @log_helpers.log_method_call def get_port_pairs(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): marker_obj = self._get_marker_obj(context, 'port_pair', limit, marker) return self._get_collection(context, PortPair, self._make_port_pair_dict, filters=filters, fields=fields, sorts=sorts, limit=limit, marker_obj=marker_obj, page_reverse=page_reverse) def get_port_pairs_count(self, context, filters=None): return self._get_collection_count(context, PortPair, filters=filters) @log_helpers.log_method_call def get_port_pair(self, context, id, fields=None): port_pair = self._get_port_pair(context, id) return self._make_port_pair_dict(port_pair, fields) def _get_port_pair(self, context, id): try: return self._get_by_id(context, PortPair, id) except exc.NoResultFound: raise ext_sfc.PortPairNotFound(id=id) def _get_port(self, context, id): try: return self._get_by_id(context, models_v2.Port, id) except exc.NoResultFound: raise ext_sfc.PortPairPortNotFound(id=id) @log_helpers.log_method_call def update_port_pair(self, context, id, port_pair): new_pp = port_pair['port_pair'] with context.session.begin(subtransactions=True): old_pp = self._get_port_pair(context, id) old_pp.update(new_pp) return self._make_port_pair_dict(old_pp) @log_helpers.log_method_call def delete_port_pair(self, context, id): try: with context.session.begin(subtransactions=True): pp = self._get_port_pair(context, id) if pp.portpairgroup_id: raise ext_sfc.PortPairInUse(id=id) context.session.delete(pp) except ext_sfc.PortPairNotFound: LOG.info(_LI("Deleting a non-existing port pair.")) def _make_port_pair_group_dict(self, port_pair_group, fields=None): res = { 'id': port_pair_group['id'], 'name': port_pair_group['name'], 'description': port_pair_group['description'], 'tenant_id': port_pair_group['tenant_id'], 'port_pairs': [pp['id'] for pp in port_pair_group['port_pairs']], } return self._fields(res, fields) @log_helpers.log_method_call def create_port_pair_group(self, context, port_pair_group): """Create a port pair group.""" pg = port_pair_group['port_pair_group'] tenant_id = self._get_tenant_id_for_create(context, pg) with context.session.begin(subtransactions=True): portpairs_list = [self._get_port_pair(context, pp_id) for pp_id in pg['port_pairs']] for portpair in portpairs_list: if portpair.portpairgroup_id: raise ext_sfc.PortPairInUse(id=portpair.id) port_pair_group_db = PortPairGroup( id=uuidutils.generate_uuid(), name=pg['name'], description=pg['description'], tenant_id=tenant_id, port_pairs=portpairs_list) context.session.add(port_pair_group_db) return self._make_port_pair_group_dict(port_pair_group_db) @log_helpers.log_method_call def get_port_pair_groups(self, context, filters=None, fields=None, sorts=None, limit=None, marker=None, page_reverse=False): marker_obj = self._get_marker_obj(context, 'port_pair_group', limit, marker) return self._get_collection(context, PortPairGroup, self._make_port_pair_group_dict, filters=filters, fields=fields, sorts=sorts, limit=limit, marker_obj=marker_obj, page_reverse=page_reverse) def get_port_pair_groups_count(self, context, filters=None): return self._get_collection_count(context, PortPairGroup, filters=filters) @log_helpers.log_method_call def get_port_pair_group(self, context, id, fields=None): port_pair_group = self._get_port_pair_group(context, id) return self._make_port_pair_group_dict(port_pair_group, fields) def _get_port_pair_group(self, context, id): try: return self._get_by_id(context, PortPairGroup, id) except exc.NoResultFound: raise ext_sfc.PortPairGroupNotFound(id=id) def _get_flow_classifier(self, context, id): try: return self._get_by_id(context, fc_db.FlowClassifier, id) except exc.NoResultFound: raise ext_fc.FlowClassifierNotFound(id=id) @log_helpers.log_method_call def update_port_pair_group(self, context, id, port_pair_group): new_pg = port_pair_group['port_pair_group'] with context.session.begin(subtransactions=True): portpairs_list = [self._get_port_pair(context, pp_id) for pp_id in new_pg.get('port_pairs', [])] for portpair in portpairs_list: if ( portpair.portpairgroup_id and portpair.portpairgroup_id != id ): raise ext_sfc.PortPairInUse(id=portpair.id) old_pg = self._get_port_pair_group(context, id) for k, v in six.iteritems(new_pg): if k == 'port_pairs': port_pairs = [ self._get_port_pair(context, pp_id) for pp_id in v ] old_pg.port_pairs = port_pairs else: old_pg[k] = v return self._make_port_pair_group_dict(old_pg) @log_helpers.log_method_call def delete_port_pair_group(self, context, id): try: with context.session.begin(subtransactions=True): pg = self._get_port_pair_group(context, id) if pg.chain_group_associations: raise ext_sfc.PortPairGroupInUse(id=id) context.session.delete(pg) except ext_sfc.PortPairGroupNotFound: LOG.info(_LI("Deleting a non-existing port pair group."))