From c772a1dbc7ace58d099570d41a889adf851c8ba8 Mon Sep 17 00:00:00 2001 From: Ulas Kozat Date: Mon, 28 Dec 2015 16:05:13 -0800 Subject: Added networking-sfc from openstack project with merge date Dec 23 2015 Added patch 13 for subject "add missing db migration files" Change-Id: Id51a160335a14870c1dd816a44baf9b1958b9ac6 --- networking_sfc/services/sfc/drivers/ovs/driver.py | 1076 +++++++++++++++++++++ 1 file changed, 1076 insertions(+) create mode 100644 networking_sfc/services/sfc/drivers/ovs/driver.py (limited to 'networking_sfc/services/sfc/drivers/ovs/driver.py') diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py new file mode 100644 index 0000000..9dfc40d --- /dev/null +++ b/networking_sfc/services/sfc/drivers/ovs/driver.py @@ -0,0 +1,1076 @@ +# Copyright 2015 Futurewei. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import netaddr + +# from eventlet import greenthread + +from neutron.common import constants as nc_const +from neutron.common import rpc as n_rpc + +from neutron import manager + +from neutron.i18n import _LE +from neutron.i18n import _LW + +from neutron.plugins.common import constants as np_const + + +from oslo_log import helpers as log_helpers +from oslo_log import log as logging +from oslo_serialization import jsonutils + +from networking_sfc.extensions import flowclassifier +from networking_sfc.extensions import sfc +from networking_sfc.services.sfc.common import exceptions as exc +from networking_sfc.services.sfc.drivers import base as driver_base +from networking_sfc.services.sfc.drivers.ovs import( + rpc_topics as sfc_topics) +from networking_sfc.services.sfc.drivers.ovs import( + db as ovs_sfc_db) +from networking_sfc.services.sfc.drivers.ovs import( + rpc as ovs_sfc_rpc) +from networking_sfc.services.sfc.drivers.ovs import ( + constants as ovs_const) + + +LOG = logging.getLogger(__name__) + + +class OVSSfcDriver(driver_base.SfcDriverBase, + ovs_sfc_db.OVSSfcDriverDB): + """Sfc Driver Base Class.""" + + def initialize(self): + super(OVSSfcDriver, self).initialize() + self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( + sfc_topics.SFC_AGENT + ) + + self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) + self._setup_rpc() + + def _setup_rpc(self): + # Setup a rpc server + self.topic = sfc_topics.SFC_PLUGIN + self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] + self.conn = n_rpc.create_connection(new=True) + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) + self.conn.consume_in_threads() + + def _get_subnet(self, core_plugin, tenant_id, cidr): + filters = {'tenant_id': [tenant_id]} + subnets = core_plugin.get_subnets(self.admin_context, filters=filters) + cidr_set = netaddr.IPSet([cidr]) + + for subnet in subnets: + subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) + if cidr_set.issubset(subnet_cidr_set): + return subnet + + def _get_fc_dst_subnet_gw_port(self, fc): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet(core_plugin, + fc['tenant_id'], + fc['destination_ip_prefix']) + + return self._get_port_subnet_gw_info(core_plugin, subnet) + + def _get_port_subnet_gw_info_by_port_id(self, id): + core_plugin = manager.NeutronManager.get_plugin() + subnet = self._get_subnet_by_port(core_plugin, id) + return self._get_port_subnet_gw_info(core_plugin, + subnet) + + def _get_port_subnet_gw_info(self, core_plugin, subnet): + filters = { + 'device_owner': + [nc_const.DEVICE_OWNER_ROUTER_INTF] + } + gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) + for port in gw_ports: + for fixed_ip in port['fixed_ips']: + if subnet["id"] == fixed_ip['subnet_id']: + return (port['mac_address'], + subnet['cidr'], + subnet['network_id']) + + raise exc.NoSubnetGateway( + type='subnet gateway', + cidr=subnet['cidr']) + + def _get_subnet_by_port(self, core_plugin, id): + port = core_plugin.get_port(self.admin_context, id) + for ip in port['fixed_ips']: + subnet = core_plugin.get_subnet(self.admin_context, + ip["subnet_id"]) + # currently only support one subnet for a port + break + + return subnet + + @log_helpers.log_method_call + def _get_portgroup_members(self, context, pg_id): + next_group_members = [] + group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) + LOG.debug('group_intid: %s', group_intid) + pg = context._plugin.get_port_pair_group(context._plugin_context, + pg_id) + for pp_id in pg['port_pairs']: + pp = context._plugin.get_port_pair(context._plugin_context, pp_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + if pd: + next_group_members.append( + dict(portpair_id=pd['id'], weight=1)) + return group_intid, next_group_members + + def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): + pp = context._plugin.get_port_pair(context._plugin_context, + port_pair_id) + filters = {} + if pp.get('ingress', None): + filters = dict(dict(ingress=pp['ingress']), **filters) + if pp.get('egress', None): + filters = dict(dict(egress=pp['egress']), **filters) + pd = self.get_port_detail_by_filter(filters) + + return pd + + @log_helpers.log_method_call + def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node, dst_node, + last_sf_node=None): + dst_ports = [] + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # lookup the source port + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + src_pd = self.get_port_detail_by_filter(src_pd_filter) + + if not src_pd: + # Create source port detail + src_pd = self._create_port_detail(src_pd_filter) + LOG.debug('create src port detail: %s', src_pd) + + # Create associate relationship + assco_args = {'portpair_id': src_pd['id'], + 'pathnode_id': src_node['id'], + 'weight': 1, + } + sna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc src port with node: %s', sna) + src_node['portpair_details'].append(src_pd['id']) + + if fc.get('logical_destination_port', ''): + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + dst_pd = self.get_port_detail_by_filter(dst_pd_filter) + + if not dst_pd: + # Create dst port detail + dst_pd = self._create_port_detail(dst_pd_filter) + LOG.debug('create dst port detail: %s', dst_pd) + + # Create associate relationship + dst_assco_args = {'portpair_id': dst_pd['id'], + 'pathnode_id': dst_node['id'], + 'weight': 1, + } + dna = self.create_pathport_assoc(dst_assco_args) + LOG.debug('create assoc dst port with node: %s', dna) + dst_node['portpair_details'].append(dst_pd['id']) + + dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) + + if last_sf_node: + if last_sf_node['next_hop']: + next_hops = jsonutils.loads(last_sf_node['next_hop']) + next_hops.extend(dst_ports) + last_sf_node['next_hop'] = jsonutils.dumps(next_hops) + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + return dst_ports + + def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, + src_node=None, dst_node=None, + last_sf_node=None): + if not fc_ids: + return + for fc in self._get_fcs_by_ids(fc_ids): + if fc.get('logical_source_port', ''): + # delete source port detail + src_pd_filter = dict(egress=fc['logical_source_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(src_pd_filter) + if pds: + for pd in pds: + # update src_node portpair_details refence info + if src_node and pd['id'] in src_node[ + 'portpair_details' + ]: + src_node['portpair_details'].remove(pd['id']) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if fc.get('logical_destination_port', ''): + # Create dst port detail + dst_pd_filter = dict(ingress=fc['logical_destination_port'], + tenant_id=tenant_id + ) + pds = self.get_port_details_by_filter(dst_pd_filter) + if pds: + for pd in pds: + # update dst_node portpair_details refence info + if dst_node and pd['id'] in dst_node[ + 'portpair_details' + ]: + # update portpair_details of this node + dst_node['portpair_details'].remove(pd['id']) + # update last hop(SF-group) next hop info + if last_sf_node: + next_hop = dict(portpair_id=pd['id'], + weight=1) + next_hops = jsonutils.loads( + last_sf_node['next_hop']) + next_hops.remove(next_hop) + last_sf_node['next_hop'] = jsonutils.dumps( + next_hops) + if len(pd['path_nodes']) == 1: + self.delete_port_detail(pd['id']) + + if last_sf_node: + # update nexthop info of pre node + self.update_path_node(last_sf_node['id'], + last_sf_node) + + @log_helpers.log_method_call + def _create_portchain_path(self, context, port_chain): + src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) + path_nodes, dst_ports = [], [] + # Create an assoc object for chain_id and path_id + # context = context._plugin_context + path_id = self.id_pool.assign_intid('portchain', port_chain['id']) + + if not path_id: + LOG.error(_LE('No path_id available for creating port chain path')) + return + + next_group_intid, next_group_members = self._get_portgroup_members( + context, port_chain['port_pair_groups'][0]) + + port_pair_groups = port_chain['port_pair_groups'] + sf_path_length = len(port_pair_groups) + # Create a head node object for port chain + src_args = {'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SRC_NODE, + 'nsp': path_id, + 'nsi': 0xff, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': jsonutils.dumps(next_group_members), + } + src_node = self.create_path_node(src_args) + LOG.debug('create src node: %s', src_node) + path_nodes.append(src_node) + + # Create a destination node object for port chain + dst_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.DST_NODE, + 'nsp': path_id, + 'nsi': 0xff - sf_path_length - 1, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': None, + 'next_hop': None + } + dst_node = self.create_path_node(dst_args) + LOG.debug('create dst node: %s', dst_node) + path_nodes.append(dst_node) + + dst_ports = self._add_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'], + src_node, + dst_node + ) + + for i in range(sf_path_length): + cur_group_members = next_group_members + # next_group for next hop + if i < sf_path_length - 1: + next_group_intid, next_group_members = ( + self._get_portgroup_members( + context, port_pair_groups[i + 1]) + ) + else: + next_group_intid = None + next_group_members = None if not dst_ports else dst_ports + + # Create a node object + node_args = { + 'tenant_id': port_chain['tenant_id'], + 'node_type': ovs_const.SF_NODE, + 'nsp': path_id, + 'nsi': 0xfe - i, + 'portchain_id': port_chain['id'], + 'status': ovs_const.STATUS_BUILDING, + 'next_group_id': next_group_intid, + 'next_hop': ( + None if not next_group_members else + jsonutils.dumps(next_group_members) + ) + } + sf_node = self.create_path_node(node_args) + LOG.debug('chain path node: %s', sf_node) + # Create the assocation objects that combine the pathnode_id with + # the ingress of the port_pairs in the current group + # when port_group does not reach tail + for member in cur_group_members: + assco_args = {'portpair_id': member['portpair_id'], + 'pathnode_id': sf_node['id'], + 'weight': member['weight'], } + sfna = self.create_pathport_assoc(assco_args) + LOG.debug('create assoc port with node: %s', sfna) + sf_node['portpair_details'].append(member['portpair_id']) + path_nodes.append(sf_node) + + return path_nodes + + def _delete_path_node_port_flowrule(self, node, port, fc_ids): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + flow_rule = self._build_portchain_flowrule_body( + node, + port, + None, + fc_ids) + + self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( + self.admin_context, + flow_rule) + + def _delete_path_node_flowrule(self, node, fc_ids): + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._delete_path_node_port_flowrule( + node, port, fc_ids) + + @log_helpers.log_method_call + def _delete_portchain_path(self, context, portchain_id): + port_chain = context.current + first = self.get_path_node_by_filter( + filters={ + 'portchain_id': portchain_id, + 'nsi': 0xff + } + ) + + # delete flow rules which source port isn't assigned + # in flow classifier + if first: + self._delete_src_node_flowrules( + first, + port_chain['flow_classifiers'] + ) + + pds = self.get_path_nodes_by_filter( + dict(portchain_id=portchain_id)) + if pds: + for pd in pds: + self._delete_path_node_flowrule( + pd, + port_chain['flow_classifiers'] + ) + self.delete_path_node(pd['id']) + + # delete the ports on the traffic classifier + self._remove_flowclassifier_port_assoc( + port_chain['flow_classifiers'], + port_chain['tenant_id'] + ) + + # Delete the chainpathpair + intid = self.id_pool.get_intid_by_uuid( + 'portchain', portchain_id) + self.id_pool.release_intid('portchain', intid) + + def _update_path_node_next_hops(self, flow_rule): + node_next_hops = [] + if not flow_rule['next_hop']: + return None + next_hops = jsonutils.loads(flow_rule['next_hop']) + if not next_hops: + return None + for member in next_hops: + detail = {} + port_detail = self.get_port_detail_by_filter( + dict(id=member['portpair_id'])) + if not port_detail or not port_detail['host_id']: + continue + detail['local_endpoint'] = port_detail['local_endpoint'] + detail['weight'] = member['weight'] + detail['mac_address'] = port_detail['mac_address'] + detail['ingress'] = port_detail['ingress'] + node_next_hops.append(detail) + + mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( + detail['ingress'] + ) + + detail['gw_mac'] = mac + detail['cidr'] = cidr + detail['net_uuid'] = net_uuid + + flow_rule['next_hops'] = node_next_hops + flow_rule.pop('next_hop') + + return node_next_hops + + def _build_portchain_flowrule_body(self, node, port, + add_fc_ids=None, del_fc_ids=None): + node_info = node.copy() + node_info.pop('tenant_id') + node_info.pop('portpair_details') + + port_info = port.copy() + port_info.pop('tenant_id') + port_info.pop('id') + port_info.pop('path_nodes') + port_info.pop('host_id') + + flow_rule = dict(node_info, **port_info) + # if this port is belong to NSH/MPLS-aware vm, only to + # notify the flow classifier for 1st SF. + flow_rule['add_fcs'] = self._filter_flow_classifiers( + flow_rule, add_fc_ids) + flow_rule['del_fcs'] = self._filter_flow_classifiers( + flow_rule, del_fc_ids) + + self._update_portchain_group_reference_count(flow_rule, + port['host_id']) + + # update next hop info + self._update_path_node_next_hops(flow_rule) + + return flow_rule + + def _filter_flow_classifiers(self, flow_rule, fc_ids): + """Filter flow classifiers. + + @return: list of the flow classifiers + """ + + fc_return = [] + + if not fc_ids: + return fc_return + fcs = self._get_fcs_by_ids(fc_ids) + for fc in fcs: + new_fc = fc.copy() + new_fc.pop('id') + new_fc.pop('name') + new_fc.pop('tenant_id') + new_fc.pop('description') + + if ((flow_rule['node_type'] == ovs_const.SRC_NODE and + flow_rule['egress'] == fc['logical_source_port'] + ) or + (flow_rule['node_type'] == ovs_const.DST_NODE and + flow_rule['ingress'] == fc['logical_destination_port'] + )): + fc_return.append(new_fc) + elif flow_rule['node_type'] == ovs_const.SF_NODE: + fc_return.append(new_fc) + + return fc_return + + def _update_path_node_port_flowrules(self, node, port, + add_fc_ids=None, del_fc_ids=None): + # if this port is not binding, don't to generate flow rule + if not port['host_id']: + return + + flow_rule = self._build_portchain_flowrule_body( + node, + port, + add_fc_ids, + del_fc_ids) + + self.ovs_driver_rpc.ask_agent_to_update_flow_rules( + self.admin_context, + flow_rule) + + def _update_path_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + if node['portpair_details'] is None: + return + for each in node['portpair_details']: + port = self.get_port_detail_by_filter(dict(id=each)) + if port: + self._update_path_node_port_flowrules( + node, port, add_fc_ids, del_fc_ids) + + def _thread_update_path_nodes(self, nodes, + add_fc_ids=None, del_fc_ids=None): + for node in nodes: + self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) + self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) + + def _get_portchain_fcs(self, port_chain): + return self._get_fcs_by_ids(port_chain['flow_classifiers']) + + def _get_fcs_by_ids(self, fc_ids): + flow_classifiers = [] + if not fc_ids: + return flow_classifiers + + # Get the portchain flow classifiers + fc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + flowclassifier.FLOW_CLASSIFIER_EXT) + ) + if not fc_plugin: + LOG.warn(_LW("Not found the flow classifier service plugin")) + return flow_classifiers + + for fc_id in fc_ids: + fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) + flow_classifiers.append(fc) + + return flow_classifiers + + @log_helpers.log_method_call + def create_port_chain(self, context): + port_chain = context.current + path_nodes = self._create_portchain_path(context, port_chain) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes( + path_nodes, + port_chain['flow_classifiers'], + None) + + @log_helpers.log_method_call + def delete_port_chain(self, context): + port_chain = context.current + portchain_id = port_chain['id'] + LOG.debug("to delete portchain path") + self._delete_portchain_path(context, portchain_id) + + def _get_diff_set(self, orig, cur): + orig_set = set(item for item in orig) + cur_set = set(item for item in cur) + + to_del = orig_set.difference(cur_set) + to_add = cur_set.difference(orig_set) + + return to_del, to_add + + @log_helpers.log_method_call + def update_port_chain(self, context): + port_chain = context.current + orig = context.original + + del_fc_ids, add_fc_ids = self._get_diff_set( + orig['flow_classifiers'], + port_chain['flow_classifiers'] + ) + path_nodes = self.get_path_nodes_by_filter( + dict(portchain_id=port_chain['id']) + ) + if not path_nodes: + return + + sort_path_nodes = sorted(path_nodes, + key=lambda x: x['nsi'], + reverse=True) + if del_fc_ids: + self._thread_update_path_nodes(sort_path_nodes, + None, + del_fc_ids) + self._remove_flowclassifier_port_assoc(del_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + if add_fc_ids: + self._add_flowclassifier_port_assoc(add_fc_ids, + port_chain['tenant_id'], + sort_path_nodes[0], + sort_path_nodes[-1], + sort_path_nodes[-2]) + + # notify agent with async thread + # current we don't use greenthread.spawn + self._thread_update_path_nodes(sort_path_nodes, + add_fc_ids, + None) + + @log_helpers.log_method_call + def create_port_pair_group(self, context): + group = context.current + self.id_pool.assign_intid('group', group['id']) + + @log_helpers.log_method_call + def delete_port_pair_group(self, context): + group = context.current + group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) + if group_intid: + self.id_pool.release_intid('group', group_intid) + + @log_helpers.log_method_call + def update_port_pair_group(self, context): + current = context.current + original = context.original + + if set(current['port_pairs']) == set(original['port_pairs']): + return + + # Update the path_nodes and flows for each port chain that + # contains this port_pair_group + # Note: _get_port_pair_group is temporarily used here. + ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, + current['id']) + port_chains = [assoc.portchain_id for assoc in + ppg_obj.chain_group_associations] + + for chain_id in port_chains: + port_chain = context._plugin.get_port_chain( + context._plugin_context, chain_id) + group_intid = self.id_pool.get_intid_by_uuid('group', + current['id']) + # Get the previous node + prev_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'next_group_id': group_intid}) + if not prev_node: + continue + + before_update_prev_node = prev_node.copy() + # Update the previous node + _, curr_group_members = self._get_portgroup_members(context, + current['id']) + prev_node['next_hop'] = ( + jsonutils.dumps(curr_group_members) + if curr_group_members else None + ) + # update next hop to database + self.update_path_node(prev_node['id'], prev_node) + if prev_node['node_type'] == ovs_const.SRC_NODE: + self._delete_src_node_flowrules( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_src_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + else: + self._delete_path_node_flowrule( + before_update_prev_node, port_chain['flow_classifiers']) + self._update_path_node_flowrules( + prev_node, port_chain['flow_classifiers'], None) + + # Update the current node + # to find the current node by using the node's next_group_id + # if this node is the last, next_group_id would be None + curr_pos = port_chain['port_pair_groups'].index(current['id']) + curr_node = self.get_path_node_by_filter( + filters={'portchain_id': chain_id, + 'nsi': 0xfe - curr_pos}) + if not curr_node: + continue + + # Add the port-pair-details into the current node + for pp_id in ( + set(current['port_pairs']) - set(original['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + + assco_args = {'portpair_id': ppd['id'], + 'pathnode_id': curr_node['id'], + 'weight': 1, } + self.create_pathport_assoc(assco_args) + self._update_path_node_port_flowrules( + curr_node, ppd, port_chain['flow_classifiers']) + + # Delete the port-pair-details from the current node + for pp_id in ( + set(original['port_pairs']) - set(current['port_pairs']) + ): + ppd = self._get_port_pair_detail_by_port_pair(context, + pp_id) + if not ppd: + LOG.debug('No port_pair_detail for the port_pair: %s', + pp_id) + LOG.debug("Failed to update port-pair-group") + return + self._delete_path_node_port_flowrule( + curr_node, ppd, port_chain['flow_classifiers']) + self.delete_pathport_assoc(curr_node['id'], ppd['id']) + + @log_helpers.log_method_call + def _get_portpair_detail_info(self, portpair_id): + """Get port detail. + + @param: portpair_id: uuid + @return: (host_id, local_ip, network_type, segment_id, + service_insert_type): tuple + """ + + core_plugin = manager.NeutronManager.get_plugin() + port_detail = core_plugin.get_port(self.admin_context, portpair_id) + host_id, local_ip, network_type, segment_id, mac_address = ( + (None, ) * 5) + + if port_detail: + host_id = port_detail['binding:host_id'] + network_id = port_detail['network_id'] + mac_address = port_detail['mac_address'] + network_info = core_plugin.get_network( + self.admin_context, network_id) + network_type = network_info['provider:network_type'] + segment_id = network_info['provider:segmentation_id'] + + if ( + host_id and + network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] + ): + driver = core_plugin.type_manager.drivers.get(network_type) + host_endpoint = driver.obj.get_endpoint_by_host(host_id) + local_ip = host_endpoint['ip_address'] + + return host_id, local_ip, network_type, segment_id, mac_address + + @log_helpers.log_method_call + def _create_port_detail(self, port_pair): + # since first node may not assign the ingress port, and last node may + # not assign the egress port. we use one of the + # port as the key to get the SF information. + port = None + if port_pair.get('ingress', None): + port = port_pair['ingress'] + elif port_pair.get('egress', None): + port = port_pair['egress'] + + host_id, local_endpoint, network_type, segment_id, mac_address = ( + self._get_portpair_detail_info(port)) + port_detail = { + 'ingress': port_pair.get('ingress', None), + 'egress': port_pair.get('egress', None), + 'tenant_id': port_pair['tenant_id'], + 'host_id': host_id, + 'segment_id': segment_id, + 'network_type': network_type, + 'local_endpoint': local_endpoint, + 'mac_address': mac_address + } + r = self.create_port_detail(port_detail) + LOG.debug('create port detail: %s', r) + return r + + @log_helpers.log_method_call + def create_port_pair(self, context): + port_pair = context.current + self._create_port_detail(port_pair) + + @log_helpers.log_method_call + def delete_port_pair(self, context): + port_pair = context.current + + pd_filter = dict(ingress=port_pair.get('ingress', None), + egress=port_pair.get('egress', None), + tenant_id=port_pair['tenant_id'] + ) + pds = self.get_port_details_by_filter(pd_filter) + if pds: + for pd in pds: + self.delete_port_detail(pd['id']) + + @log_helpers.log_method_call + def update_port_pair(self, context): + pass + + def get_flowrules_by_host_portid(self, context, host, port_id): + port_chain_flowrules = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return port_chain_flowrules + try: + port_detail_list = [] + # one port only may be in egress/ingress port once time. + ingress_port = self.get_port_detail_by_filter( + dict(ingress=port_id)) + egress_port = self.get_port_detail_by_filter( + dict(egress=port_id)) + if not ingress_port and not egress_port: + return None + # SF migrate to other host + if ingress_port: + port_detail_list.append(ingress_port) + if ingress_port['host_id'] != host: + ingress_port.update(dict(host_id=host)) + + if egress_port: + port_detail_list.append(egress_port) + if egress_port['host_id'] != host: + egress_port.update(dict(host_id=host)) + + # this is a SF if there are both egress and engress. + for i, ports in enumerate(port_detail_list): + nodes_assocs = ports['path_nodes'] + for assoc in nodes_assocs: + # update current path flow rule + node = self.get_path_node(assoc['pathnode_id']) + port_chain = sfc_plugin.get_port_chain( + context, + node['portchain_id']) + flow_rule = self._build_portchain_flowrule_body( + node, + ports, + add_fc_ids=port_chain['flow_classifiers'] + ) + port_chain_flowrules.append(flow_rule) + + # update the pre-path node flow rule + # if node['node_type'] != ovs_const.SRC_NODE: + # node_filter = dict(nsp=node['nsp'], + # nsi=node['nsi'] + 1 + # ) + # pre_node_list = self.get_path_nodes_by_filter( + # node_filter) + # if not pre_node_list: + # continue + # for pre_node in pre_node_list: + # self._update_path_node_flowrules( + # pre_node, + # add_fc_ids=port_chain['flow_classifiers']) + + return port_chain_flowrules + + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flowrules_by_host_portid failed")) + + def get_flow_classifier_by_portchain_id(self, context, portchain_id): + try: + flow_classifier_list = [] + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + + port_chain = sfc_plugin.get_port_chain( + context, + portchain_id) + flow_classifier_list = self._get_portchain_fcs(port_chain) + return flow_classifier_list + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) + + def update_flowrule_status(self, context, id, status): + try: + flowrule_status = dict(status=status) + self.update_path_node(id, flowrule_status) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("update_flowrule_status failed")) + + def _update_src_node_flowrules(self, node, + add_fc_ids=None, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + add_fc_ids, + del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, + filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + flow_rule['host'] = agent['host'] + self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( + self.admin_context, + flow_rule) + + def _delete_src_node_flowrules(self, node, del_fc_ids=None): + flow_rule = self._get_portchain_src_node_flowrule(node, + None, del_fc_ids) + if not flow_rule: + return + + core_plugin = manager.NeutronManager.get_plugin() + pc_agents = core_plugin.get_agents( + self.admin_context, filters={ + 'agent_type': [nc_const.AGENT_TYPE_OVS]}) + if not pc_agents: + return + + for agent in pc_agents: + if agent['alive']: + # update host info to flow rule + self._update_portchain_group_reference_count(flow_rule, + agent['host']) + self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( + self.admin_context, + flow_rule) + + def get_all_src_node_flowrules(self, context): + sfc_plugin = ( + manager.NeutronManager.get_service_plugins().get( + sfc.SFC_EXT + ) + ) + if not sfc_plugin: + return [] + try: + frs = [] + port_chains = sfc_plugin.get_port_chains(context) + + for port_chain in port_chains: + # get the first node of this chain + node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) + portchain_node = self.get_path_node_by_filter(node_filters) + if not portchain_node: + continue + flow_rule = self._get_portchain_src_node_flowrule( + portchain_node, + port_chain['flow_classifiers'] + ) + if not flow_rule: + continue + frs.append(flow_rule) + return frs + except Exception as e: + LOG.exception(e) + LOG.error(_LE("get_all_src_node_flowrules failed")) + + def _get_portchain_src_node_flowrule(self, node, + add_fc_ids=None, del_fc_ids=None): + try: + add_fc_rt = [] + del_fc_rt = [] + + if add_fc_ids: + for fc in self._get_fcs_by_ids(add_fc_ids): + if not fc.get('logical_source_port', None): + add_fc_rt.append(fc) + + if del_fc_ids: + for fc in self._get_fcs_by_ids(del_fc_ids): + if not fc.get('logical_source_port', None): + del_fc_rt.append(fc) + + if not add_fc_rt and not del_fc_rt: + return None + + return self._build_portchain_flowrule_body_without_port( + node, add_fc_rt, del_fc_rt) + except Exception as e: + LOG.exception(e) + LOG.error(_LE("_get_portchain_src_node_flowrule failed")) + + def _update_portchain_group_reference_count(self, flow_rule, host): + group_refcnt = 0 + flow_rule['host'] = host + + if flow_rule['next_group_id'] is not None: + all_nodes = self.get_path_nodes_by_filter( + filters={'next_group_id': flow_rule['next_group_id'], + 'nsi': 0xff}) + if all_nodes is not None: + for node in all_nodes: + if not node['portpair_details']: + group_refcnt += 1 + + port_details = self.get_port_details_by_filter( + dict(host_id=flow_rule['host'])) + if port_details is not None: + for pd in port_details: + for path in pd['path_nodes']: + path_node = self.get_path_node(path['pathnode_id']) + if ( + path_node['next_group_id'] == + flow_rule['next_group_id'] + ): + group_refcnt += 1 + + flow_rule['group_refcnt'] = group_refcnt + + return group_refcnt + + def _build_portchain_flowrule_body_without_port(self, + node, + add_fcs=None, + del_fcs=None): + flow_rule = node.copy() + flow_rule.pop('tenant_id') + flow_rule.pop('portpair_details') + + # according to the first sf node get network information + if not node['next_hop']: + return None + + next_hops = jsonutils.loads(node['next_hop']) + if not next_hops: + return None + + port_detail = self.get_port_detail_by_filter( + dict(id=next_hops[0]['portpair_id'])) + if not port_detail: + return None + + flow_rule['ingress'] = None + flow_rule['egress'] = None + flow_rule['network_type'] = port_detail['network_type'] + flow_rule['segment_id'] = port_detail['segment_id'] + + flow_rule['add_fcs'] = add_fcs + flow_rule['del_fcs'] = del_fcs + + # update next hop info + self._update_path_node_next_hops(flow_rule) + return flow_rule -- cgit 1.2.3-korg