diff options
author | Ulas Kozat <ulas.kozat@huawei.com> | 2015-12-28 16:05:13 -0800 |
---|---|---|
committer | Ulas Kozat <ulas.kozat@huawei.com> | 2015-12-28 16:05:13 -0800 |
commit | c772a1dbc7ace58d099570d41a889adf851c8ba8 (patch) | |
tree | 809aefa0dae407a1d9c12989f7e8f60891700d17 /networking_sfc/services/sfc | |
parent | e671a915d887ae8f7751a54bb07ecb7ed8f2f25b (diff) |
Added networking-sfc from openstack project with merge date Dec 23 2015stable/coloradostable/brahmaputra
Added patch 13 for subject "add missing db migration files"
Change-Id: Id51a160335a14870c1dd816a44baf9b1958b9ac6
Diffstat (limited to 'networking_sfc/services/sfc')
23 files changed, 3481 insertions, 0 deletions
diff --git a/networking_sfc/services/sfc/__init__.py b/networking_sfc/services/sfc/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/services/sfc/__init__.py diff --git a/networking_sfc/services/sfc/agent/__init__.py b/networking_sfc/services/sfc/agent/__init__.py new file mode 100644 index 0000000..626812b --- /dev/null +++ b/networking_sfc/services/sfc/agent/__init__.py @@ -0,0 +1,2 @@ +from neutron.common import eventlet_utils +eventlet_utils.monkey_patch() diff --git a/networking_sfc/services/sfc/agent/agent.py b/networking_sfc/services/sfc/agent/agent.py new file mode 100644 index 0000000..2537f9b --- /dev/null +++ b/networking_sfc/services/sfc/agent/agent.py @@ -0,0 +1,891 @@ +# Copyright 2015 Huawei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import signal +import six +import sys + +from neutron.agent.common import config +from neutron.agent.linux import ip_lib + +from networking_sfc.services.sfc.agent import br_int +from networking_sfc.services.sfc.agent import br_phys +from networking_sfc.services.sfc.agent import br_tun +from networking_sfc.services.sfc.common import ovs_ext_lib +from networking_sfc.services.sfc.drivers.ovs import constants +from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics + +from neutron.agent import rpc as agent_rpc +from neutron.common import config as common_config +from neutron.common import constants as n_const +from neutron.common import exceptions +from neutron.common import rpc as n_rpc +from neutron.common import topics +from neutron.common import utils as q_utils +from neutron.i18n import _LE +from neutron.i18n import _LI +from neutron.i18n import _LW +from neutron.plugins.ml2.drivers.openvswitch.agent.common import ( + constants as ovs_const) +from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging + + +LOG = logging.getLogger(__name__) + +agent_opts = [ + cfg.StrOpt('sfc_encap_mode', default='mpls', + help=_("The encapsulation mode of sfc.")), +] + +cfg.CONF.register_opts(agent_opts, "AGENT") + +# This table is used to process the traffic across differet subnet scenario. +# Flow 1: pri=1, ip,dl_dst=nexthop_mac,nw_src=nexthop_subnet. actions= +# push_mpls:0x8847,set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port +# or resubmit to table(INGRESS_TABLE) +# Flow 2: pri=0, ip,dl_dst=nexthop_mac,, action=push_mpls:0x8847, +# set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port or resubmit to +# table(INGRESS_TABLE) +ACROSS_SUBNET_TABLE = 5 + +# The table has multiple flows that steer traffic for the different chains +# to the ingress port of different service functions hosted on this Compute +# node. +INGRESS_TABLE = 10 + +# port chain default flow rule priority +PC_DEF_PRI = 20 +PC_INGRESS_PRI = 30 + + +class FeatureSupportError(exceptions.NeutronException): + message = _("Current sfc agent don't support %(feature)s ") + + +class SfcPluginApi(object): + def __init__(self, topic, host): + self.host = host + self.target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(self.target) + + def update_flowrules_status(self, context, flowrules_status): + cctxt = self.client.prepare() + return cctxt.call( + context, 'update_flowrules_status', + flowrules_status=flowrules_status) + + def get_flowrules_by_host_portid(self, context, port_id): + cctxt = self.client.prepare() + return cctxt.call( + context, 'get_flowrules_by_host_portid', + host=self.host, port_id=port_id) + + def get_all_src_node_flowrules(self, context): + cctxt = self.client.prepare() + return cctxt.call( + context, 'get_all_src_node_flowrules', + host=self.host) + + +class OVSSfcAgent(ovs_neutron_agent.OVSNeutronAgent): + # history + # 1.0 Initial version + """This class will support MPLS frame + + Ethernet + MPLS + IPv4 Packet: + +-------------------------------+---------------+--------------------+ + |Outer Ethernet, ET=0x8847 | MPLS head, | original IP Packet | + +-------------------------------+---------------+--------------------+ + """ + + target = oslo_messaging.Target(version='1.0') + + def __init__(self, bridge_classes, integ_br, tun_br, local_ip, + bridge_mappings, polling_interval, tunnel_types=None, + veth_mtu=None, l2_population=False, + enable_distributed_routing=False, + minimize_polling=False, + ovsdb_monitor_respawn_interval=( + ovs_const.DEFAULT_OVSDBMON_RESPAWN), + arp_responder=False, + prevent_arp_spoofing=True, + use_veth_interconnection=False, + quitting_rpc_timeout=None): + + """to get network info from ovs agent.""" + super(OVSSfcAgent, self).__init__( + bridge_classes, integ_br, tun_br, + local_ip, + bridge_mappings, polling_interval, tunnel_types, + veth_mtu, l2_population, + enable_distributed_routing, + minimize_polling, + ovsdb_monitor_respawn_interval, + arp_responder, + prevent_arp_spoofing, + use_veth_interconnection, + quitting_rpc_timeout) + + self.overlay_encap_mode = cfg.CONF.AGENT.sfc_encap_mode + self._sfc_setup_rpc() + + if self.overlay_encap_mode == 'eth_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'vxlan_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'mpls': + self._clear_sfc_flow_on_int_br() + self._setup_src_node_flow_rules_with_mpls() + + def _sfc_setup_rpc(self): + self.sfc_plugin_rpc = SfcPluginApi( + sfc_topics.SFC_PLUGIN, cfg.CONF.host) + + self.topic = sfc_topics.SFC_AGENT + self.endpoints = [self] + consumers = [ + [sfc_topics.PORTFLOW, topics.UPDATE], + [sfc_topics.PORTFLOW, topics.DELETE] + ] + + # subscribe sfc plugin message + self.connection = agent_rpc.create_consumers( + self.endpoints, + self.topic, + consumers) + + def _parse_flow_classifier(self, flow_classifier): + dl_type, nw_proto, source_port_masks, destination_port_masks = ( + (None, ) * 4) + + if ( + not flow_classifier['source_port_range_min'] and + not flow_classifier['source_port_range_max'] + ): + # wildcard + source_port_masks = ['0/0x0'] + elif not flow_classifier['source_port_range_min']: + source_port_masks = ovs_ext_lib.get_port_mask( + 1, + flow_classifier['source_port_range_max']) + elif not flow_classifier['source_port_range_max']: + source_port_masks = ovs_ext_lib.get_port_mask( + flow_classifier['source_port_range_min'], + 65535) + else: + source_port_masks = ovs_ext_lib.get_port_mask( + flow_classifier['source_port_range_min'], + flow_classifier['source_port_range_max']) + + if ( + not flow_classifier['destination_port_range_min'] and + not flow_classifier['destination_port_range_max'] + ): + # wildcard + destination_port_masks = ['0/0x0'] + elif not flow_classifier['destination_port_range_min']: + destination_port_masks = ovs_ext_lib.get_port_mask( + 1, + flow_classifier['destination_port_range_max']) + elif not flow_classifier['destination_port_range_max']: + destination_port_masks = ovs_ext_lib.get_port_mask( + flow_classifier['destination_port_range_min'], + 65535) + else: + destination_port_masks = ovs_ext_lib.get_port_mask( + flow_classifier['destination_port_range_min'], + flow_classifier['destination_port_range_max']) + + if "IPv4" == flow_classifier['ethertype']: + dl_type = 0x0800 + if n_const.PROTO_NAME_TCP == flow_classifier['protocol']: + nw_proto = n_const.PROTO_NUM_TCP + elif n_const.PROTO_NAME_UDP == flow_classifier['protocol']: + nw_proto = n_const.PROTO_NUM_UDP + elif n_const.PROTO_NAME_ICMP == flow_classifier['protocol']: + nw_proto = n_const.PROTO_NUM_ICMP + else: + nw_proto = None + elif "IPv6" == flow_classifier['ethertype']: + LOG.error(_LE("Current portchain agent don't support Ipv6")) + else: + LOG.error(_LE("invalid protocol input")) + return (dl_type, nw_proto, + source_port_masks, destination_port_masks + ) + + def _clear_sfc_flow_on_int_br(self): + self.int_br.delete_group(group_id='all') + self.int_br.delete_flows(table=ACROSS_SUBNET_TABLE) + self.int_br.delete_flows(table=INGRESS_TABLE) + self.int_br.install_goto(dest_table_id=INGRESS_TABLE, + priority=PC_DEF_PRI, + dl_type=0x8847) + self.int_br.install_drop(table_id=INGRESS_TABLE) + + def _get_flow_infos_from_flow_classifier(self, flow_classifier): + flow_infos = [] + nw_src, nw_dst = ((None, ) * 2) + + if "IPv4" != flow_classifier['ethertype']: + LOG.error(_LE("Current portchain agent don't support Ipv6")) + return flow_infos + + # parse and transfer flow info to match field info + dl_type, nw_proto, source_port_masks, destination_port_masks = ( + self._parse_flow_classifier(flow_classifier)) + + if flow_classifier['source_ip_prefix']: + nw_src = flow_classifier['source_ip_prefix'] + else: + nw_src = '0.0.0.0/0.0.0.0' + if flow_classifier['destination_ip_prefix']: + nw_dst = flow_classifier['destination_ip_prefix'] + else: + nw_dst = '0.0.0.0/0.0.0.0' + + if source_port_masks and destination_port_masks: + for destination_port in destination_port_masks: + for source_port in source_port_masks: + if nw_proto is None: + flow_infos.append(dict( + dl_type=dl_type, + nw_src=nw_src, + nw_dst=nw_dst, + tp_src='%s' % source_port, + tp_dst='%s' % destination_port + )) + else: + flow_infos.append(dict( + dl_type=dl_type, + nw_proto=nw_proto, + nw_src=nw_src, + nw_dst=nw_dst, + tp_src='%s' % source_port, + tp_dst='%s' % destination_port + )) + + return flow_infos + + def _get_flow_infos_from_flow_classifier_list(self, flow_classifier_list): + flow_infos = [] + if not flow_classifier_list: + return flow_infos + for flow_classifier in flow_classifier_list: + flow_infos.extend( + self._get_flow_infos_from_flow_classifier(flow_classifier) + ) + + return flow_infos + + def _setup_local_switch_flows_on_int_br( + self, flowrule, flow_classifier_list, + actions, add_flow=True, match_inport=True + ): + inport_match = {} + priority = PC_DEF_PRI + + if match_inport is True: + egress_port = self.int_br.get_vif_port_by_id(flowrule['egress']) + if egress_port: + inport_match = dict(in_port=egress_port.ofport) + priority = PC_INGRESS_PRI + + for flow_info in self._get_flow_infos_from_flow_classifier_list( + flow_classifier_list + ): + match_info = dict(inport_match, **flow_info) + if add_flow: + self.int_br.add_flow( + table=ovs_const.LOCAL_SWITCHING, + priority=priority, + actions=actions, **match_info + ) + else: + self.int_br.delete_flows( + table=ovs_const.LOCAL_SWITCHING, + **match_info + ) + + def _update_destination_ingress_flow_rules(self, flowrule): + for flow_info in self._get_flow_infos_from_flow_classifier_list( + flowrule['del_fcs'] + ): + self.int_br.delete_flows( + table=ovs_const.LOCAL_SWITCHING, + in_port=self.patch_tun_ofport, + **flow_info + ) + for flow_info in self._get_flow_infos_from_flow_classifier_list( + flowrule['add_fcs'] + ): + inport_match = dict(in_port=self.patch_tun_ofport) + match_info = dict(inport_match, **flow_info) + self.int_br.install_normal(table_id=ovs_const.LOCAL_SWITCHING, + priority=PC_INGRESS_PRI, + **match_info) + + def _setup_src_node_flow_rules_with_mpls(self): + flow_rules = self.sfc_plugin_rpc.get_all_src_node_flowrules( + self.context) + if not flow_rules: + return + for fr in flow_rules: + self._setup_egress_flow_rules_with_mpls(fr, False) + # if the traffic is from patch port, it means the destination + # is on the this host. so implement normal forward but not + # match the traffic from the source. + # Next step need to do is check if the traffic is from vRouter + # on the local host, also need to implement same normal process. + self._update_destination_ingress_flow_rules(fr) + + def _setup_egress_flow_rules_with_mpls(self, flowrule, match_inport=True): + network_type = flowrule['network_type'] + group_id = flowrule.get('next_group_id', None) + next_hops = flowrule.get('next_hops', None) + segmentation_id = flowrule['segment_id'] + + if not next_hops: + return + + if network_type not in ovs_const.TUNNEL_NETWORK_TYPES: + LOG.warn(_LW("currently %s network is not supported," + "only support tunnel type" + ), + network_type + ) + return + + # if the group is not none, install the egress rule for this SF + if ( + (flowrule['node_type'] == constants.SRC_NODE or + flowrule['node_type'] == constants.SF_NODE) and group_id + ): + # 1st, install br-int flow rule on table ACROSS_SUBNET_TABLE + # and group table + buckets = [] + for item in next_hops: + if item['net_uuid'] not in self.local_vlan_map: + self.provision_local_vlan(item['net_uuid'], network_type, + None, segmentation_id) + lvm = self.local_vlan_map[item['net_uuid']] + bucket = ( + 'bucket=weight=%d, mod_dl_dst:%s,' + 'resubmit(,%d)' % ( + item['weight'], + item['mac_address'], + ACROSS_SUBNET_TABLE + ) + ) + buckets.append(bucket) + + no_across_subnet_actions_list = [] + across_subnet_actions_list = [] + + push_mpls = ( + "push_mpls:0x8847," + "set_mpls_label:%d," + "set_mpls_ttl:%d," + "mod_vlan_vid:%d," % + ((flowrule['nsp'] << 8) | flowrule['nsi'], + flowrule['nsi'], lvm.vlan)) + + no_across_subnet_actions_list.append(push_mpls) + across_subnet_actions_list.append(push_mpls) + + if item['local_endpoint'] == self.local_ip: + no_across_subnet_actions = ( + "resubmit(,%d)" % INGRESS_TABLE) + across_subnet_actions = ( + "mod_dl_src:%s, resubmit(,%d)" % + (item['gw_mac'], INGRESS_TABLE)) + else: + # same subnet with next hop + no_across_subnet_actions = ("output:%s" % + self.patch_tun_ofport) + across_subnet_actions = ("mod_dl_src:%s, output:%s" % + (item['gw_mac'], + self.patch_tun_ofport)) + no_across_subnet_actions_list.append(no_across_subnet_actions) + across_subnet_actions_list.append(across_subnet_actions) + + self.int_br.add_flow( + table=ACROSS_SUBNET_TABLE, + priority=1, + dl_dst=item['mac_address'], + dl_type=0x0800, + nw_src=item['cidr'], + actions="%s" % + (','.join(no_across_subnet_actions_list))) + # different subnet with next hop + self.int_br.add_flow( + table=ACROSS_SUBNET_TABLE, + priority=0, + dl_dst=item['mac_address'], + actions="%s" % + (','.join(across_subnet_actions_list))) + + buckets = ','.join(buckets) + group_content = self.int_br.dump_group_for_id(group_id) + if group_content.find('group_id=%d' % group_id) == -1: + self.int_br.add_group(group_id=group_id, + type='select', buckets=buckets) + else: + self.int_br.mod_group(group_id=group_id, + type='select', buckets=buckets) + + # 2nd, install br-int flow rule on table 0 for egress traffic + # for egress traffic + enc_actions = ("group:%d" % group_id) + # to uninstall the removed flow classifiers + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['del_fcs'], + None, + add_flow=False, + match_inport=match_inport) + # to install the added flow classifiers + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['add_fcs'], + enc_actions, + add_flow=True, + match_inport=match_inport) + + def _get_network_by_port(self, port_id): + for key, val in six.iteritems(self.network_ports): + if port_id in val: + return key + + return None + + def _setup_ingress_flow_rules_with_mpls(self, flowrule): + network_id = self._get_network_by_port(flowrule['ingress']) + if network_id: + # install br-int flow rule on table 0 for ingress traffic + lvm = self.local_vlan_map[network_id] + vif_port = lvm.vif_ports[flowrule['ingress']] + match_field = {} + + actions = ("strip_vlan, pop_mpls:0x0800," + "output:%s" % vif_port.ofport) + match_field = dict( + table=INGRESS_TABLE, + priority=1, + dl_dst=vif_port.vif_mac, + dl_vlan=lvm.vlan, + dl_type=0x8847, + mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1), + actions=actions) + + self.int_br.add_flow(**match_field) + + def _setup_last_egress_flow_rules_with_mpls(self, flowrule): + group_id = flowrule.get('next_group_id', None) + + # check whether user assign the destination neutron port. + if ( + constants.SF_NODE == flowrule['node_type'] and + not group_id and + flowrule['egress'] is not None + ): + # to uninstall the new removed flow classifiers + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['del_fcs'], + None, + add_flow=False, + match_inport=True + ) + + # to install the added flow classifiers + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['add_fcs'], + actions='normal', + add_flow=True, + match_inport=True) + + def _get_flow_classifier_dest_port_info(self, + logical_destination_port, + flowrule): + for next_hop in flowrule['next_hops']: + # this flow classifier's destination port should match + # with the nexthop's ingress port id + if logical_destination_port in next_hop.values(): + return next_hop + + return None + + def _update_flow_rules_with_mpls_enc(self, flowrule, flowrule_status): + try: + if flowrule.get('egress', None): + self._setup_egress_flow_rules_with_mpls(flowrule) + self._setup_last_egress_flow_rules_with_mpls(flowrule) + if flowrule.get('ingress', None): + self._setup_ingress_flow_rules_with_mpls(flowrule) + + flowrule_status_temp = {} + flowrule_status_temp['id'] = flowrule['id'] + flowrule_status_temp['status'] = constants.STATUS_ACTIVE + flowrule_status.append(flowrule_status_temp) + except Exception as e: + flowrule_status_temp = {} + flowrule_status_temp['id'] = flowrule['id'] + flowrule_status_temp['status'] = constants.STATUS_ERROR + flowrule_status.append(flowrule_status_temp) + LOG.exception(e) + LOG.error(_LE("_update_flow_rules_with_mpls_enc failed")) + + def _delete_ports_flowrules_by_id(self, ports_id): + flowrule_status = [] + try: + LOG.debug("delete_port_id_flows received, ports_id= %s", ports_id) + count = 0 + if ports_id: + for port_id in ports_id: + flowrule = ( + self.sfc_plugin_rpc.get_flowrules_by_host_portid( + self.context, port_id + ) + ) + if flowrule: + self._treat_delete_flow_rules( + flowrule, flowrule_status) + LOG.debug( + "_delete_ports_flowrules_by_id received, count= %s", count) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("delete_port_id_flows failed")) + if flowrule_status: + self.sfc_plugin_rpc.update_flowrules_status( + self.context, flowrule_status) + + def _delete_flow_rule_with_mpls_enc(self, flowrule, flowrule_status): + try: + LOG.debug("_delete_flow_rule_with_mpls_enc, flowrule = %s", + flowrule) + group_id = flowrule.get('next_group_id', None) + + # delete tunnel table flow rule on br-int(egress match) + if flowrule['egress'] is not None: + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['del_fcs'], + None, + add_flow=False, + match_inport=True + ) + + # delete table INGRESS_TABLE ingress match flow rule + # on br-int(ingress match) + network_id = self._get_network_by_port(flowrule['ingress']) + if network_id: + # third, install br-int flow rule on table INGRESS_TABLE + # for ingress traffic + lvm = self.local_vlan_map[network_id] + vif_port = lvm.vif_ports[flowrule['ingress']] + self.int_br.delete_flows( + table=INGRESS_TABLE, + dl_type=0x8847, + dl_dst=vif_port.vif_mac, + mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1) + ) + + # delete group table, need to check again + if group_id and flowrule.get('group_refcnt', None) <= 1: + self.int_br.delete_group(group_id=group_id) + for item in flowrule['next_hops']: + self.int_br.delete_flows( + table=ACROSS_SUBNET_TABLE, + dl_dst=item['mac_address']) + elif (not group_id and + flowrule['egress'] is not None): + # to delete last hop flow rule + for each in flowrule['del_fcs']: + if each.get('logical_destination_port', None): + ldp = self._get_flow_classifier_dest_port_info( + each['logical_destination_port'], + flowrule + ) + if ldp: + self.int_br.delete_flows( + table=ACROSS_SUBNET_TABLE, + dl_dst=ldp['mac_address']) + + except Exception as e: + flowrule_status_temp = {} + flowrule_status_temp['id'] = flowrule['id'] + flowrule_status_temp['status'] = constants.STATUS_ERROR + flowrule_status.append(flowrule_status_temp) + LOG.exception(e) + LOG.error(_LE("_delete_flow_rule_with_mpls_enc failed")) + + def _treat_update_flow_rules(self, flowrule, flowrule_status): + if self.overlay_encap_mode == 'eth_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'vxlan_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'mpls': + self._update_flow_rules_with_mpls_enc(flowrule, flowrule_status) + + def _treat_delete_flow_rules(self, flowrule, flowrule_status): + if self.overlay_encap_mode == 'eth_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'vxlan_nsh': + raise FeatureSupportError(feature=self.overlay_encap_mode) + elif self.overlay_encap_mode == 'mpls': + self._delete_flow_rule_with_mpls_enc( + flowrule, flowrule_status) + + def update_flow_rules(self, context, **kwargs): + try: + flowrule_status = [] + flowrules = kwargs['flowrule_entries'] + LOG.debug("update_flow_rules received, flowrules = %s", + flowrules) + + if flowrules: + self._treat_update_flow_rules(flowrules, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("update_flow_rules failed")) + + if flowrule_status: + self.sfc_plugin_rpc.update_flowrules_status( + self.context, flowrule_status) + + def delete_flow_rules(self, context, **kwargs): + try: + flowrule_status = [] + flowrules = kwargs['flowrule_entries'] + LOG.debug("delete_flow_rules received, flowrules= %s", flowrules) + if flowrules: + self._treat_delete_flow_rules(flowrules, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("delete_flow_rules failed")) + + if flowrule_status: + self.sfc_plugin_rpc.update_flowrules_status( + self.context, flowrule_status) + + def update_src_node_flow_rules(self, context, **kwargs): + flowrule = kwargs['flowrule_entries'] + if self.overlay_encap_mode == 'mpls': + self._setup_egress_flow_rules_with_mpls(flowrule, + match_inport=False) + self._update_destination_ingress_flow_rules(flowrule) + + def _delete_src_node_flow_rules_with_mpls(self, flowrule, + match_inport=False): + LOG.debug("_delete_src_node_flow_rules_with_mpls, flowrule = %s", + flowrule) + group_id = flowrule.get('next_group_id', None) + + # delete br-int table 0 full match flow + self._setup_local_switch_flows_on_int_br( + flowrule, + flowrule['del_fcs'], + None, + add_flow=False, + match_inport=False) + + # delete group table, need to check again + if None != group_id and flowrule.get('group_refcnt', None) <= 1: + self.int_br.delete_group(group_id=group_id) + for item in flowrule['next_hops']: + self.int_br.delete_flows( + table=ACROSS_SUBNET_TABLE, + dl_dst=item['mac_address']) + + def delete_src_node_flow_rules(self, context, **kwargs): + flowrule = kwargs['flowrule_entries'] + if self.overlay_encap_mode == 'mpls': + self._delete_src_node_flow_rules_with_mpls(flowrule, + match_inport=False) + self._update_destination_ingress_flow_rules(flowrule) + + def sfc_treat_devices_added_updated(self, port_id): + resync = False + flowrule_status = [] + try: + LOG.debug("a new device %s is found", port_id) + flows_list = ( + self.sfc_plugin_rpc.get_flowrules_by_host_portid( + self.context, port_id + ) + ) + if flows_list: + for flow in flows_list: + self._treat_update_flow_rules(flow, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("portchain_treat_devices_added_updated failed")) + resync = True + + if flowrule_status: + self.sfc_plugin_rpc.update_flowrules_status( + self.context, flowrule_status) + + return resync + + def sfc_treat_devices_removed(self, port_ids): + resync = False + for port_id in port_ids: + LOG.info(_LI("a device %s is removed"), port_id) + try: + self._delete_ports_flowrules_by_id(port_id) + except Exception as e: + LOG.exception(e) + LOG.error( + _LE("delete port flow rule failed for %(port_id)s"), + {'port_id': port_id} + ) + resync = True + + return resync + + def treat_devices_added_or_updated(self, devices, ovs_restarted): + skipped_devices = [] + need_binding_devices = [] + security_disabled_devices = [] + devices_details_list = ( + self.plugin_rpc.get_devices_details_list_and_failed_devices( + self.context, + devices, + self.agent_id, + self.conf.host + ) + ) + if devices_details_list.get('failed_devices'): + # TODO(rossella_s): handle better the resync in next patches, + # this is just to preserve the current behavior + raise ovs_neutron_agent.DeviceListRetrievalError(devices=devices) + + devices = devices_details_list.get('devices') + vif_by_id = self.int_br.get_vifs_by_ids( + [vif['device'] for vif in devices]) + for details in devices: + device = details['device'] + LOG.debug("Processing port: %s", device) + port = vif_by_id.get(device) + if not port: + # The port disappeared and cannot be processed + LOG.info(_LI("Port %s was not found on the integration bridge " + "and will therefore not be processed"), device) + skipped_devices.append(device) + continue + + if 'port_id' in details: + LOG.info(_LI("Port %(device)s updated. Details: %(details)s"), + {'device': device, 'details': details}) + details['vif_port'] = port + need_binding = self.treat_vif_port(port, details['port_id'], + details['network_id'], + details['network_type'], + details['physical_network'], + details['segmentation_id'], + details['admin_state_up'], + details['fixed_ips'], + details['device_owner'], + ovs_restarted) + if need_binding: + need_binding_devices.append(details) + + port_security = details['port_security_enabled'] + has_sgs = 'security_groups' in details + if not port_security or not has_sgs: + security_disabled_devices.append(device) + self._update_port_network(details['port_id'], + details['network_id']) + self.ext_manager.handle_port(self.context, details) + self.sfc_treat_devices_added_updated(details['port_id']) + else: + LOG.warn(_LW("Device %s not defined on plugin"), device) + if (port and port.ofport != -1): + self.port_dead(port) + return skipped_devices, need_binding_devices, security_disabled_devices + + def process_deleted_ports(self, port_info): + # don't try to process removed ports as deleted ports since + # they are already gone + if 'removed' in port_info: + self.deleted_ports -= port_info['removed'] + deleted_ports = list(self.deleted_ports) + while self.deleted_ports: + port_id = self.deleted_ports.pop() + port = self.int_br.get_vif_port_by_id(port_id) + self._clean_network_ports(port_id) + self.ext_manager.delete_port(self.context, + {"vif_port": port, + "port_id": port_id}) + self.sfc_treat_devices_removed(port_id) + # move to dead VLAN so deleted ports no + # longer have access to the network + if port: + # don't log errors since there is a chance someone will be + # removing the port from the bridge at the same time + self.port_dead(port, log_errors=False) + self.port_unbound(port_id) + # Flush firewall rules after ports are put on dead VLAN to be + # more secure + self.sg_agent.remove_devices_filter(deleted_ports) + + +def main(): + cfg.CONF.register_opts(ip_lib.OPTS) + config.register_root_helper(cfg.CONF) + common_config.init(sys.argv[1:]) + common_config.setup_logging() + q_utils.log_opt_values(LOG) + + try: + agent_config = ovs_neutron_agent.create_agent_config_map(cfg.CONF) + except ValueError as e: + LOG.exception(e) + LOG.error(_LE('Agent terminated!')) + sys.exit(1) + + is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper + if is_xen_compute_host: + # Force ip_lib to always use the root helper to ensure that ip + # commands target xen dom0 rather than domU. + cfg.CONF.set_default('ip_lib_force_root', True) + + bridge_classes = { + 'br_int': br_int.OVSIntegrationBridge, + 'br_phys': br_phys.OVSPhysicalBridge, + 'br_tun': br_tun.OVSTunnelBridge, + } + try: + agent = OVSSfcAgent(bridge_classes, **agent_config) + except RuntimeError as e: + LOG.exception(e) + LOG.error(_LE("Agent terminated!")) + sys.exit(1) + signal.signal(signal.SIGTERM, agent._handle_sigterm) + + # Start everything. + LOG.info(_LI("Agent initialized successfully, now running... ")) + agent.daemon_loop() + + +if __name__ == "__main__": + main() diff --git a/networking_sfc/services/sfc/agent/br_int.py b/networking_sfc/services/sfc/agent/br_int.py new file mode 100644 index 0000000..1f88c01 --- /dev/null +++ b/networking_sfc/services/sfc/agent/br_int.py @@ -0,0 +1,48 @@ +# Copyright 2015 Huawei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +* references +** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic +""" + +from networking_sfc.services.sfc.common import ovs_ext_lib +from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants +from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( + br_int) + + +class OVSIntegrationBridge( + br_int.OVSIntegrationBridge, + ovs_ext_lib.OVSBridgeExt +): + def setup_controllers(self, conf): + self.set_protocols("[]") + self.del_controller() + + def delete_arp_spoofing_protection(self, port): + # there is an issue to delete icmp6, it will not effect and cause + # other flow rule get deleted. + # Raofei will raise a bug to neutron community. + self.delete_flows(table_id=constants.LOCAL_SWITCHING, + in_port=port, proto='arp') + self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, + in_port=port) + + def mod_flow(self, **kwargs): + ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) + + def run_ofctl(self, cmd, args, process_input=None): + return ovs_ext_lib.OVSBridgeExt.run_ofctl( + self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/agent/br_phys.py b/networking_sfc/services/sfc/agent/br_phys.py new file mode 100644 index 0000000..e9666e9 --- /dev/null +++ b/networking_sfc/services/sfc/agent/br_phys.py @@ -0,0 +1,34 @@ +# Copyright 2015 Huawei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +* references +** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic +""" +from networking_sfc.services.sfc.common import ovs_ext_lib +from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( + br_phys) + + +class OVSPhysicalBridge(br_phys.OVSPhysicalBridge, ovs_ext_lib.OVSBridgeExt): + def setup_controllers(self, conf): + self.set_protocols("[]") + self.del_controller() + + def mod_flow(self, **kwargs): + ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) + + def run_ofctl(self, cmd, args, process_input=None): + return ovs_ext_lib.OVSBridgeExt.run_ofctl( + self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/agent/br_tun.py b/networking_sfc/services/sfc/agent/br_tun.py new file mode 100644 index 0000000..47a7cf9 --- /dev/null +++ b/networking_sfc/services/sfc/agent/br_tun.py @@ -0,0 +1,35 @@ +# Copyright 2015 Huawei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +* references +** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic +""" + +from networking_sfc.services.sfc.common import ovs_ext_lib +from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( + br_tun) + + +class OVSTunnelBridge(br_tun.OVSTunnelBridge, ovs_ext_lib.OVSBridgeExt): + def setup_controllers(self, conf): + self.set_protocols("[]") + self.del_controller() + + def mod_flow(self, **kwargs): + ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) + + def run_ofctl(self, cmd, args, process_input=None): + return ovs_ext_lib.OVSBridgeExt.run_ofctl( + self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/common/__init__.py b/networking_sfc/services/sfc/common/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/services/sfc/common/__init__.py diff --git a/networking_sfc/services/sfc/common/config.py b/networking_sfc/services/sfc/common/config.py new file mode 100644 index 0000000..29acd1c --- /dev/null +++ b/networking_sfc/services/sfc/common/config.py @@ -0,0 +1,27 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg + + +SFC_DRIVER_OPTS = [ + cfg.ListOpt('drivers', + default=['dummy'], + help=_("An ordered list of service chain drivers " + "entrypoints to be loaded from the " + "networking_sfc.sfc.drivers namespace.")), +] + + +cfg.CONF.register_opts(SFC_DRIVER_OPTS, "sfc") diff --git a/networking_sfc/services/sfc/common/context.py b/networking_sfc/services/sfc/common/context.py new file mode 100644 index 0000000..7d3b451 --- /dev/null +++ b/networking_sfc/services/sfc/common/context.py @@ -0,0 +1,85 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +class SfcPluginContext(object): + """SFC context base class.""" + def __init__(self, plugin, plugin_context): + self._plugin = plugin + self._plugin_context = plugin_context + + +class PortChainContext(SfcPluginContext): + + def __init__(self, plugin, plugin_context, portchain, + original_portchain=None): + super(PortChainContext, self).__init__(plugin, plugin_context) + self._portchain = portchain + self._original_portchain = original_portchain + + @property + def current(self): + return self._portchain + + @property + def original(self): + return self._original_portchain + + +class FlowClassifierContext(SfcPluginContext): + def __init__(self, plugin, plugin_context, flowclassifier, + original_flowclassifier=None): + super(FlowClassifierContext, self).__init__(plugin, plugin_context) + self._flowclassifier = flowclassifier + self._original_flowclassifier = original_flowclassifier + + @property + def current(self): + return self._flowclassifier + + @property + def original(self): + return self._original_flowclassifier + + +class PortPairContext(SfcPluginContext): + def __init__(self, plugin, plugin_context, portpair, + original_portpair=None): + super(PortPairContext, self).__init__(plugin, plugin_context) + self._portpair = portpair + self._original_portpair = original_portpair + + @property + def current(self): + return self._portpair + + @property + def original(self): + return self._original_portpair + + +class PortPairGroupContext(SfcPluginContext): + def __init__(self, plugin, plugin_context, portpairgroup, + original_portpairgroup=None): + super(PortPairGroupContext, self).__init__(plugin, plugin_context) + self._portpairgroup = portpairgroup + self._original_portpairgroup = original_portpairgroup + + @property + def current(self): + return self._portpairgroup + + @property + def original(self): + return self._original_portpairgroup diff --git a/networking_sfc/services/sfc/common/exceptions.py b/networking_sfc/services/sfc/common/exceptions.py new file mode 100644 index 0000000..7d1b9d9 --- /dev/null +++ b/networking_sfc/services/sfc/common/exceptions.py @@ -0,0 +1,46 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Exceptions used by SFC plugin and drivers.""" + +from neutron.common import exceptions + + +class SfcDriverError(exceptions.NeutronException): + """SFC driver call failed.""" + message = _("%(method)s failed.") + + +class SfcException(exceptions.NeutronException): + """Base for SFC driver exceptions returned to user.""" + pass + + +class SfcBadRequest(exceptions.BadRequest, SfcException): + """Base for SFC driver bad request exceptions returned to user.""" + pass + + +class SfcNoSubnetGateway(SfcDriverError): + """No subnet gateway.""" + message = _("There is no %(type)s of ip prefix %(cidr)s.") + + +class SfcNoSuchSubnet(SfcDriverError): + """No such subnet.""" + message = _("There is no %(type)s of %(cidr)s.") + + +class FlowClassifierInvalid(SfcDriverError): + """Invalid flow classifier.""" + message = _("There is no %(type)s assigned.") diff --git a/networking_sfc/services/sfc/common/ovs_ext_lib.py b/networking_sfc/services/sfc/common/ovs_ext_lib.py new file mode 100644 index 0000000..01fbd04 --- /dev/null +++ b/networking_sfc/services/sfc/common/ovs_ext_lib.py @@ -0,0 +1,187 @@ +# Copyright 2015 Huawei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +from neutron.agent.common import ovs_lib +from neutron.agent.common import utils +from neutron.common import exceptions +from neutron.i18n import _LE +from neutron.plugins.common import constants +from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( + ovs_bridge) +from oslo_log import log as logging +import six + + +# Special return value for an invalid OVS ofport +INVALID_OFPORT = '-1' + +LOG = logging.getLogger(__name__) + + +def get_port_mask(min_port, max_port): + """get port/mask serial by port range.""" + if min_port < 1 or max_port > 0xffff or min_port > max_port: + msg = _("the port range is invalid") + raise exceptions.InvalidInput(error_message=msg) + masks = [] + while min_port <= max_port: + mask = 0xffff + while mask != 0: + next_mask = (mask << 1) & 0xffff + port_start = min_port & next_mask + port_end = min_port + (next_mask ^ 0xffff) + if port_start == min_port and port_end <= max_port: + mask = next_mask + else: + break + masks.append('0x%x/0x%x' % (min_port, mask)) + min_port = min_port + (mask ^ 0xffff) + 1 + + return masks + + +class OVSBridgeExt(ovs_bridge.OVSAgentBridge): + def setup_controllers(self, conf): + self.set_protocols("[]") + self.del_controller() + + def dump_flows_full_match(self, flow_str): + retval = None + flows = self.run_ofctl("dump-flows", [flow_str]) + if flows: + retval = '\n'.join(item for item in flows.splitlines() + if 'NXST' not in item and 'OFPST' not in item) + return retval + + def mod_flow(self, **kwargs): + flow_copy = kwargs.copy() + flow_copy.pop('actions') + flow_str = ovs_lib._build_flow_expr_str(flow_copy, 'del') + dump_flows = self.dump_flows_full_match(flow_str) + if dump_flows == '': + self.do_action_flows('add', [kwargs]) + else: + self.do_action_flows('mod', [kwargs]) + + def add_nsh_tunnel_port(self, port_name, remote_ip, local_ip, + tunnel_type=constants.TYPE_GRE, + vxlan_udp_port=constants.VXLAN_UDP_PORT, + dont_fragment=True, + in_nsp=None, + in_nsi=None): + attrs = [('type', tunnel_type)] + # This is an OrderedDict solely to make a test happy + options = collections.OrderedDict() + vxlan_uses_custom_udp_port = ( + tunnel_type == constants.TYPE_VXLAN and + vxlan_udp_port != constants.VXLAN_UDP_PORT + ) + if vxlan_uses_custom_udp_port: + options['dst_port'] = vxlan_udp_port + options['df_default'] = str(dont_fragment).lower() + options['remote_ip'] = 'flow' + options['local_ip'] = local_ip + options['in_key'] = 'flow' + options['out_key'] = 'flow' + if in_nsp is not None and in_nsi is not None: + options['nsp'] = str(in_nsp) + options['nsi'] = str(in_nsi) + elif in_nsp is None and in_nsi is None: + options['nsp'] = 'flow' + options['nsi'] = 'flow' + attrs.append(('options', options)) + ofport = self.add_port(port_name, *attrs) + if ( + tunnel_type == constants.TYPE_VXLAN and + ofport == INVALID_OFPORT + ): + LOG.error( + _LE('Unable to create VXLAN tunnel port for service chain. ' + 'Please ensure that an openvswitch version that supports ' + 'VXLAN for service chain is installed.') + ) + return ofport + + def run_ofctl(self, cmd, args, process_input=None): + # We need to dump-groups according to group Id, + # which is a feature of OpenFlow1.5 + full_args = [ + "ovs-ofctl", "-O openflow13", cmd, self.br_name + ] + args + try: + return utils.execute(full_args, run_as_root=True, + process_input=process_input) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("Unable to execute %(args)s."), + {'args': full_args}) + + def do_action_groups(self, action, kwargs_list): + group_strs = [_build_group_expr_str(kw, action) for kw in kwargs_list] + if action == 'add' or action == 'del': + self.run_ofctl('%s-groups' % action, ['-'], '\n'.join(group_strs)) + elif action == 'mod': + self.run_ofctl('%s-group' % action, ['-'], '\n'.join(group_strs)) + else: + msg = _("Action is illegal") + raise exceptions.InvalidInput(error_message=msg) + + def add_group(self, **kwargs): + self.do_action_groups('add', [kwargs]) + + def mod_group(self, **kwargs): + self.do_action_groups('mod', [kwargs]) + + def delete_group(self, **kwargs): + self.do_action_groups('del', [kwargs]) + + def dump_group_for_id(self, group_id): + retval = None + group_str = "%d" % group_id + group = self.run_ofctl("dump-groups", [group_str]) + if group: + retval = '\n'.join(item for item in group.splitlines() + if 'NXST' not in item) + return retval + + +def _build_group_expr_str(group_dict, cmd): + group_expr_arr = [] + buckets = None + groupId = None + + if cmd != 'del': + if "group_id" not in group_dict: + msg = _("Must specify one groupId on groupo addition" + " or modification") + raise exceptions.InvalidInput(error_message=msg) + groupId = "group_id=%s" % group_dict.pop('group_id') + + if "buckets" not in group_dict: + msg = _("Must specify one or more buckets on group addition" + " or modification") + raise exceptions.InvalidInput(error_message=msg) + buckets = "%s" % group_dict.pop('buckets') + + if groupId: + group_expr_arr.append(groupId) + + for key, value in six.iteritems(group_dict): + group_expr_arr.append("%s=%s" % (key, value)) + + if buckets: + group_expr_arr.append(buckets) + + return ','.join(group_expr_arr) diff --git a/networking_sfc/services/sfc/driver_manager.py b/networking_sfc/services/sfc/driver_manager.py new file mode 100644 index 0000000..c8a212a --- /dev/null +++ b/networking_sfc/services/sfc/driver_manager.py @@ -0,0 +1,118 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_config import cfg +from oslo_log import log +import stevedore + +from neutron.i18n import _LE +from neutron.i18n import _LI + +from networking_sfc.services.sfc.common import exceptions as sfc_exc + + +LOG = log.getLogger(__name__) +cfg.CONF.import_opt('drivers', + 'networking_sfc.services.sfc.common.config', + group='sfc') + + +class SfcDriverManager(stevedore.named.NamedExtensionManager): + """Implementation of SFC drivers.""" + + def __init__(self): + # Registered sfc drivers, keyed by name. + self.drivers = {} + # Ordered list of sfc drivers, defining + # the order in which the drivers are called. + self.ordered_drivers = [] + LOG.info(_LI("Configured SFC drivers: %s"), + cfg.CONF.sfc.drivers) + super(SfcDriverManager, self).__init__('networking_sfc.sfc.drivers', + cfg.CONF.sfc.drivers, + invoke_on_load=True, + name_order=True) + LOG.info(_LI("Loaded SFC drivers: %s"), self.names()) + self._register_drivers() + + def _register_drivers(self): + """Register all SFC drivers. + + This method should only be called once in the SfcDriverManager + constructor. + """ + for ext in self: + self.drivers[ext.name] = ext + self.ordered_drivers.append(ext) + LOG.info(_LI("Registered SFC drivers: %s"), + [driver.name for driver in self.ordered_drivers]) + + def initialize(self): + # ServiceChain bulk operations requires each driver to support them + self.native_bulk_support = True + for driver in self.ordered_drivers: + LOG.info(_LI("Initializing SFC driver '%s'"), driver.name) + driver.obj.initialize() + self.native_bulk_support &= getattr(driver.obj, + 'native_bulk_support', True) + + def _call_drivers(self, method_name, context): + """Helper method for calling a method across all SFC drivers. + + :param method_name: name of the method to call + :param context: context parameter to pass to each method call + :param continue_on_failure: whether or not to continue to call + all SFC drivers once one has raised an exception + if any SFC driver call fails. + """ + for driver in self.ordered_drivers: + try: + getattr(driver.obj, method_name)(context) + except Exception as e: + # This is an internal failure. + LOG.exception(e) + LOG.error( + _LE("SFC driver '%(name)s' failed in %(method)s"), + {'name': driver.name, 'method': method_name} + ) + raise sfc_exc.SfcDriverError( + method=method_name + ) + + def create_port_chain(self, context): + self._call_drivers("create_port_chain", context) + + def update_port_chain(self, context): + self._call_drivers("update_port_chain", context) + + def delete_port_chain(self, context): + self._call_drivers("delete_port_chain", context) + + def create_port_pair(self, context): + self._call_drivers("create_port_pair", context) + + def update_port_pair(self, context): + self._call_drivers("update_port_pair", context) + + def delete_port_pair(self, context): + self._call_drivers("delete_port_pair", context) + + def create_port_pair_group(self, context): + self._call_drivers("create_port_pair_group", context) + + def update_port_pair_group(self, context): + self._call_drivers("update_port_pair_group", context) + + def delete_port_pair_group(self, context): + self._call_drivers("delete_port_pair_group", context) diff --git a/networking_sfc/services/sfc/drivers/__init__.py b/networking_sfc/services/sfc/drivers/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/__init__.py diff --git a/networking_sfc/services/sfc/drivers/base.py b/networking_sfc/services/sfc/drivers/base.py new file mode 100644 index 0000000..0816789 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/base.py @@ -0,0 +1,57 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc +import six + + +@six.add_metaclass(abc.ABCMeta) +class SfcDriverBase(object): + """SFC Driver Base Class.""" + + @abc.abstractmethod + def create_port_chain(self, context): + pass + + @abc.abstractmethod + def delete_port_chain(self, context): + pass + + @abc.abstractmethod + def update_port_chain(self, context): + pass + + @abc.abstractmethod + def create_port_pair(self, context): + pass + + @abc.abstractmethod + def delete_port_pair(self, context): + pass + + @abc.abstractmethod + def update_port_pair(self, context): + pass + + @abc.abstractmethod + def create_port_pair_group(self, context): + pass + + @abc.abstractmethod + def delete_port_pair_group(self, context): + pass + + @abc.abstractmethod + def update_port_pair_group(self, context): + pass diff --git a/networking_sfc/services/sfc/drivers/dummy/__init__.py b/networking_sfc/services/sfc/drivers/dummy/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/dummy/__init__.py diff --git a/networking_sfc/services/sfc/drivers/dummy/dummy.py b/networking_sfc/services/sfc/drivers/dummy/dummy.py new file mode 100644 index 0000000..1ddd7d0 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/dummy/dummy.py @@ -0,0 +1,59 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import helpers as log_helpers + +from networking_sfc.services.sfc.drivers import base as sfc_driver + + +class DummyDriver(sfc_driver.SfcDriverBase): + """SFC Driver Dummy Class.""" + def initialize(self): + pass + + @log_helpers.log_method_call + def create_port_chain(self, context): + pass + + @log_helpers.log_method_call + def delete_port_chain(self, context): + pass + + @log_helpers.log_method_call + def update_port_chain(self, context): + pass + + @log_helpers.log_method_call + def create_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def delete_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def update_port_pair_group(self, context): + pass + + @log_helpers.log_method_call + def create_port_pair(self, context): + pass + + @log_helpers.log_method_call + def delete_port_pair(self, context): + pass + + @log_helpers.log_method_call + def update_port_pair(self, context): + pass diff --git a/networking_sfc/services/sfc/drivers/ovs/__init__.py b/networking_sfc/services/sfc/drivers/ovs/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/__init__.py diff --git a/networking_sfc/services/sfc/drivers/ovs/constants.py b/networking_sfc/services/sfc/drivers/ovs/constants.py new file mode 100644 index 0000000..30e2c37 --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/constants.py @@ -0,0 +1,57 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.common import constants as n_const + + +INGRESS_DIR = 'ingress' +EGRESS_DIR = 'egress' + +STATUS_BUILDING = 'building' +STATUS_ACTIVE = 'active' +STATUS_ERROR = 'error' +STATUS_DELETING = 'deleting' + + +PORTFLOW_OPT_ADD = 'add-flows' +PROTFLOW_OPT_DELETE = 'delete-flows' +PROTFLOW_OPT_UPDATE = 'update-flows' + + +SRC_NODE = 'src_node' +DST_NODE = 'dst_node' +SF_NODE = 'sf_node' + +RES_TYPE_GROUP = 'group' +RES_TYPE_NSP = 'nsp' + +INSERTION_TYPE_L2 = 'l2' +INSERTION_TYPE_L3 = 'l3' +INSERTION_TYPE_BITW = 'bitw' +INSERTION_TYPE_TAP = 'tap' + +MAX_HASH = 16 + +INSERTION_TYPE_DICT = { + n_const.DEVICE_OWNER_ROUTER_HA_INTF: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_INTF: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_GW: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_FLOATINGIP: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_DHCP: INSERTION_TYPE_TAP, + n_const.DEVICE_OWNER_DVR_INTERFACE: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_AGENT_GW: INSERTION_TYPE_L3, + n_const.DEVICE_OWNER_ROUTER_SNAT: INSERTION_TYPE_TAP, + n_const.DEVICE_OWNER_LOADBALANCER: INSERTION_TYPE_TAP, + 'compute': INSERTION_TYPE_L2 +} diff --git a/networking_sfc/services/sfc/drivers/ovs/db.py b/networking_sfc/services/sfc/drivers/ovs/db.py new file mode 100644 index 0000000..8d3c87d --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/db.py @@ -0,0 +1,426 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import six + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_utils import uuidutils + +from neutron.common import exceptions as n_exc +from neutron import context as n_context +from neutron.db import common_db_mixin +from neutron.db import model_base +from neutron.db import models_v2 + +import sqlalchemy as sa +from sqlalchemy import orm +from sqlalchemy.orm import exc +from sqlalchemy import sql + + +LOG = logging.getLogger(__name__) + + +class PortPairDetailNotFound(n_exc.NotFound): + message = _("Portchain port brief %(port_id)s could not be found") + + +class NodeNotFound(n_exc.NotFound): + message = _("Portchain node %(node_id)s could not be found") + + +# name changed to ChainPathId +class UuidIntidAssoc(model_base.BASEV2, models_v2.HasId): + __tablename__ = 'sfc_uuid_intid_associations' + uuid = sa.Column(sa.String(36), primary_key=True) + intid = sa.Column(sa.Integer, unique=True, nullable=False) + type_ = sa.Column(sa.String(32), nullable=False) + + def __init__(self, uuid, intid, type_): + self.uuid = uuid + self.intid = intid + self.type_ = type_ + + +def singleton(class_): + instances = {} + + def getinstance(*args, **kwargs): + if class_ not in instances: + instances[class_] = class_(*args, **kwargs) + return instances[class_] + return getinstance + + +@singleton +class IDAllocation(object): + def __init__(self, context): + # Get the inital range from conf file. + conf_obj = {'group': [1, 255], 'portchain': [256, 65536]} + self.conf_obj = conf_obj + self.session = context.session + + @log_helpers.log_method_call + def assign_intid(self, type_, uuid): + query = self.session.query(UuidIntidAssoc).filter_by( + type_=type_).order_by(UuidIntidAssoc.intid) + + allocated_int_ids = {obj.intid for obj in query.all()} + + # Find the first one from the available range that + # is not in allocated_int_ids + start, end = self.conf_obj[type_][0], self.conf_obj[type_][1]+1 + for init_id in six.moves.range(start, end): + if init_id not in allocated_int_ids: + with self.session.begin(subtransactions=True): + uuid_intid = UuidIntidAssoc( + uuid, init_id, type_) + self.session.add(uuid_intid) + return init_id + else: + return None + + @log_helpers.log_method_call + def get_intid_by_uuid(self, type_, uuid): + + query_obj = self.session.query(UuidIntidAssoc).filter_by( + type_=type_, uuid=uuid).first() + if query_obj: + return query_obj.intid + else: + return None + + @log_helpers.log_method_call + def release_intid(self, type_, intid): + """Release int id. + + @param: type_: str + @param: intid: int + """ + with self.session.begin(subtransactions=True): + query_obj = self.session.query(UuidIntidAssoc).filter_by( + intid=intid, type_=type_).first() + + if query_obj: + self.session.delete(query_obj) + + +class PathPortAssoc(model_base.BASEV2): + """path port association table. + + It represents the association table which associate path_nodes with + portpair_details. + """ + __tablename__ = 'sfc_path_port_associations' + pathnode_id = sa.Column(sa.String(36), + sa.ForeignKey( + 'sfc_path_nodes.id', ondelete='CASCADE'), + primary_key=True) + portpair_id = sa.Column(sa.String(36), + sa.ForeignKey('sfc_portpair_details.id', + ondelete='CASCADE'), + primary_key=True) + weight = sa.Column(sa.Integer, nullable=False, default=1) + + +class PortPairDetail(model_base.BASEV2, models_v2.HasId, + models_v2.HasTenant): + __tablename__ = 'sfc_portpair_details' + ingress = sa.Column(sa.String(36), nullable=True) + egress = sa.Column(sa.String(36), nullable=True) + host_id = sa.Column(sa.String(255), nullable=False) + mac_address = sa.Column(sa.String(32), nullable=False) + network_type = sa.Column(sa.String(8)) + segment_id = sa.Column(sa.Integer) + local_endpoint = sa.Column(sa.String(64), nullable=False) + path_nodes = orm.relationship(PathPortAssoc, + backref='port_pair_detail', + lazy="joined", + cascade='all,delete') + + +class PathNode(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): + __tablename__ = 'sfc_path_nodes' + nsp = sa.Column(sa.Integer, nullable=False) + nsi = sa.Column(sa.Integer, nullable=False) + node_type = sa.Column(sa.String(32)) + portchain_id = sa.Column( + sa.String(255), + sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE')) + status = sa.Column(sa.String(32)) + portpair_details = orm.relationship(PathPortAssoc, + backref='path_nodes', + lazy="joined", + cascade='all,delete') + next_group_id = sa.Column(sa.Integer) + next_hop = sa.Column(sa.String(512)) + + +class OVSSfcDriverDB(common_db_mixin.CommonDbMixin): + def initialize(self): + self.admin_context = n_context.get_admin_context() + + def _make_pathnode_dict(self, node, fields=None): + res = {'id': node['id'], + 'tenant_id': node['tenant_id'], + 'node_type': node['node_type'], + 'nsp': node['nsp'], + 'nsi': node['nsi'], + 'next_group_id': node['next_group_id'], + 'next_hop': node['next_hop'], + 'portchain_id': node['portchain_id'], + 'status': node['status'], + 'portpair_details': [pair_detail['portpair_id'] + for pair_detail in node['portpair_details'] + ] + } + + return self._fields(res, fields) + + def _make_port_detail_dict(self, port, fields=None): + res = {'id': port['id'], + 'tenant_id': port['tenant_id'], + 'host_id': port['host_id'], + 'ingress': port.get('ingress', None), + 'egress': port.get('egress', None), + 'segment_id': port['segment_id'], + 'local_endpoint': port['local_endpoint'], + 'mac_address': port['mac_address'], + 'network_type': port['network_type'], + 'path_nodes': [{'pathnode_id': node['pathnode_id'], + 'weight': node['weight']} + for node in port['path_nodes']] + } + + return self._fields(res, fields) + + def _make_pathport_assoc_dict(self, assoc, fields=None): + res = {'pathnode_id': assoc['pathnode_id'], + 'portpair_id': assoc['portpair_id'], + 'weight': assoc['weight'], + } + + return self._fields(res, fields) + + def _get_path_node(self, id): + try: + node = self._get_by_id(self.admin_context, PathNode, id) + except exc.NoResultFound: + raise NodeNotFound(node_id=id) + return node + + def _get_port_detail(self, id): + try: + port = self._get_by_id(self.admin_context, PortPairDetail, id) + except exc.NoResultFound: + raise PortPairDetailNotFound(port_id=id) + return port + + def create_port_detail(self, port): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(port, PortPairDetail) + args['id'] = uuidutils.generate_uuid() + port_obj = PortPairDetail(**args) + self.admin_context.session.add(port_obj) + return self._make_port_detail_dict(port_obj) + + def create_path_node(self, node): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(node, PathNode) + args['id'] = uuidutils.generate_uuid() + node_obj = PathNode(**args) + self.admin_context.session.add(node_obj) + return self._make_pathnode_dict(node_obj) + + def create_pathport_assoc(self, assoc): + with self.admin_context.session.begin(subtransactions=True): + args = self._filter_non_model_columns(assoc, PathPortAssoc) + assoc_obj = PathPortAssoc(**args) + self.admin_context.session.add(assoc_obj) + return self._make_pathport_assoc_dict(assoc_obj) + + def delete_pathport_assoc(self, pathnode_id, portdetail_id): + with self.admin_context.session.begin(subtransactions=True): + self.admin_context.session.query(PathPortAssoc).filter_by( + pathnode_id=pathnode_id, + portpair_id=portdetail_id).delete() + + def update_port_detail(self, id, port): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + for key, value in six.iteritems(port): + if key == 'path_nodes': + pns = [] + for pn in value: + pn_id = pn['pathnode_id'] + self._get_path_node(pn_id) + query = self._model_query( + self.admin_context, PathPortAssoc) + pn_association = query.filter_by( + pathnode_id=pn_id, + portpair_id=id + ).first() + if not pn_association: + pn_association = PathPortAssoc( + pathnode_id=pn_id, + portpair_id=id, + weight=pn.get('weight', 1) + ) + pns.append(pn_association) + port_obj[key] = pns + else: + port_obj[key] = value + port_obj.update(port) + return self._make_port_detail_dict(port_obj) + + def update_path_node(self, id, node): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + for key, value in six.iteritems(node): + if key == 'portpair_details': + pds = [] + for pd_id in value: + self._get_port_detail(pd_id) + query = self._model_query( + self.admin_context, PathPortAssoc) + pd_association = query.filter_by( + pathnode_id=id, + portpair_id=pd_id + ).first() + if not pd_association: + pd_association = PathPortAssoc( + pathnode_id=id, + portpair_id=pd_id + ) + pds.append(pd_association) + node_obj[key] = pds + else: + node_obj[key] = value + return self._make_pathnode_dict(node_obj) + + def delete_port_detail(self, id): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + self.admin_context.session.delete(port_obj) + + def delete_path_node(self, id): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + self.admin_context.session.delete(node_obj) + + def get_port_detail(self, id): + with self.admin_context.session.begin(subtransactions=True): + port_obj = self._get_port_detail(id) + return self._make_port_detail_dict(port_obj) + + def get_port_detail_without_exception(self, id): + with self.admin_context.session.begin(subtransactions=True): + try: + port = self._get_by_id( + self.admin_context, PortPairDetail, id) + except exc.NoResultFound: + return None + return self._make_port_detail_dict(port) + + def get_path_node(self, id): + with self.admin_context.session.begin(subtransactions=True): + node_obj = self._get_path_node(id) + return self._make_pathnode_dict(node_obj) + + def get_path_nodes_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_path_nodes_by_filter( + filters, fields, sorts, limit, + marker, page_reverse + ) + all_items = qry.all() + if all_items: + return [self._make_pathnode_dict(item) for item in all_items] + + return None + + def get_path_node_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_path_nodes_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + first = qry.first() + if first: + return self._make_pathnode_dict(first) + + return None + + def _get_path_nodes_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + qry = self.admin_context.session.query(PathNode) + if filters: + for key, value in six.iteritems(filters): + column = getattr(PathNode, key, None) + if column: + if not value: + qry = qry.filter(sql.false()) + else: + qry = qry.filter(column == value) + return qry + + def get_port_details_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_port_details_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + all_items = qry.all() + if all_items: + return [ + self._make_port_detail_dict(item) + for item in all_items + ] + + return None + + def get_port_detail_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + with self.admin_context.session.begin(subtransactions=True): + qry = self._get_port_details_by_filter( + filters, fields, sorts, limit, + marker, page_reverse) + first = qry.first() + if first: + return self._make_port_detail_dict(first) + + return None + + def _get_port_details_by_filter(self, filters=None, fields=None, + sorts=None, limit=None, marker=None, + page_reverse=False): + qry = self.admin_context.session.query(PortPairDetail) + if filters: + for key, value in six.iteritems(filters): + column = getattr(PortPairDetail, key, None) + if column: + if not value: + qry = qry.filter(sql.false()) + else: + qry = qry.filter(column == value) + + return qry diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py new file mode 100644 index 0000000..9dfc40d --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/driver.py @@ -0,0 +1,1076 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +# from eventlet import greenthread + +from neutron.common import constants as nc_const +from neutron.common import rpc as n_rpc + +from neutron import manager + +from neutron.i18n import _LE +from neutron.i18n import _LW + +from neutron.plugins.common import constants as np_const + + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from networking_sfc.extensions import flowclassifier +from networking_sfc.extensions import sfc +from networking_sfc.services.sfc.common import exceptions as exc +from networking_sfc.services.sfc.drivers import base as driver_base +from networking_sfc.services.sfc.drivers.ovs import( + rpc_topics as sfc_topics) +from networking_sfc.services.sfc.drivers.ovs import( + db as ovs_sfc_db) +from networking_sfc.services.sfc.drivers.ovs import( + rpc as ovs_sfc_rpc) +from networking_sfc.services.sfc.drivers.ovs import ( + constants as ovs_const) + + +LOG = logging.getLogger(__name__) + + +class OVSSfcDriver(driver_base.SfcDriverBase, + ovs_sfc_db.OVSSfcDriverDB): + """Sfc Driver Base Class.""" + + def initialize(self): + super(OVSSfcDriver, self).initialize() + self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( + sfc_topics.SFC_AGENT + ) + + self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) + self._setup_rpc() + + def _setup_rpc(self): + # Setup a rpc server + self.topic = sfc_topics.SFC_PLUGIN + self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] + self.conn = n_rpc.create_connection(new=True) + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) + self.conn.consume_in_threads() + + def _get_subnet(self, core_plugin, tenant_id, cidr): + filters = {'tenant_id': [tenant_id]} + subnets = core_plugin.get_subnets(self.admin_context, filters=filters) + cidr_set = netaddr.IPSet([cidr]) + + for subnet in subnets: + subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) + if cidr_set.issubset(subnet_cidr_set): + return subnet + + def _get_fc_dst_subnet_gw_port(self, fc): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet(core_plugin, + fc['tenant_id'], + fc['destination_ip_prefix']) + + return self._get_port_subnet_gw_info(core_plugin, subnet) + + def _get_port_subnet_gw_info_by_port_id(self, id): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet_by_port(core_plugin, id) + return self._get_port_subnet_gw_info(core_plugin, + subnet) + + def _get_port_subnet_gw_info(self, core_plugin, subnet): + filters = { + 'device_owner': + [nc_const.DEVICE_OWNER_ROUTER_INTF] + } + gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) + for port in gw_ports: + for fixed_ip in port['fixed_ips']: + if subnet["id"] == fixed_ip['subnet_id']: + return (port['mac_address'], + subnet['cidr'], + subnet['network_id']) + + raise exc.NoSubnetGateway( + type='subnet gateway', + cidr=subnet['cidr']) + + def _get_subnet_by_port(self, core_plugin, id): + port = core_plugin.get_port(self.admin_context, id) + for ip in port['fixed_ips']: + subnet = core_plugin.get_subnet(self.admin_context, + ip["subnet_id"]) + # currently only support one subnet for a port + break + + return subnet + + @log_helpers.log_method_call + def _get_portgroup_members(self, context, pg_id): + next_group_members = [] + group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) + LOG.debug('group_intid: %s', group_intid) + pg = context._plugin.get_port_pair_group(context._plugin_context, + pg_id) + for pp_id in pg['port_pairs']: + pp = context._plugin.get_port_pair(context._plugin_context, pp_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + if pd: + next_group_members.append( + dict(portpair_id=pd['id'], weight=1)) + return group_intid, next_group_members + + def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): + pp = context._plugin.get_port_pair(context._plugin_context, + port_pair_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + + return pd + + @log_helpers.log_method_call + def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node, dst_node, + last_sf_node=None): + dst_ports = [] + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # lookup the source port + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + src_pd = self.get_port_detail_by_filter(src_pd_filter) + + if not src_pd: + # Create source port detail + src_pd = self._create_port_detail(src_pd_filter) + LOG.debug('create src port detail: %s', src_pd) + + # Create associate relationship + assco_args = {'portpair_id': src_pd['id'], + 'pathnode_id': src_node['id'], + 'weight': 1, + } + sna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc src port with node: %s', sna) + src_node['portpair_details'].append(src_pd['id']) + + if fc.get('logical_destination_port', ''): + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + dst_pd = self.get_port_detail_by_filter(dst_pd_filter) + + if not dst_pd: + # Create dst port detail + dst_pd = self._create_port_detail(dst_pd_filter) + LOG.debug('create dst port detail: %s', dst_pd) + + # Create associate relationship + dst_assco_args = {'portpair_id': dst_pd['id'], + 'pathnode_id': dst_node['id'], + 'weight': 1, + } + dna = self.create_pathport_assoc(dst_assco_args) + LOG.debug('create assoc dst port with node: %s', dna) + dst_node['portpair_details'].append(dst_pd['id']) + + dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) + + if last_sf_node: + if last_sf_node['next_hop']: + next_hops = jsonutils.loads(last_sf_node['next_hop']) + next_hops.extend(dst_ports) + last_sf_node['next_hop'] = jsonutils.dumps(next_hops) + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + return dst_ports + + def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node=None, dst_node=None, + last_sf_node=None): + if not fc_ids: + return + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # delete source port detail + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(src_pd_filter) + if pds: + for pd in pds: + # update src_node portpair_details refence info + if src_node and pd['id'] in src_node[ + 'portpair_details' + ]: + src_node['portpair_details'].remove(pd['id']) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if fc.get('logical_destination_port', ''): + # Create dst port detail + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(dst_pd_filter) + if pds: + for pd in pds: + # update dst_node portpair_details refence info + if dst_node and pd['id'] in dst_node[ + 'portpair_details' + ]: + # update portpair_details of this node + dst_node['portpair_details'].remove(pd['id']) + # update last hop(SF-group) next hop info + if last_sf_node: + next_hop = dict(portpair_id=pd['id'], + weight=1) + next_hops = jsonutils.loads( + last_sf_node['next_hop']) + next_hops.remove(next_hop) + last_sf_node['next_hop'] = jsonutils.dumps( + next_hops) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if last_sf_node: + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + + @log_helpers.log_method_call + def _create_portchain_path(self, context, port_chain): + src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) + path_nodes, dst_ports = [], [] + # Create an assoc object for chain_id and path_id + # context = context._plugin_context + path_id = self.id_pool.assign_intid('portchain', port_chain['id']) + + if not path_id: + LOG.error(_LE('No path_id available for creating port chain path')) + return + + next_group_intid, next_group_members = self._get_portgroup_members( + context, port_chain['port_pair_groups'][0]) + + port_pair_groups = port_chain['port_pair_groups'] + sf_path_length = len(port_pair_groups) + # Create a head node object for port chain + src_args = {'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SRC_NODE, + 'nsp': path_id, + 'nsi': 0xff, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': jsonutils.dumps(next_group_members), + } + src_node = self.create_path_node(src_args) + LOG.debug('create src node: %s', src_node) + path_nodes.append(src_node) + + # Create a destination node object for port chain + dst_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.DST_NODE, + 'nsp': path_id, + 'nsi': 0xff - sf_path_length - 1, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': None, + 'next_hop': None + } + dst_node = self.create_path_node(dst_args) + LOG.debug('create dst node: %s', dst_node) + path_nodes.append(dst_node) + + dst_ports = self._add_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'], + src_node, + dst_node + ) + + for i in range(sf_path_length): + cur_group_members = next_group_members + # next_group for next hop + if i < sf_path_length - 1: + next_group_intid, next_group_members = ( + self._get_portgroup_members( + context, port_pair_groups[i + 1]) + ) + else: + next_group_intid = None + next_group_members = None if not dst_ports else dst_ports + + # Create a node object + node_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SF_NODE, + 'nsp': path_id, + 'nsi': 0xfe - i, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': ( + None if not next_group_members else + jsonutils.dumps(next_group_members) + ) + } + sf_node = self.create_path_node(node_args) + LOG.debug('chain path node: %s', sf_node) + # Create the assocation objects that combine the pathnode_id with + # the ingress of the port_pairs in the current group + # when port_group does not reach tail + for member in cur_group_members: + assco_args = {'portpair_id': member['portpair_id'], + 'pathnode_id': sf_node['id'], + 'weight': member['weight'], } + sfna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc port with node: %s', sfna) + sf_node['portpair_details'].append(member['portpair_id']) + path_nodes.append(sf_node) + + return path_nodes + + def _delete_path_node_port_flowrule(self, node, port, fc_ids): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + flow_rule = self._build_portchain_flowrule_body( + node, + port, + None, + fc_ids) + + self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( + self.admin_context, + flow_rule) + + def _delete_path_node_flowrule(self, node, fc_ids): + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._delete_path_node_port_flowrule( + node, port, fc_ids) + + @log_helpers.log_method_call + def _delete_portchain_path(self, context, portchain_id): + port_chain = context.current + first = self.get_path_node_by_filter( + filters={ + 'portchain_id': portchain_id, + 'nsi': 0xff + } + ) + + # delete flow rules which source port isn't assigned + # in flow classifier + if first: + self._delete_src_node_flowrules( + first, + port_chain['flow_classifiers'] + ) + + pds = self.get_path_nodes_by_filter( + dict(portchain_id=portchain_id)) + if pds: + for pd in pds: + self._delete_path_node_flowrule( + pd, + port_chain['flow_classifiers'] + ) + self.delete_path_node(pd['id']) + + # delete the ports on the traffic classifier + self._remove_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'] + ) + + # Delete the chainpathpair + intid = self.id_pool.get_intid_by_uuid( + 'portchain', portchain_id) + self.id_pool.release_intid('portchain', intid) + + def _update_path_node_next_hops(self, flow_rule): + node_next_hops = [] + if not flow_rule['next_hop']: + return None + next_hops = jsonutils.loads(flow_rule['next_hop']) + if not next_hops: + return None + for member in next_hops: + detail = {} + port_detail = self.get_port_detail_by_filter( + dict(id=member['portpair_id'])) + if not port_detail or not port_detail['host_id']: + continue + detail['local_endpoint'] = port_detail['local_endpoint'] + detail['weight'] = member['weight'] + detail['mac_address'] = port_detail['mac_address'] + detail['ingress'] = port_detail['ingress'] + node_next_hops.append(detail) + + mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( + detail['ingress'] + ) + + detail['gw_mac'] = mac + detail['cidr'] = cidr + detail['net_uuid'] = net_uuid + + flow_rule['next_hops'] = node_next_hops + flow_rule.pop('next_hop') + + return node_next_hops + + def _build_portchain_flowrule_body(self, node, port, + add_fc_ids=None, del_fc_ids=None): + node_info = node.copy() + node_info.pop('tenant_id') + node_info.pop('portpair_details') + + port_info = port.copy() + port_info.pop('tenant_id') + port_info.pop('id') + port_info.pop('path_nodes') + port_info.pop('host_id') + + flow_rule = dict(node_info, **port_info) + # if this port is belong to NSH/MPLS-aware vm, only to + # notify the flow classifier for 1st SF. + flow_rule['add_fcs'] = self._filter_flow_classifiers( + flow_rule, add_fc_ids) + flow_rule['del_fcs'] = self._filter_flow_classifiers( + flow_rule, del_fc_ids) + + self._update_portchain_group_reference_count(flow_rule, + port['host_id']) + + # update next hop info + self._update_path_node_next_hops(flow_rule) + + return flow_rule + + def _filter_flow_classifiers(self, flow_rule, fc_ids): + """Filter flow classifiers. + + @return: list of the flow classifiers + """ + + fc_return = [] + + if not fc_ids: + return fc_return + fcs = self._get_fcs_by_ids(fc_ids) + for fc in fcs: + new_fc = fc.copy() + new_fc.pop('id') + new_fc.pop('name') + new_fc.pop('tenant_id') + new_fc.pop('description') + + if ((flow_rule['node_type'] == ovs_const.SRC_NODE and + flow_rule['egress'] == fc['logical_source_port'] + ) or + (flow_rule['node_type'] == ovs_const.DST_NODE and + flow_rule['ingress'] == fc['logical_destination_port'] + )): + fc_return.append(new_fc) + elif flow_rule['node_type'] == ovs_const.SF_NODE: + fc_return.append(new_fc) + + return fc_return + + def _update_path_node_port_flowrules(self, node, port, + add_fc_ids=None, del_fc_ids=None): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + + flow_rule = self._build_portchain_flowrule_body( + node, + port, + add_fc_ids, + del_fc_ids) + + self.ovs_driver_rpc.ask_agent_to_update_flow_rules( + self.admin_context, + flow_rule) + + def _update_path_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + if node['portpair_details'] is None: + return + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._update_path_node_port_flowrules( + node, port, add_fc_ids, del_fc_ids) + + def _thread_update_path_nodes(self, nodes, + add_fc_ids=None, del_fc_ids=None): + for node in nodes: + self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) + self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) + + def _get_portchain_fcs(self, port_chain): + return self._get_fcs_by_ids(port_chain['flow_classifiers']) + + def _get_fcs_by_ids(self, fc_ids): + flow_classifiers = [] + if not fc_ids: + return flow_classifiers + + # Get the portchain flow classifiers + fc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + flowclassifier.FLOW_CLASSIFIER_EXT) + ) + if not fc_plugin: + LOG.warn(_LW("Not found the flow classifier service plugin")) + return flow_classifiers + + for fc_id in fc_ids: + fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) + flow_classifiers.append(fc) + + return flow_classifiers + + @log_helpers.log_method_call + def create_port_chain(self, context): + port_chain = context.current + path_nodes = self._create_portchain_path(context, port_chain) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes( + path_nodes, + port_chain['flow_classifiers'], + None) + + @log_helpers.log_method_call + def delete_port_chain(self, context): + port_chain = context.current + portchain_id = port_chain['id'] + LOG.debug("to delete portchain path") + self._delete_portchain_path(context, portchain_id) + + def _get_diff_set(self, orig, cur): + orig_set = set(item for item in orig) + cur_set = set(item for item in cur) + + to_del = orig_set.difference(cur_set) + to_add = cur_set.difference(orig_set) + + return to_del, to_add + + @log_helpers.log_method_call + def update_port_chain(self, context): + port_chain = context.current + orig = context.original + + del_fc_ids, add_fc_ids = self._get_diff_set( + orig['flow_classifiers'], + port_chain['flow_classifiers'] + ) + path_nodes = self.get_path_nodes_by_filter( + dict(portchain_id=port_chain['id']) + ) + if not path_nodes: + return + + sort_path_nodes = sorted(path_nodes, + key=lambda x: x['nsi'], + reverse=True) + if del_fc_ids: + self._thread_update_path_nodes(sort_path_nodes, + None, + del_fc_ids) + self._remove_flowclassifier_port_assoc(del_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + if add_fc_ids: + self._add_flowclassifier_port_assoc(add_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes(sort_path_nodes, + add_fc_ids, + None) + + @log_helpers.log_method_call + def create_port_pair_group(self, context): + group = context.current + self.id_pool.assign_intid('group', group['id']) + + @log_helpers.log_method_call + def delete_port_pair_group(self, context): + group = context.current + group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) + if group_intid: + self.id_pool.release_intid('group', group_intid) + + @log_helpers.log_method_call + def update_port_pair_group(self, context): + current = context.current + original = context.original + + if set(current['port_pairs']) == set(original['port_pairs']): + return + + # Update the path_nodes and flows for each port chain that + # contains this port_pair_group + # Note: _get_port_pair_group is temporarily used here. + ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, + current['id']) + port_chains = [assoc.portchain_id for assoc in + ppg_obj.chain_group_associations] + + for chain_id in port_chains: + port_chain = context._plugin.get_port_chain( + context._plugin_context, chain_id) + group_intid = self.id_pool.get_intid_by_uuid('group', + current['id']) + # Get the previous node + prev_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'next_group_id': group_intid}) + if not prev_node: + continue + + before_update_prev_node = prev_node.copy() + # Update the previous node + _, curr_group_members = self._get_portgroup_members(context, + current['id']) + prev_node['next_hop'] = ( + jsonutils.dumps(curr_group_members) + if curr_group_members else None + ) + # update next hop to database + self.update_path_node(prev_node['id'], prev_node) + if prev_node['node_type'] == ovs_const.SRC_NODE: + self._delete_src_node_flowrules( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_src_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + else: + self._delete_path_node_flowrule( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_path_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + + # Update the current node + # to find the current node by using the node's next_group_id + # if this node is the last, next_group_id would be None + curr_pos = port_chain['port_pair_groups'].index(current['id']) + curr_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'nsi': 0xfe - curr_pos}) + if not curr_node: + continue + + # Add the port-pair-details into the current node + for pp_id in ( + set(current['port_pairs']) - set(original['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + + assco_args = {'portpair_id': ppd['id'], + 'pathnode_id': curr_node['id'], + 'weight': 1, } + self.create_pathport_assoc(assco_args) + self._update_path_node_port_flowrules( + curr_node, ppd, port_chain['flow_classifiers']) + + # Delete the port-pair-details from the current node + for pp_id in ( + set(original['port_pairs']) - set(current['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + self._delete_path_node_port_flowrule( + curr_node, ppd, port_chain['flow_classifiers']) + self.delete_pathport_assoc(curr_node['id'], ppd['id']) + + @log_helpers.log_method_call + def _get_portpair_detail_info(self, portpair_id): + """Get port detail. + + @param: portpair_id: uuid + @return: (host_id, local_ip, network_type, segment_id, + service_insert_type): tuple + """ + + core_plugin = manager.NeutronManager.get_plugin() + port_detail = core_plugin.get_port(self.admin_context, portpair_id) + host_id, local_ip, network_type, segment_id, mac_address = ( + (None, ) * 5) + + if port_detail: + host_id = port_detail['binding:host_id'] + network_id = port_detail['network_id'] + mac_address = port_detail['mac_address'] + network_info = core_plugin.get_network( + self.admin_context, network_id) + network_type = network_info['provider:network_type'] + segment_id = network_info['provider:segmentation_id'] + + if ( + host_id and + network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] + ): + driver = core_plugin.type_manager.drivers.get(network_type) + host_endpoint = driver.obj.get_endpoint_by_host(host_id) + local_ip = host_endpoint['ip_address'] + + return host_id, local_ip, network_type, segment_id, mac_address + + @log_helpers.log_method_call + def _create_port_detail(self, port_pair): + # since first node may not assign the ingress port, and last node may + # not assign the egress port. we use one of the + # port as the key to get the SF information. + port = None + if port_pair.get('ingress', None): + port = port_pair['ingress'] + elif port_pair.get('egress', None): + port = port_pair['egress'] + + host_id, local_endpoint, network_type, segment_id, mac_address = ( + self._get_portpair_detail_info(port)) + port_detail = { + 'ingress': port_pair.get('ingress', None), + 'egress': port_pair.get('egress', None), + 'tenant_id': port_pair['tenant_id'], + 'host_id': host_id, + 'segment_id': segment_id, + 'network_type': network_type, + 'local_endpoint': local_endpoint, + 'mac_address': mac_address + } + r = self.create_port_detail(port_detail) + LOG.debug('create port detail: %s', r) + return r + + @log_helpers.log_method_call + def create_port_pair(self, context): + port_pair = context.current + self._create_port_detail(port_pair) + + @log_helpers.log_method_call + def delete_port_pair(self, context): + port_pair = context.current + + pd_filter = dict(ingress=port_pair.get('ingress', None), + egress=port_pair.get('egress', None), + tenant_id=port_pair['tenant_id'] + ) + pds = self.get_port_details_by_filter(pd_filter) + if pds: + for pd in pds: + self.delete_port_detail(pd['id']) + + @log_helpers.log_method_call + def update_port_pair(self, context): + pass + + def get_flowrules_by_host_portid(self, context, host, port_id): + port_chain_flowrules = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return port_chain_flowrules + try: + port_detail_list = [] + # one port only may be in egress/ingress port once time. + ingress_port = self.get_port_detail_by_filter( + dict(ingress=port_id)) + egress_port = self.get_port_detail_by_filter( + dict(egress=port_id)) + if not ingress_port and not egress_port: + return None + # SF migrate to other host + if ingress_port: + port_detail_list.append(ingress_port) + if ingress_port['host_id'] != host: + ingress_port.update(dict(host_id=host)) + + if egress_port: + port_detail_list.append(egress_port) + if egress_port['host_id'] != host: + egress_port.update(dict(host_id=host)) + + # this is a SF if there are both egress and engress. + for i, ports in enumerate(port_detail_list): + nodes_assocs = ports['path_nodes'] + for assoc in nodes_assocs: + # update current path flow rule + node = self.get_path_node(assoc['pathnode_id']) + port_chain = sfc_plugin.get_port_chain( + context, + node['portchain_id']) + flow_rule = self._build_portchain_flowrule_body( + node, + ports, + add_fc_ids=port_chain['flow_classifiers'] + ) + port_chain_flowrules.append(flow_rule) + + # update the pre-path node flow rule + # if node['node_type'] != ovs_const.SRC_NODE: + # node_filter = dict(nsp=node['nsp'], + # nsi=node['nsi'] + 1 + # ) + # pre_node_list = self.get_path_nodes_by_filter( + # node_filter) + # if not pre_node_list: + # continue + # for pre_node in pre_node_list: + # self._update_path_node_flowrules( + # pre_node, + # add_fc_ids=port_chain['flow_classifiers']) + + return port_chain_flowrules + + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flowrules_by_host_portid failed")) + + def get_flow_classifier_by_portchain_id(self, context, portchain_id): + try: + flow_classifier_list = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + + port_chain = sfc_plugin.get_port_chain( + context, + portchain_id) + flow_classifier_list = self._get_portchain_fcs(port_chain) + return flow_classifier_list + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) + + def update_flowrule_status(self, context, id, status): + try: + flowrule_status = dict(status=status) + self.update_path_node(id, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("update_flowrule_status failed")) + + def _update_src_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + add_fc_ids, + del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, + filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + flow_rule['host'] = agent['host'] + self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( + self.admin_context, + flow_rule) + + def _delete_src_node_flowrules(self, node, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + None, del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, filters={ + 'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + self._update_portchain_group_reference_count(flow_rule, + agent['host']) + self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( + self.admin_context, + flow_rule) + + def get_all_src_node_flowrules(self, context): + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + try: + frs = [] + port_chains = sfc_plugin.get_port_chains(context) + + for port_chain in port_chains: + # get the first node of this chain + node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) + portchain_node = self.get_path_node_by_filter(node_filters) + if not portchain_node: + continue + flow_rule = self._get_portchain_src_node_flowrule( + portchain_node, + port_chain['flow_classifiers'] + ) + if not flow_rule: + continue + frs.append(flow_rule) + return frs + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_all_src_node_flowrules failed")) + + def _get_portchain_src_node_flowrule(self, node, + add_fc_ids=None, del_fc_ids=None): + try: + add_fc_rt = [] + del_fc_rt = [] + + if add_fc_ids: + for fc in self._get_fcs_by_ids(add_fc_ids): + if not fc.get('logical_source_port', None): + add_fc_rt.append(fc) + + if del_fc_ids: + for fc in self._get_fcs_by_ids(del_fc_ids): + if not fc.get('logical_source_port', None): + del_fc_rt.append(fc) + + if not add_fc_rt and not del_fc_rt: + return None + + return self._build_portchain_flowrule_body_without_port( + node, add_fc_rt, del_fc_rt) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("_get_portchain_src_node_flowrule failed")) + + def _update_portchain_group_reference_count(self, flow_rule, host): + group_refcnt = 0 + flow_rule['host'] = host + + if flow_rule['next_group_id'] is not None: + all_nodes = self.get_path_nodes_by_filter( + filters={'next_group_id': flow_rule['next_group_id'], + 'nsi': 0xff}) + if all_nodes is not None: + for node in all_nodes: + if not node['portpair_details']: + group_refcnt += 1 + + port_details = self.get_port_details_by_filter( + dict(host_id=flow_rule['host'])) + if port_details is not None: + for pd in port_details: + for path in pd['path_nodes']: + path_node = self.get_path_node(path['pathnode_id']) + if ( + path_node['next_group_id'] == + flow_rule['next_group_id'] + ): + group_refcnt += 1 + + flow_rule['group_refcnt'] = group_refcnt + + return group_refcnt + + def _build_portchain_flowrule_body_without_port(self, + node, + add_fcs=None, + del_fcs=None): + flow_rule = node.copy() + flow_rule.pop('tenant_id') + flow_rule.pop('portpair_details') + + # according to the first sf node get network information + if not node['next_hop']: + return None + + next_hops = jsonutils.loads(node['next_hop']) + if not next_hops: + return None + + port_detail = self.get_port_detail_by_filter( + dict(id=next_hops[0]['portpair_id'])) + if not port_detail: + return None + + flow_rule['ingress'] = None + flow_rule['egress'] = None + flow_rule['network_type'] = port_detail['network_type'] + flow_rule['segment_id'] = port_detail['segment_id'] + + flow_rule['add_fcs'] = add_fcs + flow_rule['del_fcs'] = del_fcs + + # update next hop info + self._update_path_node_next_hops(flow_rule) + return flow_rule diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc.py b/networking_sfc/services/sfc/drivers/ovs/rpc.py new file mode 100644 index 0000000..a5ac0bc --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/rpc.py @@ -0,0 +1,112 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics +from neutron.common import rpc as n_rpc +from neutron.common import topics +from neutron.i18n import _LI + +from oslo_log import log as logging +import oslo_messaging + +LOG = logging.getLogger(__name__) + + +class SfcRpcCallback(object): + """Sfc RPC server.""" + + def __init__(self, driver): + self.target = oslo_messaging.Target(version='1.0') + self.driver = driver + + def get_flowrules_by_host_portid(self, context, **kwargs): + host = kwargs.get('host') + port_id = kwargs.get('port_id') + LOG.debug('from port-chain service plugin') + pcfrs = self.driver.get_flowrules_by_host_portid( + context, host, port_id) + LOG.debug('host: %s, port_id: %s', host, port_id) + return pcfrs + + def get_flow_classifier_by_portchain_id(self, context, **kwargs): + portchain_id = kwargs.get('portchain_id') + pcfcs = self.driver.get_flow_classifier_by_portchain_id( + context, + portchain_id) + LOG.debug('portchain id: %s', portchain_id) + return pcfcs + + def get_all_src_node_flowrules(self, context, **kwargs): + host = kwargs.get('host') + pcfcs = self.driver.get_all_src_node_flowrules( + context) + LOG.debug('portchain get_src_node_flowrules, host: %s', host) + return pcfcs + + def update_flowrules_status(self, context, **kwargs): + flowrules_status = kwargs.get('flowrules_status') + LOG.info(_LI('update_flowrules_status: %s'), flowrules_status) + for flowrule_dict in flowrules_status: + self.driver.update_flowrule_status( + context, flowrule_dict['id'], flowrule_dict['status']) + + +class SfcAgentRpcClient(object): + """RPC client for ovs sfc agent.""" + + def __init__(self, topic=sfc_topics.SFC_AGENT): + self.topic = topic + target = oslo_messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) + + def ask_agent_to_update_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to update flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.UPDATE), + server=host) + cctxt.cast(context, 'update_flow_rules', flowrule_entries=flows) + + def ask_agent_to_delete_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to delete flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.DELETE), + server=host) + cctxt.cast(context, 'delete_flow_rules', flowrule_entries=flows) + + def ask_agent_to_update_src_node_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to update src node flows ') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.UPDATE), + server=host) + cctxt.cast(context, 'update_src_node_flow_rules', + flowrule_entries=flows) + + def ask_agent_to_delete_src_node_flow_rules(self, context, flows): + LOG.debug('Ask agent on the specific host to delete src node flows') + LOG.debug('flows: %s', flows) + host = flows.get('host') + cctxt = self.client.prepare( + topic=topics.get_topic_name( + self.topic, sfc_topics.PORTFLOW, topics.DELETE), + server=host) + cctxt.cast(context, 'delete_src_node_flow_rules', + flowrule_entries=flows) diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py new file mode 100644 index 0000000..a35ff4f --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py @@ -0,0 +1,21 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +AGENT = 'q-agent-notifier' + +SFC_PLUGIN = 'q-sfc-plugin' +SFC_AGENT = 'q-sfc-agent' +SFC_FLOW = 'q-sfc-flow' + +PORTFLOW = 'portflowrule' diff --git a/networking_sfc/services/sfc/plugin.py b/networking_sfc/services/sfc/plugin.py new file mode 100644 index 0000000..f41c8e1 --- /dev/null +++ b/networking_sfc/services/sfc/plugin.py @@ -0,0 +1,200 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_utils import excutils + +from neutron.i18n import _LE + +from networking_sfc.db import sfc_db +from networking_sfc.extensions import sfc as sfc_ext +from networking_sfc.services.sfc.common import context as sfc_ctx +from networking_sfc.services.sfc.common import exceptions as sfc_exc +from networking_sfc.services.sfc import driver_manager as sfc_driver + + +LOG = logging.getLogger(__name__) + + +class SfcPlugin(sfc_db.SfcDbPlugin): + """SFC plugin implementation.""" + + supported_extension_aliases = [sfc_ext.SFC_EXT] + path_prefix = sfc_ext.SFC_PREFIX + + def __init__(self): + self.driver_manager = sfc_driver.SfcDriverManager() + super(SfcPlugin, self).__init__() + self.driver_manager.initialize() + + @log_helpers.log_method_call + def create_port_chain(self, context, port_chain): + port_chain_db = super(SfcPlugin, self).create_port_chain( + context, port_chain) + portchain_db_context = sfc_ctx.PortChainContext( + self, context, port_chain_db) + try: + self.driver_manager.create_port_chain(portchain_db_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Create port chain failed, " + "deleting port_chain '%s'"), + port_chain_db['id']) + self.delete_port_chain(context, port_chain_db['id']) + + return port_chain_db + + @log_helpers.log_method_call + def update_port_chain(self, context, portchain_id, port_chain): + original_portchain = self.get_port_chain(context, portchain_id) + updated_portchain = super(SfcPlugin, self).update_port_chain( + context, portchain_id, port_chain) + portchain_db_context = sfc_ctx.PortChainContext( + self, context, updated_portchain, + original_portchain=original_portchain) + + try: + self.driver_manager.update_port_chain(portchain_db_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Update port chain failed, port_chain '%s'"), + updated_portchain['id']) + + # TODO(qijing): should we rollback the database update here? + return updated_portchain + + @log_helpers.log_method_call + def delete_port_chain(self, context, portchain_id): + pc = self.get_port_chain(context, portchain_id) + pc_context = sfc_ctx.PortChainContext(self, context, pc) + try: + self.driver_manager.delete_port_chain(pc_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Delete port chain failed, portchain '%s'"), + portchain_id) + # TODO(qijing): unsync in case deleted in driver but fail in database + super(SfcPlugin, self).delete_port_chain(context, portchain_id) + + @log_helpers.log_method_call + def create_port_pair(self, context, port_pair): + portpair_db = super(SfcPlugin, self).create_port_pair( + context, port_pair) + portpair_context = sfc_ctx.PortPairContext( + self, context, portpair_db) + try: + self.driver_manager.create_port_pair(portpair_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Create port pair failed, " + "deleting port_pair '%s'"), + portpair_db['id']) + self.delete_port_pair(context, portpair_db['id']) + + return portpair_db + + @log_helpers.log_method_call + def update_port_pair(self, context, portpair_id, port_pair): + original_portpair = self.get_port_pair(context, portpair_id) + updated_portpair = super(SfcPlugin, self).update_port_pair( + context, portpair_id, port_pair) + portpair_context = sfc_ctx.PortPairContext( + self, context, updated_portpair, + original_portpair=original_portpair) + try: + self.driver_manager.update_port_pair(portpair_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Update port pair failed, port_pair '%s'"), + updated_portpair['id']) + + return updated_portpair + + @log_helpers.log_method_call + def delete_port_pair(self, context, portpair_id): + portpair = self.get_port_pair(context, portpair_id) + portpair_context = sfc_ctx.PortPairContext( + self, context, portpair) + try: + self.driver_manager.delete_port_pair(portpair_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Delete port pair failed, port_pair '%s'"), + portpair_id) + + super(SfcPlugin, self).delete_port_pair(context, portpair_id) + + @log_helpers.log_method_call + def create_port_pair_group(self, context, port_pair_group): + portpairgroup_db = super(SfcPlugin, self).create_port_pair_group( + context, port_pair_group) + portpairgroup_context = sfc_ctx.PortPairGroupContext( + self, context, portpairgroup_db) + try: + self.driver_manager.create_port_pair_group(portpairgroup_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Create port pair group failed, " + "deleting port_pair_group '%s'"), + portpairgroup_db['id']) + self.delete_port_pair_group(context, portpairgroup_db['id']) + + return portpairgroup_db + + @log_helpers.log_method_call + def update_port_pair_group( + self, context, portpairgroup_id, port_pair_group + ): + original_portpairgroup = self.get_port_pair_group( + context, portpairgroup_id) + updated_portpairgroup = super(SfcPlugin, self).update_port_pair_group( + context, portpairgroup_id, port_pair_group) + portpairgroup_context = sfc_ctx.PortPairGroupContext( + self, context, updated_portpairgroup, + original_portpairgroup=original_portpairgroup) + try: + self.driver_manager.update_port_pair_group(portpairgroup_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Update port pair group failed, " + "port_pair_group '%s'"), + updated_portpairgroup['id']) + + return updated_portpairgroup + + @log_helpers.log_method_call + def delete_port_pair_group(self, context, portpairgroup_id): + portpairgroup = self.get_port_pair_group(context, portpairgroup_id) + portpairgroup_context = sfc_ctx.PortPairGroupContext( + self, context, portpairgroup) + try: + self.driver_manager.delete_port_pair_group(portpairgroup_context) + except sfc_exc.SfcDriverError as e: + LOG.exception(e) + with excutils.save_and_reraise_exception(): + LOG.error(_LE("Delete port pair group failed, " + "port_pair_group '%s'"), + portpairgroup_id) + + super(SfcPlugin, self).delete_port_pair_group(context, + portpairgroup_id) |