diff options
Diffstat (limited to 'networking_sfc/services/sfc/drivers')
-rw-r--r-- | networking_sfc/services/sfc/drivers/__init__.py | 0 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/base.py | 57 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/dummy/__init__.py | 0 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/dummy/dummy.py | 59 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/__init__.py | 0 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/constants.py | 57 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/db.py | 426 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/driver.py | 1076 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/rpc.py | 112 | ||||
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/rpc_topics.py | 21 |
10 files changed, 0 insertions, 1808 deletions
diff --git a/networking_sfc/services/sfc/drivers/__init__.py b/networking_sfc/services/sfc/drivers/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/base.py b/networking_sfc/services/sfc/drivers/base.py deleted file mode 100644 index 0816789..0000000 --- a/networking_sfc/services/sfc/drivers/base.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import six - - -@six.add_metaclass(abc.ABCMeta) -class SfcDriverBase(object): - """SFC Driver Base Class.""" - - @abc.abstractmethod - def create_port_chain(self, context): - pass - - @abc.abstractmethod - def delete_port_chain(self, context): - pass - - @abc.abstractmethod - def update_port_chain(self, context): - pass - - @abc.abstractmethod - def create_port_pair(self, context): - pass - - @abc.abstractmethod - def delete_port_pair(self, context): - pass - - @abc.abstractmethod - def update_port_pair(self, context): - pass - - @abc.abstractmethod - def create_port_pair_group(self, context): - pass - - @abc.abstractmethod - def delete_port_pair_group(self, context): - pass - - @abc.abstractmethod - def update_port_pair_group(self, context): - pass diff --git a/networking_sfc/services/sfc/drivers/dummy/__init__.py b/networking_sfc/services/sfc/drivers/dummy/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/dummy/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/dummy/dummy.py b/networking_sfc/services/sfc/drivers/dummy/dummy.py deleted file mode 100644 index 1ddd7d0..0000000 --- a/networking_sfc/services/sfc/drivers/dummy/dummy.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import helpers as log_helpers - -from networking_sfc.services.sfc.drivers import base as sfc_driver - - -class DummyDriver(sfc_driver.SfcDriverBase): - """SFC Driver Dummy Class.""" - def initialize(self): - pass - - @log_helpers.log_method_call - def create_port_chain(self, context): - pass - - @log_helpers.log_method_call - def delete_port_chain(self, context): - pass - - @log_helpers.log_method_call - def update_port_chain(self, context): - pass - - @log_helpers.log_method_call - def create_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def delete_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def update_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def create_port_pair(self, context): - pass - - @log_helpers.log_method_call - def delete_port_pair(self, context): - pass - - @log_helpers.log_method_call - def update_port_pair(self, context): - pass diff --git a/networking_sfc/services/sfc/drivers/ovs/__init__.py b/networking_sfc/services/sfc/drivers/ovs/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/ovs/constants.py b/networking_sfc/services/sfc/drivers/ovs/constants.py deleted file mode 100644 index 30e2c37..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/constants.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.common import constants as n_const - - -INGRESS_DIR = 'ingress' -EGRESS_DIR = 'egress' - -STATUS_BUILDING = 'building' -STATUS_ACTIVE = 'active' -STATUS_ERROR = 'error' -STATUS_DELETING = 'deleting' - - -PORTFLOW_OPT_ADD = 'add-flows' -PROTFLOW_OPT_DELETE = 'delete-flows' -PROTFLOW_OPT_UPDATE = 'update-flows' - - -SRC_NODE = 'src_node' -DST_NODE = 'dst_node' -SF_NODE = 'sf_node' - -RES_TYPE_GROUP = 'group' -RES_TYPE_NSP = 'nsp' - -INSERTION_TYPE_L2 = 'l2' -INSERTION_TYPE_L3 = 'l3' -INSERTION_TYPE_BITW = 'bitw' -INSERTION_TYPE_TAP = 'tap' - -MAX_HASH = 16 - -INSERTION_TYPE_DICT = { - n_const.DEVICE_OWNER_ROUTER_HA_INTF: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_INTF: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_GW: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_FLOATINGIP: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_DHCP: INSERTION_TYPE_TAP, - n_const.DEVICE_OWNER_DVR_INTERFACE: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_AGENT_GW: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_SNAT: INSERTION_TYPE_TAP, - n_const.DEVICE_OWNER_LOADBALANCER: INSERTION_TYPE_TAP, - 'compute': INSERTION_TYPE_L2 -} diff --git a/networking_sfc/services/sfc/drivers/ovs/db.py b/networking_sfc/services/sfc/drivers/ovs/db.py deleted file mode 100644 index 8d3c87d..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/db.py +++ /dev/null @@ -1,426 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import six - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_utils import uuidutils - -from neutron.common import exceptions as n_exc -from neutron import context as n_context -from neutron.db import common_db_mixin -from neutron.db import model_base -from neutron.db import models_v2 - -import sqlalchemy as sa -from sqlalchemy import orm -from sqlalchemy.orm import exc -from sqlalchemy import sql - - -LOG = logging.getLogger(__name__) - - -class PortPairDetailNotFound(n_exc.NotFound): - message = _("Portchain port brief %(port_id)s could not be found") - - -class NodeNotFound(n_exc.NotFound): - message = _("Portchain node %(node_id)s could not be found") - - -# name changed to ChainPathId -class UuidIntidAssoc(model_base.BASEV2, models_v2.HasId): - __tablename__ = 'sfc_uuid_intid_associations' - uuid = sa.Column(sa.String(36), primary_key=True) - intid = sa.Column(sa.Integer, unique=True, nullable=False) - type_ = sa.Column(sa.String(32), nullable=False) - - def __init__(self, uuid, intid, type_): - self.uuid = uuid - self.intid = intid - self.type_ = type_ - - -def singleton(class_): - instances = {} - - def getinstance(*args, **kwargs): - if class_ not in instances: - instances[class_] = class_(*args, **kwargs) - return instances[class_] - return getinstance - - -@singleton -class IDAllocation(object): - def __init__(self, context): - # Get the inital range from conf file. - conf_obj = {'group': [1, 255], 'portchain': [256, 65536]} - self.conf_obj = conf_obj - self.session = context.session - - @log_helpers.log_method_call - def assign_intid(self, type_, uuid): - query = self.session.query(UuidIntidAssoc).filter_by( - type_=type_).order_by(UuidIntidAssoc.intid) - - allocated_int_ids = {obj.intid for obj in query.all()} - - # Find the first one from the available range that - # is not in allocated_int_ids - start, end = self.conf_obj[type_][0], self.conf_obj[type_][1]+1 - for init_id in six.moves.range(start, end): - if init_id not in allocated_int_ids: - with self.session.begin(subtransactions=True): - uuid_intid = UuidIntidAssoc( - uuid, init_id, type_) - self.session.add(uuid_intid) - return init_id - else: - return None - - @log_helpers.log_method_call - def get_intid_by_uuid(self, type_, uuid): - - query_obj = self.session.query(UuidIntidAssoc).filter_by( - type_=type_, uuid=uuid).first() - if query_obj: - return query_obj.intid - else: - return None - - @log_helpers.log_method_call - def release_intid(self, type_, intid): - """Release int id. - - @param: type_: str - @param: intid: int - """ - with self.session.begin(subtransactions=True): - query_obj = self.session.query(UuidIntidAssoc).filter_by( - intid=intid, type_=type_).first() - - if query_obj: - self.session.delete(query_obj) - - -class PathPortAssoc(model_base.BASEV2): - """path port association table. - - It represents the association table which associate path_nodes with - portpair_details. - """ - __tablename__ = 'sfc_path_port_associations' - pathnode_id = sa.Column(sa.String(36), - sa.ForeignKey( - 'sfc_path_nodes.id', ondelete='CASCADE'), - primary_key=True) - portpair_id = sa.Column(sa.String(36), - sa.ForeignKey('sfc_portpair_details.id', - ondelete='CASCADE'), - primary_key=True) - weight = sa.Column(sa.Integer, nullable=False, default=1) - - -class PortPairDetail(model_base.BASEV2, models_v2.HasId, - models_v2.HasTenant): - __tablename__ = 'sfc_portpair_details' - ingress = sa.Column(sa.String(36), nullable=True) - egress = sa.Column(sa.String(36), nullable=True) - host_id = sa.Column(sa.String(255), nullable=False) - mac_address = sa.Column(sa.String(32), nullable=False) - network_type = sa.Column(sa.String(8)) - segment_id = sa.Column(sa.Integer) - local_endpoint = sa.Column(sa.String(64), nullable=False) - path_nodes = orm.relationship(PathPortAssoc, - backref='port_pair_detail', - lazy="joined", - cascade='all,delete') - - -class PathNode(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): - __tablename__ = 'sfc_path_nodes' - nsp = sa.Column(sa.Integer, nullable=False) - nsi = sa.Column(sa.Integer, nullable=False) - node_type = sa.Column(sa.String(32)) - portchain_id = sa.Column( - sa.String(255), - sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE')) - status = sa.Column(sa.String(32)) - portpair_details = orm.relationship(PathPortAssoc, - backref='path_nodes', - lazy="joined", - cascade='all,delete') - next_group_id = sa.Column(sa.Integer) - next_hop = sa.Column(sa.String(512)) - - -class OVSSfcDriverDB(common_db_mixin.CommonDbMixin): - def initialize(self): - self.admin_context = n_context.get_admin_context() - - def _make_pathnode_dict(self, node, fields=None): - res = {'id': node['id'], - 'tenant_id': node['tenant_id'], - 'node_type': node['node_type'], - 'nsp': node['nsp'], - 'nsi': node['nsi'], - 'next_group_id': node['next_group_id'], - 'next_hop': node['next_hop'], - 'portchain_id': node['portchain_id'], - 'status': node['status'], - 'portpair_details': [pair_detail['portpair_id'] - for pair_detail in node['portpair_details'] - ] - } - - return self._fields(res, fields) - - def _make_port_detail_dict(self, port, fields=None): - res = {'id': port['id'], - 'tenant_id': port['tenant_id'], - 'host_id': port['host_id'], - 'ingress': port.get('ingress', None), - 'egress': port.get('egress', None), - 'segment_id': port['segment_id'], - 'local_endpoint': port['local_endpoint'], - 'mac_address': port['mac_address'], - 'network_type': port['network_type'], - 'path_nodes': [{'pathnode_id': node['pathnode_id'], - 'weight': node['weight']} - for node in port['path_nodes']] - } - - return self._fields(res, fields) - - def _make_pathport_assoc_dict(self, assoc, fields=None): - res = {'pathnode_id': assoc['pathnode_id'], - 'portpair_id': assoc['portpair_id'], - 'weight': assoc['weight'], - } - - return self._fields(res, fields) - - def _get_path_node(self, id): - try: - node = self._get_by_id(self.admin_context, PathNode, id) - except exc.NoResultFound: - raise NodeNotFound(node_id=id) - return node - - def _get_port_detail(self, id): - try: - port = self._get_by_id(self.admin_context, PortPairDetail, id) - except exc.NoResultFound: - raise PortPairDetailNotFound(port_id=id) - return port - - def create_port_detail(self, port): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(port, PortPairDetail) - args['id'] = uuidutils.generate_uuid() - port_obj = PortPairDetail(**args) - self.admin_context.session.add(port_obj) - return self._make_port_detail_dict(port_obj) - - def create_path_node(self, node): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(node, PathNode) - args['id'] = uuidutils.generate_uuid() - node_obj = PathNode(**args) - self.admin_context.session.add(node_obj) - return self._make_pathnode_dict(node_obj) - - def create_pathport_assoc(self, assoc): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(assoc, PathPortAssoc) - assoc_obj = PathPortAssoc(**args) - self.admin_context.session.add(assoc_obj) - return self._make_pathport_assoc_dict(assoc_obj) - - def delete_pathport_assoc(self, pathnode_id, portdetail_id): - with self.admin_context.session.begin(subtransactions=True): - self.admin_context.session.query(PathPortAssoc).filter_by( - pathnode_id=pathnode_id, - portpair_id=portdetail_id).delete() - - def update_port_detail(self, id, port): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - for key, value in six.iteritems(port): - if key == 'path_nodes': - pns = [] - for pn in value: - pn_id = pn['pathnode_id'] - self._get_path_node(pn_id) - query = self._model_query( - self.admin_context, PathPortAssoc) - pn_association = query.filter_by( - pathnode_id=pn_id, - portpair_id=id - ).first() - if not pn_association: - pn_association = PathPortAssoc( - pathnode_id=pn_id, - portpair_id=id, - weight=pn.get('weight', 1) - ) - pns.append(pn_association) - port_obj[key] = pns - else: - port_obj[key] = value - port_obj.update(port) - return self._make_port_detail_dict(port_obj) - - def update_path_node(self, id, node): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - for key, value in six.iteritems(node): - if key == 'portpair_details': - pds = [] - for pd_id in value: - self._get_port_detail(pd_id) - query = self._model_query( - self.admin_context, PathPortAssoc) - pd_association = query.filter_by( - pathnode_id=id, - portpair_id=pd_id - ).first() - if not pd_association: - pd_association = PathPortAssoc( - pathnode_id=id, - portpair_id=pd_id - ) - pds.append(pd_association) - node_obj[key] = pds - else: - node_obj[key] = value - return self._make_pathnode_dict(node_obj) - - def delete_port_detail(self, id): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - self.admin_context.session.delete(port_obj) - - def delete_path_node(self, id): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - self.admin_context.session.delete(node_obj) - - def get_port_detail(self, id): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - return self._make_port_detail_dict(port_obj) - - def get_port_detail_without_exception(self, id): - with self.admin_context.session.begin(subtransactions=True): - try: - port = self._get_by_id( - self.admin_context, PortPairDetail, id) - except exc.NoResultFound: - return None - return self._make_port_detail_dict(port) - - def get_path_node(self, id): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - return self._make_pathnode_dict(node_obj) - - def get_path_nodes_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_path_nodes_by_filter( - filters, fields, sorts, limit, - marker, page_reverse - ) - all_items = qry.all() - if all_items: - return [self._make_pathnode_dict(item) for item in all_items] - - return None - - def get_path_node_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_path_nodes_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - first = qry.first() - if first: - return self._make_pathnode_dict(first) - - return None - - def _get_path_nodes_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - qry = self.admin_context.session.query(PathNode) - if filters: - for key, value in six.iteritems(filters): - column = getattr(PathNode, key, None) - if column: - if not value: - qry = qry.filter(sql.false()) - else: - qry = qry.filter(column == value) - return qry - - def get_port_details_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_port_details_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - all_items = qry.all() - if all_items: - return [ - self._make_port_detail_dict(item) - for item in all_items - ] - - return None - - def get_port_detail_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_port_details_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - first = qry.first() - if first: - return self._make_port_detail_dict(first) - - return None - - def _get_port_details_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - qry = self.admin_context.session.query(PortPairDetail) - if filters: - for key, value in six.iteritems(filters): - column = getattr(PortPairDetail, key, None) - if column: - if not value: - qry = qry.filter(sql.false()) - else: - qry = qry.filter(column == value) - - return qry diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py deleted file mode 100644 index 9dfc40d..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/driver.py +++ /dev/null @@ -1,1076 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import netaddr - -# from eventlet import greenthread - -from neutron.common import constants as nc_const -from neutron.common import rpc as n_rpc - -from neutron import manager - -from neutron.i18n import _LE -from neutron.i18n import _LW - -from neutron.plugins.common import constants as np_const - - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_serialization import jsonutils - -from networking_sfc.extensions import flowclassifier -from networking_sfc.extensions import sfc -from networking_sfc.services.sfc.common import exceptions as exc -from networking_sfc.services.sfc.drivers import base as driver_base -from networking_sfc.services.sfc.drivers.ovs import( - rpc_topics as sfc_topics) -from networking_sfc.services.sfc.drivers.ovs import( - db as ovs_sfc_db) -from networking_sfc.services.sfc.drivers.ovs import( - rpc as ovs_sfc_rpc) -from networking_sfc.services.sfc.drivers.ovs import ( - constants as ovs_const) - - -LOG = logging.getLogger(__name__) - - -class OVSSfcDriver(driver_base.SfcDriverBase, - ovs_sfc_db.OVSSfcDriverDB): - """Sfc Driver Base Class.""" - - def initialize(self): - super(OVSSfcDriver, self).initialize() - self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( - sfc_topics.SFC_AGENT - ) - - self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) - self._setup_rpc() - - def _setup_rpc(self): - # Setup a rpc server - self.topic = sfc_topics.SFC_PLUGIN - self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] - self.conn = n_rpc.create_connection(new=True) - self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() - - def _get_subnet(self, core_plugin, tenant_id, cidr): - filters = {'tenant_id': [tenant_id]} - subnets = core_plugin.get_subnets(self.admin_context, filters=filters) - cidr_set = netaddr.IPSet([cidr]) - - for subnet in subnets: - subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) - if cidr_set.issubset(subnet_cidr_set): - return subnet - - def _get_fc_dst_subnet_gw_port(self, fc): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet(core_plugin, - fc['tenant_id'], - fc['destination_ip_prefix']) - - return self._get_port_subnet_gw_info(core_plugin, subnet) - - def _get_port_subnet_gw_info_by_port_id(self, id): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet_by_port(core_plugin, id) - return self._get_port_subnet_gw_info(core_plugin, - subnet) - - def _get_port_subnet_gw_info(self, core_plugin, subnet): - filters = { - 'device_owner': - [nc_const.DEVICE_OWNER_ROUTER_INTF] - } - gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) - for port in gw_ports: - for fixed_ip in port['fixed_ips']: - if subnet["id"] == fixed_ip['subnet_id']: - return (port['mac_address'], - subnet['cidr'], - subnet['network_id']) - - raise exc.NoSubnetGateway( - type='subnet gateway', - cidr=subnet['cidr']) - - def _get_subnet_by_port(self, core_plugin, id): - port = core_plugin.get_port(self.admin_context, id) - for ip in port['fixed_ips']: - subnet = core_plugin.get_subnet(self.admin_context, - ip["subnet_id"]) - # currently only support one subnet for a port - break - - return subnet - - @log_helpers.log_method_call - def _get_portgroup_members(self, context, pg_id): - next_group_members = [] - group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) - LOG.debug('group_intid: %s', group_intid) - pg = context._plugin.get_port_pair_group(context._plugin_context, - pg_id) - for pp_id in pg['port_pairs']: - pp = context._plugin.get_port_pair(context._plugin_context, pp_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - if pd: - next_group_members.append( - dict(portpair_id=pd['id'], weight=1)) - return group_intid, next_group_members - - def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): - pp = context._plugin.get_port_pair(context._plugin_context, - port_pair_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - - return pd - - @log_helpers.log_method_call - def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node, dst_node, - last_sf_node=None): - dst_ports = [] - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # lookup the source port - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - src_pd = self.get_port_detail_by_filter(src_pd_filter) - - if not src_pd: - # Create source port detail - src_pd = self._create_port_detail(src_pd_filter) - LOG.debug('create src port detail: %s', src_pd) - - # Create associate relationship - assco_args = {'portpair_id': src_pd['id'], - 'pathnode_id': src_node['id'], - 'weight': 1, - } - sna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc src port with node: %s', sna) - src_node['portpair_details'].append(src_pd['id']) - - if fc.get('logical_destination_port', ''): - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - dst_pd = self.get_port_detail_by_filter(dst_pd_filter) - - if not dst_pd: - # Create dst port detail - dst_pd = self._create_port_detail(dst_pd_filter) - LOG.debug('create dst port detail: %s', dst_pd) - - # Create associate relationship - dst_assco_args = {'portpair_id': dst_pd['id'], - 'pathnode_id': dst_node['id'], - 'weight': 1, - } - dna = self.create_pathport_assoc(dst_assco_args) - LOG.debug('create assoc dst port with node: %s', dna) - dst_node['portpair_details'].append(dst_pd['id']) - - dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) - - if last_sf_node: - if last_sf_node['next_hop']: - next_hops = jsonutils.loads(last_sf_node['next_hop']) - next_hops.extend(dst_ports) - last_sf_node['next_hop'] = jsonutils.dumps(next_hops) - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - return dst_ports - - def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node=None, dst_node=None, - last_sf_node=None): - if not fc_ids: - return - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # delete source port detail - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(src_pd_filter) - if pds: - for pd in pds: - # update src_node portpair_details refence info - if src_node and pd['id'] in src_node[ - 'portpair_details' - ]: - src_node['portpair_details'].remove(pd['id']) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if fc.get('logical_destination_port', ''): - # Create dst port detail - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(dst_pd_filter) - if pds: - for pd in pds: - # update dst_node portpair_details refence info - if dst_node and pd['id'] in dst_node[ - 'portpair_details' - ]: - # update portpair_details of this node - dst_node['portpair_details'].remove(pd['id']) - # update last hop(SF-group) next hop info - if last_sf_node: - next_hop = dict(portpair_id=pd['id'], - weight=1) - next_hops = jsonutils.loads( - last_sf_node['next_hop']) - next_hops.remove(next_hop) - last_sf_node['next_hop'] = jsonutils.dumps( - next_hops) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if last_sf_node: - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - - @log_helpers.log_method_call - def _create_portchain_path(self, context, port_chain): - src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) - path_nodes, dst_ports = [], [] - # Create an assoc object for chain_id and path_id - # context = context._plugin_context - path_id = self.id_pool.assign_intid('portchain', port_chain['id']) - - if not path_id: - LOG.error(_LE('No path_id available for creating port chain path')) - return - - next_group_intid, next_group_members = self._get_portgroup_members( - context, port_chain['port_pair_groups'][0]) - - port_pair_groups = port_chain['port_pair_groups'] - sf_path_length = len(port_pair_groups) - # Create a head node object for port chain - src_args = {'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SRC_NODE, - 'nsp': path_id, - 'nsi': 0xff, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': jsonutils.dumps(next_group_members), - } - src_node = self.create_path_node(src_args) - LOG.debug('create src node: %s', src_node) - path_nodes.append(src_node) - - # Create a destination node object for port chain - dst_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.DST_NODE, - 'nsp': path_id, - 'nsi': 0xff - sf_path_length - 1, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': None, - 'next_hop': None - } - dst_node = self.create_path_node(dst_args) - LOG.debug('create dst node: %s', dst_node) - path_nodes.append(dst_node) - - dst_ports = self._add_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'], - src_node, - dst_node - ) - - for i in range(sf_path_length): - cur_group_members = next_group_members - # next_group for next hop - if i < sf_path_length - 1: - next_group_intid, next_group_members = ( - self._get_portgroup_members( - context, port_pair_groups[i + 1]) - ) - else: - next_group_intid = None - next_group_members = None if not dst_ports else dst_ports - - # Create a node object - node_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SF_NODE, - 'nsp': path_id, - 'nsi': 0xfe - i, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': ( - None if not next_group_members else - jsonutils.dumps(next_group_members) - ) - } - sf_node = self.create_path_node(node_args) - LOG.debug('chain path node: %s', sf_node) - # Create the assocation objects that combine the pathnode_id with - # the ingress of the port_pairs in the current group - # when port_group does not reach tail - for member in cur_group_members: - assco_args = {'portpair_id': member['portpair_id'], - 'pathnode_id': sf_node['id'], - 'weight': member['weight'], } - sfna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc port with node: %s', sfna) - sf_node['portpair_details'].append(member['portpair_id']) - path_nodes.append(sf_node) - - return path_nodes - - def _delete_path_node_port_flowrule(self, node, port, fc_ids): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - flow_rule = self._build_portchain_flowrule_body( - node, - port, - None, - fc_ids) - - self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( - self.admin_context, - flow_rule) - - def _delete_path_node_flowrule(self, node, fc_ids): - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._delete_path_node_port_flowrule( - node, port, fc_ids) - - @log_helpers.log_method_call - def _delete_portchain_path(self, context, portchain_id): - port_chain = context.current - first = self.get_path_node_by_filter( - filters={ - 'portchain_id': portchain_id, - 'nsi': 0xff - } - ) - - # delete flow rules which source port isn't assigned - # in flow classifier - if first: - self._delete_src_node_flowrules( - first, - port_chain['flow_classifiers'] - ) - - pds = self.get_path_nodes_by_filter( - dict(portchain_id=portchain_id)) - if pds: - for pd in pds: - self._delete_path_node_flowrule( - pd, - port_chain['flow_classifiers'] - ) - self.delete_path_node(pd['id']) - - # delete the ports on the traffic classifier - self._remove_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'] - ) - - # Delete the chainpathpair - intid = self.id_pool.get_intid_by_uuid( - 'portchain', portchain_id) - self.id_pool.release_intid('portchain', intid) - - def _update_path_node_next_hops(self, flow_rule): - node_next_hops = [] - if not flow_rule['next_hop']: - return None - next_hops = jsonutils.loads(flow_rule['next_hop']) - if not next_hops: - return None - for member in next_hops: - detail = {} - port_detail = self.get_port_detail_by_filter( - dict(id=member['portpair_id'])) - if not port_detail or not port_detail['host_id']: - continue - detail['local_endpoint'] = port_detail['local_endpoint'] - detail['weight'] = member['weight'] - detail['mac_address'] = port_detail['mac_address'] - detail['ingress'] = port_detail['ingress'] - node_next_hops.append(detail) - - mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( - detail['ingress'] - ) - - detail['gw_mac'] = mac - detail['cidr'] = cidr - detail['net_uuid'] = net_uuid - - flow_rule['next_hops'] = node_next_hops - flow_rule.pop('next_hop') - - return node_next_hops - - def _build_portchain_flowrule_body(self, node, port, - add_fc_ids=None, del_fc_ids=None): - node_info = node.copy() - node_info.pop('tenant_id') - node_info.pop('portpair_details') - - port_info = port.copy() - port_info.pop('tenant_id') - port_info.pop('id') - port_info.pop('path_nodes') - port_info.pop('host_id') - - flow_rule = dict(node_info, **port_info) - # if this port is belong to NSH/MPLS-aware vm, only to - # notify the flow classifier for 1st SF. - flow_rule['add_fcs'] = self._filter_flow_classifiers( - flow_rule, add_fc_ids) - flow_rule['del_fcs'] = self._filter_flow_classifiers( - flow_rule, del_fc_ids) - - self._update_portchain_group_reference_count(flow_rule, - port['host_id']) - - # update next hop info - self._update_path_node_next_hops(flow_rule) - - return flow_rule - - def _filter_flow_classifiers(self, flow_rule, fc_ids): - """Filter flow classifiers. - - @return: list of the flow classifiers - """ - - fc_return = [] - - if not fc_ids: - return fc_return - fcs = self._get_fcs_by_ids(fc_ids) - for fc in fcs: - new_fc = fc.copy() - new_fc.pop('id') - new_fc.pop('name') - new_fc.pop('tenant_id') - new_fc.pop('description') - - if ((flow_rule['node_type'] == ovs_const.SRC_NODE and - flow_rule['egress'] == fc['logical_source_port'] - ) or - (flow_rule['node_type'] == ovs_const.DST_NODE and - flow_rule['ingress'] == fc['logical_destination_port'] - )): - fc_return.append(new_fc) - elif flow_rule['node_type'] == ovs_const.SF_NODE: - fc_return.append(new_fc) - - return fc_return - - def _update_path_node_port_flowrules(self, node, port, - add_fc_ids=None, del_fc_ids=None): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - - flow_rule = self._build_portchain_flowrule_body( - node, - port, - add_fc_ids, - del_fc_ids) - - self.ovs_driver_rpc.ask_agent_to_update_flow_rules( - self.admin_context, - flow_rule) - - def _update_path_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - if node['portpair_details'] is None: - return - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._update_path_node_port_flowrules( - node, port, add_fc_ids, del_fc_ids) - - def _thread_update_path_nodes(self, nodes, - add_fc_ids=None, del_fc_ids=None): - for node in nodes: - self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) - self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) - - def _get_portchain_fcs(self, port_chain): - return self._get_fcs_by_ids(port_chain['flow_classifiers']) - - def _get_fcs_by_ids(self, fc_ids): - flow_classifiers = [] - if not fc_ids: - return flow_classifiers - - # Get the portchain flow classifiers - fc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - flowclassifier.FLOW_CLASSIFIER_EXT) - ) - if not fc_plugin: - LOG.warn(_LW("Not found the flow classifier service plugin")) - return flow_classifiers - - for fc_id in fc_ids: - fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) - flow_classifiers.append(fc) - - return flow_classifiers - - @log_helpers.log_method_call - def create_port_chain(self, context): - port_chain = context.current - path_nodes = self._create_portchain_path(context, port_chain) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes( - path_nodes, - port_chain['flow_classifiers'], - None) - - @log_helpers.log_method_call - def delete_port_chain(self, context): - port_chain = context.current - portchain_id = port_chain['id'] - LOG.debug("to delete portchain path") - self._delete_portchain_path(context, portchain_id) - - def _get_diff_set(self, orig, cur): - orig_set = set(item for item in orig) - cur_set = set(item for item in cur) - - to_del = orig_set.difference(cur_set) - to_add = cur_set.difference(orig_set) - - return to_del, to_add - - @log_helpers.log_method_call - def update_port_chain(self, context): - port_chain = context.current - orig = context.original - - del_fc_ids, add_fc_ids = self._get_diff_set( - orig['flow_classifiers'], - port_chain['flow_classifiers'] - ) - path_nodes = self.get_path_nodes_by_filter( - dict(portchain_id=port_chain['id']) - ) - if not path_nodes: - return - - sort_path_nodes = sorted(path_nodes, - key=lambda x: x['nsi'], - reverse=True) - if del_fc_ids: - self._thread_update_path_nodes(sort_path_nodes, - None, - del_fc_ids) - self._remove_flowclassifier_port_assoc(del_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - if add_fc_ids: - self._add_flowclassifier_port_assoc(add_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes(sort_path_nodes, - add_fc_ids, - None) - - @log_helpers.log_method_call - def create_port_pair_group(self, context): - group = context.current - self.id_pool.assign_intid('group', group['id']) - - @log_helpers.log_method_call - def delete_port_pair_group(self, context): - group = context.current - group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) - if group_intid: - self.id_pool.release_intid('group', group_intid) - - @log_helpers.log_method_call - def update_port_pair_group(self, context): - current = context.current - original = context.original - - if set(current['port_pairs']) == set(original['port_pairs']): - return - - # Update the path_nodes and flows for each port chain that - # contains this port_pair_group - # Note: _get_port_pair_group is temporarily used here. - ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, - current['id']) - port_chains = [assoc.portchain_id for assoc in - ppg_obj.chain_group_associations] - - for chain_id in port_chains: - port_chain = context._plugin.get_port_chain( - context._plugin_context, chain_id) - group_intid = self.id_pool.get_intid_by_uuid('group', - current['id']) - # Get the previous node - prev_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'next_group_id': group_intid}) - if not prev_node: - continue - - before_update_prev_node = prev_node.copy() - # Update the previous node - _, curr_group_members = self._get_portgroup_members(context, - current['id']) - prev_node['next_hop'] = ( - jsonutils.dumps(curr_group_members) - if curr_group_members else None - ) - # update next hop to database - self.update_path_node(prev_node['id'], prev_node) - if prev_node['node_type'] == ovs_const.SRC_NODE: - self._delete_src_node_flowrules( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_src_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - else: - self._delete_path_node_flowrule( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_path_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - - # Update the current node - # to find the current node by using the node's next_group_id - # if this node is the last, next_group_id would be None - curr_pos = port_chain['port_pair_groups'].index(current['id']) - curr_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'nsi': 0xfe - curr_pos}) - if not curr_node: - continue - - # Add the port-pair-details into the current node - for pp_id in ( - set(current['port_pairs']) - set(original['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - - assco_args = {'portpair_id': ppd['id'], - 'pathnode_id': curr_node['id'], - 'weight': 1, } - self.create_pathport_assoc(assco_args) - self._update_path_node_port_flowrules( - curr_node, ppd, port_chain['flow_classifiers']) - - # Delete the port-pair-details from the current node - for pp_id in ( - set(original['port_pairs']) - set(current['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - self._delete_path_node_port_flowrule( - curr_node, ppd, port_chain['flow_classifiers']) - self.delete_pathport_assoc(curr_node['id'], ppd['id']) - - @log_helpers.log_method_call - def _get_portpair_detail_info(self, portpair_id): - """Get port detail. - - @param: portpair_id: uuid - @return: (host_id, local_ip, network_type, segment_id, - service_insert_type): tuple - """ - - core_plugin = manager.NeutronManager.get_plugin() - port_detail = core_plugin.get_port(self.admin_context, portpair_id) - host_id, local_ip, network_type, segment_id, mac_address = ( - (None, ) * 5) - - if port_detail: - host_id = port_detail['binding:host_id'] - network_id = port_detail['network_id'] - mac_address = port_detail['mac_address'] - network_info = core_plugin.get_network( - self.admin_context, network_id) - network_type = network_info['provider:network_type'] - segment_id = network_info['provider:segmentation_id'] - - if ( - host_id and - network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] - ): - driver = core_plugin.type_manager.drivers.get(network_type) - host_endpoint = driver.obj.get_endpoint_by_host(host_id) - local_ip = host_endpoint['ip_address'] - - return host_id, local_ip, network_type, segment_id, mac_address - - @log_helpers.log_method_call - def _create_port_detail(self, port_pair): - # since first node may not assign the ingress port, and last node may - # not assign the egress port. we use one of the - # port as the key to get the SF information. - port = None - if port_pair.get('ingress', None): - port = port_pair['ingress'] - elif port_pair.get('egress', None): - port = port_pair['egress'] - - host_id, local_endpoint, network_type, segment_id, mac_address = ( - self._get_portpair_detail_info(port)) - port_detail = { - 'ingress': port_pair.get('ingress', None), - 'egress': port_pair.get('egress', None), - 'tenant_id': port_pair['tenant_id'], - 'host_id': host_id, - 'segment_id': segment_id, - 'network_type': network_type, - 'local_endpoint': local_endpoint, - 'mac_address': mac_address - } - r = self.create_port_detail(port_detail) - LOG.debug('create port detail: %s', r) - return r - - @log_helpers.log_method_call - def create_port_pair(self, context): - port_pair = context.current - self._create_port_detail(port_pair) - - @log_helpers.log_method_call - def delete_port_pair(self, context): - port_pair = context.current - - pd_filter = dict(ingress=port_pair.get('ingress', None), - egress=port_pair.get('egress', None), - tenant_id=port_pair['tenant_id'] - ) - pds = self.get_port_details_by_filter(pd_filter) - if pds: - for pd in pds: - self.delete_port_detail(pd['id']) - - @log_helpers.log_method_call - def update_port_pair(self, context): - pass - - def get_flowrules_by_host_portid(self, context, host, port_id): - port_chain_flowrules = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return port_chain_flowrules - try: - port_detail_list = [] - # one port only may be in egress/ingress port once time. - ingress_port = self.get_port_detail_by_filter( - dict(ingress=port_id)) - egress_port = self.get_port_detail_by_filter( - dict(egress=port_id)) - if not ingress_port and not egress_port: - return None - # SF migrate to other host - if ingress_port: - port_detail_list.append(ingress_port) - if ingress_port['host_id'] != host: - ingress_port.update(dict(host_id=host)) - - if egress_port: - port_detail_list.append(egress_port) - if egress_port['host_id'] != host: - egress_port.update(dict(host_id=host)) - - # this is a SF if there are both egress and engress. - for i, ports in enumerate(port_detail_list): - nodes_assocs = ports['path_nodes'] - for assoc in nodes_assocs: - # update current path flow rule - node = self.get_path_node(assoc['pathnode_id']) - port_chain = sfc_plugin.get_port_chain( - context, - node['portchain_id']) - flow_rule = self._build_portchain_flowrule_body( - node, - ports, - add_fc_ids=port_chain['flow_classifiers'] - ) - port_chain_flowrules.append(flow_rule) - - # update the pre-path node flow rule - # if node['node_type'] != ovs_const.SRC_NODE: - # node_filter = dict(nsp=node['nsp'], - # nsi=node['nsi'] + 1 - # ) - # pre_node_list = self.get_path_nodes_by_filter( - # node_filter) - # if not pre_node_list: - # continue - # for pre_node in pre_node_list: - # self._update_path_node_flowrules( - # pre_node, - # add_fc_ids=port_chain['flow_classifiers']) - - return port_chain_flowrules - - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flowrules_by_host_portid failed")) - - def get_flow_classifier_by_portchain_id(self, context, portchain_id): - try: - flow_classifier_list = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - - port_chain = sfc_plugin.get_port_chain( - context, - portchain_id) - flow_classifier_list = self._get_portchain_fcs(port_chain) - return flow_classifier_list - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) - - def update_flowrule_status(self, context, id, status): - try: - flowrule_status = dict(status=status) - self.update_path_node(id, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("update_flowrule_status failed")) - - def _update_src_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - add_fc_ids, - del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, - filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - flow_rule['host'] = agent['host'] - self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( - self.admin_context, - flow_rule) - - def _delete_src_node_flowrules(self, node, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - None, del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, filters={ - 'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - self._update_portchain_group_reference_count(flow_rule, - agent['host']) - self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( - self.admin_context, - flow_rule) - - def get_all_src_node_flowrules(self, context): - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - try: - frs = [] - port_chains = sfc_plugin.get_port_chains(context) - - for port_chain in port_chains: - # get the first node of this chain - node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) - portchain_node = self.get_path_node_by_filter(node_filters) - if not portchain_node: - continue - flow_rule = self._get_portchain_src_node_flowrule( - portchain_node, - port_chain['flow_classifiers'] - ) - if not flow_rule: - continue - frs.append(flow_rule) - return frs - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_all_src_node_flowrules failed")) - - def _get_portchain_src_node_flowrule(self, node, - add_fc_ids=None, del_fc_ids=None): - try: - add_fc_rt = [] - del_fc_rt = [] - - if add_fc_ids: - for fc in self._get_fcs_by_ids(add_fc_ids): - if not fc.get('logical_source_port', None): - add_fc_rt.append(fc) - - if del_fc_ids: - for fc in self._get_fcs_by_ids(del_fc_ids): - if not fc.get('logical_source_port', None): - del_fc_rt.append(fc) - - if not add_fc_rt and not del_fc_rt: - return None - - return self._build_portchain_flowrule_body_without_port( - node, add_fc_rt, del_fc_rt) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("_get_portchain_src_node_flowrule failed")) - - def _update_portchain_group_reference_count(self, flow_rule, host): - group_refcnt = 0 - flow_rule['host'] = host - - if flow_rule['next_group_id'] is not None: - all_nodes = self.get_path_nodes_by_filter( - filters={'next_group_id': flow_rule['next_group_id'], - 'nsi': 0xff}) - if all_nodes is not None: - for node in all_nodes: - if not node['portpair_details']: - group_refcnt += 1 - - port_details = self.get_port_details_by_filter( - dict(host_id=flow_rule['host'])) - if port_details is not None: - for pd in port_details: - for path in pd['path_nodes']: - path_node = self.get_path_node(path['pathnode_id']) - if ( - path_node['next_group_id'] == - flow_rule['next_group_id'] - ): - group_refcnt += 1 - - flow_rule['group_refcnt'] = group_refcnt - - return group_refcnt - - def _build_portchain_flowrule_body_without_port(self, - node, - add_fcs=None, - del_fcs=None): - flow_rule = node.copy() - flow_rule.pop('tenant_id') - flow_rule.pop('portpair_details') - - # according to the first sf node get network information - if not node['next_hop']: - return None - - next_hops = jsonutils.loads(node['next_hop']) - if not next_hops: - return None - - port_detail = self.get_port_detail_by_filter( - dict(id=next_hops[0]['portpair_id'])) - if not port_detail: - return None - - flow_rule['ingress'] = None - flow_rule['egress'] = None - flow_rule['network_type'] = port_detail['network_type'] - flow_rule['segment_id'] = port_detail['segment_id'] - - flow_rule['add_fcs'] = add_fcs - flow_rule['del_fcs'] = del_fcs - - # update next hop info - self._update_path_node_next_hops(flow_rule) - return flow_rule diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc.py b/networking_sfc/services/sfc/drivers/ovs/rpc.py deleted file mode 100644 index a5ac0bc..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/rpc.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics -from neutron.common import rpc as n_rpc -from neutron.common import topics -from neutron.i18n import _LI - -from oslo_log import log as logging -import oslo_messaging - -LOG = logging.getLogger(__name__) - - -class SfcRpcCallback(object): - """Sfc RPC server.""" - - def __init__(self, driver): - self.target = oslo_messaging.Target(version='1.0') - self.driver = driver - - def get_flowrules_by_host_portid(self, context, **kwargs): - host = kwargs.get('host') - port_id = kwargs.get('port_id') - LOG.debug('from port-chain service plugin') - pcfrs = self.driver.get_flowrules_by_host_portid( - context, host, port_id) - LOG.debug('host: %s, port_id: %s', host, port_id) - return pcfrs - - def get_flow_classifier_by_portchain_id(self, context, **kwargs): - portchain_id = kwargs.get('portchain_id') - pcfcs = self.driver.get_flow_classifier_by_portchain_id( - context, - portchain_id) - LOG.debug('portchain id: %s', portchain_id) - return pcfcs - - def get_all_src_node_flowrules(self, context, **kwargs): - host = kwargs.get('host') - pcfcs = self.driver.get_all_src_node_flowrules( - context) - LOG.debug('portchain get_src_node_flowrules, host: %s', host) - return pcfcs - - def update_flowrules_status(self, context, **kwargs): - flowrules_status = kwargs.get('flowrules_status') - LOG.info(_LI('update_flowrules_status: %s'), flowrules_status) - for flowrule_dict in flowrules_status: - self.driver.update_flowrule_status( - context, flowrule_dict['id'], flowrule_dict['status']) - - -class SfcAgentRpcClient(object): - """RPC client for ovs sfc agent.""" - - def __init__(self, topic=sfc_topics.SFC_AGENT): - self.topic = topic - target = oslo_messaging.Target(topic=topic, version='1.0') - self.client = n_rpc.get_client(target) - - def ask_agent_to_update_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to update flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.UPDATE), - server=host) - cctxt.cast(context, 'update_flow_rules', flowrule_entries=flows) - - def ask_agent_to_delete_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to delete flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.DELETE), - server=host) - cctxt.cast(context, 'delete_flow_rules', flowrule_entries=flows) - - def ask_agent_to_update_src_node_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to update src node flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.UPDATE), - server=host) - cctxt.cast(context, 'update_src_node_flow_rules', - flowrule_entries=flows) - - def ask_agent_to_delete_src_node_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to delete src node flows') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.DELETE), - server=host) - cctxt.cast(context, 'delete_src_node_flow_rules', - flowrule_entries=flows) diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py deleted file mode 100644 index a35ff4f..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -AGENT = 'q-agent-notifier' - -SFC_PLUGIN = 'q-sfc-plugin' -SFC_AGENT = 'q-sfc-agent' -SFC_FLOW = 'q-sfc-flow' - -PORTFLOW = 'portflowrule' |