aboutsummaryrefslogtreecommitdiffstats
path: root/networking_sfc/services/sfc/drivers/ovs/driver.py
diff options
context:
space:
mode:
authorUlas Kozat <ulas.kozat@huawei.com>2015-12-28 16:05:13 -0800
committerUlas Kozat <ulas.kozat@huawei.com>2015-12-28 16:05:13 -0800
commitc772a1dbc7ace58d099570d41a889adf851c8ba8 (patch)
tree809aefa0dae407a1d9c12989f7e8f60891700d17 /networking_sfc/services/sfc/drivers/ovs/driver.py
parente671a915d887ae8f7751a54bb07ecb7ed8f2f25b (diff)
Added networking-sfc from openstack project with merge date Dec 23 2015stable/coloradostable/brahmaputra
Added patch 13 for subject "add missing db migration files" Change-Id: Id51a160335a14870c1dd816a44baf9b1958b9ac6
Diffstat (limited to 'networking_sfc/services/sfc/drivers/ovs/driver.py')
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/driver.py1076
1 files changed, 1076 insertions, 0 deletions
diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py
new file mode 100644
index 0000000..9dfc40d
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/driver.py
@@ -0,0 +1,1076 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+
+# from eventlet import greenthread
+
+from neutron.common import constants as nc_const
+from neutron.common import rpc as n_rpc
+
+from neutron import manager
+
+from neutron.i18n import _LE
+from neutron.i18n import _LW
+
+from neutron.plugins.common import constants as np_const
+
+
+from oslo_log import helpers as log_helpers
+from oslo_log import log as logging
+from oslo_serialization import jsonutils
+
+from networking_sfc.extensions import flowclassifier
+from networking_sfc.extensions import sfc
+from networking_sfc.services.sfc.common import exceptions as exc
+from networking_sfc.services.sfc.drivers import base as driver_base
+from networking_sfc.services.sfc.drivers.ovs import(
+ rpc_topics as sfc_topics)
+from networking_sfc.services.sfc.drivers.ovs import(
+ db as ovs_sfc_db)
+from networking_sfc.services.sfc.drivers.ovs import(
+ rpc as ovs_sfc_rpc)
+from networking_sfc.services.sfc.drivers.ovs import (
+ constants as ovs_const)
+
+
+LOG = logging.getLogger(__name__)
+
+
+class OVSSfcDriver(driver_base.SfcDriverBase,
+ ovs_sfc_db.OVSSfcDriverDB):
+ """Sfc Driver Base Class."""
+
+ def initialize(self):
+ super(OVSSfcDriver, self).initialize()
+ self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient(
+ sfc_topics.SFC_AGENT
+ )
+
+ self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context)
+ self._setup_rpc()
+
+ def _setup_rpc(self):
+ # Setup a rpc server
+ self.topic = sfc_topics.SFC_PLUGIN
+ self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)]
+ self.conn = n_rpc.create_connection(new=True)
+ self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
+ self.conn.consume_in_threads()
+
+ def _get_subnet(self, core_plugin, tenant_id, cidr):
+ filters = {'tenant_id': [tenant_id]}
+ subnets = core_plugin.get_subnets(self.admin_context, filters=filters)
+ cidr_set = netaddr.IPSet([cidr])
+
+ for subnet in subnets:
+ subnet_cidr_set = netaddr.IPSet([subnet['cidr']])
+ if cidr_set.issubset(subnet_cidr_set):
+ return subnet
+
+ def _get_fc_dst_subnet_gw_port(self, fc):
+ core_plugin = manager.NeutronManager.get_plugin()
+ subnet = self._get_subnet(core_plugin,
+ fc['tenant_id'],
+ fc['destination_ip_prefix'])
+
+ return self._get_port_subnet_gw_info(core_plugin, subnet)
+
+ def _get_port_subnet_gw_info_by_port_id(self, id):
+ core_plugin = manager.NeutronManager.get_plugin()
+ subnet = self._get_subnet_by_port(core_plugin, id)
+ return self._get_port_subnet_gw_info(core_plugin,
+ subnet)
+
+ def _get_port_subnet_gw_info(self, core_plugin, subnet):
+ filters = {
+ 'device_owner':
+ [nc_const.DEVICE_OWNER_ROUTER_INTF]
+ }
+ gw_ports = core_plugin.get_ports(self.admin_context, filters=filters)
+ for port in gw_ports:
+ for fixed_ip in port['fixed_ips']:
+ if subnet["id"] == fixed_ip['subnet_id']:
+ return (port['mac_address'],
+ subnet['cidr'],
+ subnet['network_id'])
+
+ raise exc.NoSubnetGateway(
+ type='subnet gateway',
+ cidr=subnet['cidr'])
+
+ def _get_subnet_by_port(self, core_plugin, id):
+ port = core_plugin.get_port(self.admin_context, id)
+ for ip in port['fixed_ips']:
+ subnet = core_plugin.get_subnet(self.admin_context,
+ ip["subnet_id"])
+ # currently only support one subnet for a port
+ break
+
+ return subnet
+
+ @log_helpers.log_method_call
+ def _get_portgroup_members(self, context, pg_id):
+ next_group_members = []
+ group_intid = self.id_pool.get_intid_by_uuid('group', pg_id)
+ LOG.debug('group_intid: %s', group_intid)
+ pg = context._plugin.get_port_pair_group(context._plugin_context,
+ pg_id)
+ for pp_id in pg['port_pairs']:
+ pp = context._plugin.get_port_pair(context._plugin_context, pp_id)
+ filters = {}
+ if pp.get('ingress', None):
+ filters = dict(dict(ingress=pp['ingress']), **filters)
+ if pp.get('egress', None):
+ filters = dict(dict(egress=pp['egress']), **filters)
+ pd = self.get_port_detail_by_filter(filters)
+ if pd:
+ next_group_members.append(
+ dict(portpair_id=pd['id'], weight=1))
+ return group_intid, next_group_members
+
+ def _get_port_pair_detail_by_port_pair(self, context, port_pair_id):
+ pp = context._plugin.get_port_pair(context._plugin_context,
+ port_pair_id)
+ filters = {}
+ if pp.get('ingress', None):
+ filters = dict(dict(ingress=pp['ingress']), **filters)
+ if pp.get('egress', None):
+ filters = dict(dict(egress=pp['egress']), **filters)
+ pd = self.get_port_detail_by_filter(filters)
+
+ return pd
+
+ @log_helpers.log_method_call
+ def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id,
+ src_node, dst_node,
+ last_sf_node=None):
+ dst_ports = []
+ for fc in self._get_fcs_by_ids(fc_ids):
+ if fc.get('logical_source_port', ''):
+ # lookup the source port
+ src_pd_filter = dict(egress=fc['logical_source_port'],
+ tenant_id=tenant_id
+ )
+ src_pd = self.get_port_detail_by_filter(src_pd_filter)
+
+ if not src_pd:
+ # Create source port detail
+ src_pd = self._create_port_detail(src_pd_filter)
+ LOG.debug('create src port detail: %s', src_pd)
+
+ # Create associate relationship
+ assco_args = {'portpair_id': src_pd['id'],
+ 'pathnode_id': src_node['id'],
+ 'weight': 1,
+ }
+ sna = self.create_pathport_assoc(assco_args)
+ LOG.debug('create assoc src port with node: %s', sna)
+ src_node['portpair_details'].append(src_pd['id'])
+
+ if fc.get('logical_destination_port', ''):
+ dst_pd_filter = dict(ingress=fc['logical_destination_port'],
+ tenant_id=tenant_id
+ )
+ dst_pd = self.get_port_detail_by_filter(dst_pd_filter)
+
+ if not dst_pd:
+ # Create dst port detail
+ dst_pd = self._create_port_detail(dst_pd_filter)
+ LOG.debug('create dst port detail: %s', dst_pd)
+
+ # Create associate relationship
+ dst_assco_args = {'portpair_id': dst_pd['id'],
+ 'pathnode_id': dst_node['id'],
+ 'weight': 1,
+ }
+ dna = self.create_pathport_assoc(dst_assco_args)
+ LOG.debug('create assoc dst port with node: %s', dna)
+ dst_node['portpair_details'].append(dst_pd['id'])
+
+ dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1))
+
+ if last_sf_node:
+ if last_sf_node['next_hop']:
+ next_hops = jsonutils.loads(last_sf_node['next_hop'])
+ next_hops.extend(dst_ports)
+ last_sf_node['next_hop'] = jsonutils.dumps(next_hops)
+ # update nexthop info of pre node
+ self.update_path_node(last_sf_node['id'],
+ last_sf_node)
+ return dst_ports
+
+ def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id,
+ src_node=None, dst_node=None,
+ last_sf_node=None):
+ if not fc_ids:
+ return
+ for fc in self._get_fcs_by_ids(fc_ids):
+ if fc.get('logical_source_port', ''):
+ # delete source port detail
+ src_pd_filter = dict(egress=fc['logical_source_port'],
+ tenant_id=tenant_id
+ )
+ pds = self.get_port_details_by_filter(src_pd_filter)
+ if pds:
+ for pd in pds:
+ # update src_node portpair_details refence info
+ if src_node and pd['id'] in src_node[
+ 'portpair_details'
+ ]:
+ src_node['portpair_details'].remove(pd['id'])
+ if len(pd['path_nodes']) == 1:
+ self.delete_port_detail(pd['id'])
+
+ if fc.get('logical_destination_port', ''):
+ # Create dst port detail
+ dst_pd_filter = dict(ingress=fc['logical_destination_port'],
+ tenant_id=tenant_id
+ )
+ pds = self.get_port_details_by_filter(dst_pd_filter)
+ if pds:
+ for pd in pds:
+ # update dst_node portpair_details refence info
+ if dst_node and pd['id'] in dst_node[
+ 'portpair_details'
+ ]:
+ # update portpair_details of this node
+ dst_node['portpair_details'].remove(pd['id'])
+ # update last hop(SF-group) next hop info
+ if last_sf_node:
+ next_hop = dict(portpair_id=pd['id'],
+ weight=1)
+ next_hops = jsonutils.loads(
+ last_sf_node['next_hop'])
+ next_hops.remove(next_hop)
+ last_sf_node['next_hop'] = jsonutils.dumps(
+ next_hops)
+ if len(pd['path_nodes']) == 1:
+ self.delete_port_detail(pd['id'])
+
+ if last_sf_node:
+ # update nexthop info of pre node
+ self.update_path_node(last_sf_node['id'],
+ last_sf_node)
+
+ @log_helpers.log_method_call
+ def _create_portchain_path(self, context, port_chain):
+ src_node, src_pd, dst_node, dst_pd = (({}, ) * 4)
+ path_nodes, dst_ports = [], []
+ # Create an assoc object for chain_id and path_id
+ # context = context._plugin_context
+ path_id = self.id_pool.assign_intid('portchain', port_chain['id'])
+
+ if not path_id:
+ LOG.error(_LE('No path_id available for creating port chain path'))
+ return
+
+ next_group_intid, next_group_members = self._get_portgroup_members(
+ context, port_chain['port_pair_groups'][0])
+
+ port_pair_groups = port_chain['port_pair_groups']
+ sf_path_length = len(port_pair_groups)
+ # Create a head node object for port chain
+ src_args = {'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.SRC_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xff,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': next_group_intid,
+ 'next_hop': jsonutils.dumps(next_group_members),
+ }
+ src_node = self.create_path_node(src_args)
+ LOG.debug('create src node: %s', src_node)
+ path_nodes.append(src_node)
+
+ # Create a destination node object for port chain
+ dst_args = {
+ 'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.DST_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xff - sf_path_length - 1,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': None,
+ 'next_hop': None
+ }
+ dst_node = self.create_path_node(dst_args)
+ LOG.debug('create dst node: %s', dst_node)
+ path_nodes.append(dst_node)
+
+ dst_ports = self._add_flowclassifier_port_assoc(
+ port_chain['flow_classifiers'],
+ port_chain['tenant_id'],
+ src_node,
+ dst_node
+ )
+
+ for i in range(sf_path_length):
+ cur_group_members = next_group_members
+ # next_group for next hop
+ if i < sf_path_length - 1:
+ next_group_intid, next_group_members = (
+ self._get_portgroup_members(
+ context, port_pair_groups[i + 1])
+ )
+ else:
+ next_group_intid = None
+ next_group_members = None if not dst_ports else dst_ports
+
+ # Create a node object
+ node_args = {
+ 'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.SF_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xfe - i,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': next_group_intid,
+ 'next_hop': (
+ None if not next_group_members else
+ jsonutils.dumps(next_group_members)
+ )
+ }
+ sf_node = self.create_path_node(node_args)
+ LOG.debug('chain path node: %s', sf_node)
+ # Create the assocation objects that combine the pathnode_id with
+ # the ingress of the port_pairs in the current group
+ # when port_group does not reach tail
+ for member in cur_group_members:
+ assco_args = {'portpair_id': member['portpair_id'],
+ 'pathnode_id': sf_node['id'],
+ 'weight': member['weight'], }
+ sfna = self.create_pathport_assoc(assco_args)
+ LOG.debug('create assoc port with node: %s', sfna)
+ sf_node['portpair_details'].append(member['portpair_id'])
+ path_nodes.append(sf_node)
+
+ return path_nodes
+
+ def _delete_path_node_port_flowrule(self, node, port, fc_ids):
+ # if this port is not binding, don't to generate flow rule
+ if not port['host_id']:
+ return
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ port,
+ None,
+ fc_ids)
+
+ self.ovs_driver_rpc.ask_agent_to_delete_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _delete_path_node_flowrule(self, node, fc_ids):
+ for each in node['portpair_details']:
+ port = self.get_port_detail_by_filter(dict(id=each))
+ if port:
+ self._delete_path_node_port_flowrule(
+ node, port, fc_ids)
+
+ @log_helpers.log_method_call
+ def _delete_portchain_path(self, context, portchain_id):
+ port_chain = context.current
+ first = self.get_path_node_by_filter(
+ filters={
+ 'portchain_id': portchain_id,
+ 'nsi': 0xff
+ }
+ )
+
+ # delete flow rules which source port isn't assigned
+ # in flow classifier
+ if first:
+ self._delete_src_node_flowrules(
+ first,
+ port_chain['flow_classifiers']
+ )
+
+ pds = self.get_path_nodes_by_filter(
+ dict(portchain_id=portchain_id))
+ if pds:
+ for pd in pds:
+ self._delete_path_node_flowrule(
+ pd,
+ port_chain['flow_classifiers']
+ )
+ self.delete_path_node(pd['id'])
+
+ # delete the ports on the traffic classifier
+ self._remove_flowclassifier_port_assoc(
+ port_chain['flow_classifiers'],
+ port_chain['tenant_id']
+ )
+
+ # Delete the chainpathpair
+ intid = self.id_pool.get_intid_by_uuid(
+ 'portchain', portchain_id)
+ self.id_pool.release_intid('portchain', intid)
+
+ def _update_path_node_next_hops(self, flow_rule):
+ node_next_hops = []
+ if not flow_rule['next_hop']:
+ return None
+ next_hops = jsonutils.loads(flow_rule['next_hop'])
+ if not next_hops:
+ return None
+ for member in next_hops:
+ detail = {}
+ port_detail = self.get_port_detail_by_filter(
+ dict(id=member['portpair_id']))
+ if not port_detail or not port_detail['host_id']:
+ continue
+ detail['local_endpoint'] = port_detail['local_endpoint']
+ detail['weight'] = member['weight']
+ detail['mac_address'] = port_detail['mac_address']
+ detail['ingress'] = port_detail['ingress']
+ node_next_hops.append(detail)
+
+ mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id(
+ detail['ingress']
+ )
+
+ detail['gw_mac'] = mac
+ detail['cidr'] = cidr
+ detail['net_uuid'] = net_uuid
+
+ flow_rule['next_hops'] = node_next_hops
+ flow_rule.pop('next_hop')
+
+ return node_next_hops
+
+ def _build_portchain_flowrule_body(self, node, port,
+ add_fc_ids=None, del_fc_ids=None):
+ node_info = node.copy()
+ node_info.pop('tenant_id')
+ node_info.pop('portpair_details')
+
+ port_info = port.copy()
+ port_info.pop('tenant_id')
+ port_info.pop('id')
+ port_info.pop('path_nodes')
+ port_info.pop('host_id')
+
+ flow_rule = dict(node_info, **port_info)
+ # if this port is belong to NSH/MPLS-aware vm, only to
+ # notify the flow classifier for 1st SF.
+ flow_rule['add_fcs'] = self._filter_flow_classifiers(
+ flow_rule, add_fc_ids)
+ flow_rule['del_fcs'] = self._filter_flow_classifiers(
+ flow_rule, del_fc_ids)
+
+ self._update_portchain_group_reference_count(flow_rule,
+ port['host_id'])
+
+ # update next hop info
+ self._update_path_node_next_hops(flow_rule)
+
+ return flow_rule
+
+ def _filter_flow_classifiers(self, flow_rule, fc_ids):
+ """Filter flow classifiers.
+
+ @return: list of the flow classifiers
+ """
+
+ fc_return = []
+
+ if not fc_ids:
+ return fc_return
+ fcs = self._get_fcs_by_ids(fc_ids)
+ for fc in fcs:
+ new_fc = fc.copy()
+ new_fc.pop('id')
+ new_fc.pop('name')
+ new_fc.pop('tenant_id')
+ new_fc.pop('description')
+
+ if ((flow_rule['node_type'] == ovs_const.SRC_NODE and
+ flow_rule['egress'] == fc['logical_source_port']
+ ) or
+ (flow_rule['node_type'] == ovs_const.DST_NODE and
+ flow_rule['ingress'] == fc['logical_destination_port']
+ )):
+ fc_return.append(new_fc)
+ elif flow_rule['node_type'] == ovs_const.SF_NODE:
+ fc_return.append(new_fc)
+
+ return fc_return
+
+ def _update_path_node_port_flowrules(self, node, port,
+ add_fc_ids=None, del_fc_ids=None):
+ # if this port is not binding, don't to generate flow rule
+ if not port['host_id']:
+ return
+
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ port,
+ add_fc_ids,
+ del_fc_ids)
+
+ self.ovs_driver_rpc.ask_agent_to_update_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _update_path_node_flowrules(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ if node['portpair_details'] is None:
+ return
+ for each in node['portpair_details']:
+ port = self.get_port_detail_by_filter(dict(id=each))
+ if port:
+ self._update_path_node_port_flowrules(
+ node, port, add_fc_ids, del_fc_ids)
+
+ def _thread_update_path_nodes(self, nodes,
+ add_fc_ids=None, del_fc_ids=None):
+ for node in nodes:
+ self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids)
+ self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids)
+
+ def _get_portchain_fcs(self, port_chain):
+ return self._get_fcs_by_ids(port_chain['flow_classifiers'])
+
+ def _get_fcs_by_ids(self, fc_ids):
+ flow_classifiers = []
+ if not fc_ids:
+ return flow_classifiers
+
+ # Get the portchain flow classifiers
+ fc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ flowclassifier.FLOW_CLASSIFIER_EXT)
+ )
+ if not fc_plugin:
+ LOG.warn(_LW("Not found the flow classifier service plugin"))
+ return flow_classifiers
+
+ for fc_id in fc_ids:
+ fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id)
+ flow_classifiers.append(fc)
+
+ return flow_classifiers
+
+ @log_helpers.log_method_call
+ def create_port_chain(self, context):
+ port_chain = context.current
+ path_nodes = self._create_portchain_path(context, port_chain)
+
+ # notify agent with async thread
+ # current we don't use greenthread.spawn
+ self._thread_update_path_nodes(
+ path_nodes,
+ port_chain['flow_classifiers'],
+ None)
+
+ @log_helpers.log_method_call
+ def delete_port_chain(self, context):
+ port_chain = context.current
+ portchain_id = port_chain['id']
+ LOG.debug("to delete portchain path")
+ self._delete_portchain_path(context, portchain_id)
+
+ def _get_diff_set(self, orig, cur):
+ orig_set = set(item for item in orig)
+ cur_set = set(item for item in cur)
+
+ to_del = orig_set.difference(cur_set)
+ to_add = cur_set.difference(orig_set)
+
+ return to_del, to_add
+
+ @log_helpers.log_method_call
+ def update_port_chain(self, context):
+ port_chain = context.current
+ orig = context.original
+
+ del_fc_ids, add_fc_ids = self._get_diff_set(
+ orig['flow_classifiers'],
+ port_chain['flow_classifiers']
+ )
+ path_nodes = self.get_path_nodes_by_filter(
+ dict(portchain_id=port_chain['id'])
+ )
+ if not path_nodes:
+ return
+
+ sort_path_nodes = sorted(path_nodes,
+ key=lambda x: x['nsi'],
+ reverse=True)
+ if del_fc_ids:
+ self._thread_update_path_nodes(sort_path_nodes,
+ None,
+ del_fc_ids)
+ self._remove_flowclassifier_port_assoc(del_fc_ids,
+ port_chain['tenant_id'],
+ sort_path_nodes[0],
+ sort_path_nodes[-1],
+ sort_path_nodes[-2])
+
+ if add_fc_ids:
+ self._add_flowclassifier_port_assoc(add_fc_ids,
+ port_chain['tenant_id'],
+ sort_path_nodes[0],
+ sort_path_nodes[-1],
+ sort_path_nodes[-2])
+
+ # notify agent with async thread
+ # current we don't use greenthread.spawn
+ self._thread_update_path_nodes(sort_path_nodes,
+ add_fc_ids,
+ None)
+
+ @log_helpers.log_method_call
+ def create_port_pair_group(self, context):
+ group = context.current
+ self.id_pool.assign_intid('group', group['id'])
+
+ @log_helpers.log_method_call
+ def delete_port_pair_group(self, context):
+ group = context.current
+ group_intid = self.id_pool.get_intid_by_uuid('group', group['id'])
+ if group_intid:
+ self.id_pool.release_intid('group', group_intid)
+
+ @log_helpers.log_method_call
+ def update_port_pair_group(self, context):
+ current = context.current
+ original = context.original
+
+ if set(current['port_pairs']) == set(original['port_pairs']):
+ return
+
+ # Update the path_nodes and flows for each port chain that
+ # contains this port_pair_group
+ # Note: _get_port_pair_group is temporarily used here.
+ ppg_obj = context._plugin._get_port_pair_group(context._plugin_context,
+ current['id'])
+ port_chains = [assoc.portchain_id for assoc in
+ ppg_obj.chain_group_associations]
+
+ for chain_id in port_chains:
+ port_chain = context._plugin.get_port_chain(
+ context._plugin_context, chain_id)
+ group_intid = self.id_pool.get_intid_by_uuid('group',
+ current['id'])
+ # Get the previous node
+ prev_node = self.get_path_node_by_filter(
+ filters={'portchain_id': chain_id,
+ 'next_group_id': group_intid})
+ if not prev_node:
+ continue
+
+ before_update_prev_node = prev_node.copy()
+ # Update the previous node
+ _, curr_group_members = self._get_portgroup_members(context,
+ current['id'])
+ prev_node['next_hop'] = (
+ jsonutils.dumps(curr_group_members)
+ if curr_group_members else None
+ )
+ # update next hop to database
+ self.update_path_node(prev_node['id'], prev_node)
+ if prev_node['node_type'] == ovs_const.SRC_NODE:
+ self._delete_src_node_flowrules(
+ before_update_prev_node, port_chain['flow_classifiers'])
+ self._update_src_node_flowrules(
+ prev_node, port_chain['flow_classifiers'], None)
+ else:
+ self._delete_path_node_flowrule(
+ before_update_prev_node, port_chain['flow_classifiers'])
+ self._update_path_node_flowrules(
+ prev_node, port_chain['flow_classifiers'], None)
+
+ # Update the current node
+ # to find the current node by using the node's next_group_id
+ # if this node is the last, next_group_id would be None
+ curr_pos = port_chain['port_pair_groups'].index(current['id'])
+ curr_node = self.get_path_node_by_filter(
+ filters={'portchain_id': chain_id,
+ 'nsi': 0xfe - curr_pos})
+ if not curr_node:
+ continue
+
+ # Add the port-pair-details into the current node
+ for pp_id in (
+ set(current['port_pairs']) - set(original['port_pairs'])
+ ):
+ ppd = self._get_port_pair_detail_by_port_pair(context,
+ pp_id)
+ if not ppd:
+ LOG.debug('No port_pair_detail for the port_pair: %s',
+ pp_id)
+ LOG.debug("Failed to update port-pair-group")
+ return
+
+ assco_args = {'portpair_id': ppd['id'],
+ 'pathnode_id': curr_node['id'],
+ 'weight': 1, }
+ self.create_pathport_assoc(assco_args)
+ self._update_path_node_port_flowrules(
+ curr_node, ppd, port_chain['flow_classifiers'])
+
+ # Delete the port-pair-details from the current node
+ for pp_id in (
+ set(original['port_pairs']) - set(current['port_pairs'])
+ ):
+ ppd = self._get_port_pair_detail_by_port_pair(context,
+ pp_id)
+ if not ppd:
+ LOG.debug('No port_pair_detail for the port_pair: %s',
+ pp_id)
+ LOG.debug("Failed to update port-pair-group")
+ return
+ self._delete_path_node_port_flowrule(
+ curr_node, ppd, port_chain['flow_classifiers'])
+ self.delete_pathport_assoc(curr_node['id'], ppd['id'])
+
+ @log_helpers.log_method_call
+ def _get_portpair_detail_info(self, portpair_id):
+ """Get port detail.
+
+ @param: portpair_id: uuid
+ @return: (host_id, local_ip, network_type, segment_id,
+ service_insert_type): tuple
+ """
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ port_detail = core_plugin.get_port(self.admin_context, portpair_id)
+ host_id, local_ip, network_type, segment_id, mac_address = (
+ (None, ) * 5)
+
+ if port_detail:
+ host_id = port_detail['binding:host_id']
+ network_id = port_detail['network_id']
+ mac_address = port_detail['mac_address']
+ network_info = core_plugin.get_network(
+ self.admin_context, network_id)
+ network_type = network_info['provider:network_type']
+ segment_id = network_info['provider:segmentation_id']
+
+ if (
+ host_id and
+ network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN]
+ ):
+ driver = core_plugin.type_manager.drivers.get(network_type)
+ host_endpoint = driver.obj.get_endpoint_by_host(host_id)
+ local_ip = host_endpoint['ip_address']
+
+ return host_id, local_ip, network_type, segment_id, mac_address
+
+ @log_helpers.log_method_call
+ def _create_port_detail(self, port_pair):
+ # since first node may not assign the ingress port, and last node may
+ # not assign the egress port. we use one of the
+ # port as the key to get the SF information.
+ port = None
+ if port_pair.get('ingress', None):
+ port = port_pair['ingress']
+ elif port_pair.get('egress', None):
+ port = port_pair['egress']
+
+ host_id, local_endpoint, network_type, segment_id, mac_address = (
+ self._get_portpair_detail_info(port))
+ port_detail = {
+ 'ingress': port_pair.get('ingress', None),
+ 'egress': port_pair.get('egress', None),
+ 'tenant_id': port_pair['tenant_id'],
+ 'host_id': host_id,
+ 'segment_id': segment_id,
+ 'network_type': network_type,
+ 'local_endpoint': local_endpoint,
+ 'mac_address': mac_address
+ }
+ r = self.create_port_detail(port_detail)
+ LOG.debug('create port detail: %s', r)
+ return r
+
+ @log_helpers.log_method_call
+ def create_port_pair(self, context):
+ port_pair = context.current
+ self._create_port_detail(port_pair)
+
+ @log_helpers.log_method_call
+ def delete_port_pair(self, context):
+ port_pair = context.current
+
+ pd_filter = dict(ingress=port_pair.get('ingress', None),
+ egress=port_pair.get('egress', None),
+ tenant_id=port_pair['tenant_id']
+ )
+ pds = self.get_port_details_by_filter(pd_filter)
+ if pds:
+ for pd in pds:
+ self.delete_port_detail(pd['id'])
+
+ @log_helpers.log_method_call
+ def update_port_pair(self, context):
+ pass
+
+ def get_flowrules_by_host_portid(self, context, host, port_id):
+ port_chain_flowrules = []
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return port_chain_flowrules
+ try:
+ port_detail_list = []
+ # one port only may be in egress/ingress port once time.
+ ingress_port = self.get_port_detail_by_filter(
+ dict(ingress=port_id))
+ egress_port = self.get_port_detail_by_filter(
+ dict(egress=port_id))
+ if not ingress_port and not egress_port:
+ return None
+ # SF migrate to other host
+ if ingress_port:
+ port_detail_list.append(ingress_port)
+ if ingress_port['host_id'] != host:
+ ingress_port.update(dict(host_id=host))
+
+ if egress_port:
+ port_detail_list.append(egress_port)
+ if egress_port['host_id'] != host:
+ egress_port.update(dict(host_id=host))
+
+ # this is a SF if there are both egress and engress.
+ for i, ports in enumerate(port_detail_list):
+ nodes_assocs = ports['path_nodes']
+ for assoc in nodes_assocs:
+ # update current path flow rule
+ node = self.get_path_node(assoc['pathnode_id'])
+ port_chain = sfc_plugin.get_port_chain(
+ context,
+ node['portchain_id'])
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ ports,
+ add_fc_ids=port_chain['flow_classifiers']
+ )
+ port_chain_flowrules.append(flow_rule)
+
+ # update the pre-path node flow rule
+ # if node['node_type'] != ovs_const.SRC_NODE:
+ # node_filter = dict(nsp=node['nsp'],
+ # nsi=node['nsi'] + 1
+ # )
+ # pre_node_list = self.get_path_nodes_by_filter(
+ # node_filter)
+ # if not pre_node_list:
+ # continue
+ # for pre_node in pre_node_list:
+ # self._update_path_node_flowrules(
+ # pre_node,
+ # add_fc_ids=port_chain['flow_classifiers'])
+
+ return port_chain_flowrules
+
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_flowrules_by_host_portid failed"))
+
+ def get_flow_classifier_by_portchain_id(self, context, portchain_id):
+ try:
+ flow_classifier_list = []
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return []
+
+ port_chain = sfc_plugin.get_port_chain(
+ context,
+ portchain_id)
+ flow_classifier_list = self._get_portchain_fcs(port_chain)
+ return flow_classifier_list
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_flow_classifier_by_portchain_id failed"))
+
+ def update_flowrule_status(self, context, id, status):
+ try:
+ flowrule_status = dict(status=status)
+ self.update_path_node(id, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("update_flowrule_status failed"))
+
+ def _update_src_node_flowrules(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ flow_rule = self._get_portchain_src_node_flowrule(node,
+ add_fc_ids,
+ del_fc_ids)
+ if not flow_rule:
+ return
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ pc_agents = core_plugin.get_agents(
+ self.admin_context,
+ filters={'agent_type': [nc_const.AGENT_TYPE_OVS]})
+ if not pc_agents:
+ return
+
+ for agent in pc_agents:
+ if agent['alive']:
+ # update host info to flow rule
+ flow_rule['host'] = agent['host']
+ self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _delete_src_node_flowrules(self, node, del_fc_ids=None):
+ flow_rule = self._get_portchain_src_node_flowrule(node,
+ None, del_fc_ids)
+ if not flow_rule:
+ return
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ pc_agents = core_plugin.get_agents(
+ self.admin_context, filters={
+ 'agent_type': [nc_const.AGENT_TYPE_OVS]})
+ if not pc_agents:
+ return
+
+ for agent in pc_agents:
+ if agent['alive']:
+ # update host info to flow rule
+ self._update_portchain_group_reference_count(flow_rule,
+ agent['host'])
+ self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def get_all_src_node_flowrules(self, context):
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return []
+ try:
+ frs = []
+ port_chains = sfc_plugin.get_port_chains(context)
+
+ for port_chain in port_chains:
+ # get the first node of this chain
+ node_filters = dict(portchain_id=port_chain['id'], nsi=0xff)
+ portchain_node = self.get_path_node_by_filter(node_filters)
+ if not portchain_node:
+ continue
+ flow_rule = self._get_portchain_src_node_flowrule(
+ portchain_node,
+ port_chain['flow_classifiers']
+ )
+ if not flow_rule:
+ continue
+ frs.append(flow_rule)
+ return frs
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_all_src_node_flowrules failed"))
+
+ def _get_portchain_src_node_flowrule(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ try:
+ add_fc_rt = []
+ del_fc_rt = []
+
+ if add_fc_ids:
+ for fc in self._get_fcs_by_ids(add_fc_ids):
+ if not fc.get('logical_source_port', None):
+ add_fc_rt.append(fc)
+
+ if del_fc_ids:
+ for fc in self._get_fcs_by_ids(del_fc_ids):
+ if not fc.get('logical_source_port', None):
+ del_fc_rt.append(fc)
+
+ if not add_fc_rt and not del_fc_rt:
+ return None
+
+ return self._build_portchain_flowrule_body_without_port(
+ node, add_fc_rt, del_fc_rt)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("_get_portchain_src_node_flowrule failed"))
+
+ def _update_portchain_group_reference_count(self, flow_rule, host):
+ group_refcnt = 0
+ flow_rule['host'] = host
+
+ if flow_rule['next_group_id'] is not None:
+ all_nodes = self.get_path_nodes_by_filter(
+ filters={'next_group_id': flow_rule['next_group_id'],
+ 'nsi': 0xff})
+ if all_nodes is not None:
+ for node in all_nodes:
+ if not node['portpair_details']:
+ group_refcnt += 1
+
+ port_details = self.get_port_details_by_filter(
+ dict(host_id=flow_rule['host']))
+ if port_details is not None:
+ for pd in port_details:
+ for path in pd['path_nodes']:
+ path_node = self.get_path_node(path['pathnode_id'])
+ if (
+ path_node['next_group_id'] ==
+ flow_rule['next_group_id']
+ ):
+ group_refcnt += 1
+
+ flow_rule['group_refcnt'] = group_refcnt
+
+ return group_refcnt
+
+ def _build_portchain_flowrule_body_without_port(self,
+ node,
+ add_fcs=None,
+ del_fcs=None):
+ flow_rule = node.copy()
+ flow_rule.pop('tenant_id')
+ flow_rule.pop('portpair_details')
+
+ # according to the first sf node get network information
+ if not node['next_hop']:
+ return None
+
+ next_hops = jsonutils.loads(node['next_hop'])
+ if not next_hops:
+ return None
+
+ port_detail = self.get_port_detail_by_filter(
+ dict(id=next_hops[0]['portpair_id']))
+ if not port_detail:
+ return None
+
+ flow_rule['ingress'] = None
+ flow_rule['egress'] = None
+ flow_rule['network_type'] = port_detail['network_type']
+ flow_rule['segment_id'] = port_detail['segment_id']
+
+ flow_rule['add_fcs'] = add_fcs
+ flow_rule['del_fcs'] = del_fcs
+
+ # update next hop info
+ self._update_path_node_next_hops(flow_rule)
+ return flow_rule