From c772a1dbc7ace58d099570d41a889adf851c8ba8 Mon Sep 17 00:00:00 2001 From: Ulas Kozat Date: Mon, 28 Dec 2015 16:05:13 -0800 Subject: Added networking-sfc from openstack project with merge date Dec 23 2015 Added patch 13 for subject "add missing db migration files" Change-Id: Id51a160335a14870c1dd816a44baf9b1958b9ac6 --- networking_sfc/services/sfc/drivers/__init__.py | 0 networking_sfc/services/sfc/drivers/base.py | 57 ++ .../services/sfc/drivers/dummy/__init__.py | 0 networking_sfc/services/sfc/drivers/dummy/dummy.py | 59 ++ .../services/sfc/drivers/ovs/__init__.py | 0 .../services/sfc/drivers/ovs/constants.py | 57 ++ networking_sfc/services/sfc/drivers/ovs/db.py | 426 ++++++++ networking_sfc/services/sfc/drivers/ovs/driver.py | 1076 ++++++++++++++++++++ networking_sfc/services/sfc/drivers/ovs/rpc.py | 112 ++ .../services/sfc/drivers/ovs/rpc_topics.py | 21 + 10 files changed, 1808 insertions(+) create mode 100644 networking_sfc/services/sfc/drivers/__init__.py create mode 100644 networking_sfc/services/sfc/drivers/base.py create mode 100644 networking_sfc/services/sfc/drivers/dummy/__init__.py create mode 100644 networking_sfc/services/sfc/drivers/dummy/dummy.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/__init__.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/constants.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/db.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/driver.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/rpc.py create mode 100644 networking_sfc/services/sfc/drivers/ovs/rpc_topics.py (limited to 'networking_sfc/services/sfc/drivers') diff --git a/networking_sfc/services/sfc/drivers/__init__.py b/networking_sfc/services/sfc/drivers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/networking_sfc/services/sfc/drivers/base.py b/networking_sfc/services/sfc/drivers/base.py new file mode 100644 index 0000000..0816789 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/base.py @@ -0,0 +1,57 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class SfcDriverBase(object): + """SFC Driver Base Class.""" + + @abc.abstractmethod + def create_port_chain(self, context): + pass + + @abc.abstractmethod + def delete_port_chain(self, context): + pass + + @abc.abstractmethod + def update_port_chain(self, context): + pass + + @abc.abstractmethod + def create_port_pair(self, context): + pass + + @abc.abstractmethod + def delete_port_pair(self, context): + pass + + @abc.abstractmethod + def update_port_pair(self, context): + pass + + @abc.abstractmethod + def create_port_pair_group(self, context): + pass + + @abc.abstractmethod + def delete_port_pair_group(self, context): + pass + + @abc.abstractmethod + def update_port_pair_group(self, context): + pass diff --git a/networking_sfc/services/sfc/drivers/dummy/__init__.py b/networking_sfc/services/sfc/drivers/dummy/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/networking_sfc/services/sfc/drivers/dummy/dummy.py b/networking_sfc/services/sfc/drivers/dummy/dummy.py new file mode 100644 index 0000000..1ddd7d0 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/dummy/dummy.py @@ -0,0 +1,59 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import helpers as log_helpers + +from networking_sfc.services.sfc.drivers import base as sfc_driver + + +class DummyDriver(sfc_driver.SfcDriverBase): + """SFC Driver Dummy Class.""" + def initialize(self): + pass + + @log_helpers.log_method_call + def create_port_chain(self, context): + pass + + @log_helpers.log_method_call + def delete_port_chain(self, context): + pass + + @log_helpers.log_method_call + def update_port_chain(self, context): + pass + + @log_helpers.log_method_call + def create_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def delete_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def update_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def create_port_pair(self, context): + pass + + @log_helpers.log_method_call + def delete_port_pair(self, context): + pass + + @log_helpers.log_method_call + def update_port_pair(self, context): + pass diff --git a/networking_sfc/services/sfc/drivers/ovs/__init__.py b/networking_sfc/services/sfc/drivers/ovs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/networking_sfc/services/sfc/drivers/ovs/constants.py b/networking_sfc/services/sfc/drivers/ovs/constants.py new file mode 100644 index 0000000..30e2c37 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/constants.py @@ -0,0 +1,57 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import constants as n_const + + +INGRESS_DIR = 'ingress' +EGRESS_DIR = 'egress' + +STATUS_BUILDING = 'building' +STATUS_ACTIVE = 'active' +STATUS_ERROR = 'error' +STATUS_DELETING = 'deleting' + + +PORTFLOW_OPT_ADD = 'add-flows' +PROTFLOW_OPT_DELETE = 'delete-flows' +PROTFLOW_OPT_UPDATE = 'update-flows' + + +SRC_NODE = 'src_node' +DST_NODE = 'dst_node' +SF_NODE = 'sf_node' + +RES_TYPE_GROUP = 'group' +RES_TYPE_NSP = 'nsp' + +INSERTION_TYPE_L2 = 'l2' +INSERTION_TYPE_L3 = 'l3' +INSERTION_TYPE_BITW = 'bitw' +INSERTION_TYPE_TAP = 'tap' + +MAX_HASH = 16 + +INSERTION_TYPE_DICT = { + n_const.DEVICE_OWNER_ROUTER_HA_INTF: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_INTF: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_GW: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_FLOATINGIP: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_DHCP: INSERTION_TYPE_TAP, + n_const.DEVICE_OWNER_DVR_INTERFACE: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_AGENT_GW: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_SNAT: INSERTION_TYPE_TAP, + n_const.DEVICE_OWNER_LOADBALANCER: INSERTION_TYPE_TAP, + 'compute': INSERTION_TYPE_L2 +} diff --git a/networking_sfc/services/sfc/drivers/ovs/db.py b/networking_sfc/services/sfc/drivers/ovs/db.py new file mode 100644 index 0000000..8d3c87d --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/db.py @@ -0,0 +1,426 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import six + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_utils import uuidutils + +from neutron.common import exceptions as n_exc +from neutron import context as n_context +from neutron.db import common_db_mixin +from neutron.db import model_base +from neutron.db import models_v2 + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import exc +from sqlalchemy import sql + + +LOG = logging.getLogger(__name__) + + +class PortPairDetailNotFound(n_exc.NotFound): + message = _("Portchain port brief %(port_id)s could not be found") + + +class NodeNotFound(n_exc.NotFound): + message = _("Portchain node %(node_id)s could not be found") + + +# name changed to ChainPathId +class UuidIntidAssoc(model_base.BASEV2, models_v2.HasId): + __tablename__ = 'sfc_uuid_intid_associations' + uuid = sa.Column(sa.String(36), primary_key=True) + intid = sa.Column(sa.Integer, unique=True, nullable=False) + type_ = sa.Column(sa.String(32), nullable=False) + + def __init__(self, uuid, intid, type_): + self.uuid = uuid + self.intid = intid + self.type_ = type_ + + +def singleton(class_): + instances = {} + + def getinstance(*args, **kwargs): + if class_ not in instances: + instances[class_] = class_(*args, **kwargs) + return instances[class_] + return getinstance + + +@singleton +class IDAllocation(object): + def __init__(self, context): + # Get the inital range from conf file. + conf_obj = {'group': [1, 255], 'portchain': [256, 65536]} + self.conf_obj = conf_obj + self.session = context.session + + @log_helpers.log_method_call + def assign_intid(self, type_, uuid): + query = self.session.query(UuidIntidAssoc).filter_by( + type_=type_).order_by(UuidIntidAssoc.intid) + + allocated_int_ids = {obj.intid for obj in query.all()} + + # Find the first one from the available range that + # is not in allocated_int_ids + start, end = self.conf_obj[type_][0], self.conf_obj[type_][1]+1 + for init_id in six.moves.range(start, end): + if init_id not in allocated_int_ids: + with self.session.begin(subtransactions=True): + uuid_intid = UuidIntidAssoc( + uuid, init_id, type_) + self.session.add(uuid_intid) + return init_id + else: + return None + + @log_helpers.log_method_call + def get_intid_by_uuid(self, type_, uuid): + + query_obj = self.session.query(UuidIntidAssoc).filter_by( + type_=type_, uuid=uuid).first() + if query_obj: + return query_obj.intid + else: + return None + + @log_helpers.log_method_call + def release_intid(self, type_, intid): + """Release int id. + + @param: type_: str + @param: intid: int + """ + with self.session.begin(subtransactions=True): + query_obj = self.session.query(UuidIntidAssoc).filter_by( + intid=intid, type_=type_).first() + + if query_obj: + self.session.delete(query_obj) + + +class PathPortAssoc(model_base.BASEV2): + """path port association table. + + It represents the association table which associate path_nodes with + portpair_details. + """ + __tablename__ = 'sfc_path_port_associations' + pathnode_id = sa.Column(sa.String(36), + sa.ForeignKey( + 'sfc_path_nodes.id', ondelete='CASCADE'), + primary_key=True) + portpair_id = sa.Column(sa.String(36), + sa.ForeignKey('sfc_portpair_details.id', + ondelete='CASCADE'), + primary_key=True) + weight = sa.Column(sa.Integer, nullable=False, default=1) + + +class PortPairDetail(model_base.BASEV2, models_v2.HasId, + models_v2.HasTenant): + __tablename__ = 'sfc_portpair_details' + ingress = sa.Column(sa.String(36), nullable=True) + egress = sa.Column(sa.String(36), nullable=True) + host_id = sa.Column(sa.String(255), nullable=False) + mac_address = sa.Column(sa.String(32), nullable=False) + network_type = sa.Column(sa.String(8)) + segment_id = sa.Column(sa.Integer) + local_endpoint = sa.Column(sa.String(64), nullable=False) + path_nodes = orm.relationship(PathPortAssoc, + backref='port_pair_detail', + lazy="joined", + cascade='all,delete') + + +class PathNode(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + __tablename__ = 'sfc_path_nodes' + nsp = sa.Column(sa.Integer, nullable=False) + nsi = sa.Column(sa.Integer, nullable=False) + node_type = sa.Column(sa.String(32)) + portchain_id = sa.Column( + sa.String(255), + sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE')) + status = sa.Column(sa.String(32)) + portpair_details = orm.relationship(PathPortAssoc, + backref='path_nodes', + lazy="joined", + cascade='all,delete') + next_group_id = sa.Column(sa.Integer) + next_hop = sa.Column(sa.String(512)) + + +class OVSSfcDriverDB(common_db_mixin.CommonDbMixin): + def initialize(self): + self.admin_context = n_context.get_admin_context() + + def _make_pathnode_dict(self, node, fields=None): + res = {'id': node['id'], + 'tenant_id': node['tenant_id'], + 'node_type': node['node_type'], + 'nsp': node['nsp'], + 'nsi': node['nsi'], + 'next_group_id': node['next_group_id'], + 'next_hop': node['next_hop'], + 'portchain_id': node['portchain_id'], + 'status': node['status'], + 'portpair_details': [pair_detail['portpair_id'] + for pair_detail in node['portpair_details'] + ] + } + + return self._fields(res, fields) + + def _make_port_detail_dict(self, port, fields=None): + res = {'id': port['id'], + 'tenant_id': port['tenant_id'], + 'host_id': port['host_id'], + 'ingress': port.get('ingress', None), + 'egress': port.get('egress', None), + 'segment_id': port['segment_id'], + 'local_endpoint': port['local_endpoint'], + 'mac_address': port['mac_address'], + 'network_type': port['network_type'], + 'path_nodes': [{'pathnode_id': node['pathnode_id'], + 'weight': node['weight']} + for node in port['path_nodes']] + } + + return self._fields(res, fields) + + def _make_pathport_assoc_dict(self, assoc, fields=None): + res = {'pathnode_id': assoc['pathnode_id'], + 'portpair_id': assoc['portpair_id'], + 'weight': assoc['weight'], + } + + return self._fields(res, fields) + + def _get_path_node(self, id): + try: + node = self._get_by_id(self.admin_context, PathNode, id) + except exc.NoResultFound: + raise NodeNotFound(node_id=id) + return node + + def _get_port_detail(self, id): + try: + port = self._get_by_id(self.admin_context, PortPairDetail, id) + except exc.NoResultFound: + raise PortPairDetailNotFound(port_id=id) + return port + + def create_port_detail(self, port): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(port, PortPairDetail) + args['id'] = uuidutils.generate_uuid() + port_obj = PortPairDetail(**args) + self.admin_context.session.add(port_obj) + return self._make_port_detail_dict(port_obj) + + def create_path_node(self, node): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(node, PathNode) + args['id'] = uuidutils.generate_uuid() + node_obj = PathNode(**args) + self.admin_context.session.add(node_obj) + return self._make_pathnode_dict(node_obj) + + def create_pathport_assoc(self, assoc): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(assoc, PathPortAssoc) + assoc_obj = PathPortAssoc(**args) + self.admin_context.session.add(assoc_obj) + return self._make_pathport_assoc_dict(assoc_obj) + + def delete_pathport_assoc(self, pathnode_id, portdetail_id): + with self.admin_context.session.begin(subtransactions=True): + self.admin_context.session.query(PathPortAssoc).filter_by( + pathnode_id=pathnode_id, + portpair_id=portdetail_id).delete() + + def update_port_detail(self, id, port): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + for key, value in six.iteritems(port): + if key == 'path_nodes': + pns = [] + for pn in value: + pn_id = pn['pathnode_id'] + self._get_path_node(pn_id) + query = self._model_query( + self.admin_context, PathPortAssoc) + pn_association = query.filter_by( + pathnode_id=pn_id, + portpair_id=id + ).first() + if not pn_association: + pn_association = PathPortAssoc( + pathnode_id=pn_id, + portpair_id=id, + weight=pn.get('weight', 1) + ) + pns.append(pn_association) + port_obj[key] = pns + else: + port_obj[key] = value + port_obj.update(port) + return self._make_port_detail_dict(port_obj) + + def update_path_node(self, id, node): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + for key, value in six.iteritems(node): + if key == 'portpair_details': + pds = [] + for pd_id in value: + self._get_port_detail(pd_id) + query = self._model_query( + self.admin_context, PathPortAssoc) + pd_association = query.filter_by( + pathnode_id=id, + portpair_id=pd_id + ).first() + if not pd_association: + pd_association = PathPortAssoc( + pathnode_id=id, + portpair_id=pd_id + ) + pds.append(pd_association) + node_obj[key] = pds + else: + node_obj[key] = value + return self._make_pathnode_dict(node_obj) + + def delete_port_detail(self, id): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + self.admin_context.session.delete(port_obj) + + def delete_path_node(self, id): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + self.admin_context.session.delete(node_obj) + + def get_port_detail(self, id): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + return self._make_port_detail_dict(port_obj) + + def get_port_detail_without_exception(self, id): + with self.admin_context.session.begin(subtransactions=True): + try: + port = self._get_by_id( + self.admin_context, PortPairDetail, id) + except exc.NoResultFound: + return None + return self._make_port_detail_dict(port) + + def get_path_node(self, id): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + return self._make_pathnode_dict(node_obj) + + def get_path_nodes_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_path_nodes_by_filter( + filters, fields, sorts, limit, + marker, page_reverse + ) + all_items = qry.all() + if all_items: + return [self._make_pathnode_dict(item) for item in all_items] + + return None + + def get_path_node_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_path_nodes_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + first = qry.first() + if first: + return self._make_pathnode_dict(first) + + return None + + def _get_path_nodes_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + qry = self.admin_context.session.query(PathNode) + if filters: + for key, value in six.iteritems(filters): + column = getattr(PathNode, key, None) + if column: + if not value: + qry = qry.filter(sql.false()) + else: + qry = qry.filter(column == value) + return qry + + def get_port_details_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_port_details_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + all_items = qry.all() + if all_items: + return [ + self._make_port_detail_dict(item) + for item in all_items + ] + + return None + + def get_port_detail_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_port_details_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + first = qry.first() + if first: + return self._make_port_detail_dict(first) + + return None + + def _get_port_details_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + qry = self.admin_context.session.query(PortPairDetail) + if filters: + for key, value in six.iteritems(filters): + column = getattr(PortPairDetail, key, None) + if column: + if not value: + qry = qry.filter(sql.false()) + else: + qry = qry.filter(column == value) + + return qry diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py new file mode 100644 index 0000000..9dfc40d --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/driver.py @@ -0,0 +1,1076 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +# from eventlet import greenthread + +from neutron.common import constants as nc_const +from neutron.common import rpc as n_rpc + +from neutron import manager + +from neutron.i18n import _LE +from neutron.i18n import _LW + +from neutron.plugins.common import constants as np_const + + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from networking_sfc.extensions import flowclassifier +from networking_sfc.extensions import sfc +from networking_sfc.services.sfc.common import exceptions as exc +from networking_sfc.services.sfc.drivers import base as driver_base +from networking_sfc.services.sfc.drivers.ovs import( + rpc_topics as sfc_topics) +from networking_sfc.services.sfc.drivers.ovs import( + db as ovs_sfc_db) +from networking_sfc.services.sfc.drivers.ovs import( + rpc as ovs_sfc_rpc) +from networking_sfc.services.sfc.drivers.ovs import ( + constants as ovs_const) + + +LOG = logging.getLogger(__name__) + + +class OVSSfcDriver(driver_base.SfcDriverBase, + ovs_sfc_db.OVSSfcDriverDB): + """Sfc Driver Base Class.""" + + def initialize(self): + super(OVSSfcDriver, self).initialize() + self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( + sfc_topics.SFC_AGENT + ) + + self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) + self._setup_rpc() + + def _setup_rpc(self): + # Setup a rpc server + self.topic = sfc_topics.SFC_PLUGIN + self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] + self.conn = n_rpc.create_connection(new=True) + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) + self.conn.consume_in_threads() + + def _get_subnet(self, core_plugin, tenant_id, cidr): + filters = {'tenant_id': [tenant_id]} + subnets = core_plugin.get_subnets(self.admin_context, filters=filters) + cidr_set = netaddr.IPSet([cidr]) + + for subnet in subnets: + subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) + if cidr_set.issubset(subnet_cidr_set): + return subnet + + def _get_fc_dst_subnet_gw_port(self, fc): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet(core_plugin, + fc['tenant_id'], + fc['destination_ip_prefix']) + + return self._get_port_subnet_gw_info(core_plugin, subnet) + + def _get_port_subnet_gw_info_by_port_id(self, id): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet_by_port(core_plugin, id) + return self._get_port_subnet_gw_info(core_plugin, + subnet) + + def _get_port_subnet_gw_info(self, core_plugin, subnet): + filters = { + 'device_owner': + [nc_const.DEVICE_OWNER_ROUTER_INTF] + } + gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) + for port in gw_ports: + for fixed_ip in port['fixed_ips']: + if subnet["id"] == fixed_ip['subnet_id']: + return (port['mac_address'], + subnet['cidr'], + subnet['network_id']) + + raise exc.NoSubnetGateway( + type='subnet gateway', + cidr=subnet['cidr']) + + def _get_subnet_by_port(self, core_plugin, id): + port = core_plugin.get_port(self.admin_context, id) + for ip in port['fixed_ips']: + subnet = core_plugin.get_subnet(self.admin_context, + ip["subnet_id"]) + # currently only support one subnet for a port + break + + return subnet + + @log_helpers.log_method_call + def _get_portgroup_members(self, context, pg_id): + next_group_members = [] + group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) + LOG.debug('group_intid: %s', group_intid) + pg = context._plugin.get_port_pair_group(context._plugin_context, + pg_id) + for pp_id in pg['port_pairs']: + pp = context._plugin.get_port_pair(context._plugin_context, pp_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + if pd: + next_group_members.append( + dict(portpair_id=pd['id'], weight=1)) + return group_intid, next_group_members + + def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): + pp = context._plugin.get_port_pair(context._plugin_context, + port_pair_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + + return pd + + @log_helpers.log_method_call + def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node, dst_node, + last_sf_node=None): + dst_ports = [] + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # lookup the source port + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + src_pd = self.get_port_detail_by_filter(src_pd_filter) + + if not src_pd: + # Create source port detail + src_pd = self._create_port_detail(src_pd_filter) + LOG.debug('create src port detail: %s', src_pd) + + # Create associate relationship + assco_args = {'portpair_id': src_pd['id'], + 'pathnode_id': src_node['id'], + 'weight': 1, + } + sna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc src port with node: %s', sna) + src_node['portpair_details'].append(src_pd['id']) + + if fc.get('logical_destination_port', ''): + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + dst_pd = self.get_port_detail_by_filter(dst_pd_filter) + + if not dst_pd: + # Create dst port detail + dst_pd = self._create_port_detail(dst_pd_filter) + LOG.debug('create dst port detail: %s', dst_pd) + + # Create associate relationship + dst_assco_args = {'portpair_id': dst_pd['id'], + 'pathnode_id': dst_node['id'], + 'weight': 1, + } + dna = self.create_pathport_assoc(dst_assco_args) + LOG.debug('create assoc dst port with node: %s', dna) + dst_node['portpair_details'].append(dst_pd['id']) + + dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) + + if last_sf_node: + if last_sf_node['next_hop']: + next_hops = jsonutils.loads(last_sf_node['next_hop']) + next_hops.extend(dst_ports) + last_sf_node['next_hop'] = jsonutils.dumps(next_hops) + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + return dst_ports + + def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node=None, dst_node=None, + last_sf_node=None): + if not fc_ids: + return + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # delete source port detail + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(src_pd_filter) + if pds: + for pd in pds: + # update src_node portpair_details refence info + if src_node and pd['id'] in src_node[ + 'portpair_details' + ]: + src_node['portpair_details'].remove(pd['id']) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if fc.get('logical_destination_port', ''): + # Create dst port detail + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(dst_pd_filter) + if pds: + for pd in pds: + # update dst_node portpair_details refence info + if dst_node and pd['id'] in dst_node[ + 'portpair_details' + ]: + # update portpair_details of this node + dst_node['portpair_details'].remove(pd['id']) + # update last hop(SF-group) next hop info + if last_sf_node: + next_hop = dict(portpair_id=pd['id'], + weight=1) + next_hops = jsonutils.loads( + last_sf_node['next_hop']) + next_hops.remove(next_hop) + last_sf_node['next_hop'] = jsonutils.dumps( + next_hops) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if last_sf_node: + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + + @log_helpers.log_method_call + def _create_portchain_path(self, context, port_chain): + src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) + path_nodes, dst_ports = [], [] + # Create an assoc object for chain_id and path_id + # context = context._plugin_context + path_id = self.id_pool.assign_intid('portchain', port_chain['id']) + + if not path_id: + LOG.error(_LE('No path_id available for creating port chain path')) + return + + next_group_intid, next_group_members = self._get_portgroup_members( + context, port_chain['port_pair_groups'][0]) + + port_pair_groups = port_chain['port_pair_groups'] + sf_path_length = len(port_pair_groups) + # Create a head node object for port chain + src_args = {'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SRC_NODE, + 'nsp': path_id, + 'nsi': 0xff, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': jsonutils.dumps(next_group_members), + } + src_node = self.create_path_node(src_args) + LOG.debug('create src node: %s', src_node) + path_nodes.append(src_node) + + # Create a destination node object for port chain + dst_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.DST_NODE, + 'nsp': path_id, + 'nsi': 0xff - sf_path_length - 1, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': None, + 'next_hop': None + } + dst_node = self.create_path_node(dst_args) + LOG.debug('create dst node: %s', dst_node) + path_nodes.append(dst_node) + + dst_ports = self._add_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'], + src_node, + dst_node + ) + + for i in range(sf_path_length): + cur_group_members = next_group_members + # next_group for next hop + if i < sf_path_length - 1: + next_group_intid, next_group_members = ( + self._get_portgroup_members( + context, port_pair_groups[i + 1]) + ) + else: + next_group_intid = None + next_group_members = None if not dst_ports else dst_ports + + # Create a node object + node_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SF_NODE, + 'nsp': path_id, + 'nsi': 0xfe - i, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': ( + None if not next_group_members else + jsonutils.dumps(next_group_members) + ) + } + sf_node = self.create_path_node(node_args) + LOG.debug('chain path node: %s', sf_node) + # Create the assocation objects that combine the pathnode_id with + # the ingress of the port_pairs in the current group + # when port_group does not reach tail + for member in cur_group_members: + assco_args = {'portpair_id': member['portpair_id'], + 'pathnode_id': sf_node['id'], + 'weight': member['weight'], } + sfna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc port with node: %s', sfna) + sf_node['portpair_details'].append(member['portpair_id']) + path_nodes.append(sf_node) + + return path_nodes + + def _delete_path_node_port_flowrule(self, node, port, fc_ids): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + flow_rule = self._build_portchain_flowrule_body( + node, + port, + None, + fc_ids) + + self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( + self.admin_context, + flow_rule) + + def _delete_path_node_flowrule(self, node, fc_ids): + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._delete_path_node_port_flowrule( + node, port, fc_ids) + + @log_helpers.log_method_call + def _delete_portchain_path(self, context, portchain_id): + port_chain = context.current + first = self.get_path_node_by_filter( + filters={ + 'portchain_id': portchain_id, + 'nsi': 0xff + } + ) + + # delete flow rules which source port isn't assigned + # in flow classifier + if first: + self._delete_src_node_flowrules( + first, + port_chain['flow_classifiers'] + ) + + pds = self.get_path_nodes_by_filter( + dict(portchain_id=portchain_id)) + if pds: + for pd in pds: + self._delete_path_node_flowrule( + pd, + port_chain['flow_classifiers'] + ) + self.delete_path_node(pd['id']) + + # delete the ports on the traffic classifier + self._remove_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'] + ) + + # Delete the chainpathpair + intid = self.id_pool.get_intid_by_uuid( + 'portchain', portchain_id) + self.id_pool.release_intid('portchain', intid) + + def _update_path_node_next_hops(self, flow_rule): + node_next_hops = [] + if not flow_rule['next_hop']: + return None + next_hops = jsonutils.loads(flow_rule['next_hop']) + if not next_hops: + return None + for member in next_hops: + detail = {} + port_detail = self.get_port_detail_by_filter( + dict(id=member['portpair_id'])) + if not port_detail or not port_detail['host_id']: + continue + detail['local_endpoint'] = port_detail['local_endpoint'] + detail['weight'] = member['weight'] + detail['mac_address'] = port_detail['mac_address'] + detail['ingress'] = port_detail['ingress'] + node_next_hops.append(detail) + + mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( + detail['ingress'] + ) + + detail['gw_mac'] = mac + detail['cidr'] = cidr + detail['net_uuid'] = net_uuid + + flow_rule['next_hops'] = node_next_hops + flow_rule.pop('next_hop') + + return node_next_hops + + def _build_portchain_flowrule_body(self, node, port, + add_fc_ids=None, del_fc_ids=None): + node_info = node.copy() + node_info.pop('tenant_id') + node_info.pop('portpair_details') + + port_info = port.copy() + port_info.pop('tenant_id') + port_info.pop('id') + port_info.pop('path_nodes') + port_info.pop('host_id') + + flow_rule = dict(node_info, **port_info) + # if this port is belong to NSH/MPLS-aware vm, only to + # notify the flow classifier for 1st SF. + flow_rule['add_fcs'] = self._filter_flow_classifiers( + flow_rule, add_fc_ids) + flow_rule['del_fcs'] = self._filter_flow_classifiers( + flow_rule, del_fc_ids) + + self._update_portchain_group_reference_count(flow_rule, + port['host_id']) + + # update next hop info + self._update_path_node_next_hops(flow_rule) + + return flow_rule + + def _filter_flow_classifiers(self, flow_rule, fc_ids): + """Filter flow classifiers. + + @return: list of the flow classifiers + """ + + fc_return = [] + + if not fc_ids: + return fc_return + fcs = self._get_fcs_by_ids(fc_ids) + for fc in fcs: + new_fc = fc.copy() + new_fc.pop('id') + new_fc.pop('name') + new_fc.pop('tenant_id') + new_fc.pop('description') + + if ((flow_rule['node_type'] == ovs_const.SRC_NODE and + flow_rule['egress'] == fc['logical_source_port'] + ) or + (flow_rule['node_type'] == ovs_const.DST_NODE and + flow_rule['ingress'] == fc['logical_destination_port'] + )): + fc_return.append(new_fc) + elif flow_rule['node_type'] == ovs_const.SF_NODE: + fc_return.append(new_fc) + + return fc_return + + def _update_path_node_port_flowrules(self, node, port, + add_fc_ids=None, del_fc_ids=None): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + + flow_rule = self._build_portchain_flowrule_body( + node, + port, + add_fc_ids, + del_fc_ids) + + self.ovs_driver_rpc.ask_agent_to_update_flow_rules( + self.admin_context, + flow_rule) + + def _update_path_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + if node['portpair_details'] is None: + return + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._update_path_node_port_flowrules( + node, port, add_fc_ids, del_fc_ids) + + def _thread_update_path_nodes(self, nodes, + add_fc_ids=None, del_fc_ids=None): + for node in nodes: + self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) + self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) + + def _get_portchain_fcs(self, port_chain): + return self._get_fcs_by_ids(port_chain['flow_classifiers']) + + def _get_fcs_by_ids(self, fc_ids): + flow_classifiers = [] + if not fc_ids: + return flow_classifiers + + # Get the portchain flow classifiers + fc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + flowclassifier.FLOW_CLASSIFIER_EXT) + ) + if not fc_plugin: + LOG.warn(_LW("Not found the flow classifier service plugin")) + return flow_classifiers + + for fc_id in fc_ids: + fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) + flow_classifiers.append(fc) + + return flow_classifiers + + @log_helpers.log_method_call + def create_port_chain(self, context): + port_chain = context.current + path_nodes = self._create_portchain_path(context, port_chain) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes( + path_nodes, + port_chain['flow_classifiers'], + None) + + @log_helpers.log_method_call + def delete_port_chain(self, context): + port_chain = context.current + portchain_id = port_chain['id'] + LOG.debug("to delete portchain path") + self._delete_portchain_path(context, portchain_id) + + def _get_diff_set(self, orig, cur): + orig_set = set(item for item in orig) + cur_set = set(item for item in cur) + + to_del = orig_set.difference(cur_set) + to_add = cur_set.difference(orig_set) + + return to_del, to_add + + @log_helpers.log_method_call + def update_port_chain(self, context): + port_chain = context.current + orig = context.original + + del_fc_ids, add_fc_ids = self._get_diff_set( + orig['flow_classifiers'], + port_chain['flow_classifiers'] + ) + path_nodes = self.get_path_nodes_by_filter( + dict(portchain_id=port_chain['id']) + ) + if not path_nodes: + return + + sort_path_nodes = sorted(path_nodes, + key=lambda x: x['nsi'], + reverse=True) + if del_fc_ids: + self._thread_update_path_nodes(sort_path_nodes, + None, + del_fc_ids) + self._remove_flowclassifier_port_assoc(del_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + if add_fc_ids: + self._add_flowclassifier_port_assoc(add_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes(sort_path_nodes, + add_fc_ids, + None) + + @log_helpers.log_method_call + def create_port_pair_group(self, context): + group = context.current + self.id_pool.assign_intid('group', group['id']) + + @log_helpers.log_method_call + def delete_port_pair_group(self, context): + group = context.current + group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) + if group_intid: + self.id_pool.release_intid('group', group_intid) + + @log_helpers.log_method_call + def update_port_pair_group(self, context): + current = context.current + original = context.original + + if set(current['port_pairs']) == set(original['port_pairs']): + return + + # Update the path_nodes and flows for each port chain that + # contains this port_pair_group + # Note: _get_port_pair_group is temporarily used here. + ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, + current['id']) + port_chains = [assoc.portchain_id for assoc in + ppg_obj.chain_group_associations] + + for chain_id in port_chains: + port_chain = context._plugin.get_port_chain( + context._plugin_context, chain_id) + group_intid = self.id_pool.get_intid_by_uuid('group', + current['id']) + # Get the previous node + prev_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'next_group_id': group_intid}) + if not prev_node: + continue + + before_update_prev_node = prev_node.copy() + # Update the previous node + _, curr_group_members = self._get_portgroup_members(context, + current['id']) + prev_node['next_hop'] = ( + jsonutils.dumps(curr_group_members) + if curr_group_members else None + ) + # update next hop to database + self.update_path_node(prev_node['id'], prev_node) + if prev_node['node_type'] == ovs_const.SRC_NODE: + self._delete_src_node_flowrules( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_src_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + else: + self._delete_path_node_flowrule( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_path_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + + # Update the current node + # to find the current node by using the node's next_group_id + # if this node is the last, next_group_id would be None + curr_pos = port_chain['port_pair_groups'].index(current['id']) + curr_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'nsi': 0xfe - curr_pos}) + if not curr_node: + continue + + # Add the port-pair-details into the current node + for pp_id in ( + set(current['port_pairs']) - set(original['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + + assco_args = {'portpair_id': ppd['id'], + 'pathnode_id': curr_node['id'], + 'weight': 1, } + self.create_pathport_assoc(assco_args) + self._update_path_node_port_flowrules( + curr_node, ppd, port_chain['flow_classifiers']) + + # Delete the port-pair-details from the current node + for pp_id in ( + set(original['port_pairs']) - set(current['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + self._delete_path_node_port_flowrule( + curr_node, ppd, port_chain['flow_classifiers']) + self.delete_pathport_assoc(curr_node['id'], ppd['id']) + + @log_helpers.log_method_call + def _get_portpair_detail_info(self, portpair_id): + """Get port detail. + + @param: portpair_id: uuid + @return: (host_id, local_ip, network_type, segment_id, + service_insert_type): tuple + """ + + core_plugin = manager.NeutronManager.get_plugin() + port_detail = core_plugin.get_port(self.admin_context, portpair_id) + host_id, local_ip, network_type, segment_id, mac_address = ( + (None, ) * 5) + + if port_detail: + host_id = port_detail['binding:host_id'] + network_id = port_detail['network_id'] + mac_address = port_detail['mac_address'] + network_info = core_plugin.get_network( + self.admin_context, network_id) + network_type = network_info['provider:network_type'] + segment_id = network_info['provider:segmentation_id'] + + if ( + host_id and + network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] + ): + driver = core_plugin.type_manager.drivers.get(network_type) + host_endpoint = driver.obj.get_endpoint_by_host(host_id) + local_ip = host_endpoint['ip_address'] + + return host_id, local_ip, network_type, segment_id, mac_address + + @log_helpers.log_method_call + def _create_port_detail(self, port_pair): + # since first node may not assign the ingress port, and last node may + # not assign the egress port. we use one of the + # port as the key to get the SF information. + port = None + if port_pair.get('ingress', None): + port = port_pair['ingress'] + elif port_pair.get('egress', None): + port = port_pair['egress'] + + host_id, local_endpoint, network_type, segment_id, mac_address = ( + self._get_portpair_detail_info(port)) + port_detail = { + 'ingress': port_pair.get('ingress', None), + 'egress': port_pair.get('egress', None), + 'tenant_id': port_pair['tenant_id'], + 'host_id': host_id, + 'segment_id': segment_id, + 'network_type': network_type, + 'local_endpoint': local_endpoint, + 'mac_address': mac_address + } + r = self.create_port_detail(port_detail) + LOG.debug('create port detail: %s', r) + return r + + @log_helpers.log_method_call + def create_port_pair(self, context): + port_pair = context.current + self._create_port_detail(port_pair) + + @log_helpers.log_method_call + def delete_port_pair(self, context): + port_pair = context.current + + pd_filter = dict(ingress=port_pair.get('ingress', None), + egress=port_pair.get('egress', None), + tenant_id=port_pair['tenant_id'] + ) + pds = self.get_port_details_by_filter(pd_filter) + if pds: + for pd in pds: + self.delete_port_detail(pd['id']) + + @log_helpers.log_method_call + def update_port_pair(self, context): + pass + + def get_flowrules_by_host_portid(self, context, host, port_id): + port_chain_flowrules = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return port_chain_flowrules + try: + port_detail_list = [] + # one port only may be in egress/ingress port once time. + ingress_port = self.get_port_detail_by_filter( + dict(ingress=port_id)) + egress_port = self.get_port_detail_by_filter( + dict(egress=port_id)) + if not ingress_port and not egress_port: + return None + # SF migrate to other host + if ingress_port: + port_detail_list.append(ingress_port) + if ingress_port['host_id'] != host: + ingress_port.update(dict(host_id=host)) + + if egress_port: + port_detail_list.append(egress_port) + if egress_port['host_id'] != host: + egress_port.update(dict(host_id=host)) + + # this is a SF if there are both egress and engress. + for i, ports in enumerate(port_detail_list): + nodes_assocs = ports['path_nodes'] + for assoc in nodes_assocs: + # update current path flow rule + node = self.get_path_node(assoc['pathnode_id']) + port_chain = sfc_plugin.get_port_chain( + context, + node['portchain_id']) + flow_rule = self._build_portchain_flowrule_body( + node, + ports, + add_fc_ids=port_chain['flow_classifiers'] + ) + port_chain_flowrules.append(flow_rule) + + # update the pre-path node flow rule + # if node['node_type'] != ovs_const.SRC_NODE: + # node_filter = dict(nsp=node['nsp'], + # nsi=node['nsi'] + 1 + # ) + # pre_node_list = self.get_path_nodes_by_filter( + # node_filter) + # if not pre_node_list: + # continue + # for pre_node in pre_node_list: + # self._update_path_node_flowrules( + # pre_node, + # add_fc_ids=port_chain['flow_classifiers']) + + return port_chain_flowrules + + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flowrules_by_host_portid failed")) + + def get_flow_classifier_by_portchain_id(self, context, portchain_id): + try: + flow_classifier_list = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + + port_chain = sfc_plugin.get_port_chain( + context, + portchain_id) + flow_classifier_list = self._get_portchain_fcs(port_chain) + return flow_classifier_list + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) + + def update_flowrule_status(self, context, id, status): + try: + flowrule_status = dict(status=status) + self.update_path_node(id, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("update_flowrule_status failed")) + + def _update_src_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + add_fc_ids, + del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, + filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + flow_rule['host'] = agent['host'] + self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( + self.admin_context, + flow_rule) + + def _delete_src_node_flowrules(self, node, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + None, del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, filters={ + 'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + self._update_portchain_group_reference_count(flow_rule, + agent['host']) + self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( + self.admin_context, + flow_rule) + + def get_all_src_node_flowrules(self, context): + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + try: + frs = [] + port_chains = sfc_plugin.get_port_chains(context) + + for port_chain in port_chains: + # get the first node of this chain + node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) + portchain_node = self.get_path_node_by_filter(node_filters) + if not portchain_node: + continue + flow_rule = self._get_portchain_src_node_flowrule( + portchain_node, + port_chain['flow_classifiers'] + ) + if not flow_rule: + continue + frs.append(flow_rule) + return frs + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_all_src_node_flowrules failed")) + + def _get_portchain_src_node_flowrule(self, node, + add_fc_ids=None, del_fc_ids=None): + try: + add_fc_rt = [] + del_fc_rt = [] + + if add_fc_ids: + for fc in self._get_fcs_by_ids(add_fc_ids): + if not fc.get('logical_source_port', None): + add_fc_rt.append(fc) + + if del_fc_ids: + for fc in self._get_fcs_by_ids(del_fc_ids): + if not fc.get('logical_source_port', None): + del_fc_rt.append(fc) + + if not add_fc_rt and not del_fc_rt: + return None + + return self._build_portchain_flowrule_body_without_port( + node, add_fc_rt, del_fc_rt) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("_get_portchain_src_node_flowrule failed")) + + def _update_portchain_group_reference_count(self, flow_rule, host): + group_refcnt = 0 + flow_rule['host'] = host + + if flow_rule['next_group_id'] is not None: + all_nodes = self.get_path_nodes_by_filter( + filters={'next_group_id': flow_rule['next_group_id'], + 'nsi': 0xff}) + if all_nodes is not None: + for node in all_nodes: + if not node['portpair_details']: + group_refcnt += 1 + + port_details = self.get_port_details_by_filter( + dict(host_id=flow_rule['host'])) + if port_details is not None: + for pd in port_details: + for path in pd['path_nodes']: + path_node = self.get_path_node(path['pathnode_id']) + if ( + path_node['next_group_id'] == + flow_rule['next_group_id'] + ): + group_refcnt += 1 + + flow_rule['group_refcnt'] = group_refcnt + + return group_refcnt + + def _build_portchain_flowrule_body_without_port(self, + node, + add_fcs=None, + del_fcs=None): + flow_rule = node.copy() + flow_rule.pop('tenant_id') + flow_rule.pop('portpair_details') + + # according to the first sf node get network information + if not node['next_hop']: + return None + + next_hops = jsonutils.loads(node['next_hop']) + if not next_hops: + return None + + port_detail = self.get_port_detail_by_filter( + dict(id=next_hops[0]['portpair_id'])) + if not port_detail: + return None + + flow_rule['ingress'] = None + flow_rule['egress'] = None + flow_rule['network_type'] = port_detail['network_type'] + flow_rule['segment_id'] = port_detail['segment_id'] + + flow_rule['add_fcs'] = add_fcs + flow_rule['del_fcs'] = del_fcs + + # update next hop info + self._update_path_node_next_hops(flow_rule) + return flow_rule diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc.py b/networking_sfc/services/sfc/drivers/ovs/rpc.py new file mode 100644 index 0000000..a5ac0bc --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/rpc.py @@ -0,0 +1,112 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics +from neutron.common import rpc as n_rpc +from neutron.common import topics +from neutron.i18n import _LI + +from oslo_log import log as logging +import oslo_messaging + +LOG = logging.getLogger(__name__) + + +class SfcRpcCallback(object): + """Sfc RPC server.""" + + def __init__(self, driver): + self.target = oslo_messaging.Target(version='1.0') + self.driver = driver + + def get_flowrules_by_host_portid(self, context, **kwargs): + host = kwargs.get('host') + port_id = kwargs.get('port_id') + LOG.debug('from port-chain service plugin') + pcfrs = self.driver.get_flowrules_by_host_portid( + context, host, port_id) + LOG.debug('host: %s, port_id: %s', host, port_id) + return pcfrs + + def get_flow_classifier_by_portchain_id(self, context, **kwargs): + portchain_id = kwargs.get('portchain_id') + pcfcs = self.driver.get_flow_classifier_by_portchain_id( + context, + portchain_id) + LOG.debug('portchain id: %s', portchain_id) + return pcfcs + + def get_all_src_node_flowrules(self, context, **kwargs): + host = kwargs.get('host') + pcfcs = self.driver.get_all_src_node_flowrules( + context) + LOG.debug('portchain get_src_node_flowrules, host: %s', host) + return pcfcs + + def update_flowrules_status(self, context, **kwargs): + flowrules_status = kwargs.get('flowrules_status') + LOG.info(_LI('update_flowrules_status: %s'), flowrules_status) + for flowrule_dict in flowrules_status: + self.driver.update_flowrule_status( + context, flowrule_dict['id'], flowrule_dict['status']) + + +class SfcAgentRpcClient(object): + """RPC client for ovs sfc agent.""" + + def __init__(self, topic=sfc_topics.SFC_AGENT): + self.topic = topic + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def ask_agent_to_update_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to update flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.UPDATE), + server=host) + cctxt.cast(context, 'update_flow_rules', flowrule_entries=flows) + + def ask_agent_to_delete_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to delete flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.DELETE), + server=host) + cctxt.cast(context, 'delete_flow_rules', flowrule_entries=flows) + + def ask_agent_to_update_src_node_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to update src node flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.UPDATE), + server=host) + cctxt.cast(context, 'update_src_node_flow_rules', + flowrule_entries=flows) + + def ask_agent_to_delete_src_node_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to delete src node flows') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.DELETE), + server=host) + cctxt.cast(context, 'delete_src_node_flow_rules', + flowrule_entries=flows) diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py new file mode 100644 index 0000000..a35ff4f --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py @@ -0,0 +1,21 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +AGENT = 'q-agent-notifier' + +SFC_PLUGIN = 'q-sfc-plugin' +SFC_AGENT = 'q-sfc-agent' +SFC_FLOW = 'q-sfc-flow' + +PORTFLOW = 'portflowrule' -- cgit 1.2.3-korg