diff options
Diffstat (limited to 'networking_sfc/services/sfc')
23 files changed, 0 insertions, 3481 deletions
diff --git a/networking_sfc/services/sfc/__init__.py b/networking_sfc/services/sfc/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/agent/__init__.py b/networking_sfc/services/sfc/agent/__init__.py deleted file mode 100644 index 626812b..0000000 --- a/networking_sfc/services/sfc/agent/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from neutron.common import eventlet_utils -eventlet_utils.monkey_patch() diff --git a/networking_sfc/services/sfc/agent/agent.py b/networking_sfc/services/sfc/agent/agent.py deleted file mode 100644 index 2537f9b..0000000 --- a/networking_sfc/services/sfc/agent/agent.py +++ /dev/null @@ -1,891 +0,0 @@ -# Copyright 2015 Huawei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import signal -import six -import sys - -from neutron.agent.common import config -from neutron.agent.linux import ip_lib - -from networking_sfc.services.sfc.agent import br_int -from networking_sfc.services.sfc.agent import br_phys -from networking_sfc.services.sfc.agent import br_tun -from networking_sfc.services.sfc.common import ovs_ext_lib -from networking_sfc.services.sfc.drivers.ovs import constants -from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics - -from neutron.agent import rpc as agent_rpc -from neutron.common import config as common_config -from neutron.common import constants as n_const -from neutron.common import exceptions -from neutron.common import rpc as n_rpc -from neutron.common import topics -from neutron.common import utils as q_utils -from neutron.i18n import _LE -from neutron.i18n import _LI -from neutron.i18n import _LW -from neutron.plugins.ml2.drivers.openvswitch.agent.common import ( - constants as ovs_const) -from neutron.plugins.ml2.drivers.openvswitch.agent import ovs_neutron_agent - -from oslo_config import cfg -from oslo_log import log as logging -import oslo_messaging - - -LOG = logging.getLogger(__name__) - -agent_opts = [ - cfg.StrOpt('sfc_encap_mode', default='mpls', - help=_("The encapsulation mode of sfc.")), -] - -cfg.CONF.register_opts(agent_opts, "AGENT") - -# This table is used to process the traffic across differet subnet scenario. -# Flow 1: pri=1, ip,dl_dst=nexthop_mac,nw_src=nexthop_subnet. actions= -# push_mpls:0x8847,set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port -# or resubmit to table(INGRESS_TABLE) -# Flow 2: pri=0, ip,dl_dst=nexthop_mac,, action=push_mpls:0x8847, -# set_mpls_label,set_mpls_ttl,push_vlan,output:(patch port or resubmit to -# table(INGRESS_TABLE) -ACROSS_SUBNET_TABLE = 5 - -# The table has multiple flows that steer traffic for the different chains -# to the ingress port of different service functions hosted on this Compute -# node. -INGRESS_TABLE = 10 - -# port chain default flow rule priority -PC_DEF_PRI = 20 -PC_INGRESS_PRI = 30 - - -class FeatureSupportError(exceptions.NeutronException): - message = _("Current sfc agent don't support %(feature)s ") - - -class SfcPluginApi(object): - def __init__(self, topic, host): - self.host = host - self.target = oslo_messaging.Target(topic=topic, version='1.0') - self.client = n_rpc.get_client(self.target) - - def update_flowrules_status(self, context, flowrules_status): - cctxt = self.client.prepare() - return cctxt.call( - context, 'update_flowrules_status', - flowrules_status=flowrules_status) - - def get_flowrules_by_host_portid(self, context, port_id): - cctxt = self.client.prepare() - return cctxt.call( - context, 'get_flowrules_by_host_portid', - host=self.host, port_id=port_id) - - def get_all_src_node_flowrules(self, context): - cctxt = self.client.prepare() - return cctxt.call( - context, 'get_all_src_node_flowrules', - host=self.host) - - -class OVSSfcAgent(ovs_neutron_agent.OVSNeutronAgent): - # history - # 1.0 Initial version - """This class will support MPLS frame - - Ethernet + MPLS - IPv4 Packet: - +-------------------------------+---------------+--------------------+ - |Outer Ethernet, ET=0x8847 | MPLS head, | original IP Packet | - +-------------------------------+---------------+--------------------+ - """ - - target = oslo_messaging.Target(version='1.0') - - def __init__(self, bridge_classes, integ_br, tun_br, local_ip, - bridge_mappings, polling_interval, tunnel_types=None, - veth_mtu=None, l2_population=False, - enable_distributed_routing=False, - minimize_polling=False, - ovsdb_monitor_respawn_interval=( - ovs_const.DEFAULT_OVSDBMON_RESPAWN), - arp_responder=False, - prevent_arp_spoofing=True, - use_veth_interconnection=False, - quitting_rpc_timeout=None): - - """to get network info from ovs agent.""" - super(OVSSfcAgent, self).__init__( - bridge_classes, integ_br, tun_br, - local_ip, - bridge_mappings, polling_interval, tunnel_types, - veth_mtu, l2_population, - enable_distributed_routing, - minimize_polling, - ovsdb_monitor_respawn_interval, - arp_responder, - prevent_arp_spoofing, - use_veth_interconnection, - quitting_rpc_timeout) - - self.overlay_encap_mode = cfg.CONF.AGENT.sfc_encap_mode - self._sfc_setup_rpc() - - if self.overlay_encap_mode == 'eth_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'vxlan_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'mpls': - self._clear_sfc_flow_on_int_br() - self._setup_src_node_flow_rules_with_mpls() - - def _sfc_setup_rpc(self): - self.sfc_plugin_rpc = SfcPluginApi( - sfc_topics.SFC_PLUGIN, cfg.CONF.host) - - self.topic = sfc_topics.SFC_AGENT - self.endpoints = [self] - consumers = [ - [sfc_topics.PORTFLOW, topics.UPDATE], - [sfc_topics.PORTFLOW, topics.DELETE] - ] - - # subscribe sfc plugin message - self.connection = agent_rpc.create_consumers( - self.endpoints, - self.topic, - consumers) - - def _parse_flow_classifier(self, flow_classifier): - dl_type, nw_proto, source_port_masks, destination_port_masks = ( - (None, ) * 4) - - if ( - not flow_classifier['source_port_range_min'] and - not flow_classifier['source_port_range_max'] - ): - # wildcard - source_port_masks = ['0/0x0'] - elif not flow_classifier['source_port_range_min']: - source_port_masks = ovs_ext_lib.get_port_mask( - 1, - flow_classifier['source_port_range_max']) - elif not flow_classifier['source_port_range_max']: - source_port_masks = ovs_ext_lib.get_port_mask( - flow_classifier['source_port_range_min'], - 65535) - else: - source_port_masks = ovs_ext_lib.get_port_mask( - flow_classifier['source_port_range_min'], - flow_classifier['source_port_range_max']) - - if ( - not flow_classifier['destination_port_range_min'] and - not flow_classifier['destination_port_range_max'] - ): - # wildcard - destination_port_masks = ['0/0x0'] - elif not flow_classifier['destination_port_range_min']: - destination_port_masks = ovs_ext_lib.get_port_mask( - 1, - flow_classifier['destination_port_range_max']) - elif not flow_classifier['destination_port_range_max']: - destination_port_masks = ovs_ext_lib.get_port_mask( - flow_classifier['destination_port_range_min'], - 65535) - else: - destination_port_masks = ovs_ext_lib.get_port_mask( - flow_classifier['destination_port_range_min'], - flow_classifier['destination_port_range_max']) - - if "IPv4" == flow_classifier['ethertype']: - dl_type = 0x0800 - if n_const.PROTO_NAME_TCP == flow_classifier['protocol']: - nw_proto = n_const.PROTO_NUM_TCP - elif n_const.PROTO_NAME_UDP == flow_classifier['protocol']: - nw_proto = n_const.PROTO_NUM_UDP - elif n_const.PROTO_NAME_ICMP == flow_classifier['protocol']: - nw_proto = n_const.PROTO_NUM_ICMP - else: - nw_proto = None - elif "IPv6" == flow_classifier['ethertype']: - LOG.error(_LE("Current portchain agent don't support Ipv6")) - else: - LOG.error(_LE("invalid protocol input")) - return (dl_type, nw_proto, - source_port_masks, destination_port_masks - ) - - def _clear_sfc_flow_on_int_br(self): - self.int_br.delete_group(group_id='all') - self.int_br.delete_flows(table=ACROSS_SUBNET_TABLE) - self.int_br.delete_flows(table=INGRESS_TABLE) - self.int_br.install_goto(dest_table_id=INGRESS_TABLE, - priority=PC_DEF_PRI, - dl_type=0x8847) - self.int_br.install_drop(table_id=INGRESS_TABLE) - - def _get_flow_infos_from_flow_classifier(self, flow_classifier): - flow_infos = [] - nw_src, nw_dst = ((None, ) * 2) - - if "IPv4" != flow_classifier['ethertype']: - LOG.error(_LE("Current portchain agent don't support Ipv6")) - return flow_infos - - # parse and transfer flow info to match field info - dl_type, nw_proto, source_port_masks, destination_port_masks = ( - self._parse_flow_classifier(flow_classifier)) - - if flow_classifier['source_ip_prefix']: - nw_src = flow_classifier['source_ip_prefix'] - else: - nw_src = '0.0.0.0/0.0.0.0' - if flow_classifier['destination_ip_prefix']: - nw_dst = flow_classifier['destination_ip_prefix'] - else: - nw_dst = '0.0.0.0/0.0.0.0' - - if source_port_masks and destination_port_masks: - for destination_port in destination_port_masks: - for source_port in source_port_masks: - if nw_proto is None: - flow_infos.append(dict( - dl_type=dl_type, - nw_src=nw_src, - nw_dst=nw_dst, - tp_src='%s' % source_port, - tp_dst='%s' % destination_port - )) - else: - flow_infos.append(dict( - dl_type=dl_type, - nw_proto=nw_proto, - nw_src=nw_src, - nw_dst=nw_dst, - tp_src='%s' % source_port, - tp_dst='%s' % destination_port - )) - - return flow_infos - - def _get_flow_infos_from_flow_classifier_list(self, flow_classifier_list): - flow_infos = [] - if not flow_classifier_list: - return flow_infos - for flow_classifier in flow_classifier_list: - flow_infos.extend( - self._get_flow_infos_from_flow_classifier(flow_classifier) - ) - - return flow_infos - - def _setup_local_switch_flows_on_int_br( - self, flowrule, flow_classifier_list, - actions, add_flow=True, match_inport=True - ): - inport_match = {} - priority = PC_DEF_PRI - - if match_inport is True: - egress_port = self.int_br.get_vif_port_by_id(flowrule['egress']) - if egress_port: - inport_match = dict(in_port=egress_port.ofport) - priority = PC_INGRESS_PRI - - for flow_info in self._get_flow_infos_from_flow_classifier_list( - flow_classifier_list - ): - match_info = dict(inport_match, **flow_info) - if add_flow: - self.int_br.add_flow( - table=ovs_const.LOCAL_SWITCHING, - priority=priority, - actions=actions, **match_info - ) - else: - self.int_br.delete_flows( - table=ovs_const.LOCAL_SWITCHING, - **match_info - ) - - def _update_destination_ingress_flow_rules(self, flowrule): - for flow_info in self._get_flow_infos_from_flow_classifier_list( - flowrule['del_fcs'] - ): - self.int_br.delete_flows( - table=ovs_const.LOCAL_SWITCHING, - in_port=self.patch_tun_ofport, - **flow_info - ) - for flow_info in self._get_flow_infos_from_flow_classifier_list( - flowrule['add_fcs'] - ): - inport_match = dict(in_port=self.patch_tun_ofport) - match_info = dict(inport_match, **flow_info) - self.int_br.install_normal(table_id=ovs_const.LOCAL_SWITCHING, - priority=PC_INGRESS_PRI, - **match_info) - - def _setup_src_node_flow_rules_with_mpls(self): - flow_rules = self.sfc_plugin_rpc.get_all_src_node_flowrules( - self.context) - if not flow_rules: - return - for fr in flow_rules: - self._setup_egress_flow_rules_with_mpls(fr, False) - # if the traffic is from patch port, it means the destination - # is on the this host. so implement normal forward but not - # match the traffic from the source. - # Next step need to do is check if the traffic is from vRouter - # on the local host, also need to implement same normal process. - self._update_destination_ingress_flow_rules(fr) - - def _setup_egress_flow_rules_with_mpls(self, flowrule, match_inport=True): - network_type = flowrule['network_type'] - group_id = flowrule.get('next_group_id', None) - next_hops = flowrule.get('next_hops', None) - segmentation_id = flowrule['segment_id'] - - if not next_hops: - return - - if network_type not in ovs_const.TUNNEL_NETWORK_TYPES: - LOG.warn(_LW("currently %s network is not supported," - "only support tunnel type" - ), - network_type - ) - return - - # if the group is not none, install the egress rule for this SF - if ( - (flowrule['node_type'] == constants.SRC_NODE or - flowrule['node_type'] == constants.SF_NODE) and group_id - ): - # 1st, install br-int flow rule on table ACROSS_SUBNET_TABLE - # and group table - buckets = [] - for item in next_hops: - if item['net_uuid'] not in self.local_vlan_map: - self.provision_local_vlan(item['net_uuid'], network_type, - None, segmentation_id) - lvm = self.local_vlan_map[item['net_uuid']] - bucket = ( - 'bucket=weight=%d, mod_dl_dst:%s,' - 'resubmit(,%d)' % ( - item['weight'], - item['mac_address'], - ACROSS_SUBNET_TABLE - ) - ) - buckets.append(bucket) - - no_across_subnet_actions_list = [] - across_subnet_actions_list = [] - - push_mpls = ( - "push_mpls:0x8847," - "set_mpls_label:%d," - "set_mpls_ttl:%d," - "mod_vlan_vid:%d," % - ((flowrule['nsp'] << 8) | flowrule['nsi'], - flowrule['nsi'], lvm.vlan)) - - no_across_subnet_actions_list.append(push_mpls) - across_subnet_actions_list.append(push_mpls) - - if item['local_endpoint'] == self.local_ip: - no_across_subnet_actions = ( - "resubmit(,%d)" % INGRESS_TABLE) - across_subnet_actions = ( - "mod_dl_src:%s, resubmit(,%d)" % - (item['gw_mac'], INGRESS_TABLE)) - else: - # same subnet with next hop - no_across_subnet_actions = ("output:%s" % - self.patch_tun_ofport) - across_subnet_actions = ("mod_dl_src:%s, output:%s" % - (item['gw_mac'], - self.patch_tun_ofport)) - no_across_subnet_actions_list.append(no_across_subnet_actions) - across_subnet_actions_list.append(across_subnet_actions) - - self.int_br.add_flow( - table=ACROSS_SUBNET_TABLE, - priority=1, - dl_dst=item['mac_address'], - dl_type=0x0800, - nw_src=item['cidr'], - actions="%s" % - (','.join(no_across_subnet_actions_list))) - # different subnet with next hop - self.int_br.add_flow( - table=ACROSS_SUBNET_TABLE, - priority=0, - dl_dst=item['mac_address'], - actions="%s" % - (','.join(across_subnet_actions_list))) - - buckets = ','.join(buckets) - group_content = self.int_br.dump_group_for_id(group_id) - if group_content.find('group_id=%d' % group_id) == -1: - self.int_br.add_group(group_id=group_id, - type='select', buckets=buckets) - else: - self.int_br.mod_group(group_id=group_id, - type='select', buckets=buckets) - - # 2nd, install br-int flow rule on table 0 for egress traffic - # for egress traffic - enc_actions = ("group:%d" % group_id) - # to uninstall the removed flow classifiers - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['del_fcs'], - None, - add_flow=False, - match_inport=match_inport) - # to install the added flow classifiers - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['add_fcs'], - enc_actions, - add_flow=True, - match_inport=match_inport) - - def _get_network_by_port(self, port_id): - for key, val in six.iteritems(self.network_ports): - if port_id in val: - return key - - return None - - def _setup_ingress_flow_rules_with_mpls(self, flowrule): - network_id = self._get_network_by_port(flowrule['ingress']) - if network_id: - # install br-int flow rule on table 0 for ingress traffic - lvm = self.local_vlan_map[network_id] - vif_port = lvm.vif_ports[flowrule['ingress']] - match_field = {} - - actions = ("strip_vlan, pop_mpls:0x0800," - "output:%s" % vif_port.ofport) - match_field = dict( - table=INGRESS_TABLE, - priority=1, - dl_dst=vif_port.vif_mac, - dl_vlan=lvm.vlan, - dl_type=0x8847, - mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1), - actions=actions) - - self.int_br.add_flow(**match_field) - - def _setup_last_egress_flow_rules_with_mpls(self, flowrule): - group_id = flowrule.get('next_group_id', None) - - # check whether user assign the destination neutron port. - if ( - constants.SF_NODE == flowrule['node_type'] and - not group_id and - flowrule['egress'] is not None - ): - # to uninstall the new removed flow classifiers - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['del_fcs'], - None, - add_flow=False, - match_inport=True - ) - - # to install the added flow classifiers - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['add_fcs'], - actions='normal', - add_flow=True, - match_inport=True) - - def _get_flow_classifier_dest_port_info(self, - logical_destination_port, - flowrule): - for next_hop in flowrule['next_hops']: - # this flow classifier's destination port should match - # with the nexthop's ingress port id - if logical_destination_port in next_hop.values(): - return next_hop - - return None - - def _update_flow_rules_with_mpls_enc(self, flowrule, flowrule_status): - try: - if flowrule.get('egress', None): - self._setup_egress_flow_rules_with_mpls(flowrule) - self._setup_last_egress_flow_rules_with_mpls(flowrule) - if flowrule.get('ingress', None): - self._setup_ingress_flow_rules_with_mpls(flowrule) - - flowrule_status_temp = {} - flowrule_status_temp['id'] = flowrule['id'] - flowrule_status_temp['status'] = constants.STATUS_ACTIVE - flowrule_status.append(flowrule_status_temp) - except Exception as e: - flowrule_status_temp = {} - flowrule_status_temp['id'] = flowrule['id'] - flowrule_status_temp['status'] = constants.STATUS_ERROR - flowrule_status.append(flowrule_status_temp) - LOG.exception(e) - LOG.error(_LE("_update_flow_rules_with_mpls_enc failed")) - - def _delete_ports_flowrules_by_id(self, ports_id): - flowrule_status = [] - try: - LOG.debug("delete_port_id_flows received, ports_id= %s", ports_id) - count = 0 - if ports_id: - for port_id in ports_id: - flowrule = ( - self.sfc_plugin_rpc.get_flowrules_by_host_portid( - self.context, port_id - ) - ) - if flowrule: - self._treat_delete_flow_rules( - flowrule, flowrule_status) - LOG.debug( - "_delete_ports_flowrules_by_id received, count= %s", count) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("delete_port_id_flows failed")) - if flowrule_status: - self.sfc_plugin_rpc.update_flowrules_status( - self.context, flowrule_status) - - def _delete_flow_rule_with_mpls_enc(self, flowrule, flowrule_status): - try: - LOG.debug("_delete_flow_rule_with_mpls_enc, flowrule = %s", - flowrule) - group_id = flowrule.get('next_group_id', None) - - # delete tunnel table flow rule on br-int(egress match) - if flowrule['egress'] is not None: - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['del_fcs'], - None, - add_flow=False, - match_inport=True - ) - - # delete table INGRESS_TABLE ingress match flow rule - # on br-int(ingress match) - network_id = self._get_network_by_port(flowrule['ingress']) - if network_id: - # third, install br-int flow rule on table INGRESS_TABLE - # for ingress traffic - lvm = self.local_vlan_map[network_id] - vif_port = lvm.vif_ports[flowrule['ingress']] - self.int_br.delete_flows( - table=INGRESS_TABLE, - dl_type=0x8847, - dl_dst=vif_port.vif_mac, - mpls_label=flowrule['nsp'] << 8 | (flowrule['nsi'] + 1) - ) - - # delete group table, need to check again - if group_id and flowrule.get('group_refcnt', None) <= 1: - self.int_br.delete_group(group_id=group_id) - for item in flowrule['next_hops']: - self.int_br.delete_flows( - table=ACROSS_SUBNET_TABLE, - dl_dst=item['mac_address']) - elif (not group_id and - flowrule['egress'] is not None): - # to delete last hop flow rule - for each in flowrule['del_fcs']: - if each.get('logical_destination_port', None): - ldp = self._get_flow_classifier_dest_port_info( - each['logical_destination_port'], - flowrule - ) - if ldp: - self.int_br.delete_flows( - table=ACROSS_SUBNET_TABLE, - dl_dst=ldp['mac_address']) - - except Exception as e: - flowrule_status_temp = {} - flowrule_status_temp['id'] = flowrule['id'] - flowrule_status_temp['status'] = constants.STATUS_ERROR - flowrule_status.append(flowrule_status_temp) - LOG.exception(e) - LOG.error(_LE("_delete_flow_rule_with_mpls_enc failed")) - - def _treat_update_flow_rules(self, flowrule, flowrule_status): - if self.overlay_encap_mode == 'eth_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'vxlan_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'mpls': - self._update_flow_rules_with_mpls_enc(flowrule, flowrule_status) - - def _treat_delete_flow_rules(self, flowrule, flowrule_status): - if self.overlay_encap_mode == 'eth_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'vxlan_nsh': - raise FeatureSupportError(feature=self.overlay_encap_mode) - elif self.overlay_encap_mode == 'mpls': - self._delete_flow_rule_with_mpls_enc( - flowrule, flowrule_status) - - def update_flow_rules(self, context, **kwargs): - try: - flowrule_status = [] - flowrules = kwargs['flowrule_entries'] - LOG.debug("update_flow_rules received, flowrules = %s", - flowrules) - - if flowrules: - self._treat_update_flow_rules(flowrules, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("update_flow_rules failed")) - - if flowrule_status: - self.sfc_plugin_rpc.update_flowrules_status( - self.context, flowrule_status) - - def delete_flow_rules(self, context, **kwargs): - try: - flowrule_status = [] - flowrules = kwargs['flowrule_entries'] - LOG.debug("delete_flow_rules received, flowrules= %s", flowrules) - if flowrules: - self._treat_delete_flow_rules(flowrules, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("delete_flow_rules failed")) - - if flowrule_status: - self.sfc_plugin_rpc.update_flowrules_status( - self.context, flowrule_status) - - def update_src_node_flow_rules(self, context, **kwargs): - flowrule = kwargs['flowrule_entries'] - if self.overlay_encap_mode == 'mpls': - self._setup_egress_flow_rules_with_mpls(flowrule, - match_inport=False) - self._update_destination_ingress_flow_rules(flowrule) - - def _delete_src_node_flow_rules_with_mpls(self, flowrule, - match_inport=False): - LOG.debug("_delete_src_node_flow_rules_with_mpls, flowrule = %s", - flowrule) - group_id = flowrule.get('next_group_id', None) - - # delete br-int table 0 full match flow - self._setup_local_switch_flows_on_int_br( - flowrule, - flowrule['del_fcs'], - None, - add_flow=False, - match_inport=False) - - # delete group table, need to check again - if None != group_id and flowrule.get('group_refcnt', None) <= 1: - self.int_br.delete_group(group_id=group_id) - for item in flowrule['next_hops']: - self.int_br.delete_flows( - table=ACROSS_SUBNET_TABLE, - dl_dst=item['mac_address']) - - def delete_src_node_flow_rules(self, context, **kwargs): - flowrule = kwargs['flowrule_entries'] - if self.overlay_encap_mode == 'mpls': - self._delete_src_node_flow_rules_with_mpls(flowrule, - match_inport=False) - self._update_destination_ingress_flow_rules(flowrule) - - def sfc_treat_devices_added_updated(self, port_id): - resync = False - flowrule_status = [] - try: - LOG.debug("a new device %s is found", port_id) - flows_list = ( - self.sfc_plugin_rpc.get_flowrules_by_host_portid( - self.context, port_id - ) - ) - if flows_list: - for flow in flows_list: - self._treat_update_flow_rules(flow, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("portchain_treat_devices_added_updated failed")) - resync = True - - if flowrule_status: - self.sfc_plugin_rpc.update_flowrules_status( - self.context, flowrule_status) - - return resync - - def sfc_treat_devices_removed(self, port_ids): - resync = False - for port_id in port_ids: - LOG.info(_LI("a device %s is removed"), port_id) - try: - self._delete_ports_flowrules_by_id(port_id) - except Exception as e: - LOG.exception(e) - LOG.error( - _LE("delete port flow rule failed for %(port_id)s"), - {'port_id': port_id} - ) - resync = True - - return resync - - def treat_devices_added_or_updated(self, devices, ovs_restarted): - skipped_devices = [] - need_binding_devices = [] - security_disabled_devices = [] - devices_details_list = ( - self.plugin_rpc.get_devices_details_list_and_failed_devices( - self.context, - devices, - self.agent_id, - self.conf.host - ) - ) - if devices_details_list.get('failed_devices'): - # TODO(rossella_s): handle better the resync in next patches, - # this is just to preserve the current behavior - raise ovs_neutron_agent.DeviceListRetrievalError(devices=devices) - - devices = devices_details_list.get('devices') - vif_by_id = self.int_br.get_vifs_by_ids( - [vif['device'] for vif in devices]) - for details in devices: - device = details['device'] - LOG.debug("Processing port: %s", device) - port = vif_by_id.get(device) - if not port: - # The port disappeared and cannot be processed - LOG.info(_LI("Port %s was not found on the integration bridge " - "and will therefore not be processed"), device) - skipped_devices.append(device) - continue - - if 'port_id' in details: - LOG.info(_LI("Port %(device)s updated. Details: %(details)s"), - {'device': device, 'details': details}) - details['vif_port'] = port - need_binding = self.treat_vif_port(port, details['port_id'], - details['network_id'], - details['network_type'], - details['physical_network'], - details['segmentation_id'], - details['admin_state_up'], - details['fixed_ips'], - details['device_owner'], - ovs_restarted) - if need_binding: - need_binding_devices.append(details) - - port_security = details['port_security_enabled'] - has_sgs = 'security_groups' in details - if not port_security or not has_sgs: - security_disabled_devices.append(device) - self._update_port_network(details['port_id'], - details['network_id']) - self.ext_manager.handle_port(self.context, details) - self.sfc_treat_devices_added_updated(details['port_id']) - else: - LOG.warn(_LW("Device %s not defined on plugin"), device) - if (port and port.ofport != -1): - self.port_dead(port) - return skipped_devices, need_binding_devices, security_disabled_devices - - def process_deleted_ports(self, port_info): - # don't try to process removed ports as deleted ports since - # they are already gone - if 'removed' in port_info: - self.deleted_ports -= port_info['removed'] - deleted_ports = list(self.deleted_ports) - while self.deleted_ports: - port_id = self.deleted_ports.pop() - port = self.int_br.get_vif_port_by_id(port_id) - self._clean_network_ports(port_id) - self.ext_manager.delete_port(self.context, - {"vif_port": port, - "port_id": port_id}) - self.sfc_treat_devices_removed(port_id) - # move to dead VLAN so deleted ports no - # longer have access to the network - if port: - # don't log errors since there is a chance someone will be - # removing the port from the bridge at the same time - self.port_dead(port, log_errors=False) - self.port_unbound(port_id) - # Flush firewall rules after ports are put on dead VLAN to be - # more secure - self.sg_agent.remove_devices_filter(deleted_ports) - - -def main(): - cfg.CONF.register_opts(ip_lib.OPTS) - config.register_root_helper(cfg.CONF) - common_config.init(sys.argv[1:]) - common_config.setup_logging() - q_utils.log_opt_values(LOG) - - try: - agent_config = ovs_neutron_agent.create_agent_config_map(cfg.CONF) - except ValueError as e: - LOG.exception(e) - LOG.error(_LE('Agent terminated!')) - sys.exit(1) - - is_xen_compute_host = 'rootwrap-xen-dom0' in cfg.CONF.AGENT.root_helper - if is_xen_compute_host: - # Force ip_lib to always use the root helper to ensure that ip - # commands target xen dom0 rather than domU. - cfg.CONF.set_default('ip_lib_force_root', True) - - bridge_classes = { - 'br_int': br_int.OVSIntegrationBridge, - 'br_phys': br_phys.OVSPhysicalBridge, - 'br_tun': br_tun.OVSTunnelBridge, - } - try: - agent = OVSSfcAgent(bridge_classes, **agent_config) - except RuntimeError as e: - LOG.exception(e) - LOG.error(_LE("Agent terminated!")) - sys.exit(1) - signal.signal(signal.SIGTERM, agent._handle_sigterm) - - # Start everything. - LOG.info(_LI("Agent initialized successfully, now running... ")) - agent.daemon_loop() - - -if __name__ == "__main__": - main() diff --git a/networking_sfc/services/sfc/agent/br_int.py b/networking_sfc/services/sfc/agent/br_int.py deleted file mode 100644 index 1f88c01..0000000 --- a/networking_sfc/services/sfc/agent/br_int.py +++ /dev/null @@ -1,48 +0,0 @@ -# Copyright 2015 Huawei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -* references -** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic -""" - -from networking_sfc.services.sfc.common import ovs_ext_lib -from neutron.plugins.ml2.drivers.openvswitch.agent.common import constants -from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( - br_int) - - -class OVSIntegrationBridge( - br_int.OVSIntegrationBridge, - ovs_ext_lib.OVSBridgeExt -): - def setup_controllers(self, conf): - self.set_protocols("[]") - self.del_controller() - - def delete_arp_spoofing_protection(self, port): - # there is an issue to delete icmp6, it will not effect and cause - # other flow rule get deleted. - # Raofei will raise a bug to neutron community. - self.delete_flows(table_id=constants.LOCAL_SWITCHING, - in_port=port, proto='arp') - self.delete_flows(table_id=constants.ARP_SPOOF_TABLE, - in_port=port) - - def mod_flow(self, **kwargs): - ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) - - def run_ofctl(self, cmd, args, process_input=None): - return ovs_ext_lib.OVSBridgeExt.run_ofctl( - self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/agent/br_phys.py b/networking_sfc/services/sfc/agent/br_phys.py deleted file mode 100644 index e9666e9..0000000 --- a/networking_sfc/services/sfc/agent/br_phys.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2015 Huawei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -* references -** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic -""" -from networking_sfc.services.sfc.common import ovs_ext_lib -from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( - br_phys) - - -class OVSPhysicalBridge(br_phys.OVSPhysicalBridge, ovs_ext_lib.OVSBridgeExt): - def setup_controllers(self, conf): - self.set_protocols("[]") - self.del_controller() - - def mod_flow(self, **kwargs): - ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) - - def run_ofctl(self, cmd, args, process_input=None): - return ovs_ext_lib.OVSBridgeExt.run_ofctl( - self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/agent/br_tun.py b/networking_sfc/services/sfc/agent/br_tun.py deleted file mode 100644 index 47a7cf9..0000000 --- a/networking_sfc/services/sfc/agent/br_tun.py +++ /dev/null @@ -1,35 +0,0 @@ -# Copyright 2015 Huawei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -* references -** OVS agent https://wiki.openstack.org/wiki/Ovs-flow-logic -""" - -from networking_sfc.services.sfc.common import ovs_ext_lib -from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( - br_tun) - - -class OVSTunnelBridge(br_tun.OVSTunnelBridge, ovs_ext_lib.OVSBridgeExt): - def setup_controllers(self, conf): - self.set_protocols("[]") - self.del_controller() - - def mod_flow(self, **kwargs): - ovs_ext_lib.OVSBridgeExt.mod_flow(self, **kwargs) - - def run_ofctl(self, cmd, args, process_input=None): - return ovs_ext_lib.OVSBridgeExt.run_ofctl( - self, cmd, args, process_input=process_input) diff --git a/networking_sfc/services/sfc/common/__init__.py b/networking_sfc/services/sfc/common/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/common/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/common/config.py b/networking_sfc/services/sfc/common/config.py deleted file mode 100644 index 29acd1c..0000000 --- a/networking_sfc/services/sfc/common/config.py +++ /dev/null @@ -1,27 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_config import cfg - - -SFC_DRIVER_OPTS = [ - cfg.ListOpt('drivers', - default=['dummy'], - help=_("An ordered list of service chain drivers " - "entrypoints to be loaded from the " - "networking_sfc.sfc.drivers namespace.")), -] - - -cfg.CONF.register_opts(SFC_DRIVER_OPTS, "sfc") diff --git a/networking_sfc/services/sfc/common/context.py b/networking_sfc/services/sfc/common/context.py deleted file mode 100644 index 7d3b451..0000000 --- a/networking_sfc/services/sfc/common/context.py +++ /dev/null @@ -1,85 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -class SfcPluginContext(object): - """SFC context base class.""" - def __init__(self, plugin, plugin_context): - self._plugin = plugin - self._plugin_context = plugin_context - - -class PortChainContext(SfcPluginContext): - - def __init__(self, plugin, plugin_context, portchain, - original_portchain=None): - super(PortChainContext, self).__init__(plugin, plugin_context) - self._portchain = portchain - self._original_portchain = original_portchain - - @property - def current(self): - return self._portchain - - @property - def original(self): - return self._original_portchain - - -class FlowClassifierContext(SfcPluginContext): - def __init__(self, plugin, plugin_context, flowclassifier, - original_flowclassifier=None): - super(FlowClassifierContext, self).__init__(plugin, plugin_context) - self._flowclassifier = flowclassifier - self._original_flowclassifier = original_flowclassifier - - @property - def current(self): - return self._flowclassifier - - @property - def original(self): - return self._original_flowclassifier - - -class PortPairContext(SfcPluginContext): - def __init__(self, plugin, plugin_context, portpair, - original_portpair=None): - super(PortPairContext, self).__init__(plugin, plugin_context) - self._portpair = portpair - self._original_portpair = original_portpair - - @property - def current(self): - return self._portpair - - @property - def original(self): - return self._original_portpair - - -class PortPairGroupContext(SfcPluginContext): - def __init__(self, plugin, plugin_context, portpairgroup, - original_portpairgroup=None): - super(PortPairGroupContext, self).__init__(plugin, plugin_context) - self._portpairgroup = portpairgroup - self._original_portpairgroup = original_portpairgroup - - @property - def current(self): - return self._portpairgroup - - @property - def original(self): - return self._original_portpairgroup diff --git a/networking_sfc/services/sfc/common/exceptions.py b/networking_sfc/services/sfc/common/exceptions.py deleted file mode 100644 index 7d1b9d9..0000000 --- a/networking_sfc/services/sfc/common/exceptions.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -"""Exceptions used by SFC plugin and drivers.""" - -from neutron.common import exceptions - - -class SfcDriverError(exceptions.NeutronException): - """SFC driver call failed.""" - message = _("%(method)s failed.") - - -class SfcException(exceptions.NeutronException): - """Base for SFC driver exceptions returned to user.""" - pass - - -class SfcBadRequest(exceptions.BadRequest, SfcException): - """Base for SFC driver bad request exceptions returned to user.""" - pass - - -class SfcNoSubnetGateway(SfcDriverError): - """No subnet gateway.""" - message = _("There is no %(type)s of ip prefix %(cidr)s.") - - -class SfcNoSuchSubnet(SfcDriverError): - """No such subnet.""" - message = _("There is no %(type)s of %(cidr)s.") - - -class FlowClassifierInvalid(SfcDriverError): - """Invalid flow classifier.""" - message = _("There is no %(type)s assigned.") diff --git a/networking_sfc/services/sfc/common/ovs_ext_lib.py b/networking_sfc/services/sfc/common/ovs_ext_lib.py deleted file mode 100644 index 01fbd04..0000000 --- a/networking_sfc/services/sfc/common/ovs_ext_lib.py +++ /dev/null @@ -1,187 +0,0 @@ -# Copyright 2015 Huawei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -from neutron.agent.common import ovs_lib -from neutron.agent.common import utils -from neutron.common import exceptions -from neutron.i18n import _LE -from neutron.plugins.common import constants -from neutron.plugins.ml2.drivers.openvswitch.agent.openflow.ovs_ofctl import ( - ovs_bridge) -from oslo_log import log as logging -import six - - -# Special return value for an invalid OVS ofport -INVALID_OFPORT = '-1' - -LOG = logging.getLogger(__name__) - - -def get_port_mask(min_port, max_port): - """get port/mask serial by port range.""" - if min_port < 1 or max_port > 0xffff or min_port > max_port: - msg = _("the port range is invalid") - raise exceptions.InvalidInput(error_message=msg) - masks = [] - while min_port <= max_port: - mask = 0xffff - while mask != 0: - next_mask = (mask << 1) & 0xffff - port_start = min_port & next_mask - port_end = min_port + (next_mask ^ 0xffff) - if port_start == min_port and port_end <= max_port: - mask = next_mask - else: - break - masks.append('0x%x/0x%x' % (min_port, mask)) - min_port = min_port + (mask ^ 0xffff) + 1 - - return masks - - -class OVSBridgeExt(ovs_bridge.OVSAgentBridge): - def setup_controllers(self, conf): - self.set_protocols("[]") - self.del_controller() - - def dump_flows_full_match(self, flow_str): - retval = None - flows = self.run_ofctl("dump-flows", [flow_str]) - if flows: - retval = '\n'.join(item for item in flows.splitlines() - if 'NXST' not in item and 'OFPST' not in item) - return retval - - def mod_flow(self, **kwargs): - flow_copy = kwargs.copy() - flow_copy.pop('actions') - flow_str = ovs_lib._build_flow_expr_str(flow_copy, 'del') - dump_flows = self.dump_flows_full_match(flow_str) - if dump_flows == '': - self.do_action_flows('add', [kwargs]) - else: - self.do_action_flows('mod', [kwargs]) - - def add_nsh_tunnel_port(self, port_name, remote_ip, local_ip, - tunnel_type=constants.TYPE_GRE, - vxlan_udp_port=constants.VXLAN_UDP_PORT, - dont_fragment=True, - in_nsp=None, - in_nsi=None): - attrs = [('type', tunnel_type)] - # This is an OrderedDict solely to make a test happy - options = collections.OrderedDict() - vxlan_uses_custom_udp_port = ( - tunnel_type == constants.TYPE_VXLAN and - vxlan_udp_port != constants.VXLAN_UDP_PORT - ) - if vxlan_uses_custom_udp_port: - options['dst_port'] = vxlan_udp_port - options['df_default'] = str(dont_fragment).lower() - options['remote_ip'] = 'flow' - options['local_ip'] = local_ip - options['in_key'] = 'flow' - options['out_key'] = 'flow' - if in_nsp is not None and in_nsi is not None: - options['nsp'] = str(in_nsp) - options['nsi'] = str(in_nsi) - elif in_nsp is None and in_nsi is None: - options['nsp'] = 'flow' - options['nsi'] = 'flow' - attrs.append(('options', options)) - ofport = self.add_port(port_name, *attrs) - if ( - tunnel_type == constants.TYPE_VXLAN and - ofport == INVALID_OFPORT - ): - LOG.error( - _LE('Unable to create VXLAN tunnel port for service chain. ' - 'Please ensure that an openvswitch version that supports ' - 'VXLAN for service chain is installed.') - ) - return ofport - - def run_ofctl(self, cmd, args, process_input=None): - # We need to dump-groups according to group Id, - # which is a feature of OpenFlow1.5 - full_args = [ - "ovs-ofctl", "-O openflow13", cmd, self.br_name - ] + args - try: - return utils.execute(full_args, run_as_root=True, - process_input=process_input) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("Unable to execute %(args)s."), - {'args': full_args}) - - def do_action_groups(self, action, kwargs_list): - group_strs = [_build_group_expr_str(kw, action) for kw in kwargs_list] - if action == 'add' or action == 'del': - self.run_ofctl('%s-groups' % action, ['-'], '\n'.join(group_strs)) - elif action == 'mod': - self.run_ofctl('%s-group' % action, ['-'], '\n'.join(group_strs)) - else: - msg = _("Action is illegal") - raise exceptions.InvalidInput(error_message=msg) - - def add_group(self, **kwargs): - self.do_action_groups('add', [kwargs]) - - def mod_group(self, **kwargs): - self.do_action_groups('mod', [kwargs]) - - def delete_group(self, **kwargs): - self.do_action_groups('del', [kwargs]) - - def dump_group_for_id(self, group_id): - retval = None - group_str = "%d" % group_id - group = self.run_ofctl("dump-groups", [group_str]) - if group: - retval = '\n'.join(item for item in group.splitlines() - if 'NXST' not in item) - return retval - - -def _build_group_expr_str(group_dict, cmd): - group_expr_arr = [] - buckets = None - groupId = None - - if cmd != 'del': - if "group_id" not in group_dict: - msg = _("Must specify one groupId on groupo addition" - " or modification") - raise exceptions.InvalidInput(error_message=msg) - groupId = "group_id=%s" % group_dict.pop('group_id') - - if "buckets" not in group_dict: - msg = _("Must specify one or more buckets on group addition" - " or modification") - raise exceptions.InvalidInput(error_message=msg) - buckets = "%s" % group_dict.pop('buckets') - - if groupId: - group_expr_arr.append(groupId) - - for key, value in six.iteritems(group_dict): - group_expr_arr.append("%s=%s" % (key, value)) - - if buckets: - group_expr_arr.append(buckets) - - return ','.join(group_expr_arr) diff --git a/networking_sfc/services/sfc/driver_manager.py b/networking_sfc/services/sfc/driver_manager.py deleted file mode 100644 index c8a212a..0000000 --- a/networking_sfc/services/sfc/driver_manager.py +++ /dev/null @@ -1,118 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_config import cfg -from oslo_log import log -import stevedore - -from neutron.i18n import _LE -from neutron.i18n import _LI - -from networking_sfc.services.sfc.common import exceptions as sfc_exc - - -LOG = log.getLogger(__name__) -cfg.CONF.import_opt('drivers', - 'networking_sfc.services.sfc.common.config', - group='sfc') - - -class SfcDriverManager(stevedore.named.NamedExtensionManager): - """Implementation of SFC drivers.""" - - def __init__(self): - # Registered sfc drivers, keyed by name. - self.drivers = {} - # Ordered list of sfc drivers, defining - # the order in which the drivers are called. - self.ordered_drivers = [] - LOG.info(_LI("Configured SFC drivers: %s"), - cfg.CONF.sfc.drivers) - super(SfcDriverManager, self).__init__('networking_sfc.sfc.drivers', - cfg.CONF.sfc.drivers, - invoke_on_load=True, - name_order=True) - LOG.info(_LI("Loaded SFC drivers: %s"), self.names()) - self._register_drivers() - - def _register_drivers(self): - """Register all SFC drivers. - - This method should only be called once in the SfcDriverManager - constructor. - """ - for ext in self: - self.drivers[ext.name] = ext - self.ordered_drivers.append(ext) - LOG.info(_LI("Registered SFC drivers: %s"), - [driver.name for driver in self.ordered_drivers]) - - def initialize(self): - # ServiceChain bulk operations requires each driver to support them - self.native_bulk_support = True - for driver in self.ordered_drivers: - LOG.info(_LI("Initializing SFC driver '%s'"), driver.name) - driver.obj.initialize() - self.native_bulk_support &= getattr(driver.obj, - 'native_bulk_support', True) - - def _call_drivers(self, method_name, context): - """Helper method for calling a method across all SFC drivers. - - :param method_name: name of the method to call - :param context: context parameter to pass to each method call - :param continue_on_failure: whether or not to continue to call - all SFC drivers once one has raised an exception - if any SFC driver call fails. - """ - for driver in self.ordered_drivers: - try: - getattr(driver.obj, method_name)(context) - except Exception as e: - # This is an internal failure. - LOG.exception(e) - LOG.error( - _LE("SFC driver '%(name)s' failed in %(method)s"), - {'name': driver.name, 'method': method_name} - ) - raise sfc_exc.SfcDriverError( - method=method_name - ) - - def create_port_chain(self, context): - self._call_drivers("create_port_chain", context) - - def update_port_chain(self, context): - self._call_drivers("update_port_chain", context) - - def delete_port_chain(self, context): - self._call_drivers("delete_port_chain", context) - - def create_port_pair(self, context): - self._call_drivers("create_port_pair", context) - - def update_port_pair(self, context): - self._call_drivers("update_port_pair", context) - - def delete_port_pair(self, context): - self._call_drivers("delete_port_pair", context) - - def create_port_pair_group(self, context): - self._call_drivers("create_port_pair_group", context) - - def update_port_pair_group(self, context): - self._call_drivers("update_port_pair_group", context) - - def delete_port_pair_group(self, context): - self._call_drivers("delete_port_pair_group", context) diff --git a/networking_sfc/services/sfc/drivers/__init__.py b/networking_sfc/services/sfc/drivers/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/base.py b/networking_sfc/services/sfc/drivers/base.py deleted file mode 100644 index 0816789..0000000 --- a/networking_sfc/services/sfc/drivers/base.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import six - - -@six.add_metaclass(abc.ABCMeta) -class SfcDriverBase(object): - """SFC Driver Base Class.""" - - @abc.abstractmethod - def create_port_chain(self, context): - pass - - @abc.abstractmethod - def delete_port_chain(self, context): - pass - - @abc.abstractmethod - def update_port_chain(self, context): - pass - - @abc.abstractmethod - def create_port_pair(self, context): - pass - - @abc.abstractmethod - def delete_port_pair(self, context): - pass - - @abc.abstractmethod - def update_port_pair(self, context): - pass - - @abc.abstractmethod - def create_port_pair_group(self, context): - pass - - @abc.abstractmethod - def delete_port_pair_group(self, context): - pass - - @abc.abstractmethod - def update_port_pair_group(self, context): - pass diff --git a/networking_sfc/services/sfc/drivers/dummy/__init__.py b/networking_sfc/services/sfc/drivers/dummy/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/dummy/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/dummy/dummy.py b/networking_sfc/services/sfc/drivers/dummy/dummy.py deleted file mode 100644 index 1ddd7d0..0000000 --- a/networking_sfc/services/sfc/drivers/dummy/dummy.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import helpers as log_helpers - -from networking_sfc.services.sfc.drivers import base as sfc_driver - - -class DummyDriver(sfc_driver.SfcDriverBase): - """SFC Driver Dummy Class.""" - def initialize(self): - pass - - @log_helpers.log_method_call - def create_port_chain(self, context): - pass - - @log_helpers.log_method_call - def delete_port_chain(self, context): - pass - - @log_helpers.log_method_call - def update_port_chain(self, context): - pass - - @log_helpers.log_method_call - def create_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def delete_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def update_port_pair_group(self, context): - pass - - @log_helpers.log_method_call - def create_port_pair(self, context): - pass - - @log_helpers.log_method_call - def delete_port_pair(self, context): - pass - - @log_helpers.log_method_call - def update_port_pair(self, context): - pass diff --git a/networking_sfc/services/sfc/drivers/ovs/__init__.py b/networking_sfc/services/sfc/drivers/ovs/__init__.py deleted file mode 100644 index e69de29..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/__init__.py +++ /dev/null diff --git a/networking_sfc/services/sfc/drivers/ovs/constants.py b/networking_sfc/services/sfc/drivers/ovs/constants.py deleted file mode 100644 index 30e2c37..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/constants.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from neutron.common import constants as n_const - - -INGRESS_DIR = 'ingress' -EGRESS_DIR = 'egress' - -STATUS_BUILDING = 'building' -STATUS_ACTIVE = 'active' -STATUS_ERROR = 'error' -STATUS_DELETING = 'deleting' - - -PORTFLOW_OPT_ADD = 'add-flows' -PROTFLOW_OPT_DELETE = 'delete-flows' -PROTFLOW_OPT_UPDATE = 'update-flows' - - -SRC_NODE = 'src_node' -DST_NODE = 'dst_node' -SF_NODE = 'sf_node' - -RES_TYPE_GROUP = 'group' -RES_TYPE_NSP = 'nsp' - -INSERTION_TYPE_L2 = 'l2' -INSERTION_TYPE_L3 = 'l3' -INSERTION_TYPE_BITW = 'bitw' -INSERTION_TYPE_TAP = 'tap' - -MAX_HASH = 16 - -INSERTION_TYPE_DICT = { - n_const.DEVICE_OWNER_ROUTER_HA_INTF: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_INTF: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_GW: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_FLOATINGIP: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_DHCP: INSERTION_TYPE_TAP, - n_const.DEVICE_OWNER_DVR_INTERFACE: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_AGENT_GW: INSERTION_TYPE_L3, - n_const.DEVICE_OWNER_ROUTER_SNAT: INSERTION_TYPE_TAP, - n_const.DEVICE_OWNER_LOADBALANCER: INSERTION_TYPE_TAP, - 'compute': INSERTION_TYPE_L2 -} diff --git a/networking_sfc/services/sfc/drivers/ovs/db.py b/networking_sfc/services/sfc/drivers/ovs/db.py deleted file mode 100644 index 8d3c87d..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/db.py +++ /dev/null @@ -1,426 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -# - -import six - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_utils import uuidutils - -from neutron.common import exceptions as n_exc -from neutron import context as n_context -from neutron.db import common_db_mixin -from neutron.db import model_base -from neutron.db import models_v2 - -import sqlalchemy as sa -from sqlalchemy import orm -from sqlalchemy.orm import exc -from sqlalchemy import sql - - -LOG = logging.getLogger(__name__) - - -class PortPairDetailNotFound(n_exc.NotFound): - message = _("Portchain port brief %(port_id)s could not be found") - - -class NodeNotFound(n_exc.NotFound): - message = _("Portchain node %(node_id)s could not be found") - - -# name changed to ChainPathId -class UuidIntidAssoc(model_base.BASEV2, models_v2.HasId): - __tablename__ = 'sfc_uuid_intid_associations' - uuid = sa.Column(sa.String(36), primary_key=True) - intid = sa.Column(sa.Integer, unique=True, nullable=False) - type_ = sa.Column(sa.String(32), nullable=False) - - def __init__(self, uuid, intid, type_): - self.uuid = uuid - self.intid = intid - self.type_ = type_ - - -def singleton(class_): - instances = {} - - def getinstance(*args, **kwargs): - if class_ not in instances: - instances[class_] = class_(*args, **kwargs) - return instances[class_] - return getinstance - - -@singleton -class IDAllocation(object): - def __init__(self, context): - # Get the inital range from conf file. - conf_obj = {'group': [1, 255], 'portchain': [256, 65536]} - self.conf_obj = conf_obj - self.session = context.session - - @log_helpers.log_method_call - def assign_intid(self, type_, uuid): - query = self.session.query(UuidIntidAssoc).filter_by( - type_=type_).order_by(UuidIntidAssoc.intid) - - allocated_int_ids = {obj.intid for obj in query.all()} - - # Find the first one from the available range that - # is not in allocated_int_ids - start, end = self.conf_obj[type_][0], self.conf_obj[type_][1]+1 - for init_id in six.moves.range(start, end): - if init_id not in allocated_int_ids: - with self.session.begin(subtransactions=True): - uuid_intid = UuidIntidAssoc( - uuid, init_id, type_) - self.session.add(uuid_intid) - return init_id - else: - return None - - @log_helpers.log_method_call - def get_intid_by_uuid(self, type_, uuid): - - query_obj = self.session.query(UuidIntidAssoc).filter_by( - type_=type_, uuid=uuid).first() - if query_obj: - return query_obj.intid - else: - return None - - @log_helpers.log_method_call - def release_intid(self, type_, intid): - """Release int id. - - @param: type_: str - @param: intid: int - """ - with self.session.begin(subtransactions=True): - query_obj = self.session.query(UuidIntidAssoc).filter_by( - intid=intid, type_=type_).first() - - if query_obj: - self.session.delete(query_obj) - - -class PathPortAssoc(model_base.BASEV2): - """path port association table. - - It represents the association table which associate path_nodes with - portpair_details. - """ - __tablename__ = 'sfc_path_port_associations' - pathnode_id = sa.Column(sa.String(36), - sa.ForeignKey( - 'sfc_path_nodes.id', ondelete='CASCADE'), - primary_key=True) - portpair_id = sa.Column(sa.String(36), - sa.ForeignKey('sfc_portpair_details.id', - ondelete='CASCADE'), - primary_key=True) - weight = sa.Column(sa.Integer, nullable=False, default=1) - - -class PortPairDetail(model_base.BASEV2, models_v2.HasId, - models_v2.HasTenant): - __tablename__ = 'sfc_portpair_details' - ingress = sa.Column(sa.String(36), nullable=True) - egress = sa.Column(sa.String(36), nullable=True) - host_id = sa.Column(sa.String(255), nullable=False) - mac_address = sa.Column(sa.String(32), nullable=False) - network_type = sa.Column(sa.String(8)) - segment_id = sa.Column(sa.Integer) - local_endpoint = sa.Column(sa.String(64), nullable=False) - path_nodes = orm.relationship(PathPortAssoc, - backref='port_pair_detail', - lazy="joined", - cascade='all,delete') - - -class PathNode(model_base.BASEV2, models_v2.HasId, models_v2.HasTenant): - __tablename__ = 'sfc_path_nodes' - nsp = sa.Column(sa.Integer, nullable=False) - nsi = sa.Column(sa.Integer, nullable=False) - node_type = sa.Column(sa.String(32)) - portchain_id = sa.Column( - sa.String(255), - sa.ForeignKey('sfc_port_chains.id', ondelete='CASCADE')) - status = sa.Column(sa.String(32)) - portpair_details = orm.relationship(PathPortAssoc, - backref='path_nodes', - lazy="joined", - cascade='all,delete') - next_group_id = sa.Column(sa.Integer) - next_hop = sa.Column(sa.String(512)) - - -class OVSSfcDriverDB(common_db_mixin.CommonDbMixin): - def initialize(self): - self.admin_context = n_context.get_admin_context() - - def _make_pathnode_dict(self, node, fields=None): - res = {'id': node['id'], - 'tenant_id': node['tenant_id'], - 'node_type': node['node_type'], - 'nsp': node['nsp'], - 'nsi': node['nsi'], - 'next_group_id': node['next_group_id'], - 'next_hop': node['next_hop'], - 'portchain_id': node['portchain_id'], - 'status': node['status'], - 'portpair_details': [pair_detail['portpair_id'] - for pair_detail in node['portpair_details'] - ] - } - - return self._fields(res, fields) - - def _make_port_detail_dict(self, port, fields=None): - res = {'id': port['id'], - 'tenant_id': port['tenant_id'], - 'host_id': port['host_id'], - 'ingress': port.get('ingress', None), - 'egress': port.get('egress', None), - 'segment_id': port['segment_id'], - 'local_endpoint': port['local_endpoint'], - 'mac_address': port['mac_address'], - 'network_type': port['network_type'], - 'path_nodes': [{'pathnode_id': node['pathnode_id'], - 'weight': node['weight']} - for node in port['path_nodes']] - } - - return self._fields(res, fields) - - def _make_pathport_assoc_dict(self, assoc, fields=None): - res = {'pathnode_id': assoc['pathnode_id'], - 'portpair_id': assoc['portpair_id'], - 'weight': assoc['weight'], - } - - return self._fields(res, fields) - - def _get_path_node(self, id): - try: - node = self._get_by_id(self.admin_context, PathNode, id) - except exc.NoResultFound: - raise NodeNotFound(node_id=id) - return node - - def _get_port_detail(self, id): - try: - port = self._get_by_id(self.admin_context, PortPairDetail, id) - except exc.NoResultFound: - raise PortPairDetailNotFound(port_id=id) - return port - - def create_port_detail(self, port): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(port, PortPairDetail) - args['id'] = uuidutils.generate_uuid() - port_obj = PortPairDetail(**args) - self.admin_context.session.add(port_obj) - return self._make_port_detail_dict(port_obj) - - def create_path_node(self, node): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(node, PathNode) - args['id'] = uuidutils.generate_uuid() - node_obj = PathNode(**args) - self.admin_context.session.add(node_obj) - return self._make_pathnode_dict(node_obj) - - def create_pathport_assoc(self, assoc): - with self.admin_context.session.begin(subtransactions=True): - args = self._filter_non_model_columns(assoc, PathPortAssoc) - assoc_obj = PathPortAssoc(**args) - self.admin_context.session.add(assoc_obj) - return self._make_pathport_assoc_dict(assoc_obj) - - def delete_pathport_assoc(self, pathnode_id, portdetail_id): - with self.admin_context.session.begin(subtransactions=True): - self.admin_context.session.query(PathPortAssoc).filter_by( - pathnode_id=pathnode_id, - portpair_id=portdetail_id).delete() - - def update_port_detail(self, id, port): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - for key, value in six.iteritems(port): - if key == 'path_nodes': - pns = [] - for pn in value: - pn_id = pn['pathnode_id'] - self._get_path_node(pn_id) - query = self._model_query( - self.admin_context, PathPortAssoc) - pn_association = query.filter_by( - pathnode_id=pn_id, - portpair_id=id - ).first() - if not pn_association: - pn_association = PathPortAssoc( - pathnode_id=pn_id, - portpair_id=id, - weight=pn.get('weight', 1) - ) - pns.append(pn_association) - port_obj[key] = pns - else: - port_obj[key] = value - port_obj.update(port) - return self._make_port_detail_dict(port_obj) - - def update_path_node(self, id, node): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - for key, value in six.iteritems(node): - if key == 'portpair_details': - pds = [] - for pd_id in value: - self._get_port_detail(pd_id) - query = self._model_query( - self.admin_context, PathPortAssoc) - pd_association = query.filter_by( - pathnode_id=id, - portpair_id=pd_id - ).first() - if not pd_association: - pd_association = PathPortAssoc( - pathnode_id=id, - portpair_id=pd_id - ) - pds.append(pd_association) - node_obj[key] = pds - else: - node_obj[key] = value - return self._make_pathnode_dict(node_obj) - - def delete_port_detail(self, id): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - self.admin_context.session.delete(port_obj) - - def delete_path_node(self, id): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - self.admin_context.session.delete(node_obj) - - def get_port_detail(self, id): - with self.admin_context.session.begin(subtransactions=True): - port_obj = self._get_port_detail(id) - return self._make_port_detail_dict(port_obj) - - def get_port_detail_without_exception(self, id): - with self.admin_context.session.begin(subtransactions=True): - try: - port = self._get_by_id( - self.admin_context, PortPairDetail, id) - except exc.NoResultFound: - return None - return self._make_port_detail_dict(port) - - def get_path_node(self, id): - with self.admin_context.session.begin(subtransactions=True): - node_obj = self._get_path_node(id) - return self._make_pathnode_dict(node_obj) - - def get_path_nodes_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_path_nodes_by_filter( - filters, fields, sorts, limit, - marker, page_reverse - ) - all_items = qry.all() - if all_items: - return [self._make_pathnode_dict(item) for item in all_items] - - return None - - def get_path_node_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_path_nodes_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - first = qry.first() - if first: - return self._make_pathnode_dict(first) - - return None - - def _get_path_nodes_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - qry = self.admin_context.session.query(PathNode) - if filters: - for key, value in six.iteritems(filters): - column = getattr(PathNode, key, None) - if column: - if not value: - qry = qry.filter(sql.false()) - else: - qry = qry.filter(column == value) - return qry - - def get_port_details_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_port_details_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - all_items = qry.all() - if all_items: - return [ - self._make_port_detail_dict(item) - for item in all_items - ] - - return None - - def get_port_detail_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - with self.admin_context.session.begin(subtransactions=True): - qry = self._get_port_details_by_filter( - filters, fields, sorts, limit, - marker, page_reverse) - first = qry.first() - if first: - return self._make_port_detail_dict(first) - - return None - - def _get_port_details_by_filter(self, filters=None, fields=None, - sorts=None, limit=None, marker=None, - page_reverse=False): - qry = self.admin_context.session.query(PortPairDetail) - if filters: - for key, value in six.iteritems(filters): - column = getattr(PortPairDetail, key, None) - if column: - if not value: - qry = qry.filter(sql.false()) - else: - qry = qry.filter(column == value) - - return qry diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py deleted file mode 100644 index 9dfc40d..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/driver.py +++ /dev/null @@ -1,1076 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import netaddr - -# from eventlet import greenthread - -from neutron.common import constants as nc_const -from neutron.common import rpc as n_rpc - -from neutron import manager - -from neutron.i18n import _LE -from neutron.i18n import _LW - -from neutron.plugins.common import constants as np_const - - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_serialization import jsonutils - -from networking_sfc.extensions import flowclassifier -from networking_sfc.extensions import sfc -from networking_sfc.services.sfc.common import exceptions as exc -from networking_sfc.services.sfc.drivers import base as driver_base -from networking_sfc.services.sfc.drivers.ovs import( - rpc_topics as sfc_topics) -from networking_sfc.services.sfc.drivers.ovs import( - db as ovs_sfc_db) -from networking_sfc.services.sfc.drivers.ovs import( - rpc as ovs_sfc_rpc) -from networking_sfc.services.sfc.drivers.ovs import ( - constants as ovs_const) - - -LOG = logging.getLogger(__name__) - - -class OVSSfcDriver(driver_base.SfcDriverBase, - ovs_sfc_db.OVSSfcDriverDB): - """Sfc Driver Base Class.""" - - def initialize(self): - super(OVSSfcDriver, self).initialize() - self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( - sfc_topics.SFC_AGENT - ) - - self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) - self._setup_rpc() - - def _setup_rpc(self): - # Setup a rpc server - self.topic = sfc_topics.SFC_PLUGIN - self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] - self.conn = n_rpc.create_connection(new=True) - self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() - - def _get_subnet(self, core_plugin, tenant_id, cidr): - filters = {'tenant_id': [tenant_id]} - subnets = core_plugin.get_subnets(self.admin_context, filters=filters) - cidr_set = netaddr.IPSet([cidr]) - - for subnet in subnets: - subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) - if cidr_set.issubset(subnet_cidr_set): - return subnet - - def _get_fc_dst_subnet_gw_port(self, fc): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet(core_plugin, - fc['tenant_id'], - fc['destination_ip_prefix']) - - return self._get_port_subnet_gw_info(core_plugin, subnet) - - def _get_port_subnet_gw_info_by_port_id(self, id): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet_by_port(core_plugin, id) - return self._get_port_subnet_gw_info(core_plugin, - subnet) - - def _get_port_subnet_gw_info(self, core_plugin, subnet): - filters = { - 'device_owner': - [nc_const.DEVICE_OWNER_ROUTER_INTF] - } - gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) - for port in gw_ports: - for fixed_ip in port['fixed_ips']: - if subnet["id"] == fixed_ip['subnet_id']: - return (port['mac_address'], - subnet['cidr'], - subnet['network_id']) - - raise exc.NoSubnetGateway( - type='subnet gateway', - cidr=subnet['cidr']) - - def _get_subnet_by_port(self, core_plugin, id): - port = core_plugin.get_port(self.admin_context, id) - for ip in port['fixed_ips']: - subnet = core_plugin.get_subnet(self.admin_context, - ip["subnet_id"]) - # currently only support one subnet for a port - break - - return subnet - - @log_helpers.log_method_call - def _get_portgroup_members(self, context, pg_id): - next_group_members = [] - group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) - LOG.debug('group_intid: %s', group_intid) - pg = context._plugin.get_port_pair_group(context._plugin_context, - pg_id) - for pp_id in pg['port_pairs']: - pp = context._plugin.get_port_pair(context._plugin_context, pp_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - if pd: - next_group_members.append( - dict(portpair_id=pd['id'], weight=1)) - return group_intid, next_group_members - - def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): - pp = context._plugin.get_port_pair(context._plugin_context, - port_pair_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - - return pd - - @log_helpers.log_method_call - def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node, dst_node, - last_sf_node=None): - dst_ports = [] - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # lookup the source port - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - src_pd = self.get_port_detail_by_filter(src_pd_filter) - - if not src_pd: - # Create source port detail - src_pd = self._create_port_detail(src_pd_filter) - LOG.debug('create src port detail: %s', src_pd) - - # Create associate relationship - assco_args = {'portpair_id': src_pd['id'], - 'pathnode_id': src_node['id'], - 'weight': 1, - } - sna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc src port with node: %s', sna) - src_node['portpair_details'].append(src_pd['id']) - - if fc.get('logical_destination_port', ''): - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - dst_pd = self.get_port_detail_by_filter(dst_pd_filter) - - if not dst_pd: - # Create dst port detail - dst_pd = self._create_port_detail(dst_pd_filter) - LOG.debug('create dst port detail: %s', dst_pd) - - # Create associate relationship - dst_assco_args = {'portpair_id': dst_pd['id'], - 'pathnode_id': dst_node['id'], - 'weight': 1, - } - dna = self.create_pathport_assoc(dst_assco_args) - LOG.debug('create assoc dst port with node: %s', dna) - dst_node['portpair_details'].append(dst_pd['id']) - - dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) - - if last_sf_node: - if last_sf_node['next_hop']: - next_hops = jsonutils.loads(last_sf_node['next_hop']) - next_hops.extend(dst_ports) - last_sf_node['next_hop'] = jsonutils.dumps(next_hops) - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - return dst_ports - - def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node=None, dst_node=None, - last_sf_node=None): - if not fc_ids: - return - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # delete source port detail - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(src_pd_filter) - if pds: - for pd in pds: - # update src_node portpair_details refence info - if src_node and pd['id'] in src_node[ - 'portpair_details' - ]: - src_node['portpair_details'].remove(pd['id']) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if fc.get('logical_destination_port', ''): - # Create dst port detail - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(dst_pd_filter) - if pds: - for pd in pds: - # update dst_node portpair_details refence info - if dst_node and pd['id'] in dst_node[ - 'portpair_details' - ]: - # update portpair_details of this node - dst_node['portpair_details'].remove(pd['id']) - # update last hop(SF-group) next hop info - if last_sf_node: - next_hop = dict(portpair_id=pd['id'], - weight=1) - next_hops = jsonutils.loads( - last_sf_node['next_hop']) - next_hops.remove(next_hop) - last_sf_node['next_hop'] = jsonutils.dumps( - next_hops) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if last_sf_node: - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - - @log_helpers.log_method_call - def _create_portchain_path(self, context, port_chain): - src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) - path_nodes, dst_ports = [], [] - # Create an assoc object for chain_id and path_id - # context = context._plugin_context - path_id = self.id_pool.assign_intid('portchain', port_chain['id']) - - if not path_id: - LOG.error(_LE('No path_id available for creating port chain path')) - return - - next_group_intid, next_group_members = self._get_portgroup_members( - context, port_chain['port_pair_groups'][0]) - - port_pair_groups = port_chain['port_pair_groups'] - sf_path_length = len(port_pair_groups) - # Create a head node object for port chain - src_args = {'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SRC_NODE, - 'nsp': path_id, - 'nsi': 0xff, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': jsonutils.dumps(next_group_members), - } - src_node = self.create_path_node(src_args) - LOG.debug('create src node: %s', src_node) - path_nodes.append(src_node) - - # Create a destination node object for port chain - dst_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.DST_NODE, - 'nsp': path_id, - 'nsi': 0xff - sf_path_length - 1, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': None, - 'next_hop': None - } - dst_node = self.create_path_node(dst_args) - LOG.debug('create dst node: %s', dst_node) - path_nodes.append(dst_node) - - dst_ports = self._add_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'], - src_node, - dst_node - ) - - for i in range(sf_path_length): - cur_group_members = next_group_members - # next_group for next hop - if i < sf_path_length - 1: - next_group_intid, next_group_members = ( - self._get_portgroup_members( - context, port_pair_groups[i + 1]) - ) - else: - next_group_intid = None - next_group_members = None if not dst_ports else dst_ports - - # Create a node object - node_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SF_NODE, - 'nsp': path_id, - 'nsi': 0xfe - i, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': ( - None if not next_group_members else - jsonutils.dumps(next_group_members) - ) - } - sf_node = self.create_path_node(node_args) - LOG.debug('chain path node: %s', sf_node) - # Create the assocation objects that combine the pathnode_id with - # the ingress of the port_pairs in the current group - # when port_group does not reach tail - for member in cur_group_members: - assco_args = {'portpair_id': member['portpair_id'], - 'pathnode_id': sf_node['id'], - 'weight': member['weight'], } - sfna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc port with node: %s', sfna) - sf_node['portpair_details'].append(member['portpair_id']) - path_nodes.append(sf_node) - - return path_nodes - - def _delete_path_node_port_flowrule(self, node, port, fc_ids): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - flow_rule = self._build_portchain_flowrule_body( - node, - port, - None, - fc_ids) - - self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( - self.admin_context, - flow_rule) - - def _delete_path_node_flowrule(self, node, fc_ids): - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._delete_path_node_port_flowrule( - node, port, fc_ids) - - @log_helpers.log_method_call - def _delete_portchain_path(self, context, portchain_id): - port_chain = context.current - first = self.get_path_node_by_filter( - filters={ - 'portchain_id': portchain_id, - 'nsi': 0xff - } - ) - - # delete flow rules which source port isn't assigned - # in flow classifier - if first: - self._delete_src_node_flowrules( - first, - port_chain['flow_classifiers'] - ) - - pds = self.get_path_nodes_by_filter( - dict(portchain_id=portchain_id)) - if pds: - for pd in pds: - self._delete_path_node_flowrule( - pd, - port_chain['flow_classifiers'] - ) - self.delete_path_node(pd['id']) - - # delete the ports on the traffic classifier - self._remove_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'] - ) - - # Delete the chainpathpair - intid = self.id_pool.get_intid_by_uuid( - 'portchain', portchain_id) - self.id_pool.release_intid('portchain', intid) - - def _update_path_node_next_hops(self, flow_rule): - node_next_hops = [] - if not flow_rule['next_hop']: - return None - next_hops = jsonutils.loads(flow_rule['next_hop']) - if not next_hops: - return None - for member in next_hops: - detail = {} - port_detail = self.get_port_detail_by_filter( - dict(id=member['portpair_id'])) - if not port_detail or not port_detail['host_id']: - continue - detail['local_endpoint'] = port_detail['local_endpoint'] - detail['weight'] = member['weight'] - detail['mac_address'] = port_detail['mac_address'] - detail['ingress'] = port_detail['ingress'] - node_next_hops.append(detail) - - mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( - detail['ingress'] - ) - - detail['gw_mac'] = mac - detail['cidr'] = cidr - detail['net_uuid'] = net_uuid - - flow_rule['next_hops'] = node_next_hops - flow_rule.pop('next_hop') - - return node_next_hops - - def _build_portchain_flowrule_body(self, node, port, - add_fc_ids=None, del_fc_ids=None): - node_info = node.copy() - node_info.pop('tenant_id') - node_info.pop('portpair_details') - - port_info = port.copy() - port_info.pop('tenant_id') - port_info.pop('id') - port_info.pop('path_nodes') - port_info.pop('host_id') - - flow_rule = dict(node_info, **port_info) - # if this port is belong to NSH/MPLS-aware vm, only to - # notify the flow classifier for 1st SF. - flow_rule['add_fcs'] = self._filter_flow_classifiers( - flow_rule, add_fc_ids) - flow_rule['del_fcs'] = self._filter_flow_classifiers( - flow_rule, del_fc_ids) - - self._update_portchain_group_reference_count(flow_rule, - port['host_id']) - - # update next hop info - self._update_path_node_next_hops(flow_rule) - - return flow_rule - - def _filter_flow_classifiers(self, flow_rule, fc_ids): - """Filter flow classifiers. - - @return: list of the flow classifiers - """ - - fc_return = [] - - if not fc_ids: - return fc_return - fcs = self._get_fcs_by_ids(fc_ids) - for fc in fcs: - new_fc = fc.copy() - new_fc.pop('id') - new_fc.pop('name') - new_fc.pop('tenant_id') - new_fc.pop('description') - - if ((flow_rule['node_type'] == ovs_const.SRC_NODE and - flow_rule['egress'] == fc['logical_source_port'] - ) or - (flow_rule['node_type'] == ovs_const.DST_NODE and - flow_rule['ingress'] == fc['logical_destination_port'] - )): - fc_return.append(new_fc) - elif flow_rule['node_type'] == ovs_const.SF_NODE: - fc_return.append(new_fc) - - return fc_return - - def _update_path_node_port_flowrules(self, node, port, - add_fc_ids=None, del_fc_ids=None): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - - flow_rule = self._build_portchain_flowrule_body( - node, - port, - add_fc_ids, - del_fc_ids) - - self.ovs_driver_rpc.ask_agent_to_update_flow_rules( - self.admin_context, - flow_rule) - - def _update_path_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - if node['portpair_details'] is None: - return - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._update_path_node_port_flowrules( - node, port, add_fc_ids, del_fc_ids) - - def _thread_update_path_nodes(self, nodes, - add_fc_ids=None, del_fc_ids=None): - for node in nodes: - self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) - self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) - - def _get_portchain_fcs(self, port_chain): - return self._get_fcs_by_ids(port_chain['flow_classifiers']) - - def _get_fcs_by_ids(self, fc_ids): - flow_classifiers = [] - if not fc_ids: - return flow_classifiers - - # Get the portchain flow classifiers - fc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - flowclassifier.FLOW_CLASSIFIER_EXT) - ) - if not fc_plugin: - LOG.warn(_LW("Not found the flow classifier service plugin")) - return flow_classifiers - - for fc_id in fc_ids: - fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) - flow_classifiers.append(fc) - - return flow_classifiers - - @log_helpers.log_method_call - def create_port_chain(self, context): - port_chain = context.current - path_nodes = self._create_portchain_path(context, port_chain) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes( - path_nodes, - port_chain['flow_classifiers'], - None) - - @log_helpers.log_method_call - def delete_port_chain(self, context): - port_chain = context.current - portchain_id = port_chain['id'] - LOG.debug("to delete portchain path") - self._delete_portchain_path(context, portchain_id) - - def _get_diff_set(self, orig, cur): - orig_set = set(item for item in orig) - cur_set = set(item for item in cur) - - to_del = orig_set.difference(cur_set) - to_add = cur_set.difference(orig_set) - - return to_del, to_add - - @log_helpers.log_method_call - def update_port_chain(self, context): - port_chain = context.current - orig = context.original - - del_fc_ids, add_fc_ids = self._get_diff_set( - orig['flow_classifiers'], - port_chain['flow_classifiers'] - ) - path_nodes = self.get_path_nodes_by_filter( - dict(portchain_id=port_chain['id']) - ) - if not path_nodes: - return - - sort_path_nodes = sorted(path_nodes, - key=lambda x: x['nsi'], - reverse=True) - if del_fc_ids: - self._thread_update_path_nodes(sort_path_nodes, - None, - del_fc_ids) - self._remove_flowclassifier_port_assoc(del_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - if add_fc_ids: - self._add_flowclassifier_port_assoc(add_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes(sort_path_nodes, - add_fc_ids, - None) - - @log_helpers.log_method_call - def create_port_pair_group(self, context): - group = context.current - self.id_pool.assign_intid('group', group['id']) - - @log_helpers.log_method_call - def delete_port_pair_group(self, context): - group = context.current - group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) - if group_intid: - self.id_pool.release_intid('group', group_intid) - - @log_helpers.log_method_call - def update_port_pair_group(self, context): - current = context.current - original = context.original - - if set(current['port_pairs']) == set(original['port_pairs']): - return - - # Update the path_nodes and flows for each port chain that - # contains this port_pair_group - # Note: _get_port_pair_group is temporarily used here. - ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, - current['id']) - port_chains = [assoc.portchain_id for assoc in - ppg_obj.chain_group_associations] - - for chain_id in port_chains: - port_chain = context._plugin.get_port_chain( - context._plugin_context, chain_id) - group_intid = self.id_pool.get_intid_by_uuid('group', - current['id']) - # Get the previous node - prev_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'next_group_id': group_intid}) - if not prev_node: - continue - - before_update_prev_node = prev_node.copy() - # Update the previous node - _, curr_group_members = self._get_portgroup_members(context, - current['id']) - prev_node['next_hop'] = ( - jsonutils.dumps(curr_group_members) - if curr_group_members else None - ) - # update next hop to database - self.update_path_node(prev_node['id'], prev_node) - if prev_node['node_type'] == ovs_const.SRC_NODE: - self._delete_src_node_flowrules( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_src_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - else: - self._delete_path_node_flowrule( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_path_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - - # Update the current node - # to find the current node by using the node's next_group_id - # if this node is the last, next_group_id would be None - curr_pos = port_chain['port_pair_groups'].index(current['id']) - curr_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'nsi': 0xfe - curr_pos}) - if not curr_node: - continue - - # Add the port-pair-details into the current node - for pp_id in ( - set(current['port_pairs']) - set(original['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - - assco_args = {'portpair_id': ppd['id'], - 'pathnode_id': curr_node['id'], - 'weight': 1, } - self.create_pathport_assoc(assco_args) - self._update_path_node_port_flowrules( - curr_node, ppd, port_chain['flow_classifiers']) - - # Delete the port-pair-details from the current node - for pp_id in ( - set(original['port_pairs']) - set(current['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - self._delete_path_node_port_flowrule( - curr_node, ppd, port_chain['flow_classifiers']) - self.delete_pathport_assoc(curr_node['id'], ppd['id']) - - @log_helpers.log_method_call - def _get_portpair_detail_info(self, portpair_id): - """Get port detail. - - @param: portpair_id: uuid - @return: (host_id, local_ip, network_type, segment_id, - service_insert_type): tuple - """ - - core_plugin = manager.NeutronManager.get_plugin() - port_detail = core_plugin.get_port(self.admin_context, portpair_id) - host_id, local_ip, network_type, segment_id, mac_address = ( - (None, ) * 5) - - if port_detail: - host_id = port_detail['binding:host_id'] - network_id = port_detail['network_id'] - mac_address = port_detail['mac_address'] - network_info = core_plugin.get_network( - self.admin_context, network_id) - network_type = network_info['provider:network_type'] - segment_id = network_info['provider:segmentation_id'] - - if ( - host_id and - network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] - ): - driver = core_plugin.type_manager.drivers.get(network_type) - host_endpoint = driver.obj.get_endpoint_by_host(host_id) - local_ip = host_endpoint['ip_address'] - - return host_id, local_ip, network_type, segment_id, mac_address - - @log_helpers.log_method_call - def _create_port_detail(self, port_pair): - # since first node may not assign the ingress port, and last node may - # not assign the egress port. we use one of the - # port as the key to get the SF information. - port = None - if port_pair.get('ingress', None): - port = port_pair['ingress'] - elif port_pair.get('egress', None): - port = port_pair['egress'] - - host_id, local_endpoint, network_type, segment_id, mac_address = ( - self._get_portpair_detail_info(port)) - port_detail = { - 'ingress': port_pair.get('ingress', None), - 'egress': port_pair.get('egress', None), - 'tenant_id': port_pair['tenant_id'], - 'host_id': host_id, - 'segment_id': segment_id, - 'network_type': network_type, - 'local_endpoint': local_endpoint, - 'mac_address': mac_address - } - r = self.create_port_detail(port_detail) - LOG.debug('create port detail: %s', r) - return r - - @log_helpers.log_method_call - def create_port_pair(self, context): - port_pair = context.current - self._create_port_detail(port_pair) - - @log_helpers.log_method_call - def delete_port_pair(self, context): - port_pair = context.current - - pd_filter = dict(ingress=port_pair.get('ingress', None), - egress=port_pair.get('egress', None), - tenant_id=port_pair['tenant_id'] - ) - pds = self.get_port_details_by_filter(pd_filter) - if pds: - for pd in pds: - self.delete_port_detail(pd['id']) - - @log_helpers.log_method_call - def update_port_pair(self, context): - pass - - def get_flowrules_by_host_portid(self, context, host, port_id): - port_chain_flowrules = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return port_chain_flowrules - try: - port_detail_list = [] - # one port only may be in egress/ingress port once time. - ingress_port = self.get_port_detail_by_filter( - dict(ingress=port_id)) - egress_port = self.get_port_detail_by_filter( - dict(egress=port_id)) - if not ingress_port and not egress_port: - return None - # SF migrate to other host - if ingress_port: - port_detail_list.append(ingress_port) - if ingress_port['host_id'] != host: - ingress_port.update(dict(host_id=host)) - - if egress_port: - port_detail_list.append(egress_port) - if egress_port['host_id'] != host: - egress_port.update(dict(host_id=host)) - - # this is a SF if there are both egress and engress. - for i, ports in enumerate(port_detail_list): - nodes_assocs = ports['path_nodes'] - for assoc in nodes_assocs: - # update current path flow rule - node = self.get_path_node(assoc['pathnode_id']) - port_chain = sfc_plugin.get_port_chain( - context, - node['portchain_id']) - flow_rule = self._build_portchain_flowrule_body( - node, - ports, - add_fc_ids=port_chain['flow_classifiers'] - ) - port_chain_flowrules.append(flow_rule) - - # update the pre-path node flow rule - # if node['node_type'] != ovs_const.SRC_NODE: - # node_filter = dict(nsp=node['nsp'], - # nsi=node['nsi'] + 1 - # ) - # pre_node_list = self.get_path_nodes_by_filter( - # node_filter) - # if not pre_node_list: - # continue - # for pre_node in pre_node_list: - # self._update_path_node_flowrules( - # pre_node, - # add_fc_ids=port_chain['flow_classifiers']) - - return port_chain_flowrules - - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flowrules_by_host_portid failed")) - - def get_flow_classifier_by_portchain_id(self, context, portchain_id): - try: - flow_classifier_list = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - - port_chain = sfc_plugin.get_port_chain( - context, - portchain_id) - flow_classifier_list = self._get_portchain_fcs(port_chain) - return flow_classifier_list - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) - - def update_flowrule_status(self, context, id, status): - try: - flowrule_status = dict(status=status) - self.update_path_node(id, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("update_flowrule_status failed")) - - def _update_src_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - add_fc_ids, - del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, - filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - flow_rule['host'] = agent['host'] - self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( - self.admin_context, - flow_rule) - - def _delete_src_node_flowrules(self, node, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - None, del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, filters={ - 'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - self._update_portchain_group_reference_count(flow_rule, - agent['host']) - self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( - self.admin_context, - flow_rule) - - def get_all_src_node_flowrules(self, context): - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - try: - frs = [] - port_chains = sfc_plugin.get_port_chains(context) - - for port_chain in port_chains: - # get the first node of this chain - node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) - portchain_node = self.get_path_node_by_filter(node_filters) - if not portchain_node: - continue - flow_rule = self._get_portchain_src_node_flowrule( - portchain_node, - port_chain['flow_classifiers'] - ) - if not flow_rule: - continue - frs.append(flow_rule) - return frs - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_all_src_node_flowrules failed")) - - def _get_portchain_src_node_flowrule(self, node, - add_fc_ids=None, del_fc_ids=None): - try: - add_fc_rt = [] - del_fc_rt = [] - - if add_fc_ids: - for fc in self._get_fcs_by_ids(add_fc_ids): - if not fc.get('logical_source_port', None): - add_fc_rt.append(fc) - - if del_fc_ids: - for fc in self._get_fcs_by_ids(del_fc_ids): - if not fc.get('logical_source_port', None): - del_fc_rt.append(fc) - - if not add_fc_rt and not del_fc_rt: - return None - - return self._build_portchain_flowrule_body_without_port( - node, add_fc_rt, del_fc_rt) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("_get_portchain_src_node_flowrule failed")) - - def _update_portchain_group_reference_count(self, flow_rule, host): - group_refcnt = 0 - flow_rule['host'] = host - - if flow_rule['next_group_id'] is not None: - all_nodes = self.get_path_nodes_by_filter( - filters={'next_group_id': flow_rule['next_group_id'], - 'nsi': 0xff}) - if all_nodes is not None: - for node in all_nodes: - if not node['portpair_details']: - group_refcnt += 1 - - port_details = self.get_port_details_by_filter( - dict(host_id=flow_rule['host'])) - if port_details is not None: - for pd in port_details: - for path in pd['path_nodes']: - path_node = self.get_path_node(path['pathnode_id']) - if ( - path_node['next_group_id'] == - flow_rule['next_group_id'] - ): - group_refcnt += 1 - - flow_rule['group_refcnt'] = group_refcnt - - return group_refcnt - - def _build_portchain_flowrule_body_without_port(self, - node, - add_fcs=None, - del_fcs=None): - flow_rule = node.copy() - flow_rule.pop('tenant_id') - flow_rule.pop('portpair_details') - - # according to the first sf node get network information - if not node['next_hop']: - return None - - next_hops = jsonutils.loads(node['next_hop']) - if not next_hops: - return None - - port_detail = self.get_port_detail_by_filter( - dict(id=next_hops[0]['portpair_id'])) - if not port_detail: - return None - - flow_rule['ingress'] = None - flow_rule['egress'] = None - flow_rule['network_type'] = port_detail['network_type'] - flow_rule['segment_id'] = port_detail['segment_id'] - - flow_rule['add_fcs'] = add_fcs - flow_rule['del_fcs'] = del_fcs - - # update next hop info - self._update_path_node_next_hops(flow_rule) - return flow_rule diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc.py b/networking_sfc/services/sfc/drivers/ovs/rpc.py deleted file mode 100644 index a5ac0bc..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/rpc.py +++ /dev/null @@ -1,112 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -from networking_sfc.services.sfc.drivers.ovs import rpc_topics as sfc_topics -from neutron.common import rpc as n_rpc -from neutron.common import topics -from neutron.i18n import _LI - -from oslo_log import log as logging -import oslo_messaging - -LOG = logging.getLogger(__name__) - - -class SfcRpcCallback(object): - """Sfc RPC server.""" - - def __init__(self, driver): - self.target = oslo_messaging.Target(version='1.0') - self.driver = driver - - def get_flowrules_by_host_portid(self, context, **kwargs): - host = kwargs.get('host') - port_id = kwargs.get('port_id') - LOG.debug('from port-chain service plugin') - pcfrs = self.driver.get_flowrules_by_host_portid( - context, host, port_id) - LOG.debug('host: %s, port_id: %s', host, port_id) - return pcfrs - - def get_flow_classifier_by_portchain_id(self, context, **kwargs): - portchain_id = kwargs.get('portchain_id') - pcfcs = self.driver.get_flow_classifier_by_portchain_id( - context, - portchain_id) - LOG.debug('portchain id: %s', portchain_id) - return pcfcs - - def get_all_src_node_flowrules(self, context, **kwargs): - host = kwargs.get('host') - pcfcs = self.driver.get_all_src_node_flowrules( - context) - LOG.debug('portchain get_src_node_flowrules, host: %s', host) - return pcfcs - - def update_flowrules_status(self, context, **kwargs): - flowrules_status = kwargs.get('flowrules_status') - LOG.info(_LI('update_flowrules_status: %s'), flowrules_status) - for flowrule_dict in flowrules_status: - self.driver.update_flowrule_status( - context, flowrule_dict['id'], flowrule_dict['status']) - - -class SfcAgentRpcClient(object): - """RPC client for ovs sfc agent.""" - - def __init__(self, topic=sfc_topics.SFC_AGENT): - self.topic = topic - target = oslo_messaging.Target(topic=topic, version='1.0') - self.client = n_rpc.get_client(target) - - def ask_agent_to_update_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to update flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.UPDATE), - server=host) - cctxt.cast(context, 'update_flow_rules', flowrule_entries=flows) - - def ask_agent_to_delete_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to delete flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.DELETE), - server=host) - cctxt.cast(context, 'delete_flow_rules', flowrule_entries=flows) - - def ask_agent_to_update_src_node_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to update src node flows ') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.UPDATE), - server=host) - cctxt.cast(context, 'update_src_node_flow_rules', - flowrule_entries=flows) - - def ask_agent_to_delete_src_node_flow_rules(self, context, flows): - LOG.debug('Ask agent on the specific host to delete src node flows') - LOG.debug('flows: %s', flows) - host = flows.get('host') - cctxt = self.client.prepare( - topic=topics.get_topic_name( - self.topic, sfc_topics.PORTFLOW, topics.DELETE), - server=host) - cctxt.cast(context, 'delete_src_node_flow_rules', - flowrule_entries=flows) diff --git a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py b/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py deleted file mode 100644 index a35ff4f..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/rpc_topics.py +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -AGENT = 'q-agent-notifier' - -SFC_PLUGIN = 'q-sfc-plugin' -SFC_AGENT = 'q-sfc-agent' -SFC_FLOW = 'q-sfc-flow' - -PORTFLOW = 'portflowrule' diff --git a/networking_sfc/services/sfc/plugin.py b/networking_sfc/services/sfc/plugin.py deleted file mode 100644 index f41c8e1..0000000 --- a/networking_sfc/services/sfc/plugin.py +++ /dev/null @@ -1,200 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_utils import excutils - -from neutron.i18n import _LE - -from networking_sfc.db import sfc_db -from networking_sfc.extensions import sfc as sfc_ext -from networking_sfc.services.sfc.common import context as sfc_ctx -from networking_sfc.services.sfc.common import exceptions as sfc_exc -from networking_sfc.services.sfc import driver_manager as sfc_driver - - -LOG = logging.getLogger(__name__) - - -class SfcPlugin(sfc_db.SfcDbPlugin): - """SFC plugin implementation.""" - - supported_extension_aliases = [sfc_ext.SFC_EXT] - path_prefix = sfc_ext.SFC_PREFIX - - def __init__(self): - self.driver_manager = sfc_driver.SfcDriverManager() - super(SfcPlugin, self).__init__() - self.driver_manager.initialize() - - @log_helpers.log_method_call - def create_port_chain(self, context, port_chain): - port_chain_db = super(SfcPlugin, self).create_port_chain( - context, port_chain) - portchain_db_context = sfc_ctx.PortChainContext( - self, context, port_chain_db) - try: - self.driver_manager.create_port_chain(portchain_db_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Create port chain failed, " - "deleting port_chain '%s'"), - port_chain_db['id']) - self.delete_port_chain(context, port_chain_db['id']) - - return port_chain_db - - @log_helpers.log_method_call - def update_port_chain(self, context, portchain_id, port_chain): - original_portchain = self.get_port_chain(context, portchain_id) - updated_portchain = super(SfcPlugin, self).update_port_chain( - context, portchain_id, port_chain) - portchain_db_context = sfc_ctx.PortChainContext( - self, context, updated_portchain, - original_portchain=original_portchain) - - try: - self.driver_manager.update_port_chain(portchain_db_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Update port chain failed, port_chain '%s'"), - updated_portchain['id']) - - # TODO(qijing): should we rollback the database update here? - return updated_portchain - - @log_helpers.log_method_call - def delete_port_chain(self, context, portchain_id): - pc = self.get_port_chain(context, portchain_id) - pc_context = sfc_ctx.PortChainContext(self, context, pc) - try: - self.driver_manager.delete_port_chain(pc_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Delete port chain failed, portchain '%s'"), - portchain_id) - # TODO(qijing): unsync in case deleted in driver but fail in database - super(SfcPlugin, self).delete_port_chain(context, portchain_id) - - @log_helpers.log_method_call - def create_port_pair(self, context, port_pair): - portpair_db = super(SfcPlugin, self).create_port_pair( - context, port_pair) - portpair_context = sfc_ctx.PortPairContext( - self, context, portpair_db) - try: - self.driver_manager.create_port_pair(portpair_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Create port pair failed, " - "deleting port_pair '%s'"), - portpair_db['id']) - self.delete_port_pair(context, portpair_db['id']) - - return portpair_db - - @log_helpers.log_method_call - def update_port_pair(self, context, portpair_id, port_pair): - original_portpair = self.get_port_pair(context, portpair_id) - updated_portpair = super(SfcPlugin, self).update_port_pair( - context, portpair_id, port_pair) - portpair_context = sfc_ctx.PortPairContext( - self, context, updated_portpair, - original_portpair=original_portpair) - try: - self.driver_manager.update_port_pair(portpair_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Update port pair failed, port_pair '%s'"), - updated_portpair['id']) - - return updated_portpair - - @log_helpers.log_method_call - def delete_port_pair(self, context, portpair_id): - portpair = self.get_port_pair(context, portpair_id) - portpair_context = sfc_ctx.PortPairContext( - self, context, portpair) - try: - self.driver_manager.delete_port_pair(portpair_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Delete port pair failed, port_pair '%s'"), - portpair_id) - - super(SfcPlugin, self).delete_port_pair(context, portpair_id) - - @log_helpers.log_method_call - def create_port_pair_group(self, context, port_pair_group): - portpairgroup_db = super(SfcPlugin, self).create_port_pair_group( - context, port_pair_group) - portpairgroup_context = sfc_ctx.PortPairGroupContext( - self, context, portpairgroup_db) - try: - self.driver_manager.create_port_pair_group(portpairgroup_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Create port pair group failed, " - "deleting port_pair_group '%s'"), - portpairgroup_db['id']) - self.delete_port_pair_group(context, portpairgroup_db['id']) - - return portpairgroup_db - - @log_helpers.log_method_call - def update_port_pair_group( - self, context, portpairgroup_id, port_pair_group - ): - original_portpairgroup = self.get_port_pair_group( - context, portpairgroup_id) - updated_portpairgroup = super(SfcPlugin, self).update_port_pair_group( - context, portpairgroup_id, port_pair_group) - portpairgroup_context = sfc_ctx.PortPairGroupContext( - self, context, updated_portpairgroup, - original_portpairgroup=original_portpairgroup) - try: - self.driver_manager.update_port_pair_group(portpairgroup_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Update port pair group failed, " - "port_pair_group '%s'"), - updated_portpairgroup['id']) - - return updated_portpairgroup - - @log_helpers.log_method_call - def delete_port_pair_group(self, context, portpairgroup_id): - portpairgroup = self.get_port_pair_group(context, portpairgroup_id) - portpairgroup_context = sfc_ctx.PortPairGroupContext( - self, context, portpairgroup) - try: - self.driver_manager.delete_port_pair_group(portpairgroup_context) - except sfc_exc.SfcDriverError as e: - LOG.exception(e) - with excutils.save_and_reraise_exception(): - LOG.error(_LE("Delete port pair group failed, " - "port_pair_group '%s'"), - portpairgroup_id) - - super(SfcPlugin, self).delete_port_pair_group(context, - portpairgroup_id) |