aboutsummaryrefslogtreecommitdiffstats
path: root/networking_sfc/services/sfc
diff options
context:
space:
mode:
Diffstat (limited to 'networking_sfc/services/sfc')
-rw-r--r--networking_sfc/services/sfc/__init__.py0
-rw-r--r--networking_sfc/services/sfc/agent/__init__.py2
-rw-r--r--networking_sfc/services/sfc/agent/agent.py891
-rw-r--r--networking_sfc/services/sfc/agent/br_int.py48
-rw-r--r--networking_sfc/services/sfc/agent/br_phys.py34
-rw-r--r--networking_sfc/services/sfc/agent/br_tun.py35
-rw-r--r--networking_sfc/services/sfc/common/__init__.py0
-rw-r--r--networking_sfc/services/sfc/common/config.py27
-rw-r--r--networking_sfc/services/sfc/common/context.py85
-rw-r--r--networking_sfc/services/sfc/common/exceptions.py46
-rw-r--r--networking_sfc/services/sfc/common/ovs_ext_lib.py187
-rw-r--r--networking_sfc/services/sfc/driver_manager.py118
-rw-r--r--networking_sfc/services/sfc/drivers/__init__.py0
-rw-r--r--networking_sfc/services/sfc/drivers/base.py57
-rw-r--r--networking_sfc/services/sfc/drivers/dummy/__init__.py0
-rw-r--r--networking_sfc/services/sfc/drivers/dummy/dummy.py59
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/__init__.py0
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/constants.py57
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/db.py426
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/driver.py1076
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/rpc.py112
-rw-r--r--networking_sfc/services/sfc/drivers/ovs/rpc_topics.py21
-rw-r--r--networking_sfc/services/sfc/plugin.py200
23 files changed, 3481 insertions, 0 deletions
diff --git a/networking_sfc/services/sfc/__init__.py b/networking_sfc/services/sfc/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/services/sfc/__init__.py
diff --git a/networking_sfc/services/sfc/agent/__init__.py b/networking_sfc/services/sfc/agent/__init__.py
new file mode 100644
index 0000000..626812b
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/__init__.py
@@ -0,0 +1,2 @@
+from neutron.common import eventlet_utils
+eventlet_utils.monkey_patch()
diff --git a/networking_sfc/services/sfc/agent/agent.py b/networking_sfc/services/sfc/agent/agent.py
new file mode 100644
index 0000000..2537f9b
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/agent.py
@@ -0,0 +1,891 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import signal
+import six
+import sys
+
+from neutron.agent.common import config
+from neutron.agent.linux import ip_lib
+
+from networking_sfc.services.sfc.agent import br_int
+from networking_sfc.services.sfc.agent import br_phys
+from networking_sfc.services.sfc.agent import br_tun
+from networking_sfc.services.sfc.common import ovs_ext_lib
+from networking_sfc.services.sfc.drivers.ovs import constants
+from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics
+
+from neutron.agent import rpc as agent_rpc
+from neutron.common import config as common_config
+from neutron.common import constants as n_const
+from neutron.common import exceptions
+from neutron.common import rpc as n_rpc
+from neutron.common import topics
+from neutron.common import utils as q_utils
+from neutron.i18n import _LE
+from neutron.i18n import _LI
+from neutron.i18n import _LW
+from neutron.plugins.ml2.drivers.openvswitch.agent.common import (
+ constants as ovs_const)
+from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent
+
+from oslo_config import cfg
+from oslo_log import log as logging
+import oslo_messaging
+
+
+LOG = logging.getLogger(__name__)
+
+agent_opts = [
+ cfg.StrOpt('sfc_encap_mode', default='mpls',
+ help=_("The encapsulation mode of sfc.")),
+]
+
+cfg.CONF.register_opts(agent_opts, "AGENT")
+
+# This table is used to process the traffic across differet subnet scenario.
+# Flow 1: pri=1, ip,dl_dst=nexthop_mac,nw_src=nexthop_subnet. actions=
+# push_mpls:0x8847,set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port
+# or resubmit to table(INGRESS_TABLE)
+# Flow 2: pri=0, ip,dl_dst=nexthop_mac,, action=push_mpls:0x8847,
+# set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port or resubmit to
+# table(INGRESS_TABLE)
+ACROSS_SUBNET_TABLE = 5
+
+# The table has multiple flows that steer traffic for the different chains
+# to the ingress port of different service functions hosted on this Compute
+# node.
+INGRESS_TABLE = 10
+
+# port chain default flow rule priority
+PC_DEF_PRI = 20
+PC_INGRESS_PRI = 30
+
+
+class FeatureSupportError(exceptions.NeutronException):
+ message = _("Current sfc agent don't support %(feature)s ")
+
+
+class SfcPluginApi(object):
+ def __init__(self, topic, host):
+ self.host = host
+ self.target = oslo_messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(self.target)
+
+ def update_flowrules_status(self, context, flowrules_status):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'update_flowrules_status',
+ flowrules_status=flowrules_status)
+
+ def get_flowrules_by_host_portid(self, context, port_id):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'get_flowrules_by_host_portid',
+ host=self.host, port_id=port_id)
+
+ def get_all_src_node_flowrules(self, context):
+ cctxt = self.client.prepare()
+ return cctxt.call(
+ context, 'get_all_src_node_flowrules',
+ host=self.host)
+
+
+class OVSSfcAgent(ovs_neutron_agent.OVSNeutronAgent):
+ # history
+ # 1.0 Initial version
+ """This class will support MPLS frame
+
+ Ethernet + MPLS
+ IPv4 Packet:
+ +-------------------------------+---------------+--------------------+
+ |Outer Ethernet, ET=0x8847 | MPLS head, | original IP Packet |
+ +-------------------------------+---------------+--------------------+
+ """
+
+ target = oslo_messaging.Target(version='1.0')
+
+ def __init__(self, bridge_classes, integ_br, tun_br, local_ip,
+ bridge_mappings, polling_interval, tunnel_types=None,
+ veth_mtu=None, l2_population=False,
+ enable_distributed_routing=False,
+ minimize_polling=False,
+ ovsdb_monitor_respawn_interval=(
+ ovs_const.DEFAULT_OVSDBMON_RESPAWN),
+ arp_responder=False,
+ prevent_arp_spoofing=True,
+ use_veth_interconnection=False,
+ quitting_rpc_timeout=None):
+
+ """to get network info from ovs agent."""
+ super(OVSSfcAgent, self).__init__(
+ bridge_classes, integ_br, tun_br,
+ local_ip,
+ bridge_mappings, polling_interval, tunnel_types,
+ veth_mtu, l2_population,
+ enable_distributed_routing,
+ minimize_polling,
+ ovsdb_monitor_respawn_interval,
+ arp_responder,
+ prevent_arp_spoofing,
+ use_veth_interconnection,
+ quitting_rpc_timeout)
+
+ self.overlay_encap_mode = cfg.CONF.AGENT.sfc_encap_mode
+ self._sfc_setup_rpc()
+
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._clear_sfc_flow_on_int_br()
+ self._setup_src_node_flow_rules_with_mpls()
+
+ def _sfc_setup_rpc(self):
+ self.sfc_plugin_rpc = SfcPluginApi(
+ sfc_topics.SFC_PLUGIN, cfg.CONF.host)
+
+ self.topic = sfc_topics.SFC_AGENT
+ self.endpoints = [self]
+ consumers = [
+ [sfc_topics.PORTFLOW, topics.UPDATE],
+ [sfc_topics.PORTFLOW, topics.DELETE]
+ ]
+
+ # subscribe sfc plugin message
+ self.connection = agent_rpc.create_consumers(
+ self.endpoints,
+ self.topic,
+ consumers)
+
+ def _parse_flow_classifier(self, flow_classifier):
+ dl_type, nw_proto, source_port_masks, destination_port_masks = (
+ (None, ) * 4)
+
+ if (
+ not flow_classifier['source_port_range_min'] and
+ not flow_classifier['source_port_range_max']
+ ):
+ # wildcard
+ source_port_masks = ['0/0x0']
+ elif not flow_classifier['source_port_range_min']:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ 1,
+ flow_classifier['source_port_range_max'])
+ elif not flow_classifier['source_port_range_max']:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['source_port_range_min'],
+ 65535)
+ else:
+ source_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['source_port_range_min'],
+ flow_classifier['source_port_range_max'])
+
+ if (
+ not flow_classifier['destination_port_range_min'] and
+ not flow_classifier['destination_port_range_max']
+ ):
+ # wildcard
+ destination_port_masks = ['0/0x0']
+ elif not flow_classifier['destination_port_range_min']:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ 1,
+ flow_classifier['destination_port_range_max'])
+ elif not flow_classifier['destination_port_range_max']:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['destination_port_range_min'],
+ 65535)
+ else:
+ destination_port_masks = ovs_ext_lib.get_port_mask(
+ flow_classifier['destination_port_range_min'],
+ flow_classifier['destination_port_range_max'])
+
+ if "IPv4" == flow_classifier['ethertype']:
+ dl_type = 0x0800
+ if n_const.PROTO_NAME_TCP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_TCP
+ elif n_const.PROTO_NAME_UDP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_UDP
+ elif n_const.PROTO_NAME_ICMP == flow_classifier['protocol']:
+ nw_proto = n_const.PROTO_NUM_ICMP
+ else:
+ nw_proto = None
+ elif "IPv6" == flow_classifier['ethertype']:
+ LOG.error(_LE("Current portchain agent don't support Ipv6"))
+ else:
+ LOG.error(_LE("invalid protocol input"))
+ return (dl_type, nw_proto,
+ source_port_masks, destination_port_masks
+ )
+
+ def _clear_sfc_flow_on_int_br(self):
+ self.int_br.delete_group(group_id='all')
+ self.int_br.delete_flows(table=ACROSS_SUBNET_TABLE)
+ self.int_br.delete_flows(table=INGRESS_TABLE)
+ self.int_br.install_goto(dest_table_id=INGRESS_TABLE,
+ priority=PC_DEF_PRI,
+ dl_type=0x8847)
+ self.int_br.install_drop(table_id=INGRESS_TABLE)
+
+ def _get_flow_infos_from_flow_classifier(self, flow_classifier):
+ flow_infos = []
+ nw_src, nw_dst = ((None, ) * 2)
+
+ if "IPv4" != flow_classifier['ethertype']:
+ LOG.error(_LE("Current portchain agent don't support Ipv6"))
+ return flow_infos
+
+ # parse and transfer flow info to match field info
+ dl_type, nw_proto, source_port_masks, destination_port_masks = (
+ self._parse_flow_classifier(flow_classifier))
+
+ if flow_classifier['source_ip_prefix']:
+ nw_src = flow_classifier['source_ip_prefix']
+ else:
+ nw_src = '0.0.0.0/0.0.0.0'
+ if flow_classifier['destination_ip_prefix']:
+ nw_dst = flow_classifier['destination_ip_prefix']
+ else:
+ nw_dst = '0.0.0.0/0.0.0.0'
+
+ if source_port_masks and destination_port_masks:
+ for destination_port in destination_port_masks:
+ for source_port in source_port_masks:
+ if nw_proto is None:
+ flow_infos.append(dict(
+ dl_type=dl_type,
+ nw_src=nw_src,
+ nw_dst=nw_dst,
+ tp_src='%s' % source_port,
+ tp_dst='%s' % destination_port
+ ))
+ else:
+ flow_infos.append(dict(
+ dl_type=dl_type,
+ nw_proto=nw_proto,
+ nw_src=nw_src,
+ nw_dst=nw_dst,
+ tp_src='%s' % source_port,
+ tp_dst='%s' % destination_port
+ ))
+
+ return flow_infos
+
+ def _get_flow_infos_from_flow_classifier_list(self, flow_classifier_list):
+ flow_infos = []
+ if not flow_classifier_list:
+ return flow_infos
+ for flow_classifier in flow_classifier_list:
+ flow_infos.extend(
+ self._get_flow_infos_from_flow_classifier(flow_classifier)
+ )
+
+ return flow_infos
+
+ def _setup_local_switch_flows_on_int_br(
+ self, flowrule, flow_classifier_list,
+ actions, add_flow=True, match_inport=True
+ ):
+ inport_match = {}
+ priority = PC_DEF_PRI
+
+ if match_inport is True:
+ egress_port = self.int_br.get_vif_port_by_id(flowrule['egress'])
+ if egress_port:
+ inport_match = dict(in_port=egress_port.ofport)
+ priority = PC_INGRESS_PRI
+
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flow_classifier_list
+ ):
+ match_info = dict(inport_match, **flow_info)
+ if add_flow:
+ self.int_br.add_flow(
+ table=ovs_const.LOCAL_SWITCHING,
+ priority=priority,
+ actions=actions, **match_info
+ )
+ else:
+ self.int_br.delete_flows(
+ table=ovs_const.LOCAL_SWITCHING,
+ **match_info
+ )
+
+ def _update_destination_ingress_flow_rules(self, flowrule):
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flowrule['del_fcs']
+ ):
+ self.int_br.delete_flows(
+ table=ovs_const.LOCAL_SWITCHING,
+ in_port=self.patch_tun_ofport,
+ **flow_info
+ )
+ for flow_info in self._get_flow_infos_from_flow_classifier_list(
+ flowrule['add_fcs']
+ ):
+ inport_match = dict(in_port=self.patch_tun_ofport)
+ match_info = dict(inport_match, **flow_info)
+ self.int_br.install_normal(table_id=ovs_const.LOCAL_SWITCHING,
+ priority=PC_INGRESS_PRI,
+ **match_info)
+
+ def _setup_src_node_flow_rules_with_mpls(self):
+ flow_rules = self.sfc_plugin_rpc.get_all_src_node_flowrules(
+ self.context)
+ if not flow_rules:
+ return
+ for fr in flow_rules:
+ self._setup_egress_flow_rules_with_mpls(fr, False)
+ # if the traffic is from patch port, it means the destination
+ # is on the this host. so implement normal forward but not
+ # match the traffic from the source.
+ # Next step need to do is check if the traffic is from vRouter
+ # on the local host, also need to implement same normal process.
+ self._update_destination_ingress_flow_rules(fr)
+
+ def _setup_egress_flow_rules_with_mpls(self, flowrule, match_inport=True):
+ network_type = flowrule['network_type']
+ group_id = flowrule.get('next_group_id', None)
+ next_hops = flowrule.get('next_hops', None)
+ segmentation_id = flowrule['segment_id']
+
+ if not next_hops:
+ return
+
+ if network_type not in ovs_const.TUNNEL_NETWORK_TYPES:
+ LOG.warn(_LW("currently %s network is not supported,"
+ "only support tunnel type"
+ ),
+ network_type
+ )
+ return
+
+ # if the group is not none, install the egress rule for this SF
+ if (
+ (flowrule['node_type'] == constants.SRC_NODE or
+ flowrule['node_type'] == constants.SF_NODE) and group_id
+ ):
+ # 1st, install br-int flow rule on table ACROSS_SUBNET_TABLE
+ # and group table
+ buckets = []
+ for item in next_hops:
+ if item['net_uuid'] not in self.local_vlan_map:
+ self.provision_local_vlan(item['net_uuid'], network_type,
+ None, segmentation_id)
+ lvm = self.local_vlan_map[item['net_uuid']]
+ bucket = (
+ 'bucket=weight=%d, mod_dl_dst:%s,'
+ 'resubmit(,%d)' % (
+ item['weight'],
+ item['mac_address'],
+ ACROSS_SUBNET_TABLE
+ )
+ )
+ buckets.append(bucket)
+
+ no_across_subnet_actions_list = []
+ across_subnet_actions_list = []
+
+ push_mpls = (
+ "push_mpls:0x8847,"
+ "set_mpls_label:%d,"
+ "set_mpls_ttl:%d,"
+ "mod_vlan_vid:%d," %
+ ((flowrule['nsp'] << 8) | flowrule['nsi'],
+ flowrule['nsi'], lvm.vlan))
+
+ no_across_subnet_actions_list.append(push_mpls)
+ across_subnet_actions_list.append(push_mpls)
+
+ if item['local_endpoint'] == self.local_ip:
+ no_across_subnet_actions = (
+ "resubmit(,%d)" % INGRESS_TABLE)
+ across_subnet_actions = (
+ "mod_dl_src:%s, resubmit(,%d)" %
+ (item['gw_mac'], INGRESS_TABLE))
+ else:
+ # same subnet with next hop
+ no_across_subnet_actions = ("output:%s" %
+ self.patch_tun_ofport)
+ across_subnet_actions = ("mod_dl_src:%s, output:%s" %
+ (item['gw_mac'],
+ self.patch_tun_ofport))
+ no_across_subnet_actions_list.append(no_across_subnet_actions)
+ across_subnet_actions_list.append(across_subnet_actions)
+
+ self.int_br.add_flow(
+ table=ACROSS_SUBNET_TABLE,
+ priority=1,
+ dl_dst=item['mac_address'],
+ dl_type=0x0800,
+ nw_src=item['cidr'],
+ actions="%s" %
+ (','.join(no_across_subnet_actions_list)))
+ # different subnet with next hop
+ self.int_br.add_flow(
+ table=ACROSS_SUBNET_TABLE,
+ priority=0,
+ dl_dst=item['mac_address'],
+ actions="%s" %
+ (','.join(across_subnet_actions_list)))
+
+ buckets = ','.join(buckets)
+ group_content = self.int_br.dump_group_for_id(group_id)
+ if group_content.find('group_id=%d' % group_id) == -1:
+ self.int_br.add_group(group_id=group_id,
+ type='select', buckets=buckets)
+ else:
+ self.int_br.mod_group(group_id=group_id,
+ type='select', buckets=buckets)
+
+ # 2nd, install br-int flow rule on table 0 for egress traffic
+ # for egress traffic
+ enc_actions = ("group:%d" % group_id)
+ # to uninstall the removed flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=match_inport)
+ # to install the added flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['add_fcs'],
+ enc_actions,
+ add_flow=True,
+ match_inport=match_inport)
+
+ def _get_network_by_port(self, port_id):
+ for key, val in six.iteritems(self.network_ports):
+ if port_id in val:
+ return key
+
+ return None
+
+ def _setup_ingress_flow_rules_with_mpls(self, flowrule):
+ network_id = self._get_network_by_port(flowrule['ingress'])
+ if network_id:
+ # install br-int flow rule on table 0 for ingress traffic
+ lvm = self.local_vlan_map[network_id]
+ vif_port = lvm.vif_ports[flowrule['ingress']]
+ match_field = {}
+
+ actions = ("strip_vlan, pop_mpls:0x0800,"
+ "output:%s" % vif_port.ofport)
+ match_field = dict(
+ table=INGRESS_TABLE,
+ priority=1,
+ dl_dst=vif_port.vif_mac,
+ dl_vlan=lvm.vlan,
+ dl_type=0x8847,
+ mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1),
+ actions=actions)
+
+ self.int_br.add_flow(**match_field)
+
+ def _setup_last_egress_flow_rules_with_mpls(self, flowrule):
+ group_id = flowrule.get('next_group_id', None)
+
+ # check whether user assign the destination neutron port.
+ if (
+ constants.SF_NODE == flowrule['node_type'] and
+ not group_id and
+ flowrule['egress'] is not None
+ ):
+ # to uninstall the new removed flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=True
+ )
+
+ # to install the added flow classifiers
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['add_fcs'],
+ actions='normal',
+ add_flow=True,
+ match_inport=True)
+
+ def _get_flow_classifier_dest_port_info(self,
+ logical_destination_port,
+ flowrule):
+ for next_hop in flowrule['next_hops']:
+ # this flow classifier's destination port should match
+ # with the nexthop's ingress port id
+ if logical_destination_port in next_hop.values():
+ return next_hop
+
+ return None
+
+ def _update_flow_rules_with_mpls_enc(self, flowrule, flowrule_status):
+ try:
+ if flowrule.get('egress', None):
+ self._setup_egress_flow_rules_with_mpls(flowrule)
+ self._setup_last_egress_flow_rules_with_mpls(flowrule)
+ if flowrule.get('ingress', None):
+ self._setup_ingress_flow_rules_with_mpls(flowrule)
+
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ACTIVE
+ flowrule_status.append(flowrule_status_temp)
+ except Exception as e:
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ERROR
+ flowrule_status.append(flowrule_status_temp)
+ LOG.exception(e)
+ LOG.error(_LE("_update_flow_rules_with_mpls_enc failed"))
+
+ def _delete_ports_flowrules_by_id(self, ports_id):
+ flowrule_status = []
+ try:
+ LOG.debug("delete_port_id_flows received, ports_id= %s", ports_id)
+ count = 0
+ if ports_id:
+ for port_id in ports_id:
+ flowrule = (
+ self.sfc_plugin_rpc.get_flowrules_by_host_portid(
+ self.context, port_id
+ )
+ )
+ if flowrule:
+ self._treat_delete_flow_rules(
+ flowrule, flowrule_status)
+ LOG.debug(
+ "_delete_ports_flowrules_by_id received, count= %s", count)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("delete_port_id_flows failed"))
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def _delete_flow_rule_with_mpls_enc(self, flowrule, flowrule_status):
+ try:
+ LOG.debug("_delete_flow_rule_with_mpls_enc, flowrule = %s",
+ flowrule)
+ group_id = flowrule.get('next_group_id', None)
+
+ # delete tunnel table flow rule on br-int(egress match)
+ if flowrule['egress'] is not None:
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=True
+ )
+
+ # delete table INGRESS_TABLE ingress match flow rule
+ # on br-int(ingress match)
+ network_id = self._get_network_by_port(flowrule['ingress'])
+ if network_id:
+ # third, install br-int flow rule on table INGRESS_TABLE
+ # for ingress traffic
+ lvm = self.local_vlan_map[network_id]
+ vif_port = lvm.vif_ports[flowrule['ingress']]
+ self.int_br.delete_flows(
+ table=INGRESS_TABLE,
+ dl_type=0x8847,
+ dl_dst=vif_port.vif_mac,
+ mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1)
+ )
+
+ # delete group table, need to check again
+ if group_id and flowrule.get('group_refcnt', None) <= 1:
+ self.int_br.delete_group(group_id=group_id)
+ for item in flowrule['next_hops']:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=item['mac_address'])
+ elif (not group_id and
+ flowrule['egress'] is not None):
+ # to delete last hop flow rule
+ for each in flowrule['del_fcs']:
+ if each.get('logical_destination_port', None):
+ ldp = self._get_flow_classifier_dest_port_info(
+ each['logical_destination_port'],
+ flowrule
+ )
+ if ldp:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=ldp['mac_address'])
+
+ except Exception as e:
+ flowrule_status_temp = {}
+ flowrule_status_temp['id'] = flowrule['id']
+ flowrule_status_temp['status'] = constants.STATUS_ERROR
+ flowrule_status.append(flowrule_status_temp)
+ LOG.exception(e)
+ LOG.error(_LE("_delete_flow_rule_with_mpls_enc failed"))
+
+ def _treat_update_flow_rules(self, flowrule, flowrule_status):
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._update_flow_rules_with_mpls_enc(flowrule, flowrule_status)
+
+ def _treat_delete_flow_rules(self, flowrule, flowrule_status):
+ if self.overlay_encap_mode == 'eth_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'vxlan_nsh':
+ raise FeatureSupportError(feature=self.overlay_encap_mode)
+ elif self.overlay_encap_mode == 'mpls':
+ self._delete_flow_rule_with_mpls_enc(
+ flowrule, flowrule_status)
+
+ def update_flow_rules(self, context, **kwargs):
+ try:
+ flowrule_status = []
+ flowrules = kwargs['flowrule_entries']
+ LOG.debug("update_flow_rules received, flowrules = %s",
+ flowrules)
+
+ if flowrules:
+ self._treat_update_flow_rules(flowrules, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("update_flow_rules failed"))
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def delete_flow_rules(self, context, **kwargs):
+ try:
+ flowrule_status = []
+ flowrules = kwargs['flowrule_entries']
+ LOG.debug("delete_flow_rules received, flowrules= %s", flowrules)
+ if flowrules:
+ self._treat_delete_flow_rules(flowrules, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("delete_flow_rules failed"))
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ def update_src_node_flow_rules(self, context, **kwargs):
+ flowrule = kwargs['flowrule_entries']
+ if self.overlay_encap_mode == 'mpls':
+ self._setup_egress_flow_rules_with_mpls(flowrule,
+ match_inport=False)
+ self._update_destination_ingress_flow_rules(flowrule)
+
+ def _delete_src_node_flow_rules_with_mpls(self, flowrule,
+ match_inport=False):
+ LOG.debug("_delete_src_node_flow_rules_with_mpls, flowrule = %s",
+ flowrule)
+ group_id = flowrule.get('next_group_id', None)
+
+ # delete br-int table 0 full match flow
+ self._setup_local_switch_flows_on_int_br(
+ flowrule,
+ flowrule['del_fcs'],
+ None,
+ add_flow=False,
+ match_inport=False)
+
+ # delete group table, need to check again
+ if None != group_id and flowrule.get('group_refcnt', None) <= 1:
+ self.int_br.delete_group(group_id=group_id)
+ for item in flowrule['next_hops']:
+ self.int_br.delete_flows(
+ table=ACROSS_SUBNET_TABLE,
+ dl_dst=item['mac_address'])
+
+ def delete_src_node_flow_rules(self, context, **kwargs):
+ flowrule = kwargs['flowrule_entries']
+ if self.overlay_encap_mode == 'mpls':
+ self._delete_src_node_flow_rules_with_mpls(flowrule,
+ match_inport=False)
+ self._update_destination_ingress_flow_rules(flowrule)
+
+ def sfc_treat_devices_added_updated(self, port_id):
+ resync = False
+ flowrule_status = []
+ try:
+ LOG.debug("a new device %s is found", port_id)
+ flows_list = (
+ self.sfc_plugin_rpc.get_flowrules_by_host_portid(
+ self.context, port_id
+ )
+ )
+ if flows_list:
+ for flow in flows_list:
+ self._treat_update_flow_rules(flow, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("portchain_treat_devices_added_updated failed"))
+ resync = True
+
+ if flowrule_status:
+ self.sfc_plugin_rpc.update_flowrules_status(
+ self.context, flowrule_status)
+
+ return resync
+
+ def sfc_treat_devices_removed(self, port_ids):
+ resync = False
+ for port_id in port_ids:
+ LOG.info(_LI("a device %s is removed"), port_id)
+ try:
+ self._delete_ports_flowrules_by_id(port_id)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(
+ _LE("delete port flow rule failed for %(port_id)s"),
+ {'port_id': port_id}
+ )
+ resync = True
+
+ return resync
+
+ def treat_devices_added_or_updated(self, devices, ovs_restarted):
+ skipped_devices = []
+ need_binding_devices = []
+ security_disabled_devices = []
+ devices_details_list = (
+ self.plugin_rpc.get_devices_details_list_and_failed_devices(
+ self.context,
+ devices,
+ self.agent_id,
+ self.conf.host
+ )
+ )
+ if devices_details_list.get('failed_devices'):
+ # TODO(rossella_s): handle better the resync in next patches,
+ # this is just to preserve the current behavior
+ raise ovs_neutron_agent.DeviceListRetrievalError(devices=devices)
+
+ devices = devices_details_list.get('devices')
+ vif_by_id = self.int_br.get_vifs_by_ids(
+ [vif['device'] for vif in devices])
+ for details in devices:
+ device = details['device']
+ LOG.debug("Processing port: %s", device)
+ port = vif_by_id.get(device)
+ if not port:
+ # The port disappeared and cannot be processed
+ LOG.info(_LI("Port %s was not found on the integration bridge "
+ "and will therefore not be processed"), device)
+ skipped_devices.append(device)
+ continue
+
+ if 'port_id' in details:
+ LOG.info(_LI("Port %(device)s updated. Details: %(details)s"),
+ {'device': device, 'details': details})
+ details['vif_port'] = port
+ need_binding = self.treat_vif_port(port, details['port_id'],
+ details['network_id'],
+ details['network_type'],
+ details['physical_network'],
+ details['segmentation_id'],
+ details['admin_state_up'],
+ details['fixed_ips'],
+ details['device_owner'],
+ ovs_restarted)
+ if need_binding:
+ need_binding_devices.append(details)
+
+ port_security = details['port_security_enabled']
+ has_sgs = 'security_groups' in details
+ if not port_security or not has_sgs:
+ security_disabled_devices.append(device)
+ self._update_port_network(details['port_id'],
+ details['network_id'])
+ self.ext_manager.handle_port(self.context, details)
+ self.sfc_treat_devices_added_updated(details['port_id'])
+ else:
+ LOG.warn(_LW("Device %s not defined on plugin"), device)
+ if (port and port.ofport != -1):
+ self.port_dead(port)
+ return skipped_devices, need_binding_devices, security_disabled_devices
+
+ def process_deleted_ports(self, port_info):
+ # don't try to process removed ports as deleted ports since
+ # they are already gone
+ if 'removed' in port_info:
+ self.deleted_ports -= port_info['removed']
+ deleted_ports = list(self.deleted_ports)
+ while self.deleted_ports:
+ port_id = self.deleted_ports.pop()
+ port = self.int_br.get_vif_port_by_id(port_id)
+ self._clean_network_ports(port_id)
+ self.ext_manager.delete_port(self.context,
+ {"vif_port": port,
+ "port_id": port_id})
+ self.sfc_treat_devices_removed(port_id)
+ # move to dead VLAN so deleted ports no
+ # longer have access to the network
+ if port:
+ # don't log errors since there is a chance someone will be
+ # removing the port from the bridge at the same time
+ self.port_dead(port, log_errors=False)
+ self.port_unbound(port_id)
+ # Flush firewall rules after ports are put on dead VLAN to be
+ # more secure
+ self.sg_agent.remove_devices_filter(deleted_ports)
+
+
+def main():
+ cfg.CONF.register_opts(ip_lib.OPTS)
+ config.register_root_helper(cfg.CONF)
+ common_config.init(sys.argv[1:])
+ common_config.setup_logging()
+ q_utils.log_opt_values(LOG)
+
+ try:
+ agent_config = ovs_neutron_agent.create_agent_config_map(cfg.CONF)
+ except ValueError as e:
+ LOG.exception(e)
+ LOG.error(_LE('Agent terminated!'))
+ sys.exit(1)
+
+ is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper
+ if is_xen_compute_host:
+ # Force ip_lib to always use the root helper to ensure that ip
+ # commands target xen dom0 rather than domU.
+ cfg.CONF.set_default('ip_lib_force_root', True)
+
+ bridge_classes = {
+ 'br_int': br_int.OVSIntegrationBridge,
+ 'br_phys': br_phys.OVSPhysicalBridge,
+ 'br_tun': br_tun.OVSTunnelBridge,
+ }
+ try:
+ agent = OVSSfcAgent(bridge_classes, **agent_config)
+ except RuntimeError as e:
+ LOG.exception(e)
+ LOG.error(_LE("Agent terminated!"))
+ sys.exit(1)
+ signal.signal(signal.SIGTERM, agent._handle_sigterm)
+
+ # Start everything.
+ LOG.info(_LI("Agent initialized successfully, now running... "))
+ agent.daemon_loop()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/networking_sfc/services/sfc/agent/br_int.py b/networking_sfc/services/sfc/agent/br_int.py
new file mode 100644
index 0000000..1f88c01
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/br_int.py
@@ -0,0 +1,48 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+* references
+** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic
+"""
+
+from networking_sfc.services.sfc.common import ovs_ext_lib
+from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants
+from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import (
+ br_int)
+
+
+class OVSIntegrationBridge(
+ br_int.OVSIntegrationBridge,
+ ovs_ext_lib.OVSBridgeExt
+):
+ def setup_controllers(self, conf):
+ self.set_protocols("[]")
+ self.del_controller()
+
+ def delete_arp_spoofing_protection(self, port):
+ # there is an issue to delete icmp6, it will not effect and cause
+ # other flow rule get deleted.
+ # Raofei will raise a bug to neutron community.
+ self.delete_flows(table_id=constants.LOCAL_SWITCHING,
+ in_port=port, proto='arp')
+ self.delete_flows(table_id=constants.ARP_SPOOF_TABLE,
+ in_port=port)
+
+ def mod_flow(self, **kwargs):
+ ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs)
+
+ def run_ofctl(self, cmd, args, process_input=None):
+ return ovs_ext_lib.OVSBridgeExt.run_ofctl(
+ self, cmd, args, process_input=process_input)
diff --git a/networking_sfc/services/sfc/agent/br_phys.py b/networking_sfc/services/sfc/agent/br_phys.py
new file mode 100644
index 0000000..e9666e9
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/br_phys.py
@@ -0,0 +1,34 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+* references
+** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic
+"""
+from networking_sfc.services.sfc.common import ovs_ext_lib
+from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import (
+ br_phys)
+
+
+class OVSPhysicalBridge(br_phys.OVSPhysicalBridge, ovs_ext_lib.OVSBridgeExt):
+ def setup_controllers(self, conf):
+ self.set_protocols("[]")
+ self.del_controller()
+
+ def mod_flow(self, **kwargs):
+ ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs)
+
+ def run_ofctl(self, cmd, args, process_input=None):
+ return ovs_ext_lib.OVSBridgeExt.run_ofctl(
+ self, cmd, args, process_input=process_input)
diff --git a/networking_sfc/services/sfc/agent/br_tun.py b/networking_sfc/services/sfc/agent/br_tun.py
new file mode 100644
index 0000000..47a7cf9
--- /dev/null
+++ b/networking_sfc/services/sfc/agent/br_tun.py
@@ -0,0 +1,35 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+"""
+* references
+** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic
+"""
+
+from networking_sfc.services.sfc.common import ovs_ext_lib
+from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import (
+ br_tun)
+
+
+class OVSTunnelBridge(br_tun.OVSTunnelBridge, ovs_ext_lib.OVSBridgeExt):
+ def setup_controllers(self, conf):
+ self.set_protocols("[]")
+ self.del_controller()
+
+ def mod_flow(self, **kwargs):
+ ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs)
+
+ def run_ofctl(self, cmd, args, process_input=None):
+ return ovs_ext_lib.OVSBridgeExt.run_ofctl(
+ self, cmd, args, process_input=process_input)
diff --git a/networking_sfc/services/sfc/common/__init__.py b/networking_sfc/services/sfc/common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/services/sfc/common/__init__.py
diff --git a/networking_sfc/services/sfc/common/config.py b/networking_sfc/services/sfc/common/config.py
new file mode 100644
index 0000000..29acd1c
--- /dev/null
+++ b/networking_sfc/services/sfc/common/config.py
@@ -0,0 +1,27 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+
+
+SFC_DRIVER_OPTS = [
+ cfg.ListOpt('drivers',
+ default=['dummy'],
+ help=_("An ordered list of service chain drivers "
+ "entrypoints to be loaded from the "
+ "networking_sfc.sfc.drivers namespace.")),
+]
+
+
+cfg.CONF.register_opts(SFC_DRIVER_OPTS, "sfc")
diff --git a/networking_sfc/services/sfc/common/context.py b/networking_sfc/services/sfc/common/context.py
new file mode 100644
index 0000000..7d3b451
--- /dev/null
+++ b/networking_sfc/services/sfc/common/context.py
@@ -0,0 +1,85 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+class SfcPluginContext(object):
+ """SFC context base class."""
+ def __init__(self, plugin, plugin_context):
+ self._plugin = plugin
+ self._plugin_context = plugin_context
+
+
+class PortChainContext(SfcPluginContext):
+
+ def __init__(self, plugin, plugin_context, portchain,
+ original_portchain=None):
+ super(PortChainContext, self).__init__(plugin, plugin_context)
+ self._portchain = portchain
+ self._original_portchain = original_portchain
+
+ @property
+ def current(self):
+ return self._portchain
+
+ @property
+ def original(self):
+ return self._original_portchain
+
+
+class FlowClassifierContext(SfcPluginContext):
+ def __init__(self, plugin, plugin_context, flowclassifier,
+ original_flowclassifier=None):
+ super(FlowClassifierContext, self).__init__(plugin, plugin_context)
+ self._flowclassifier = flowclassifier
+ self._original_flowclassifier = original_flowclassifier
+
+ @property
+ def current(self):
+ return self._flowclassifier
+
+ @property
+ def original(self):
+ return self._original_flowclassifier
+
+
+class PortPairContext(SfcPluginContext):
+ def __init__(self, plugin, plugin_context, portpair,
+ original_portpair=None):
+ super(PortPairContext, self).__init__(plugin, plugin_context)
+ self._portpair = portpair
+ self._original_portpair = original_portpair
+
+ @property
+ def current(self):
+ return self._portpair
+
+ @property
+ def original(self):
+ return self._original_portpair
+
+
+class PortPairGroupContext(SfcPluginContext):
+ def __init__(self, plugin, plugin_context, portpairgroup,
+ original_portpairgroup=None):
+ super(PortPairGroupContext, self).__init__(plugin, plugin_context)
+ self._portpairgroup = portpairgroup
+ self._original_portpairgroup = original_portpairgroup
+
+ @property
+ def current(self):
+ return self._portpairgroup
+
+ @property
+ def original(self):
+ return self._original_portpairgroup
diff --git a/networking_sfc/services/sfc/common/exceptions.py b/networking_sfc/services/sfc/common/exceptions.py
new file mode 100644
index 0000000..7d1b9d9
--- /dev/null
+++ b/networking_sfc/services/sfc/common/exceptions.py
@@ -0,0 +1,46 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+"""Exceptions used by SFC plugin and drivers."""
+
+from neutron.common import exceptions
+
+
+class SfcDriverError(exceptions.NeutronException):
+ """SFC driver call failed."""
+ message = _("%(method)s failed.")
+
+
+class SfcException(exceptions.NeutronException):
+ """Base for SFC driver exceptions returned to user."""
+ pass
+
+
+class SfcBadRequest(exceptions.BadRequest, SfcException):
+ """Base for SFC driver bad request exceptions returned to user."""
+ pass
+
+
+class SfcNoSubnetGateway(SfcDriverError):
+ """No subnet gateway."""
+ message = _("There is no %(type)s of ip prefix %(cidr)s.")
+
+
+class SfcNoSuchSubnet(SfcDriverError):
+ """No such subnet."""
+ message = _("There is no %(type)s of %(cidr)s.")
+
+
+class FlowClassifierInvalid(SfcDriverError):
+ """Invalid flow classifier."""
+ message = _("There is no %(type)s assigned.")
diff --git a/networking_sfc/services/sfc/common/ovs_ext_lib.py b/networking_sfc/services/sfc/common/ovs_ext_lib.py
new file mode 100644
index 0000000..01fbd04
--- /dev/null
+++ b/networking_sfc/services/sfc/common/ovs_ext_lib.py
@@ -0,0 +1,187 @@
+# Copyright 2015 Huawei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import collections
+from neutron.agent.common import ovs_lib
+from neutron.agent.common import utils
+from neutron.common import exceptions
+from neutron.i18n import _LE
+from neutron.plugins.common import constants
+from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import (
+ ovs_bridge)
+from oslo_log import log as logging
+import six
+
+
+# Special return value for an invalid OVS ofport
+INVALID_OFPORT = '-1'
+
+LOG = logging.getLogger(__name__)
+
+
+def get_port_mask(min_port, max_port):
+ """get port/mask serial by port range."""
+ if min_port < 1 or max_port > 0xffff or min_port > max_port:
+ msg = _("the port range is invalid")
+ raise exceptions.InvalidInput(error_message=msg)
+ masks = []
+ while min_port <= max_port:
+ mask = 0xffff
+ while mask != 0:
+ next_mask = (mask << 1) & 0xffff
+ port_start = min_port & next_mask
+ port_end = min_port + (next_mask ^ 0xffff)
+ if port_start == min_port and port_end <= max_port:
+ mask = next_mask
+ else:
+ break
+ masks.append('0x%x/0x%x' % (min_port, mask))
+ min_port = min_port + (mask ^ 0xffff) + 1
+
+ return masks
+
+
+class OVSBridgeExt(ovs_bridge.OVSAgentBridge):
+ def setup_controllers(self, conf):
+ self.set_protocols("[]")
+ self.del_controller()
+
+ def dump_flows_full_match(self, flow_str):
+ retval = None
+ flows = self.run_ofctl("dump-flows", [flow_str])
+ if flows:
+ retval = '\n'.join(item for item in flows.splitlines()
+ if 'NXST' not in item and 'OFPST' not in item)
+ return retval
+
+ def mod_flow(self, **kwargs):
+ flow_copy = kwargs.copy()
+ flow_copy.pop('actions')
+ flow_str = ovs_lib._build_flow_expr_str(flow_copy, 'del')
+ dump_flows = self.dump_flows_full_match(flow_str)
+ if dump_flows == '':
+ self.do_action_flows('add', [kwargs])
+ else:
+ self.do_action_flows('mod', [kwargs])
+
+ def add_nsh_tunnel_port(self, port_name, remote_ip, local_ip,
+ tunnel_type=constants.TYPE_GRE,
+ vxlan_udp_port=constants.VXLAN_UDP_PORT,
+ dont_fragment=True,
+ in_nsp=None,
+ in_nsi=None):
+ attrs = [('type', tunnel_type)]
+ # This is an OrderedDict solely to make a test happy
+ options = collections.OrderedDict()
+ vxlan_uses_custom_udp_port = (
+ tunnel_type == constants.TYPE_VXLAN and
+ vxlan_udp_port != constants.VXLAN_UDP_PORT
+ )
+ if vxlan_uses_custom_udp_port:
+ options['dst_port'] = vxlan_udp_port
+ options['df_default'] = str(dont_fragment).lower()
+ options['remote_ip'] = 'flow'
+ options['local_ip'] = local_ip
+ options['in_key'] = 'flow'
+ options['out_key'] = 'flow'
+ if in_nsp is not None and in_nsi is not None:
+ options['nsp'] = str(in_nsp)
+ options['nsi'] = str(in_nsi)
+ elif in_nsp is None and in_nsi is None:
+ options['nsp'] = 'flow'
+ options['nsi'] = 'flow'
+ attrs.append(('options', options))
+ ofport = self.add_port(port_name, *attrs)
+ if (
+ tunnel_type == constants.TYPE_VXLAN and
+ ofport == INVALID_OFPORT
+ ):
+ LOG.error(
+ _LE('Unable to create VXLAN tunnel port for service chain. '
+ 'Please ensure that an openvswitch version that supports '
+ 'VXLAN for service chain is installed.')
+ )
+ return ofport
+
+ def run_ofctl(self, cmd, args, process_input=None):
+ # We need to dump-groups according to group Id,
+ # which is a feature of OpenFlow1.5
+ full_args = [
+ "ovs-ofctl", "-O openflow13", cmd, self.br_name
+ ] + args
+ try:
+ return utils.execute(full_args, run_as_root=True,
+ process_input=process_input)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("Unable to execute %(args)s."),
+ {'args': full_args})
+
+ def do_action_groups(self, action, kwargs_list):
+ group_strs = [_build_group_expr_str(kw, action) for kw in kwargs_list]
+ if action == 'add' or action == 'del':
+ self.run_ofctl('%s-groups' % action, ['-'], '\n'.join(group_strs))
+ elif action == 'mod':
+ self.run_ofctl('%s-group' % action, ['-'], '\n'.join(group_strs))
+ else:
+ msg = _("Action is illegal")
+ raise exceptions.InvalidInput(error_message=msg)
+
+ def add_group(self, **kwargs):
+ self.do_action_groups('add', [kwargs])
+
+ def mod_group(self, **kwargs):
+ self.do_action_groups('mod', [kwargs])
+
+ def delete_group(self, **kwargs):
+ self.do_action_groups('del', [kwargs])
+
+ def dump_group_for_id(self, group_id):
+ retval = None
+ group_str = "%d" % group_id
+ group = self.run_ofctl("dump-groups", [group_str])
+ if group:
+ retval = '\n'.join(item for item in group.splitlines()
+ if 'NXST' not in item)
+ return retval
+
+
+def _build_group_expr_str(group_dict, cmd):
+ group_expr_arr = []
+ buckets = None
+ groupId = None
+
+ if cmd != 'del':
+ if "group_id" not in group_dict:
+ msg = _("Must specify one groupId on groupo addition"
+ " or modification")
+ raise exceptions.InvalidInput(error_message=msg)
+ groupId = "group_id=%s" % group_dict.pop('group_id')
+
+ if "buckets" not in group_dict:
+ msg = _("Must specify one or more buckets on group addition"
+ " or modification")
+ raise exceptions.InvalidInput(error_message=msg)
+ buckets = "%s" % group_dict.pop('buckets')
+
+ if groupId:
+ group_expr_arr.append(groupId)
+
+ for key, value in six.iteritems(group_dict):
+ group_expr_arr.append("%s=%s" % (key, value))
+
+ if buckets:
+ group_expr_arr.append(buckets)
+
+ return ','.join(group_expr_arr)
diff --git a/networking_sfc/services/sfc/driver_manager.py b/networking_sfc/services/sfc/driver_manager.py
new file mode 100644
index 0000000..c8a212a
--- /dev/null
+++ b/networking_sfc/services/sfc/driver_manager.py
@@ -0,0 +1,118 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+from oslo_log import log
+import stevedore
+
+from neutron.i18n import _LE
+from neutron.i18n import _LI
+
+from networking_sfc.services.sfc.common import exceptions as sfc_exc
+
+
+LOG = log.getLogger(__name__)
+cfg.CONF.import_opt('drivers',
+ 'networking_sfc.services.sfc.common.config',
+ group='sfc')
+
+
+class SfcDriverManager(stevedore.named.NamedExtensionManager):
+ """Implementation of SFC drivers."""
+
+ def __init__(self):
+ # Registered sfc drivers, keyed by name.
+ self.drivers = {}
+ # Ordered list of sfc drivers, defining
+ # the order in which the drivers are called.
+ self.ordered_drivers = []
+ LOG.info(_LI("Configured SFC drivers: %s"),
+ cfg.CONF.sfc.drivers)
+ super(SfcDriverManager, self).__init__('networking_sfc.sfc.drivers',
+ cfg.CONF.sfc.drivers,
+ invoke_on_load=True,
+ name_order=True)
+ LOG.info(_LI("Loaded SFC drivers: %s"), self.names())
+ self._register_drivers()
+
+ def _register_drivers(self):
+ """Register all SFC drivers.
+
+ This method should only be called once in the SfcDriverManager
+ constructor.
+ """
+ for ext in self:
+ self.drivers[ext.name] = ext
+ self.ordered_drivers.append(ext)
+ LOG.info(_LI("Registered SFC drivers: %s"),
+ [driver.name for driver in self.ordered_drivers])
+
+ def initialize(self):
+ # ServiceChain bulk operations requires each driver to support them
+ self.native_bulk_support = True
+ for driver in self.ordered_drivers:
+ LOG.info(_LI("Initializing SFC driver '%s'"), driver.name)
+ driver.obj.initialize()
+ self.native_bulk_support &= getattr(driver.obj,
+ 'native_bulk_support', True)
+
+ def _call_drivers(self, method_name, context):
+ """Helper method for calling a method across all SFC drivers.
+
+ :param method_name: name of the method to call
+ :param context: context parameter to pass to each method call
+ :param continue_on_failure: whether or not to continue to call
+ all SFC drivers once one has raised an exception
+ if any SFC driver call fails.
+ """
+ for driver in self.ordered_drivers:
+ try:
+ getattr(driver.obj, method_name)(context)
+ except Exception as e:
+ # This is an internal failure.
+ LOG.exception(e)
+ LOG.error(
+ _LE("SFC driver '%(name)s' failed in %(method)s"),
+ {'name': driver.name, 'method': method_name}
+ )
+ raise sfc_exc.SfcDriverError(
+ method=method_name
+ )
+
+ def create_port_chain(self, context):
+ self._call_drivers("create_port_chain", context)
+
+ def update_port_chain(self, context):
+ self._call_drivers("update_port_chain", context)
+
+ def delete_port_chain(self, context):
+ self._call_drivers("delete_port_chain", context)
+
+ def create_port_pair(self, context):
+ self._call_drivers("create_port_pair", context)
+
+ def update_port_pair(self, context):
+ self._call_drivers("update_port_pair", context)
+
+ def delete_port_pair(self, context):
+ self._call_drivers("delete_port_pair", context)
+
+ def create_port_pair_group(self, context):
+ self._call_drivers("create_port_pair_group", context)
+
+ def update_port_pair_group(self, context):
+ self._call_drivers("update_port_pair_group", context)
+
+ def delete_port_pair_group(self, context):
+ self._call_drivers("delete_port_pair_group", context)
diff --git a/networking_sfc/services/sfc/drivers/__init__.py b/networking_sfc/services/sfc/drivers/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/__init__.py
diff --git a/networking_sfc/services/sfc/drivers/base.py b/networking_sfc/services/sfc/drivers/base.py
new file mode 100644
index 0000000..0816789
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/base.py
@@ -0,0 +1,57 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+import six
+
+
+@six.add_metaclass(abc.ABCMeta)
+class SfcDriverBase(object):
+ """SFC Driver Base Class."""
+
+ @abc.abstractmethod
+ def create_port_chain(self, context):
+ pass
+
+ @abc.abstractmethod
+ def delete_port_chain(self, context):
+ pass
+
+ @abc.abstractmethod
+ def update_port_chain(self, context):
+ pass
+
+ @abc.abstractmethod
+ def create_port_pair(self, context):
+ pass
+
+ @abc.abstractmethod
+ def delete_port_pair(self, context):
+ pass
+
+ @abc.abstractmethod
+ def update_port_pair(self, context):
+ pass
+
+ @abc.abstractmethod
+ def create_port_pair_group(self, context):
+ pass
+
+ @abc.abstractmethod
+ def delete_port_pair_group(self, context):
+ pass
+
+ @abc.abstractmethod
+ def update_port_pair_group(self, context):
+ pass
diff --git a/networking_sfc/services/sfc/drivers/dummy/__init__.py b/networking_sfc/services/sfc/drivers/dummy/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/dummy/__init__.py
diff --git a/networking_sfc/services/sfc/drivers/dummy/dummy.py b/networking_sfc/services/sfc/drivers/dummy/dummy.py
new file mode 100644
index 0000000..1ddd7d0
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/dummy/dummy.py
@@ -0,0 +1,59 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import helpers as log_helpers
+
+from networking_sfc.services.sfc.drivers import base as sfc_driver
+
+
+class DummyDriver(sfc_driver.SfcDriverBase):
+ """SFC Driver Dummy Class."""
+ def initialize(self):
+ pass
+
+ @log_helpers.log_method_call
+ def create_port_chain(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def delete_port_chain(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def update_port_chain(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def create_port_pair_group(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def delete_port_pair_group(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def update_port_pair_group(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def create_port_pair(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def delete_port_pair(self, context):
+ pass
+
+ @log_helpers.log_method_call
+ def update_port_pair(self, context):
+ pass
diff --git a/networking_sfc/services/sfc/drivers/ovs/__init__.py b/networking_sfc/services/sfc/drivers/ovs/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/__init__.py
diff --git a/networking_sfc/services/sfc/drivers/ovs/constants.py b/networking_sfc/services/sfc/drivers/ovs/constants.py
new file mode 100644
index 0000000..30e2c37
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/constants.py
@@ -0,0 +1,57 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.common import constants as n_const
+
+
+INGRESS_DIR = 'ingress'
+EGRESS_DIR = 'egress'
+
+STATUS_BUILDING = 'building'
+STATUS_ACTIVE = 'active'
+STATUS_ERROR = 'error'
+STATUS_DELETING = 'deleting'
+
+
+PORTFLOW_OPT_ADD = 'add-flows'
+PROTFLOW_OPT_DELETE = 'delete-flows'
+PROTFLOW_OPT_UPDATE = 'update-flows'
+
+
+SRC_NODE = 'src_node'
+DST_NODE = 'dst_node'
+SF_NODE = 'sf_node'
+
+RES_TYPE_GROUP = 'group'
+RES_TYPE_NSP = 'nsp'
+
+INSERTION_TYPE_L2 = 'l2'
+INSERTION_TYPE_L3 = 'l3'
+INSERTION_TYPE_BITW = 'bitw'
+INSERTION_TYPE_TAP = 'tap'
+
+MAX_HASH = 16
+
+INSERTION_TYPE_DICT = {
+ n_const.DEVICE_OWNER_ROUTER_HA_INTF: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_ROUTER_INTF: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_ROUTER_GW: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_FLOATINGIP: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_DHCP: INSERTION_TYPE_TAP,
+ n_const.DEVICE_OWNER_DVR_INTERFACE: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_AGENT_GW: INSERTION_TYPE_L3,
+ n_const.DEVICE_OWNER_ROUTER_SNAT: INSERTION_TYPE_TAP,
+ n_const.DEVICE_OWNER_LOADBALANCER: INSERTION_TYPE_TAP,
+ 'compute': INSERTION_TYPE_L2
+}
diff --git a/networking_sfc/services/sfc/drivers/ovs/db.py b/networking_sfc/services/sfc/drivers/ovs/db.py
new file mode 100644
index 0000000..8d3c87d
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/db.py
@@ -0,0 +1,426 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+
+import six
+
+from oslo_log import helpers as log_helpers
+from oslo_log import log as logging
+from oslo_utils import uuidutils
+
+from neutron.common import exceptions as n_exc
+from neutron import context as n_context
+from neutron.db import common_db_mixin
+from neutron.db import model_base
+from neutron.db import models_v2
+
+import sqlalchemy as sa
+from sqlalchemy import orm
+from sqlalchemy.orm import exc
+from sqlalchemy import sql
+
+
+LOG = logging.getLogger(__name__)
+
+
+class PortPairDetailNotFound(n_exc.NotFound):
+ message = _("Portchain port brief %(port_id)s could not be found")
+
+
+class NodeNotFound(n_exc.NotFound):
+ message = _("Portchain node %(node_id)s could not be found")
+
+
+# name changed to ChainPathId
+class UuidIntidAssoc(model_base.BASEV2, models_v2.HasId):
+ __tablename__ = 'sfc_uuid_intid_associations'
+ uuid = sa.Column(sa.String(36), primary_key=True)
+ intid = sa.Column(sa.Integer, unique=True, nullable=False)
+ type_ = sa.Column(sa.String(32), nullable=False)
+
+ def __init__(self, uuid, intid, type_):
+ self.uuid = uuid
+ self.intid = intid
+ self.type_ = type_
+
+
+def singleton(class_):
+ instances = {}
+
+ def getinstance(*args, **kwargs):
+ if class_ not in instances:
+ instances[class_] = class_(*args, **kwargs)
+ return instances[class_]
+ return getinstance
+
+
+@singleton
+class IDAllocation(object):
+ def __init__(self, context):
+ # Get the inital range from conf file.
+ conf_obj = {'group': [1, 255], 'portchain': [256, 65536]}
+ self.conf_obj = conf_obj
+ self.session = context.session
+
+ @log_helpers.log_method_call
+ def assign_intid(self, type_, uuid):
+ query = self.session.query(UuidIntidAssoc).filter_by(
+ type_=type_).order_by(UuidIntidAssoc.intid)
+
+ allocated_int_ids = {obj.intid for obj in query.all()}
+
+ # Find the first one from the available range that
+ # is not in allocated_int_ids
+ start, end = self.conf_obj[type_][0], self.conf_obj[type_][1]+1
+ for init_id in six.moves.range(start, end):
+ if init_id not in allocated_int_ids:
+ with self.session.begin(subtransactions=True):
+ uuid_intid = UuidIntidAssoc(
+ uuid, init_id, type_)
+ self.session.add(uuid_intid)
+ return init_id
+ else:
+ return None
+
+ @log_helpers.log_method_call
+ def get_intid_by_uuid(self, type_, uuid):
+
+ query_obj = self.session.query(UuidIntidAssoc).filter_by(
+ type_=type_, uuid=uuid).first()
+ if query_obj:
+ return query_obj.intid
+ else:
+ return None
+
+ @log_helpers.log_method_call
+ def release_intid(self, type_, intid):
+ """Release int id.
+
+ @param: type_: str
+ @param: intid: int
+ """
+ with self.session.begin(subtransactions=True):
+ query_obj = self.session.query(UuidIntidAssoc).filter_by(
+ intid=intid, type_=type_).first()
+
+ if query_obj:
+ self.session.delete(query_obj)
+
+
+class PathPortAssoc(model_base.BASEV2):
+ """path port association table.
+
+ It represents the association table which associate path_nodes with
+ portpair_details.
+ """
+ __tablename__ = 'sfc_path_port_associations'
+ pathnode_id = sa.Column(sa.String(36),
+ sa.ForeignKey(
+ 'sfc_path_nodes.id', ondelete='CASCADE'),
+ primary_key=True)
+ portpair_id = sa.Column(sa.String(36),
+ sa.ForeignKey('sfc_portpair_details.id',
+ ondelete='CASCADE'),
+ primary_key=True)
+ weight = sa.Column(sa.Integer, nullable=False, default=1)
+
+
+class PortPairDetail(model_base.BASEV2, models_v2.HasId,
+ models_v2.HasTenant):
+ __tablename__ = 'sfc_portpair_details'
+ ingress = sa.Column(sa.String(36), nullable=True)
+ egress = sa.Column(sa.String(36), nullable=True)
+ host_id = sa.Column(sa.String(255), nullable=False)
+ mac_address = sa.Column(sa.String(32), nullable=False)
+ network_type = sa.Column(sa.String(8))
+ segment_id = sa.Column(sa.Integer)
+ local_endpoint = sa.Column(sa.String(64), nullable=False)
+ path_nodes = orm.relationship(PathPortAssoc,
+ backref='port_pair_detail',
+ lazy="joined",
+ cascade='all,delete')
+
+
+class PathNode(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant):
+ __tablename__ = 'sfc_path_nodes'
+ nsp = sa.Column(sa.Integer, nullable=False)
+ nsi = sa.Column(sa.Integer, nullable=False)
+ node_type = sa.Column(sa.String(32))
+ portchain_id = sa.Column(
+ sa.String(255),
+ sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE'))
+ status = sa.Column(sa.String(32))
+ portpair_details = orm.relationship(PathPortAssoc,
+ backref='path_nodes',
+ lazy="joined",
+ cascade='all,delete')
+ next_group_id = sa.Column(sa.Integer)
+ next_hop = sa.Column(sa.String(512))
+
+
+class OVSSfcDriverDB(common_db_mixin.CommonDbMixin):
+ def initialize(self):
+ self.admin_context = n_context.get_admin_context()
+
+ def _make_pathnode_dict(self, node, fields=None):
+ res = {'id': node['id'],
+ 'tenant_id': node['tenant_id'],
+ 'node_type': node['node_type'],
+ 'nsp': node['nsp'],
+ 'nsi': node['nsi'],
+ 'next_group_id': node['next_group_id'],
+ 'next_hop': node['next_hop'],
+ 'portchain_id': node['portchain_id'],
+ 'status': node['status'],
+ 'portpair_details': [pair_detail['portpair_id']
+ for pair_detail in node['portpair_details']
+ ]
+ }
+
+ return self._fields(res, fields)
+
+ def _make_port_detail_dict(self, port, fields=None):
+ res = {'id': port['id'],
+ 'tenant_id': port['tenant_id'],
+ 'host_id': port['host_id'],
+ 'ingress': port.get('ingress', None),
+ 'egress': port.get('egress', None),
+ 'segment_id': port['segment_id'],
+ 'local_endpoint': port['local_endpoint'],
+ 'mac_address': port['mac_address'],
+ 'network_type': port['network_type'],
+ 'path_nodes': [{'pathnode_id': node['pathnode_id'],
+ 'weight': node['weight']}
+ for node in port['path_nodes']]
+ }
+
+ return self._fields(res, fields)
+
+ def _make_pathport_assoc_dict(self, assoc, fields=None):
+ res = {'pathnode_id': assoc['pathnode_id'],
+ 'portpair_id': assoc['portpair_id'],
+ 'weight': assoc['weight'],
+ }
+
+ return self._fields(res, fields)
+
+ def _get_path_node(self, id):
+ try:
+ node = self._get_by_id(self.admin_context, PathNode, id)
+ except exc.NoResultFound:
+ raise NodeNotFound(node_id=id)
+ return node
+
+ def _get_port_detail(self, id):
+ try:
+ port = self._get_by_id(self.admin_context, PortPairDetail, id)
+ except exc.NoResultFound:
+ raise PortPairDetailNotFound(port_id=id)
+ return port
+
+ def create_port_detail(self, port):
+ with self.admin_context.session.begin(subtransactions=True):
+ args = self._filter_non_model_columns(port, PortPairDetail)
+ args['id'] = uuidutils.generate_uuid()
+ port_obj = PortPairDetail(**args)
+ self.admin_context.session.add(port_obj)
+ return self._make_port_detail_dict(port_obj)
+
+ def create_path_node(self, node):
+ with self.admin_context.session.begin(subtransactions=True):
+ args = self._filter_non_model_columns(node, PathNode)
+ args['id'] = uuidutils.generate_uuid()
+ node_obj = PathNode(**args)
+ self.admin_context.session.add(node_obj)
+ return self._make_pathnode_dict(node_obj)
+
+ def create_pathport_assoc(self, assoc):
+ with self.admin_context.session.begin(subtransactions=True):
+ args = self._filter_non_model_columns(assoc, PathPortAssoc)
+ assoc_obj = PathPortAssoc(**args)
+ self.admin_context.session.add(assoc_obj)
+ return self._make_pathport_assoc_dict(assoc_obj)
+
+ def delete_pathport_assoc(self, pathnode_id, portdetail_id):
+ with self.admin_context.session.begin(subtransactions=True):
+ self.admin_context.session.query(PathPortAssoc).filter_by(
+ pathnode_id=pathnode_id,
+ portpair_id=portdetail_id).delete()
+
+ def update_port_detail(self, id, port):
+ with self.admin_context.session.begin(subtransactions=True):
+ port_obj = self._get_port_detail(id)
+ for key, value in six.iteritems(port):
+ if key == 'path_nodes':
+ pns = []
+ for pn in value:
+ pn_id = pn['pathnode_id']
+ self._get_path_node(pn_id)
+ query = self._model_query(
+ self.admin_context, PathPortAssoc)
+ pn_association = query.filter_by(
+ pathnode_id=pn_id,
+ portpair_id=id
+ ).first()
+ if not pn_association:
+ pn_association = PathPortAssoc(
+ pathnode_id=pn_id,
+ portpair_id=id,
+ weight=pn.get('weight', 1)
+ )
+ pns.append(pn_association)
+ port_obj[key] = pns
+ else:
+ port_obj[key] = value
+ port_obj.update(port)
+ return self._make_port_detail_dict(port_obj)
+
+ def update_path_node(self, id, node):
+ with self.admin_context.session.begin(subtransactions=True):
+ node_obj = self._get_path_node(id)
+ for key, value in six.iteritems(node):
+ if key == 'portpair_details':
+ pds = []
+ for pd_id in value:
+ self._get_port_detail(pd_id)
+ query = self._model_query(
+ self.admin_context, PathPortAssoc)
+ pd_association = query.filter_by(
+ pathnode_id=id,
+ portpair_id=pd_id
+ ).first()
+ if not pd_association:
+ pd_association = PathPortAssoc(
+ pathnode_id=id,
+ portpair_id=pd_id
+ )
+ pds.append(pd_association)
+ node_obj[key] = pds
+ else:
+ node_obj[key] = value
+ return self._make_pathnode_dict(node_obj)
+
+ def delete_port_detail(self, id):
+ with self.admin_context.session.begin(subtransactions=True):
+ port_obj = self._get_port_detail(id)
+ self.admin_context.session.delete(port_obj)
+
+ def delete_path_node(self, id):
+ with self.admin_context.session.begin(subtransactions=True):
+ node_obj = self._get_path_node(id)
+ self.admin_context.session.delete(node_obj)
+
+ def get_port_detail(self, id):
+ with self.admin_context.session.begin(subtransactions=True):
+ port_obj = self._get_port_detail(id)
+ return self._make_port_detail_dict(port_obj)
+
+ def get_port_detail_without_exception(self, id):
+ with self.admin_context.session.begin(subtransactions=True):
+ try:
+ port = self._get_by_id(
+ self.admin_context, PortPairDetail, id)
+ except exc.NoResultFound:
+ return None
+ return self._make_port_detail_dict(port)
+
+ def get_path_node(self, id):
+ with self.admin_context.session.begin(subtransactions=True):
+ node_obj = self._get_path_node(id)
+ return self._make_pathnode_dict(node_obj)
+
+ def get_path_nodes_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ with self.admin_context.session.begin(subtransactions=True):
+ qry = self._get_path_nodes_by_filter(
+ filters, fields, sorts, limit,
+ marker, page_reverse
+ )
+ all_items = qry.all()
+ if all_items:
+ return [self._make_pathnode_dict(item) for item in all_items]
+
+ return None
+
+ def get_path_node_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ with self.admin_context.session.begin(subtransactions=True):
+ qry = self._get_path_nodes_by_filter(
+ filters, fields, sorts, limit,
+ marker, page_reverse)
+ first = qry.first()
+ if first:
+ return self._make_pathnode_dict(first)
+
+ return None
+
+ def _get_path_nodes_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ qry = self.admin_context.session.query(PathNode)
+ if filters:
+ for key, value in six.iteritems(filters):
+ column = getattr(PathNode, key, None)
+ if column:
+ if not value:
+ qry = qry.filter(sql.false())
+ else:
+ qry = qry.filter(column == value)
+ return qry
+
+ def get_port_details_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ with self.admin_context.session.begin(subtransactions=True):
+ qry = self._get_port_details_by_filter(
+ filters, fields, sorts, limit,
+ marker, page_reverse)
+ all_items = qry.all()
+ if all_items:
+ return [
+ self._make_port_detail_dict(item)
+ for item in all_items
+ ]
+
+ return None
+
+ def get_port_detail_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ with self.admin_context.session.begin(subtransactions=True):
+ qry = self._get_port_details_by_filter(
+ filters, fields, sorts, limit,
+ marker, page_reverse)
+ first = qry.first()
+ if first:
+ return self._make_port_detail_dict(first)
+
+ return None
+
+ def _get_port_details_by_filter(self, filters=None, fields=None,
+ sorts=None, limit=None, marker=None,
+ page_reverse=False):
+ qry = self.admin_context.session.query(PortPairDetail)
+ if filters:
+ for key, value in six.iteritems(filters):
+ column = getattr(PortPairDetail, key, None)
+ if column:
+ if not value:
+ qry = qry.filter(sql.false())
+ else:
+ qry = qry.filter(column == value)
+
+ return qry
diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py
new file mode 100644
index 0000000..9dfc40d
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/driver.py
@@ -0,0 +1,1076 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+
+# from eventlet import greenthread
+
+from neutron.common import constants as nc_const
+from neutron.common import rpc as n_rpc
+
+from neutron import manager
+
+from neutron.i18n import _LE
+from neutron.i18n import _LW
+
+from neutron.plugins.common import constants as np_const
+
+
+from oslo_log import helpers as log_helpers
+from oslo_log import log as logging
+from oslo_serialization import jsonutils
+
+from networking_sfc.extensions import flowclassifier
+from networking_sfc.extensions import sfc
+from networking_sfc.services.sfc.common import exceptions as exc
+from networking_sfc.services.sfc.drivers import base as driver_base
+from networking_sfc.services.sfc.drivers.ovs import(
+ rpc_topics as sfc_topics)
+from networking_sfc.services.sfc.drivers.ovs import(
+ db as ovs_sfc_db)
+from networking_sfc.services.sfc.drivers.ovs import(
+ rpc as ovs_sfc_rpc)
+from networking_sfc.services.sfc.drivers.ovs import (
+ constants as ovs_const)
+
+
+LOG = logging.getLogger(__name__)
+
+
+class OVSSfcDriver(driver_base.SfcDriverBase,
+ ovs_sfc_db.OVSSfcDriverDB):
+ """Sfc Driver Base Class."""
+
+ def initialize(self):
+ super(OVSSfcDriver, self).initialize()
+ self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient(
+ sfc_topics.SFC_AGENT
+ )
+
+ self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context)
+ self._setup_rpc()
+
+ def _setup_rpc(self):
+ # Setup a rpc server
+ self.topic = sfc_topics.SFC_PLUGIN
+ self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)]
+ self.conn = n_rpc.create_connection(new=True)
+ self.conn.create_consumer(self.topic, self.endpoints, fanout=False)
+ self.conn.consume_in_threads()
+
+ def _get_subnet(self, core_plugin, tenant_id, cidr):
+ filters = {'tenant_id': [tenant_id]}
+ subnets = core_plugin.get_subnets(self.admin_context, filters=filters)
+ cidr_set = netaddr.IPSet([cidr])
+
+ for subnet in subnets:
+ subnet_cidr_set = netaddr.IPSet([subnet['cidr']])
+ if cidr_set.issubset(subnet_cidr_set):
+ return subnet
+
+ def _get_fc_dst_subnet_gw_port(self, fc):
+ core_plugin = manager.NeutronManager.get_plugin()
+ subnet = self._get_subnet(core_plugin,
+ fc['tenant_id'],
+ fc['destination_ip_prefix'])
+
+ return self._get_port_subnet_gw_info(core_plugin, subnet)
+
+ def _get_port_subnet_gw_info_by_port_id(self, id):
+ core_plugin = manager.NeutronManager.get_plugin()
+ subnet = self._get_subnet_by_port(core_plugin, id)
+ return self._get_port_subnet_gw_info(core_plugin,
+ subnet)
+
+ def _get_port_subnet_gw_info(self, core_plugin, subnet):
+ filters = {
+ 'device_owner':
+ [nc_const.DEVICE_OWNER_ROUTER_INTF]
+ }
+ gw_ports = core_plugin.get_ports(self.admin_context, filters=filters)
+ for port in gw_ports:
+ for fixed_ip in port['fixed_ips']:
+ if subnet["id"] == fixed_ip['subnet_id']:
+ return (port['mac_address'],
+ subnet['cidr'],
+ subnet['network_id'])
+
+ raise exc.NoSubnetGateway(
+ type='subnet gateway',
+ cidr=subnet['cidr'])
+
+ def _get_subnet_by_port(self, core_plugin, id):
+ port = core_plugin.get_port(self.admin_context, id)
+ for ip in port['fixed_ips']:
+ subnet = core_plugin.get_subnet(self.admin_context,
+ ip["subnet_id"])
+ # currently only support one subnet for a port
+ break
+
+ return subnet
+
+ @log_helpers.log_method_call
+ def _get_portgroup_members(self, context, pg_id):
+ next_group_members = []
+ group_intid = self.id_pool.get_intid_by_uuid('group', pg_id)
+ LOG.debug('group_intid: %s', group_intid)
+ pg = context._plugin.get_port_pair_group(context._plugin_context,
+ pg_id)
+ for pp_id in pg['port_pairs']:
+ pp = context._plugin.get_port_pair(context._plugin_context, pp_id)
+ filters = {}
+ if pp.get('ingress', None):
+ filters = dict(dict(ingress=pp['ingress']), **filters)
+ if pp.get('egress', None):
+ filters = dict(dict(egress=pp['egress']), **filters)
+ pd = self.get_port_detail_by_filter(filters)
+ if pd:
+ next_group_members.append(
+ dict(portpair_id=pd['id'], weight=1))
+ return group_intid, next_group_members
+
+ def _get_port_pair_detail_by_port_pair(self, context, port_pair_id):
+ pp = context._plugin.get_port_pair(context._plugin_context,
+ port_pair_id)
+ filters = {}
+ if pp.get('ingress', None):
+ filters = dict(dict(ingress=pp['ingress']), **filters)
+ if pp.get('egress', None):
+ filters = dict(dict(egress=pp['egress']), **filters)
+ pd = self.get_port_detail_by_filter(filters)
+
+ return pd
+
+ @log_helpers.log_method_call
+ def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id,
+ src_node, dst_node,
+ last_sf_node=None):
+ dst_ports = []
+ for fc in self._get_fcs_by_ids(fc_ids):
+ if fc.get('logical_source_port', ''):
+ # lookup the source port
+ src_pd_filter = dict(egress=fc['logical_source_port'],
+ tenant_id=tenant_id
+ )
+ src_pd = self.get_port_detail_by_filter(src_pd_filter)
+
+ if not src_pd:
+ # Create source port detail
+ src_pd = self._create_port_detail(src_pd_filter)
+ LOG.debug('create src port detail: %s', src_pd)
+
+ # Create associate relationship
+ assco_args = {'portpair_id': src_pd['id'],
+ 'pathnode_id': src_node['id'],
+ 'weight': 1,
+ }
+ sna = self.create_pathport_assoc(assco_args)
+ LOG.debug('create assoc src port with node: %s', sna)
+ src_node['portpair_details'].append(src_pd['id'])
+
+ if fc.get('logical_destination_port', ''):
+ dst_pd_filter = dict(ingress=fc['logical_destination_port'],
+ tenant_id=tenant_id
+ )
+ dst_pd = self.get_port_detail_by_filter(dst_pd_filter)
+
+ if not dst_pd:
+ # Create dst port detail
+ dst_pd = self._create_port_detail(dst_pd_filter)
+ LOG.debug('create dst port detail: %s', dst_pd)
+
+ # Create associate relationship
+ dst_assco_args = {'portpair_id': dst_pd['id'],
+ 'pathnode_id': dst_node['id'],
+ 'weight': 1,
+ }
+ dna = self.create_pathport_assoc(dst_assco_args)
+ LOG.debug('create assoc dst port with node: %s', dna)
+ dst_node['portpair_details'].append(dst_pd['id'])
+
+ dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1))
+
+ if last_sf_node:
+ if last_sf_node['next_hop']:
+ next_hops = jsonutils.loads(last_sf_node['next_hop'])
+ next_hops.extend(dst_ports)
+ last_sf_node['next_hop'] = jsonutils.dumps(next_hops)
+ # update nexthop info of pre node
+ self.update_path_node(last_sf_node['id'],
+ last_sf_node)
+ return dst_ports
+
+ def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id,
+ src_node=None, dst_node=None,
+ last_sf_node=None):
+ if not fc_ids:
+ return
+ for fc in self._get_fcs_by_ids(fc_ids):
+ if fc.get('logical_source_port', ''):
+ # delete source port detail
+ src_pd_filter = dict(egress=fc['logical_source_port'],
+ tenant_id=tenant_id
+ )
+ pds = self.get_port_details_by_filter(src_pd_filter)
+ if pds:
+ for pd in pds:
+ # update src_node portpair_details refence info
+ if src_node and pd['id'] in src_node[
+ 'portpair_details'
+ ]:
+ src_node['portpair_details'].remove(pd['id'])
+ if len(pd['path_nodes']) == 1:
+ self.delete_port_detail(pd['id'])
+
+ if fc.get('logical_destination_port', ''):
+ # Create dst port detail
+ dst_pd_filter = dict(ingress=fc['logical_destination_port'],
+ tenant_id=tenant_id
+ )
+ pds = self.get_port_details_by_filter(dst_pd_filter)
+ if pds:
+ for pd in pds:
+ # update dst_node portpair_details refence info
+ if dst_node and pd['id'] in dst_node[
+ 'portpair_details'
+ ]:
+ # update portpair_details of this node
+ dst_node['portpair_details'].remove(pd['id'])
+ # update last hop(SF-group) next hop info
+ if last_sf_node:
+ next_hop = dict(portpair_id=pd['id'],
+ weight=1)
+ next_hops = jsonutils.loads(
+ last_sf_node['next_hop'])
+ next_hops.remove(next_hop)
+ last_sf_node['next_hop'] = jsonutils.dumps(
+ next_hops)
+ if len(pd['path_nodes']) == 1:
+ self.delete_port_detail(pd['id'])
+
+ if last_sf_node:
+ # update nexthop info of pre node
+ self.update_path_node(last_sf_node['id'],
+ last_sf_node)
+
+ @log_helpers.log_method_call
+ def _create_portchain_path(self, context, port_chain):
+ src_node, src_pd, dst_node, dst_pd = (({}, ) * 4)
+ path_nodes, dst_ports = [], []
+ # Create an assoc object for chain_id and path_id
+ # context = context._plugin_context
+ path_id = self.id_pool.assign_intid('portchain', port_chain['id'])
+
+ if not path_id:
+ LOG.error(_LE('No path_id available for creating port chain path'))
+ return
+
+ next_group_intid, next_group_members = self._get_portgroup_members(
+ context, port_chain['port_pair_groups'][0])
+
+ port_pair_groups = port_chain['port_pair_groups']
+ sf_path_length = len(port_pair_groups)
+ # Create a head node object for port chain
+ src_args = {'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.SRC_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xff,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': next_group_intid,
+ 'next_hop': jsonutils.dumps(next_group_members),
+ }
+ src_node = self.create_path_node(src_args)
+ LOG.debug('create src node: %s', src_node)
+ path_nodes.append(src_node)
+
+ # Create a destination node object for port chain
+ dst_args = {
+ 'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.DST_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xff - sf_path_length - 1,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': None,
+ 'next_hop': None
+ }
+ dst_node = self.create_path_node(dst_args)
+ LOG.debug('create dst node: %s', dst_node)
+ path_nodes.append(dst_node)
+
+ dst_ports = self._add_flowclassifier_port_assoc(
+ port_chain['flow_classifiers'],
+ port_chain['tenant_id'],
+ src_node,
+ dst_node
+ )
+
+ for i in range(sf_path_length):
+ cur_group_members = next_group_members
+ # next_group for next hop
+ if i < sf_path_length - 1:
+ next_group_intid, next_group_members = (
+ self._get_portgroup_members(
+ context, port_pair_groups[i + 1])
+ )
+ else:
+ next_group_intid = None
+ next_group_members = None if not dst_ports else dst_ports
+
+ # Create a node object
+ node_args = {
+ 'tenant_id': port_chain['tenant_id'],
+ 'node_type': ovs_const.SF_NODE,
+ 'nsp': path_id,
+ 'nsi': 0xfe - i,
+ 'portchain_id': port_chain['id'],
+ 'status': ovs_const.STATUS_BUILDING,
+ 'next_group_id': next_group_intid,
+ 'next_hop': (
+ None if not next_group_members else
+ jsonutils.dumps(next_group_members)
+ )
+ }
+ sf_node = self.create_path_node(node_args)
+ LOG.debug('chain path node: %s', sf_node)
+ # Create the assocation objects that combine the pathnode_id with
+ # the ingress of the port_pairs in the current group
+ # when port_group does not reach tail
+ for member in cur_group_members:
+ assco_args = {'portpair_id': member['portpair_id'],
+ 'pathnode_id': sf_node['id'],
+ 'weight': member['weight'], }
+ sfna = self.create_pathport_assoc(assco_args)
+ LOG.debug('create assoc port with node: %s', sfna)
+ sf_node['portpair_details'].append(member['portpair_id'])
+ path_nodes.append(sf_node)
+
+ return path_nodes
+
+ def _delete_path_node_port_flowrule(self, node, port, fc_ids):
+ # if this port is not binding, don't to generate flow rule
+ if not port['host_id']:
+ return
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ port,
+ None,
+ fc_ids)
+
+ self.ovs_driver_rpc.ask_agent_to_delete_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _delete_path_node_flowrule(self, node, fc_ids):
+ for each in node['portpair_details']:
+ port = self.get_port_detail_by_filter(dict(id=each))
+ if port:
+ self._delete_path_node_port_flowrule(
+ node, port, fc_ids)
+
+ @log_helpers.log_method_call
+ def _delete_portchain_path(self, context, portchain_id):
+ port_chain = context.current
+ first = self.get_path_node_by_filter(
+ filters={
+ 'portchain_id': portchain_id,
+ 'nsi': 0xff
+ }
+ )
+
+ # delete flow rules which source port isn't assigned
+ # in flow classifier
+ if first:
+ self._delete_src_node_flowrules(
+ first,
+ port_chain['flow_classifiers']
+ )
+
+ pds = self.get_path_nodes_by_filter(
+ dict(portchain_id=portchain_id))
+ if pds:
+ for pd in pds:
+ self._delete_path_node_flowrule(
+ pd,
+ port_chain['flow_classifiers']
+ )
+ self.delete_path_node(pd['id'])
+
+ # delete the ports on the traffic classifier
+ self._remove_flowclassifier_port_assoc(
+ port_chain['flow_classifiers'],
+ port_chain['tenant_id']
+ )
+
+ # Delete the chainpathpair
+ intid = self.id_pool.get_intid_by_uuid(
+ 'portchain', portchain_id)
+ self.id_pool.release_intid('portchain', intid)
+
+ def _update_path_node_next_hops(self, flow_rule):
+ node_next_hops = []
+ if not flow_rule['next_hop']:
+ return None
+ next_hops = jsonutils.loads(flow_rule['next_hop'])
+ if not next_hops:
+ return None
+ for member in next_hops:
+ detail = {}
+ port_detail = self.get_port_detail_by_filter(
+ dict(id=member['portpair_id']))
+ if not port_detail or not port_detail['host_id']:
+ continue
+ detail['local_endpoint'] = port_detail['local_endpoint']
+ detail['weight'] = member['weight']
+ detail['mac_address'] = port_detail['mac_address']
+ detail['ingress'] = port_detail['ingress']
+ node_next_hops.append(detail)
+
+ mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id(
+ detail['ingress']
+ )
+
+ detail['gw_mac'] = mac
+ detail['cidr'] = cidr
+ detail['net_uuid'] = net_uuid
+
+ flow_rule['next_hops'] = node_next_hops
+ flow_rule.pop('next_hop')
+
+ return node_next_hops
+
+ def _build_portchain_flowrule_body(self, node, port,
+ add_fc_ids=None, del_fc_ids=None):
+ node_info = node.copy()
+ node_info.pop('tenant_id')
+ node_info.pop('portpair_details')
+
+ port_info = port.copy()
+ port_info.pop('tenant_id')
+ port_info.pop('id')
+ port_info.pop('path_nodes')
+ port_info.pop('host_id')
+
+ flow_rule = dict(node_info, **port_info)
+ # if this port is belong to NSH/MPLS-aware vm, only to
+ # notify the flow classifier for 1st SF.
+ flow_rule['add_fcs'] = self._filter_flow_classifiers(
+ flow_rule, add_fc_ids)
+ flow_rule['del_fcs'] = self._filter_flow_classifiers(
+ flow_rule, del_fc_ids)
+
+ self._update_portchain_group_reference_count(flow_rule,
+ port['host_id'])
+
+ # update next hop info
+ self._update_path_node_next_hops(flow_rule)
+
+ return flow_rule
+
+ def _filter_flow_classifiers(self, flow_rule, fc_ids):
+ """Filter flow classifiers.
+
+ @return: list of the flow classifiers
+ """
+
+ fc_return = []
+
+ if not fc_ids:
+ return fc_return
+ fcs = self._get_fcs_by_ids(fc_ids)
+ for fc in fcs:
+ new_fc = fc.copy()
+ new_fc.pop('id')
+ new_fc.pop('name')
+ new_fc.pop('tenant_id')
+ new_fc.pop('description')
+
+ if ((flow_rule['node_type'] == ovs_const.SRC_NODE and
+ flow_rule['egress'] == fc['logical_source_port']
+ ) or
+ (flow_rule['node_type'] == ovs_const.DST_NODE and
+ flow_rule['ingress'] == fc['logical_destination_port']
+ )):
+ fc_return.append(new_fc)
+ elif flow_rule['node_type'] == ovs_const.SF_NODE:
+ fc_return.append(new_fc)
+
+ return fc_return
+
+ def _update_path_node_port_flowrules(self, node, port,
+ add_fc_ids=None, del_fc_ids=None):
+ # if this port is not binding, don't to generate flow rule
+ if not port['host_id']:
+ return
+
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ port,
+ add_fc_ids,
+ del_fc_ids)
+
+ self.ovs_driver_rpc.ask_agent_to_update_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _update_path_node_flowrules(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ if node['portpair_details'] is None:
+ return
+ for each in node['portpair_details']:
+ port = self.get_port_detail_by_filter(dict(id=each))
+ if port:
+ self._update_path_node_port_flowrules(
+ node, port, add_fc_ids, del_fc_ids)
+
+ def _thread_update_path_nodes(self, nodes,
+ add_fc_ids=None, del_fc_ids=None):
+ for node in nodes:
+ self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids)
+ self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids)
+
+ def _get_portchain_fcs(self, port_chain):
+ return self._get_fcs_by_ids(port_chain['flow_classifiers'])
+
+ def _get_fcs_by_ids(self, fc_ids):
+ flow_classifiers = []
+ if not fc_ids:
+ return flow_classifiers
+
+ # Get the portchain flow classifiers
+ fc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ flowclassifier.FLOW_CLASSIFIER_EXT)
+ )
+ if not fc_plugin:
+ LOG.warn(_LW("Not found the flow classifier service plugin"))
+ return flow_classifiers
+
+ for fc_id in fc_ids:
+ fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id)
+ flow_classifiers.append(fc)
+
+ return flow_classifiers
+
+ @log_helpers.log_method_call
+ def create_port_chain(self, context):
+ port_chain = context.current
+ path_nodes = self._create_portchain_path(context, port_chain)
+
+ # notify agent with async thread
+ # current we don't use greenthread.spawn
+ self._thread_update_path_nodes(
+ path_nodes,
+ port_chain['flow_classifiers'],
+ None)
+
+ @log_helpers.log_method_call
+ def delete_port_chain(self, context):
+ port_chain = context.current
+ portchain_id = port_chain['id']
+ LOG.debug("to delete portchain path")
+ self._delete_portchain_path(context, portchain_id)
+
+ def _get_diff_set(self, orig, cur):
+ orig_set = set(item for item in orig)
+ cur_set = set(item for item in cur)
+
+ to_del = orig_set.difference(cur_set)
+ to_add = cur_set.difference(orig_set)
+
+ return to_del, to_add
+
+ @log_helpers.log_method_call
+ def update_port_chain(self, context):
+ port_chain = context.current
+ orig = context.original
+
+ del_fc_ids, add_fc_ids = self._get_diff_set(
+ orig['flow_classifiers'],
+ port_chain['flow_classifiers']
+ )
+ path_nodes = self.get_path_nodes_by_filter(
+ dict(portchain_id=port_chain['id'])
+ )
+ if not path_nodes:
+ return
+
+ sort_path_nodes = sorted(path_nodes,
+ key=lambda x: x['nsi'],
+ reverse=True)
+ if del_fc_ids:
+ self._thread_update_path_nodes(sort_path_nodes,
+ None,
+ del_fc_ids)
+ self._remove_flowclassifier_port_assoc(del_fc_ids,
+ port_chain['tenant_id'],
+ sort_path_nodes[0],
+ sort_path_nodes[-1],
+ sort_path_nodes[-2])
+
+ if add_fc_ids:
+ self._add_flowclassifier_port_assoc(add_fc_ids,
+ port_chain['tenant_id'],
+ sort_path_nodes[0],
+ sort_path_nodes[-1],
+ sort_path_nodes[-2])
+
+ # notify agent with async thread
+ # current we don't use greenthread.spawn
+ self._thread_update_path_nodes(sort_path_nodes,
+ add_fc_ids,
+ None)
+
+ @log_helpers.log_method_call
+ def create_port_pair_group(self, context):
+ group = context.current
+ self.id_pool.assign_intid('group', group['id'])
+
+ @log_helpers.log_method_call
+ def delete_port_pair_group(self, context):
+ group = context.current
+ group_intid = self.id_pool.get_intid_by_uuid('group', group['id'])
+ if group_intid:
+ self.id_pool.release_intid('group', group_intid)
+
+ @log_helpers.log_method_call
+ def update_port_pair_group(self, context):
+ current = context.current
+ original = context.original
+
+ if set(current['port_pairs']) == set(original['port_pairs']):
+ return
+
+ # Update the path_nodes and flows for each port chain that
+ # contains this port_pair_group
+ # Note: _get_port_pair_group is temporarily used here.
+ ppg_obj = context._plugin._get_port_pair_group(context._plugin_context,
+ current['id'])
+ port_chains = [assoc.portchain_id for assoc in
+ ppg_obj.chain_group_associations]
+
+ for chain_id in port_chains:
+ port_chain = context._plugin.get_port_chain(
+ context._plugin_context, chain_id)
+ group_intid = self.id_pool.get_intid_by_uuid('group',
+ current['id'])
+ # Get the previous node
+ prev_node = self.get_path_node_by_filter(
+ filters={'portchain_id': chain_id,
+ 'next_group_id': group_intid})
+ if not prev_node:
+ continue
+
+ before_update_prev_node = prev_node.copy()
+ # Update the previous node
+ _, curr_group_members = self._get_portgroup_members(context,
+ current['id'])
+ prev_node['next_hop'] = (
+ jsonutils.dumps(curr_group_members)
+ if curr_group_members else None
+ )
+ # update next hop to database
+ self.update_path_node(prev_node['id'], prev_node)
+ if prev_node['node_type'] == ovs_const.SRC_NODE:
+ self._delete_src_node_flowrules(
+ before_update_prev_node, port_chain['flow_classifiers'])
+ self._update_src_node_flowrules(
+ prev_node, port_chain['flow_classifiers'], None)
+ else:
+ self._delete_path_node_flowrule(
+ before_update_prev_node, port_chain['flow_classifiers'])
+ self._update_path_node_flowrules(
+ prev_node, port_chain['flow_classifiers'], None)
+
+ # Update the current node
+ # to find the current node by using the node's next_group_id
+ # if this node is the last, next_group_id would be None
+ curr_pos = port_chain['port_pair_groups'].index(current['id'])
+ curr_node = self.get_path_node_by_filter(
+ filters={'portchain_id': chain_id,
+ 'nsi': 0xfe - curr_pos})
+ if not curr_node:
+ continue
+
+ # Add the port-pair-details into the current node
+ for pp_id in (
+ set(current['port_pairs']) - set(original['port_pairs'])
+ ):
+ ppd = self._get_port_pair_detail_by_port_pair(context,
+ pp_id)
+ if not ppd:
+ LOG.debug('No port_pair_detail for the port_pair: %s',
+ pp_id)
+ LOG.debug("Failed to update port-pair-group")
+ return
+
+ assco_args = {'portpair_id': ppd['id'],
+ 'pathnode_id': curr_node['id'],
+ 'weight': 1, }
+ self.create_pathport_assoc(assco_args)
+ self._update_path_node_port_flowrules(
+ curr_node, ppd, port_chain['flow_classifiers'])
+
+ # Delete the port-pair-details from the current node
+ for pp_id in (
+ set(original['port_pairs']) - set(current['port_pairs'])
+ ):
+ ppd = self._get_port_pair_detail_by_port_pair(context,
+ pp_id)
+ if not ppd:
+ LOG.debug('No port_pair_detail for the port_pair: %s',
+ pp_id)
+ LOG.debug("Failed to update port-pair-group")
+ return
+ self._delete_path_node_port_flowrule(
+ curr_node, ppd, port_chain['flow_classifiers'])
+ self.delete_pathport_assoc(curr_node['id'], ppd['id'])
+
+ @log_helpers.log_method_call
+ def _get_portpair_detail_info(self, portpair_id):
+ """Get port detail.
+
+ @param: portpair_id: uuid
+ @return: (host_id, local_ip, network_type, segment_id,
+ service_insert_type): tuple
+ """
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ port_detail = core_plugin.get_port(self.admin_context, portpair_id)
+ host_id, local_ip, network_type, segment_id, mac_address = (
+ (None, ) * 5)
+
+ if port_detail:
+ host_id = port_detail['binding:host_id']
+ network_id = port_detail['network_id']
+ mac_address = port_detail['mac_address']
+ network_info = core_plugin.get_network(
+ self.admin_context, network_id)
+ network_type = network_info['provider:network_type']
+ segment_id = network_info['provider:segmentation_id']
+
+ if (
+ host_id and
+ network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN]
+ ):
+ driver = core_plugin.type_manager.drivers.get(network_type)
+ host_endpoint = driver.obj.get_endpoint_by_host(host_id)
+ local_ip = host_endpoint['ip_address']
+
+ return host_id, local_ip, network_type, segment_id, mac_address
+
+ @log_helpers.log_method_call
+ def _create_port_detail(self, port_pair):
+ # since first node may not assign the ingress port, and last node may
+ # not assign the egress port. we use one of the
+ # port as the key to get the SF information.
+ port = None
+ if port_pair.get('ingress', None):
+ port = port_pair['ingress']
+ elif port_pair.get('egress', None):
+ port = port_pair['egress']
+
+ host_id, local_endpoint, network_type, segment_id, mac_address = (
+ self._get_portpair_detail_info(port))
+ port_detail = {
+ 'ingress': port_pair.get('ingress', None),
+ 'egress': port_pair.get('egress', None),
+ 'tenant_id': port_pair['tenant_id'],
+ 'host_id': host_id,
+ 'segment_id': segment_id,
+ 'network_type': network_type,
+ 'local_endpoint': local_endpoint,
+ 'mac_address': mac_address
+ }
+ r = self.create_port_detail(port_detail)
+ LOG.debug('create port detail: %s', r)
+ return r
+
+ @log_helpers.log_method_call
+ def create_port_pair(self, context):
+ port_pair = context.current
+ self._create_port_detail(port_pair)
+
+ @log_helpers.log_method_call
+ def delete_port_pair(self, context):
+ port_pair = context.current
+
+ pd_filter = dict(ingress=port_pair.get('ingress', None),
+ egress=port_pair.get('egress', None),
+ tenant_id=port_pair['tenant_id']
+ )
+ pds = self.get_port_details_by_filter(pd_filter)
+ if pds:
+ for pd in pds:
+ self.delete_port_detail(pd['id'])
+
+ @log_helpers.log_method_call
+ def update_port_pair(self, context):
+ pass
+
+ def get_flowrules_by_host_portid(self, context, host, port_id):
+ port_chain_flowrules = []
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return port_chain_flowrules
+ try:
+ port_detail_list = []
+ # one port only may be in egress/ingress port once time.
+ ingress_port = self.get_port_detail_by_filter(
+ dict(ingress=port_id))
+ egress_port = self.get_port_detail_by_filter(
+ dict(egress=port_id))
+ if not ingress_port and not egress_port:
+ return None
+ # SF migrate to other host
+ if ingress_port:
+ port_detail_list.append(ingress_port)
+ if ingress_port['host_id'] != host:
+ ingress_port.update(dict(host_id=host))
+
+ if egress_port:
+ port_detail_list.append(egress_port)
+ if egress_port['host_id'] != host:
+ egress_port.update(dict(host_id=host))
+
+ # this is a SF if there are both egress and engress.
+ for i, ports in enumerate(port_detail_list):
+ nodes_assocs = ports['path_nodes']
+ for assoc in nodes_assocs:
+ # update current path flow rule
+ node = self.get_path_node(assoc['pathnode_id'])
+ port_chain = sfc_plugin.get_port_chain(
+ context,
+ node['portchain_id'])
+ flow_rule = self._build_portchain_flowrule_body(
+ node,
+ ports,
+ add_fc_ids=port_chain['flow_classifiers']
+ )
+ port_chain_flowrules.append(flow_rule)
+
+ # update the pre-path node flow rule
+ # if node['node_type'] != ovs_const.SRC_NODE:
+ # node_filter = dict(nsp=node['nsp'],
+ # nsi=node['nsi'] + 1
+ # )
+ # pre_node_list = self.get_path_nodes_by_filter(
+ # node_filter)
+ # if not pre_node_list:
+ # continue
+ # for pre_node in pre_node_list:
+ # self._update_path_node_flowrules(
+ # pre_node,
+ # add_fc_ids=port_chain['flow_classifiers'])
+
+ return port_chain_flowrules
+
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_flowrules_by_host_portid failed"))
+
+ def get_flow_classifier_by_portchain_id(self, context, portchain_id):
+ try:
+ flow_classifier_list = []
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return []
+
+ port_chain = sfc_plugin.get_port_chain(
+ context,
+ portchain_id)
+ flow_classifier_list = self._get_portchain_fcs(port_chain)
+ return flow_classifier_list
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_flow_classifier_by_portchain_id failed"))
+
+ def update_flowrule_status(self, context, id, status):
+ try:
+ flowrule_status = dict(status=status)
+ self.update_path_node(id, flowrule_status)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("update_flowrule_status failed"))
+
+ def _update_src_node_flowrules(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ flow_rule = self._get_portchain_src_node_flowrule(node,
+ add_fc_ids,
+ del_fc_ids)
+ if not flow_rule:
+ return
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ pc_agents = core_plugin.get_agents(
+ self.admin_context,
+ filters={'agent_type': [nc_const.AGENT_TYPE_OVS]})
+ if not pc_agents:
+ return
+
+ for agent in pc_agents:
+ if agent['alive']:
+ # update host info to flow rule
+ flow_rule['host'] = agent['host']
+ self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def _delete_src_node_flowrules(self, node, del_fc_ids=None):
+ flow_rule = self._get_portchain_src_node_flowrule(node,
+ None, del_fc_ids)
+ if not flow_rule:
+ return
+
+ core_plugin = manager.NeutronManager.get_plugin()
+ pc_agents = core_plugin.get_agents(
+ self.admin_context, filters={
+ 'agent_type': [nc_const.AGENT_TYPE_OVS]})
+ if not pc_agents:
+ return
+
+ for agent in pc_agents:
+ if agent['alive']:
+ # update host info to flow rule
+ self._update_portchain_group_reference_count(flow_rule,
+ agent['host'])
+ self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules(
+ self.admin_context,
+ flow_rule)
+
+ def get_all_src_node_flowrules(self, context):
+ sfc_plugin = (
+ manager.NeutronManager.get_service_plugins().get(
+ sfc.SFC_EXT
+ )
+ )
+ if not sfc_plugin:
+ return []
+ try:
+ frs = []
+ port_chains = sfc_plugin.get_port_chains(context)
+
+ for port_chain in port_chains:
+ # get the first node of this chain
+ node_filters = dict(portchain_id=port_chain['id'], nsi=0xff)
+ portchain_node = self.get_path_node_by_filter(node_filters)
+ if not portchain_node:
+ continue
+ flow_rule = self._get_portchain_src_node_flowrule(
+ portchain_node,
+ port_chain['flow_classifiers']
+ )
+ if not flow_rule:
+ continue
+ frs.append(flow_rule)
+ return frs
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("get_all_src_node_flowrules failed"))
+
+ def _get_portchain_src_node_flowrule(self, node,
+ add_fc_ids=None, del_fc_ids=None):
+ try:
+ add_fc_rt = []
+ del_fc_rt = []
+
+ if add_fc_ids:
+ for fc in self._get_fcs_by_ids(add_fc_ids):
+ if not fc.get('logical_source_port', None):
+ add_fc_rt.append(fc)
+
+ if del_fc_ids:
+ for fc in self._get_fcs_by_ids(del_fc_ids):
+ if not fc.get('logical_source_port', None):
+ del_fc_rt.append(fc)
+
+ if not add_fc_rt and not del_fc_rt:
+ return None
+
+ return self._build_portchain_flowrule_body_without_port(
+ node, add_fc_rt, del_fc_rt)
+ except Exception as e:
+ LOG.exception(e)
+ LOG.error(_LE("_get_portchain_src_node_flowrule failed"))
+
+ def _update_portchain_group_reference_count(self, flow_rule, host):
+ group_refcnt = 0
+ flow_rule['host'] = host
+
+ if flow_rule['next_group_id'] is not None:
+ all_nodes = self.get_path_nodes_by_filter(
+ filters={'next_group_id': flow_rule['next_group_id'],
+ 'nsi': 0xff})
+ if all_nodes is not None:
+ for node in all_nodes:
+ if not node['portpair_details']:
+ group_refcnt += 1
+
+ port_details = self.get_port_details_by_filter(
+ dict(host_id=flow_rule['host']))
+ if port_details is not None:
+ for pd in port_details:
+ for path in pd['path_nodes']:
+ path_node = self.get_path_node(path['pathnode_id'])
+ if (
+ path_node['next_group_id'] ==
+ flow_rule['next_group_id']
+ ):
+ group_refcnt += 1
+
+ flow_rule['group_refcnt'] = group_refcnt
+
+ return group_refcnt
+
+ def _build_portchain_flowrule_body_without_port(self,
+ node,
+ add_fcs=None,
+ del_fcs=None):
+ flow_rule = node.copy()
+ flow_rule.pop('tenant_id')
+ flow_rule.pop('portpair_details')
+
+ # according to the first sf node get network information
+ if not node['next_hop']:
+ return None
+
+ next_hops = jsonutils.loads(node['next_hop'])
+ if not next_hops:
+ return None
+
+ port_detail = self.get_port_detail_by_filter(
+ dict(id=next_hops[0]['portpair_id']))
+ if not port_detail:
+ return None
+
+ flow_rule['ingress'] = None
+ flow_rule['egress'] = None
+ flow_rule['network_type'] = port_detail['network_type']
+ flow_rule['segment_id'] = port_detail['segment_id']
+
+ flow_rule['add_fcs'] = add_fcs
+ flow_rule['del_fcs'] = del_fcs
+
+ # update next hop info
+ self._update_path_node_next_hops(flow_rule)
+ return flow_rule
diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc.py b/networking_sfc/services/sfc/drivers/ovs/rpc.py
new file mode 100644
index 0000000..a5ac0bc
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/rpc.py
@@ -0,0 +1,112 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics
+from neutron.common import rpc as n_rpc
+from neutron.common import topics
+from neutron.i18n import _LI
+
+from oslo_log import log as logging
+import oslo_messaging
+
+LOG = logging.getLogger(__name__)
+
+
+class SfcRpcCallback(object):
+ """Sfc RPC server."""
+
+ def __init__(self, driver):
+ self.target = oslo_messaging.Target(version='1.0')
+ self.driver = driver
+
+ def get_flowrules_by_host_portid(self, context, **kwargs):
+ host = kwargs.get('host')
+ port_id = kwargs.get('port_id')
+ LOG.debug('from port-chain service plugin')
+ pcfrs = self.driver.get_flowrules_by_host_portid(
+ context, host, port_id)
+ LOG.debug('host: %s, port_id: %s', host, port_id)
+ return pcfrs
+
+ def get_flow_classifier_by_portchain_id(self, context, **kwargs):
+ portchain_id = kwargs.get('portchain_id')
+ pcfcs = self.driver.get_flow_classifier_by_portchain_id(
+ context,
+ portchain_id)
+ LOG.debug('portchain id: %s', portchain_id)
+ return pcfcs
+
+ def get_all_src_node_flowrules(self, context, **kwargs):
+ host = kwargs.get('host')
+ pcfcs = self.driver.get_all_src_node_flowrules(
+ context)
+ LOG.debug('portchain get_src_node_flowrules, host: %s', host)
+ return pcfcs
+
+ def update_flowrules_status(self, context, **kwargs):
+ flowrules_status = kwargs.get('flowrules_status')
+ LOG.info(_LI('update_flowrules_status: %s'), flowrules_status)
+ for flowrule_dict in flowrules_status:
+ self.driver.update_flowrule_status(
+ context, flowrule_dict['id'], flowrule_dict['status'])
+
+
+class SfcAgentRpcClient(object):
+ """RPC client for ovs sfc agent."""
+
+ def __init__(self, topic=sfc_topics.SFC_AGENT):
+ self.topic = topic
+ target = oslo_messaging.Target(topic=topic, version='1.0')
+ self.client = n_rpc.get_client(target)
+
+ def ask_agent_to_update_flow_rules(self, context, flows):
+ LOG.debug('Ask agent on the specific host to update flows ')
+ LOG.debug('flows: %s', flows)
+ host = flows.get('host')
+ cctxt = self.client.prepare(
+ topic=topics.get_topic_name(
+ self.topic, sfc_topics.PORTFLOW, topics.UPDATE),
+ server=host)
+ cctxt.cast(context, 'update_flow_rules', flowrule_entries=flows)
+
+ def ask_agent_to_delete_flow_rules(self, context, flows):
+ LOG.debug('Ask agent on the specific host to delete flows ')
+ LOG.debug('flows: %s', flows)
+ host = flows.get('host')
+ cctxt = self.client.prepare(
+ topic=topics.get_topic_name(
+ self.topic, sfc_topics.PORTFLOW, topics.DELETE),
+ server=host)
+ cctxt.cast(context, 'delete_flow_rules', flowrule_entries=flows)
+
+ def ask_agent_to_update_src_node_flow_rules(self, context, flows):
+ LOG.debug('Ask agent on the specific host to update src node flows ')
+ LOG.debug('flows: %s', flows)
+ host = flows.get('host')
+ cctxt = self.client.prepare(
+ topic=topics.get_topic_name(
+ self.topic, sfc_topics.PORTFLOW, topics.UPDATE),
+ server=host)
+ cctxt.cast(context, 'update_src_node_flow_rules',
+ flowrule_entries=flows)
+
+ def ask_agent_to_delete_src_node_flow_rules(self, context, flows):
+ LOG.debug('Ask agent on the specific host to delete src node flows')
+ LOG.debug('flows: %s', flows)
+ host = flows.get('host')
+ cctxt = self.client.prepare(
+ topic=topics.get_topic_name(
+ self.topic, sfc_topics.PORTFLOW, topics.DELETE),
+ server=host)
+ cctxt.cast(context, 'delete_src_node_flow_rules',
+ flowrule_entries=flows)
diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py
new file mode 100644
index 0000000..a35ff4f
--- /dev/null
+++ b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py
@@ -0,0 +1,21 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+AGENT = 'q-agent-notifier'
+
+SFC_PLUGIN = 'q-sfc-plugin'
+SFC_AGENT = 'q-sfc-agent'
+SFC_FLOW = 'q-sfc-flow'
+
+PORTFLOW = 'portflowrule'
diff --git a/networking_sfc/services/sfc/plugin.py b/networking_sfc/services/sfc/plugin.py
new file mode 100644
index 0000000..f41c8e1
--- /dev/null
+++ b/networking_sfc/services/sfc/plugin.py
@@ -0,0 +1,200 @@
+# Copyright 2015 Futurewei. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import helpers as log_helpers
+from oslo_log import log as logging
+from oslo_utils import excutils
+
+from neutron.i18n import _LE
+
+from networking_sfc.db import sfc_db
+from networking_sfc.extensions import sfc as sfc_ext
+from networking_sfc.services.sfc.common import context as sfc_ctx
+from networking_sfc.services.sfc.common import exceptions as sfc_exc
+from networking_sfc.services.sfc import driver_manager as sfc_driver
+
+
+LOG = logging.getLogger(__name__)
+
+
+class SfcPlugin(sfc_db.SfcDbPlugin):
+ """SFC plugin implementation."""
+
+ supported_extension_aliases = [sfc_ext.SFC_EXT]
+ path_prefix = sfc_ext.SFC_PREFIX
+
+ def __init__(self):
+ self.driver_manager = sfc_driver.SfcDriverManager()
+ super(SfcPlugin, self).__init__()
+ self.driver_manager.initialize()
+
+ @log_helpers.log_method_call
+ def create_port_chain(self, context, port_chain):
+ port_chain_db = super(SfcPlugin, self).create_port_chain(
+ context, port_chain)
+ portchain_db_context = sfc_ctx.PortChainContext(
+ self, context, port_chain_db)
+ try:
+ self.driver_manager.create_port_chain(portchain_db_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Create port chain failed, "
+ "deleting port_chain '%s'"),
+ port_chain_db['id'])
+ self.delete_port_chain(context, port_chain_db['id'])
+
+ return port_chain_db
+
+ @log_helpers.log_method_call
+ def update_port_chain(self, context, portchain_id, port_chain):
+ original_portchain = self.get_port_chain(context, portchain_id)
+ updated_portchain = super(SfcPlugin, self).update_port_chain(
+ context, portchain_id, port_chain)
+ portchain_db_context = sfc_ctx.PortChainContext(
+ self, context, updated_portchain,
+ original_portchain=original_portchain)
+
+ try:
+ self.driver_manager.update_port_chain(portchain_db_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Update port chain failed, port_chain '%s'"),
+ updated_portchain['id'])
+
+ # TODO(qijing): should we rollback the database update here?
+ return updated_portchain
+
+ @log_helpers.log_method_call
+ def delete_port_chain(self, context, portchain_id):
+ pc = self.get_port_chain(context, portchain_id)
+ pc_context = sfc_ctx.PortChainContext(self, context, pc)
+ try:
+ self.driver_manager.delete_port_chain(pc_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Delete port chain failed, portchain '%s'"),
+ portchain_id)
+ # TODO(qijing): unsync in case deleted in driver but fail in database
+ super(SfcPlugin, self).delete_port_chain(context, portchain_id)
+
+ @log_helpers.log_method_call
+ def create_port_pair(self, context, port_pair):
+ portpair_db = super(SfcPlugin, self).create_port_pair(
+ context, port_pair)
+ portpair_context = sfc_ctx.PortPairContext(
+ self, context, portpair_db)
+ try:
+ self.driver_manager.create_port_pair(portpair_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Create port pair failed, "
+ "deleting port_pair '%s'"),
+ portpair_db['id'])
+ self.delete_port_pair(context, portpair_db['id'])
+
+ return portpair_db
+
+ @log_helpers.log_method_call
+ def update_port_pair(self, context, portpair_id, port_pair):
+ original_portpair = self.get_port_pair(context, portpair_id)
+ updated_portpair = super(SfcPlugin, self).update_port_pair(
+ context, portpair_id, port_pair)
+ portpair_context = sfc_ctx.PortPairContext(
+ self, context, updated_portpair,
+ original_portpair=original_portpair)
+ try:
+ self.driver_manager.update_port_pair(portpair_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Update port pair failed, port_pair '%s'"),
+ updated_portpair['id'])
+
+ return updated_portpair
+
+ @log_helpers.log_method_call
+ def delete_port_pair(self, context, portpair_id):
+ portpair = self.get_port_pair(context, portpair_id)
+ portpair_context = sfc_ctx.PortPairContext(
+ self, context, portpair)
+ try:
+ self.driver_manager.delete_port_pair(portpair_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Delete port pair failed, port_pair '%s'"),
+ portpair_id)
+
+ super(SfcPlugin, self).delete_port_pair(context, portpair_id)
+
+ @log_helpers.log_method_call
+ def create_port_pair_group(self, context, port_pair_group):
+ portpairgroup_db = super(SfcPlugin, self).create_port_pair_group(
+ context, port_pair_group)
+ portpairgroup_context = sfc_ctx.PortPairGroupContext(
+ self, context, portpairgroup_db)
+ try:
+ self.driver_manager.create_port_pair_group(portpairgroup_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Create port pair group failed, "
+ "deleting port_pair_group '%s'"),
+ portpairgroup_db['id'])
+ self.delete_port_pair_group(context, portpairgroup_db['id'])
+
+ return portpairgroup_db
+
+ @log_helpers.log_method_call
+ def update_port_pair_group(
+ self, context, portpairgroup_id, port_pair_group
+ ):
+ original_portpairgroup = self.get_port_pair_group(
+ context, portpairgroup_id)
+ updated_portpairgroup = super(SfcPlugin, self).update_port_pair_group(
+ context, portpairgroup_id, port_pair_group)
+ portpairgroup_context = sfc_ctx.PortPairGroupContext(
+ self, context, updated_portpairgroup,
+ original_portpairgroup=original_portpairgroup)
+ try:
+ self.driver_manager.update_port_pair_group(portpairgroup_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Update port pair group failed, "
+ "port_pair_group '%s'"),
+ updated_portpairgroup['id'])
+
+ return updated_portpairgroup
+
+ @log_helpers.log_method_call
+ def delete_port_pair_group(self, context, portpairgroup_id):
+ portpairgroup = self.get_port_pair_group(context, portpairgroup_id)
+ portpairgroup_context = sfc_ctx.PortPairGroupContext(
+ self, context, portpairgroup)
+ try:
+ self.driver_manager.delete_port_pair_group(portpairgroup_context)
+ except sfc_exc.SfcDriverError as e:
+ LOG.exception(e)
+ with excutils.save_and_reraise_exception():
+ LOG.error(_LE("Delete port pair group failed, "
+ "port_pair_group '%s'"),
+ portpairgroup_id)
+
+ super(SfcPlugin, self).delete_port_pair_group(context,
+ portpairgroup_id)