aboutsummaryrefslogtreecommitdiffstats
path: root/networking_sfc/services/sfc/agent/agent.py
diff options
context:
space:
mode:
Diffstat (limited to 'networking_sfc/services/sfc/agent/agent.py')
-rw-r--r--networking_sfc/services/sfc/agent/agent.py891
1 files changed, 891 insertions, 0 deletions
diff --git a/networking_sfc/services/sfc/agent/agent.py b/networking_sfc/services/sfc/agent/agent.py
new file mode 100644
index 0000000..2537f9b
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/agent.py
@@ -0,0 +1,891 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import signal
+import six
+import sys
+
+from neutron.agent.common import config
+from neutron.agent.linux import ip_lib
+
+from networking_sfc.services.sfc.agent import br_int
+from networking_sfc.services.sfc.agent import br_phys
+from networking_sfc.services.sfc.agent import br_tun
+from networking_sfc.services.sfc.common import ovs_ext_lib
+from networking_sfc.services.sfc.drivers.ovs import constants
+from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics
+
+from neutron.agent import rpc as agent_rpc
+from neutron.common import config as common_config
+from neutron.common import constants as n_const
+from neutron.common import exceptions
+from neutron.common import rpc as n_rpc
+from neutron.common import topics
+from neutron.common import utils as q_utils
+from neutron.i18n import _LE
+from neutron.i18n import _LI
+from neutron.i18n import _LW
+from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
+ constants as ovs_const)
+from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import oslo_messaging
+
+
+LOG = logging.getLogger(__name__)
+
+agent_opts = [
+ cfg.StrOpt('sfc_encap_mode', default='mpls',
+ help=_("The encapsulation mode of sfc.")),
+]
+
+cfg.CONF.register_opts(agent_opts, "AGENT")
+
+# This table is used to process the traffic across differet subnet scenario.
+# Flow 1: pri=1, ip,dl_dst=nexthop_mac,nw_src=nexthop_subnet. actions=
+# push_mpls:0x8847,set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port
+# or resubmit to table(INGRESS_TABLE)
+# Flow 2: pri=0, ip,dl_dst=nexthop_mac,, action=push_mpls:0x8847,
+# set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port or resubmit to
+# table(INGRESS_TABLE)
+ACROSS_SUBNET_TABLE = 5
+
+# The table has multiple flows that steer traffic for the different chains
+# to the ingress port of different service functions hosted on this Compute
+# node.
+INGRESS_TABLE = 10
+
+# port chain default flow rule priority
+PC_DEF_PRI = 20
+PC_INGRESS_PRI = 30
+
+
+class FeatureSupportError(exceptions.NeutronException):
+ message = _("Current sfc agent don't support %(feature)s ")
+
+
+class SfcPluginApi(object):
+ def __init__(self, topic, host):
+ self.host = host
+ self.target = oslo_messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(self.target)
+
+ def update_flowrules_status(self, context, flowrules_status):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'update_flowrules_status',
+ flowrules_status=flowrules_status)
+
+ def get_flowrules_by_host_portid(self, context, port_id):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'get_flowrules_by_host_portid',
+ host=self.host, port_id=port_id)
+
+ def get_all_src_node_flowrules(self, context):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'get_all_src_node_flowrules',
+ host=self.host)
+
+
+class OVSSfcAgent(ovs_neutron_agent.OVSNeutronAgent):
+ # history
+ # 1.0 Initial version
+ """This class will support MPLS frame
+
+ Ethernet + MPLS
+ IPv4 Packet:
+ +-------------------------------+---------------+--------------------+
+ |Outer Ethernet, ET=0x8847 | MPLS head, | original IP Packet |
+ +-------------------------------+---------------+--------------------+
+ """
+
+ target = oslo_messaging.Target(version='1.0')
+
+ def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
+ bridge_mappings, polling_interval, tunnel_types=None,
+ veth_mtu=None, l2_population=False,
+ enable_distributed_routing=False,
+ minimize_polling=False,
+ ovsdb_monitor_respawn_interval=(
+ ovs_const.DEFAULT_OVSDBMON_RESPAWN),
+ arp_responder=False,
+ prevent_arp_spoofing=True,
+ use_veth_interconnection=False,
+ quitting_rpc_timeout=None):
+
+ """to get network info from ovs agent."""
+ super(OVSSfcAgent, self).__init__(
+ bridge_classes, integ_br, tun_br,
+ local_ip,
+ bridge_mappings, polling_interval, tunnel_types,
+ veth_mtu, l2_population,
+ enable_distributed_routing,
+ minimize_polling,
+ ovsdb_monitor_respawn_interval,
+ arp_responder,
+ prevent_arp_spoofing,
+ use_veth_interconnection,
+ quitting_rpc_timeout)
+
+ self.overlay_encap_mode = cfg.CONF.AGENT.sfc_encap_mode
+ self._sfc_setup_rpc()
+
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._clear_sfc_flow_on_int_br()
+ self._setup_src_node_flow_rules_with_mpls()
+
+ def _sfc_setup_rpc(self):
+ self.sfc_plugin_rpc = SfcPluginApi(
+ sfc_topics.SFC_PLUGIN, cfg.CONF.host)
+
+ self.topic = sfc_topics.SFC_AGENT
+ self.endpoints = [self]
+ consumers = [
+ [sfc_topics.PORTFLOW, topics.UPDATE],
+ [sfc_topics.PORTFLOW, topics.DELETE]
+ ]
+
+ # subscribe sfc plugin message
+ self.connection = agent_rpc.create_consumers(
+ self.endpoints,
+ self.topic,
+ consumers)
+
+ def _parse_flow_classifier(self, flow_classifier):
+ dl_type, nw_proto, source_port_masks, destination_port_masks = (
+ (None, ) * 4)
+
+ if (
+ not flow_classifier['source_port_range_min'] and
+ not flow_classifier['source_port_range_max']
+ ):
+ # wildcard
+ source_port_masks = ['0/0x0']
+ elif not flow_classifier['source_port_range_min']:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ 1,
+ flow_classifier['source_port_range_max'])
+ elif not flow_classifier['source_port_range_max']:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['source_port_range_min'],
+ 65535)
+ else:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['source_port_range_min'],
+ flow_classifier['source_port_range_max'])
+
+ if (
+ not flow_classifier['destination_port_range_min'] and
+ not flow_classifier['destination_port_range_max']
+ ):
+ # wildcard
+ destination_port_masks = ['0/0x0']
+ elif not flow_classifier['destination_port_range_min']:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ 1,
+ flow_classifier['destination_port_range_max'])
+ elif not flow_classifier['destination_port_range_max']:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['destination_port_range_min'],
+ 65535)
+ else:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['destination_port_range_min'],
+ flow_classifier['destination_port_range_max'])
+
+ if "IPv4" == flow_classifier['ethertype']:
+ dl_type = 0x0800
+ if n_const.PROTO_NAME_TCP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_TCP
+ elif n_const.PROTO_NAME_UDP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_UDP
+ elif n_const.PROTO_NAME_ICMP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_ICMP
+ else:
+ nw_proto = None
+ elif "IPv6" == flow_classifier['ethertype']:
+ LOG.error(_LE("Current portchain agent don't support Ipv6"))
+ else:
+ LOG.error(_LE("invalid protocol input"))
+ return (dl_type, nw_proto,
+ source_port_masks, destination_port_masks
+ )
+
+ def _clear_sfc_flow_on_int_br(self):
+ self.int_br.delete_group(group_id='all')
+ self.int_br.delete_flows(table=ACROSS_SUBNET_TABLE)
+ self.int_br.delete_flows(table=INGRESS_TABLE)
+ self.int_br.install_goto(dest_table_id=INGRESS_TABLE,
+ priority=PC_DEF_PRI,
+ dl_type=0x8847)
+ self.int_br.install_drop(table_id=INGRESS_TABLE)
+
+ def _get_flow_infos_from_flow_classifier(self, flow_classifier):
+ flow_infos = []
+ nw_src, nw_dst = ((None, ) * 2)
+
+ if "IPv4" != flow_classifier['ethertype']:
+ LOG.error(_LE("Current portchain agent don't support Ipv6"))
+ return flow_infos
+
+ # parse and transfer flow info to match field info
+ dl_type, nw_proto, source_port_masks, destination_port_masks = (
+ self._parse_flow_classifier(flow_classifier))
+
+ if flow_classifier['source_ip_prefix']:
+ nw_src = flow_classifier['source_ip_prefix']
+ else:
+ nw_src = '0.0.0.0/0.0.0.0'
+ if flow_classifier['destination_ip_prefix']:
+ nw_dst = flow_classifier['destination_ip_prefix']
+ else:
+ nw_dst = '0.0.0.0/0.0.0.0'
+
+ if source_port_masks and destination_port_masks:
+ for destination_port in destination_port_masks:
+ for source_port in source_port_masks:
+ if nw_proto is None:
+ flow_infos.append(dict(
+ dl_type=dl_type,
+ nw_src=nw_src,
+ nw_dst=nw_dst,
+ tp_src='%s' % source_port,
+ tp_dst='%s' % destination_port
+ ))
+ else:
+ flow_infos.append(dict(
+ dl_type=dl_type,
+ nw_proto=nw_proto,
+ nw_src=nw_src,
+ nw_dst=nw_dst,
+ tp_src='%s' % source_port,
+ tp_dst='%s' % destination_port
+ ))
+
+ return flow_infos
+
+ def _get_flow_infos_from_flow_classifier_list(self, flow_classifier_list):
+ flow_infos = []
+ if not flow_classifier_list:
+ return flow_infos
+ for flow_classifier in flow_classifier_list:
+ flow_infos.extend(
+ self._get_flow_infos_from_flow_classifier(flow_classifier)
+ )
+
+ return flow_infos
+
+ def _setup_local_switch_flows_on_int_br(
+ self, flowrule, flow_classifier_list,
+ actions, add_flow=True, match_inport=True
+ ):
+ inport_match = {}
+ priority = PC_DEF_PRI
+
+ if match_inport is True:
+ egress_port = self.int_br.get_vif_port_by_id(flowrule['egress'])
+ if egress_port:
+ inport_match = dict(in_port=egress_port.ofport)
+ priority = PC_INGRESS_PRI
+
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flow_classifier_list
+ ):
+ match_info = dict(inport_match, **flow_info)
+ if add_flow:
+ self.int_br.add_flow(
+ table=ovs_const.LOCAL_SWITCHING,
+ priority=priority,
+ actions=actions, **match_info
+ )
+ else:
+ self.int_br.delete_flows(
+ table=ovs_const.LOCAL_SWITCHING,
+ **match_info
+ )
+
+ def _update_destination_ingress_flow_rules(self, flowrule):
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flowrule['del_fcs']
+ ):
+ self.int_br.delete_flows(
+ table=ovs_const.LOCAL_SWITCHING,
+ in_port=self.patch_tun_ofport,
+ **flow_info
+ )
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flowrule['add_fcs']
+ ):
+ inport_match = dict(in_port=self.patch_tun_ofport)
+ match_info = dict(inport_match, **flow_info)
+ self.int_br.install_normal(table_id=ovs_const.LOCAL_SWITCHING,
+ priority=PC_INGRESS_PRI,
+ **match_info)
+
+ def _setup_src_node_flow_rules_with_mpls(self):
+ flow_rules = self.sfc_plugin_rpc.get_all_src_node_flowrules(
+ self.context)
+ if not flow_rules:
+ return
+ for fr in flow_rules:
+ self._setup_egress_flow_rules_with_mpls(fr, False)
+ # if the traffic is from patch port, it means the destination
+ # is on the this host. so implement normal forward but not
+ # match the traffic from the source.
+ # Next step need to do is check if the traffic is from vRouter
+ # on the local host, also need to implement same normal process.
+ self._update_destination_ingress_flow_rules(fr)
+
+ def _setup_egress_flow_rules_with_mpls(self, flowrule, match_inport=True):
+ network_type = flowrule['network_type']
+ group_id = flowrule.get('next_group_id', None)
+ next_hops = flowrule.get('next_hops', None)
+ segmentation_id = flowrule['segment_id']
+
+ if not next_hops:
+ return
+
+ if network_type not in ovs_const.TUNNEL_NETWORK_TYPES:
+ LOG.warn(_LW("currently %s network is not supported,"
+ "only support tunnel type"
+ ),
+ network_type
+ )
+ return
+
+ # if the group is not none, install the egress rule for this SF
+ if (
+ (flowrule['node_type'] == constants.SRC_NODE or
+ flowrule['node_type'] == constants.SF_NODE) and group_id
+ ):
+ # 1st, install br-int flow rule on table ACROSS_SUBNET_TABLE
+ # and group table
+ buckets = []
+ for item in next_hops:
+ if item['net_uuid'] not in self.local_vlan_map:
+ self.provision_local_vlan(item['net_uuid'], network_type,
+ None, segmentation_id)
+ lvm = self.local_vlan_map[item['net_uuid']]
+ bucket = (
+ 'bucket=weight=%d, mod_dl_dst:%s,'
+ 'resubmit(,%d)' % (
+ item['weight'],
+ item['mac_address'],
+ ACROSS_SUBNET_TABLE
+ )
+ )
+ buckets.append(bucket)
+
+ no_across_subnet_actions_list = []
+ across_subnet_actions_list = []
+
+ push_mpls = (
+ "push_mpls:0x8847,"
+ "set_mpls_label:%d,"
+ "set_mpls_ttl:%d,"
+ "mod_vlan_vid:%d," %
+ ((flowrule['nsp'] << 8) | flowrule['nsi'],
+ flowrule['nsi'], lvm.vlan))
+
+ no_across_subnet_actions_list.append(push_mpls)
+ across_subnet_actions_list.append(push_mpls)
+
+ if item['local_endpoint'] == self.local_ip:
+ no_across_subnet_actions = (
+ "resubmit(,%d)" % INGRESS_TABLE)
+ across_subnet_actions = (
+ "mod_dl_src:%s, resubmit(,%d)" %
+ (item['gw_mac'], INGRESS_TABLE))
+ else:
+ # same subnet with next hop
+ no_across_subnet_actions = ("output:%s" %
+ self.patch_tun_ofport)
+ across_subnet_actions = ("mod_dl_src:%s, output:%s" %
+ (item['gw_mac'],
+ self.patch_tun_ofport))
+ no_across_subnet_actions_list.append(no_across_subnet_actions)
+ across_subnet_actions_list.append(across_subnet_actions)
+
+ self.int_br.add_flow(
+ table=ACROSS_SUBNET_TABLE,
+ priority=1,
+ dl_dst=item['mac_address'],
+ dl_type=0x0800,
+ nw_src=item['cidr'],
+ actions="%s" %
+ (','.join(no_across_subnet_actions_list)))
+ # different subnet with next hop
+ self.int_br.add_flow(
+ table=ACROSS_SUBNET_TABLE,
+ priority=0,
+ dl_dst=item['mac_address'],
+ actions="%s" %
+ (','.join(across_subnet_actions_list)))
+
+ buckets = ','.join(buckets)
+ group_content = self.int_br.dump_group_for_id(group_id)
+ if group_content.find('group_id=%d' % group_id) == -1:
+ self.int_br.add_group(group_id=group_id,
+ type='select', buckets=buckets)
+ else:
+ self.int_br.mod_group(group_id=group_id,
+ type='select', buckets=buckets)
+
+ # 2nd, install br-int flow rule on table 0 for egress traffic
+ # for egress traffic
+ enc_actions = ("group:%d" % group_id)
+ # to uninstall the removed flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=match_inport)
+ # to install the added flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['add_fcs'],
+ enc_actions,
+ add_flow=True,
+ match_inport=match_inport)
+
+ def _get_network_by_port(self, port_id):
+ for key, val in six.iteritems(self.network_ports):
+ if port_id in val:
+ return key
+
+ return None
+
+ def _setup_ingress_flow_rules_with_mpls(self, flowrule):
+ network_id = self._get_network_by_port(flowrule['ingress'])
+ if network_id:
+ # install br-int flow rule on table 0 for ingress traffic
+ lvm = self.local_vlan_map[network_id]
+ vif_port = lvm.vif_ports[flowrule['ingress']]
+ match_field = {}
+
+ actions = ("strip_vlan, pop_mpls:0x0800,"
+ "output:%s" % vif_port.ofport)
+ match_field = dict(
+ table=INGRESS_TABLE,
+ priority=1,
+ dl_dst=vif_port.vif_mac,
+ dl_vlan=lvm.vlan,
+ dl_type=0x8847,
+ mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1),
+ actions=actions)
+
+ self.int_br.add_flow(**match_field)
+
+ def _setup_last_egress_flow_rules_with_mpls(self, flowrule):
+ group_id = flowrule.get('next_group_id', None)
+
+ # check whether user assign the destination neutron port.
+ if (
+ constants.SF_NODE == flowrule['node_type'] and
+ not group_id and
+ flowrule['egress'] is not None
+ ):
+ # to uninstall the new removed flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=True
+ )
+
+ # to install the added flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['add_fcs'],
+ actions='normal',
+ add_flow=True,
+ match_inport=True)
+
+ def _get_flow_classifier_dest_port_info(self,
+ logical_destination_port,
+ flowrule):
+ for next_hop in flowrule['next_hops']:
+ # this flow classifier's destination port should match
+ # with the nexthop's ingress port id
+ if logical_destination_port in next_hop.values():
+ return next_hop
+
+ return None
+
+ def _update_flow_rules_with_mpls_enc(self, flowrule, flowrule_status):
+ try:
+ if flowrule.get('egress', None):
+ self._setup_egress_flow_rules_with_mpls(flowrule)
+ self._setup_last_egress_flow_rules_with_mpls(flowrule)
+ if flowrule.get('ingress', None):
+ self._setup_ingress_flow_rules_with_mpls(flowrule)
+
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ACTIVE
+ flowrule_status.append(flowrule_status_temp)
+ except Exception as e:
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ERROR
+ flowrule_status.append(flowrule_status_temp)
+ LOG.exception(e)
+ LOG.error(_LE("_update_flow_rules_with_mpls_enc failed"))
+
+ def _delete_ports_flowrules_by_id(self, ports_id):
+ flowrule_status = []
+ try:
+ LOG.debug("delete_port_id_flows received, ports_id= %s", ports_id)
+ count = 0
+ if ports_id:
+ for port_id in ports_id:
+ flowrule = (
+ self.sfc_plugin_rpc.get_flowrules_by_host_portid(
+ self.context, port_id
+ )
+ )
+ if flowrule:
+ self._treat_delete_flow_rules(
+ flowrule, flowrule_status)
+ LOG.debug(
+ "_delete_ports_flowrules_by_id received, count= %s", count)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("delete_port_id_flows failed"))
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def _delete_flow_rule_with_mpls_enc(self, flowrule, flowrule_status):
+ try:
+ LOG.debug("_delete_flow_rule_with_mpls_enc, flowrule = %s",
+ flowrule)
+ group_id = flowrule.get('next_group_id', None)
+
+ # delete tunnel table flow rule on br-int(egress match)
+ if flowrule['egress'] is not None:
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=True
+ )
+
+ # delete table INGRESS_TABLE ingress match flow rule
+ # on br-int(ingress match)
+ network_id = self._get_network_by_port(flowrule['ingress'])
+ if network_id:
+ # third, install br-int flow rule on table INGRESS_TABLE
+ # for ingress traffic
+ lvm = self.local_vlan_map[network_id]
+ vif_port = lvm.vif_ports[flowrule['ingress']]
+ self.int_br.delete_flows(
+ table=INGRESS_TABLE,
+ dl_type=0x8847,
+ dl_dst=vif_port.vif_mac,
+ mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1)
+ )
+
+ # delete group table, need to check again
+ if group_id and flowrule.get('group_refcnt', None) <= 1:
+ self.int_br.delete_group(group_id=group_id)
+ for item in flowrule['next_hops']:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=item['mac_address'])
+ elif (not group_id and
+ flowrule['egress'] is not None):
+ # to delete last hop flow rule
+ for each in flowrule['del_fcs']:
+ if each.get('logical_destination_port', None):
+ ldp = self._get_flow_classifier_dest_port_info(
+ each['logical_destination_port'],
+ flowrule
+ )
+ if ldp:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=ldp['mac_address'])
+
+ except Exception as e:
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ERROR
+ flowrule_status.append(flowrule_status_temp)
+ LOG.exception(e)
+ LOG.error(_LE("_delete_flow_rule_with_mpls_enc failed"))
+
+ def _treat_update_flow_rules(self, flowrule, flowrule_status):
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._update_flow_rules_with_mpls_enc(flowrule, flowrule_status)
+
+ def _treat_delete_flow_rules(self, flowrule, flowrule_status):
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._delete_flow_rule_with_mpls_enc(
+ flowrule, flowrule_status)
+
+ def update_flow_rules(self, context, **kwargs):
+ try:
+ flowrule_status = []
+ flowrules = kwargs['flowrule_entries']
+ LOG.debug("update_flow_rules received, flowrules = %s",
+ flowrules)
+
+ if flowrules:
+ self._treat_update_flow_rules(flowrules, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("update_flow_rules failed"))
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def delete_flow_rules(self, context, **kwargs):
+ try:
+ flowrule_status = []
+ flowrules = kwargs['flowrule_entries']
+ LOG.debug("delete_flow_rules received, flowrules= %s", flowrules)
+ if flowrules:
+ self._treat_delete_flow_rules(flowrules, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("delete_flow_rules failed"))
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def update_src_node_flow_rules(self, context, **kwargs):
+ flowrule = kwargs['flowrule_entries']
+ if self.overlay_encap_mode == 'mpls':
+ self._setup_egress_flow_rules_with_mpls(flowrule,
+ match_inport=False)
+ self._update_destination_ingress_flow_rules(flowrule)
+
+ def _delete_src_node_flow_rules_with_mpls(self, flowrule,
+ match_inport=False):
+ LOG.debug("_delete_src_node_flow_rules_with_mpls, flowrule = %s",
+ flowrule)
+ group_id = flowrule.get('next_group_id', None)
+
+ # delete br-int table 0 full match flow
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=False)
+
+ # delete group table, need to check again
+ if None != group_id and flowrule.get('group_refcnt', None) <= 1:
+ self.int_br.delete_group(group_id=group_id)
+ for item in flowrule['next_hops']:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=item['mac_address'])
+
+ def delete_src_node_flow_rules(self, context, **kwargs):
+ flowrule = kwargs['flowrule_entries']
+ if self.overlay_encap_mode == 'mpls':
+ self._delete_src_node_flow_rules_with_mpls(flowrule,
+ match_inport=False)
+ self._update_destination_ingress_flow_rules(flowrule)
+
+ def sfc_treat_devices_added_updated(self, port_id):
+ resync = False
+ flowrule_status = []
+ try:
+ LOG.debug("a new device %s is found", port_id)
+ flows_list = (
+ self.sfc_plugin_rpc.get_flowrules_by_host_portid(
+ self.context, port_id
+ )
+ )
+ if flows_list:
+ for flow in flows_list:
+ self._treat_update_flow_rules(flow, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("portchain_treat_devices_added_updated failed"))
+ resync = True
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ return resync
+
+ def sfc_treat_devices_removed(self, port_ids):
+ resync = False
+ for port_id in port_ids:
+ LOG.info(_LI("a device %s is removed"), port_id)
+ try:
+ self._delete_ports_flowrules_by_id(port_id)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(
+ _LE("delete port flow rule failed for %(port_id)s"),
+ {'port_id': port_id}
+ )
+ resync = True
+
+ return resync
+
+ def treat_devices_added_or_updated(self, devices, ovs_restarted):
+ skipped_devices = []
+ need_binding_devices = []
+ security_disabled_devices = []
+ devices_details_list = (
+ self.plugin_rpc.get_devices_details_list_and_failed_devices(
+ self.context,
+ devices,
+ self.agent_id,
+ self.conf.host
+ )
+ )
+ if devices_details_list.get('failed_devices'):
+ # TODO(rossella_s): handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise ovs_neutron_agent.DeviceListRetrievalError(devices=devices)
+
+ devices = devices_details_list.get('devices')
+ vif_by_id = self.int_br.get_vifs_by_ids(
+ [vif['device'] for vif in devices])
+ for details in devices:
+ device = details['device']
+ LOG.debug("Processing port: %s", device)
+ port = vif_by_id.get(device)
+ if not port:
+ # The port disappeared and cannot be processed
+ LOG.info(_LI("Port %s was not found on the integration bridge "
+ "and will therefore not be processed"), device)
+ skipped_devices.append(device)
+ continue
+
+ if 'port_id' in details:
+ LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
+ {'device': device, 'details': details})
+ details['vif_port'] = port
+ need_binding = self.treat_vif_port(port, details['port_id'],
+ details['network_id'],
+ details['network_type'],
+ details['physical_network'],
+ details['segmentation_id'],
+ details['admin_state_up'],
+ details['fixed_ips'],
+ details['device_owner'],
+ ovs_restarted)
+ if need_binding:
+ need_binding_devices.append(details)
+
+ port_security = details['port_security_enabled']
+ has_sgs = 'security_groups' in details
+ if not port_security or not has_sgs:
+ security_disabled_devices.append(device)
+ self._update_port_network(details['port_id'],
+ details['network_id'])
+ self.ext_manager.handle_port(self.context, details)
+ self.sfc_treat_devices_added_updated(details['port_id'])
+ else:
+ LOG.warn(_LW("Device %s not defined on plugin"), device)
+ if (port and port.ofport != -1):
+ self.port_dead(port)
+ return skipped_devices, need_binding_devices, security_disabled_devices
+
+ def process_deleted_ports(self, port_info):
+ # don't try to process removed ports as deleted ports since
+ # they are already gone
+ if 'removed' in port_info:
+ self.deleted_ports -= port_info['removed']
+ deleted_ports = list(self.deleted_ports)
+ while self.deleted_ports:
+ port_id = self.deleted_ports.pop()
+ port = self.int_br.get_vif_port_by_id(port_id)
+ self._clean_network_ports(port_id)
+ self.ext_manager.delete_port(self.context,
+ {"vif_port": port,
+ "port_id": port_id})
+ self.sfc_treat_devices_removed(port_id)
+ # move to dead VLAN so deleted ports no
+ # longer have access to the network
+ if port:
+ # don't log errors since there is a chance someone will be
+ # removing the port from the bridge at the same time
+ self.port_dead(port, log_errors=False)
+ self.port_unbound(port_id)
+ # Flush firewall rules after ports are put on dead VLAN to be
+ # more secure
+ self.sg_agent.remove_devices_filter(deleted_ports)
+
+
+def main():
+ cfg.CONF.register_opts(ip_lib.OPTS)
+ config.register_root_helper(cfg.CONF)
+ common_config.init(sys.argv[1:])
+ common_config.setup_logging()
+ q_utils.log_opt_values(LOG)
+
+ try:
+ agent_config = ovs_neutron_agent.create_agent_config_map(cfg.CONF)
+ except ValueError as e:
+ LOG.exception(e)
+ LOG.error(_LE('Agent terminated!'))
+ sys.exit(1)
+
+ is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper
+ if is_xen_compute_host:
+ # Force ip_lib to always use the root helper to ensure that ip
+ # commands target xen dom0 rather than domU.
+ cfg.CONF.set_default('ip_lib_force_root', True)
+
+ bridge_classes = {
+ 'br_int': br_int.OVSIntegrationBridge,
+ 'br_phys': br_phys.OVSPhysicalBridge,
+ 'br_tun': br_tun.OVSTunnelBridge,
+ }
+ try:
+ agent = OVSSfcAgent(bridge_classes, **agent_config)
+ except RuntimeError as e:
+ LOG.exception(e)
+ LOG.error(_LE("Agent terminated!"))
+ sys.exit(1)
+ signal.signal(signal.SIGTERM, agent._handle_sigterm)
+
+ # Start everything.
+ LOG.info(_LI("Agent initialized successfully, now running... "))
+ agent.daemon_loop()
+
+
+if __name__ == "__main__":
+ main()