diff options
author | lfourie <louis.fourie@huawei.com> | 2017-02-28 10:40:28 -0800 |
---|---|---|
committer | lfourie <louis.fourie@huawei.com> | 2017-02-28 10:40:28 -0800 |
commit | 85499ecb3034bd1a7ccf228ca0880456d4d46c71 (patch) | |
tree | 25d49d849c26e129e968bb6a7fa0af4fa778f135 /networking_sfc/services/sfc/drivers/ovs/driver.py | |
parent | 4b48bf9e4a988b9fd1c6f43b8bb5e26542679fd3 (diff) |
Add VNFFG release documentation
Change-Id: Ib72fadba154e054c8c819f96d0ac5674fa7967fe
Signed-off-by: lfourie <louis.fourie@huawei.com>
Diffstat (limited to 'networking_sfc/services/sfc/drivers/ovs/driver.py')
-rw-r--r-- | networking_sfc/services/sfc/drivers/ovs/driver.py | 1076 |
1 files changed, 0 insertions, 1076 deletions
diff --git a/networking_sfc/services/sfc/drivers/ovs/driver.py b/networking_sfc/services/sfc/drivers/ovs/driver.py deleted file mode 100644 index 9dfc40d..0000000 --- a/networking_sfc/services/sfc/drivers/ovs/driver.py +++ /dev/null @@ -1,1076 +0,0 @@ -# Copyright 2015 Futurewei. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import netaddr - -# from eventlet import greenthread - -from neutron.common import constants as nc_const -from neutron.common import rpc as n_rpc - -from neutron import manager - -from neutron.i18n import _LE -from neutron.i18n import _LW - -from neutron.plugins.common import constants as np_const - - -from oslo_log import helpers as log_helpers -from oslo_log import log as logging -from oslo_serialization import jsonutils - -from networking_sfc.extensions import flowclassifier -from networking_sfc.extensions import sfc -from networking_sfc.services.sfc.common import exceptions as exc -from networking_sfc.services.sfc.drivers import base as driver_base -from networking_sfc.services.sfc.drivers.ovs import( - rpc_topics as sfc_topics) -from networking_sfc.services.sfc.drivers.ovs import( - db as ovs_sfc_db) -from networking_sfc.services.sfc.drivers.ovs import( - rpc as ovs_sfc_rpc) -from networking_sfc.services.sfc.drivers.ovs import ( - constants as ovs_const) - - -LOG = logging.getLogger(__name__) - - -class OVSSfcDriver(driver_base.SfcDriverBase, - ovs_sfc_db.OVSSfcDriverDB): - """Sfc Driver Base Class.""" - - def initialize(self): - super(OVSSfcDriver, self).initialize() - self.ovs_driver_rpc = ovs_sfc_rpc.SfcAgentRpcClient( - sfc_topics.SFC_AGENT - ) - - self.id_pool = ovs_sfc_db.IDAllocation(self.admin_context) - self._setup_rpc() - - def _setup_rpc(self): - # Setup a rpc server - self.topic = sfc_topics.SFC_PLUGIN - self.endpoints = [ovs_sfc_rpc.SfcRpcCallback(self)] - self.conn = n_rpc.create_connection(new=True) - self.conn.create_consumer(self.topic, self.endpoints, fanout=False) - self.conn.consume_in_threads() - - def _get_subnet(self, core_plugin, tenant_id, cidr): - filters = {'tenant_id': [tenant_id]} - subnets = core_plugin.get_subnets(self.admin_context, filters=filters) - cidr_set = netaddr.IPSet([cidr]) - - for subnet in subnets: - subnet_cidr_set = netaddr.IPSet([subnet['cidr']]) - if cidr_set.issubset(subnet_cidr_set): - return subnet - - def _get_fc_dst_subnet_gw_port(self, fc): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet(core_plugin, - fc['tenant_id'], - fc['destination_ip_prefix']) - - return self._get_port_subnet_gw_info(core_plugin, subnet) - - def _get_port_subnet_gw_info_by_port_id(self, id): - core_plugin = manager.NeutronManager.get_plugin() - subnet = self._get_subnet_by_port(core_plugin, id) - return self._get_port_subnet_gw_info(core_plugin, - subnet) - - def _get_port_subnet_gw_info(self, core_plugin, subnet): - filters = { - 'device_owner': - [nc_const.DEVICE_OWNER_ROUTER_INTF] - } - gw_ports = core_plugin.get_ports(self.admin_context, filters=filters) - for port in gw_ports: - for fixed_ip in port['fixed_ips']: - if subnet["id"] == fixed_ip['subnet_id']: - return (port['mac_address'], - subnet['cidr'], - subnet['network_id']) - - raise exc.NoSubnetGateway( - type='subnet gateway', - cidr=subnet['cidr']) - - def _get_subnet_by_port(self, core_plugin, id): - port = core_plugin.get_port(self.admin_context, id) - for ip in port['fixed_ips']: - subnet = core_plugin.get_subnet(self.admin_context, - ip["subnet_id"]) - # currently only support one subnet for a port - break - - return subnet - - @log_helpers.log_method_call - def _get_portgroup_members(self, context, pg_id): - next_group_members = [] - group_intid = self.id_pool.get_intid_by_uuid('group', pg_id) - LOG.debug('group_intid: %s', group_intid) - pg = context._plugin.get_port_pair_group(context._plugin_context, - pg_id) - for pp_id in pg['port_pairs']: - pp = context._plugin.get_port_pair(context._plugin_context, pp_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - if pd: - next_group_members.append( - dict(portpair_id=pd['id'], weight=1)) - return group_intid, next_group_members - - def _get_port_pair_detail_by_port_pair(self, context, port_pair_id): - pp = context._plugin.get_port_pair(context._plugin_context, - port_pair_id) - filters = {} - if pp.get('ingress', None): - filters = dict(dict(ingress=pp['ingress']), **filters) - if pp.get('egress', None): - filters = dict(dict(egress=pp['egress']), **filters) - pd = self.get_port_detail_by_filter(filters) - - return pd - - @log_helpers.log_method_call - def _add_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node, dst_node, - last_sf_node=None): - dst_ports = [] - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # lookup the source port - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - src_pd = self.get_port_detail_by_filter(src_pd_filter) - - if not src_pd: - # Create source port detail - src_pd = self._create_port_detail(src_pd_filter) - LOG.debug('create src port detail: %s', src_pd) - - # Create associate relationship - assco_args = {'portpair_id': src_pd['id'], - 'pathnode_id': src_node['id'], - 'weight': 1, - } - sna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc src port with node: %s', sna) - src_node['portpair_details'].append(src_pd['id']) - - if fc.get('logical_destination_port', ''): - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - dst_pd = self.get_port_detail_by_filter(dst_pd_filter) - - if not dst_pd: - # Create dst port detail - dst_pd = self._create_port_detail(dst_pd_filter) - LOG.debug('create dst port detail: %s', dst_pd) - - # Create associate relationship - dst_assco_args = {'portpair_id': dst_pd['id'], - 'pathnode_id': dst_node['id'], - 'weight': 1, - } - dna = self.create_pathport_assoc(dst_assco_args) - LOG.debug('create assoc dst port with node: %s', dna) - dst_node['portpair_details'].append(dst_pd['id']) - - dst_ports.append(dict(portpair_id=dst_pd['id'], weight=1)) - - if last_sf_node: - if last_sf_node['next_hop']: - next_hops = jsonutils.loads(last_sf_node['next_hop']) - next_hops.extend(dst_ports) - last_sf_node['next_hop'] = jsonutils.dumps(next_hops) - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - return dst_ports - - def _remove_flowclassifier_port_assoc(self, fc_ids, tenant_id, - src_node=None, dst_node=None, - last_sf_node=None): - if not fc_ids: - return - for fc in self._get_fcs_by_ids(fc_ids): - if fc.get('logical_source_port', ''): - # delete source port detail - src_pd_filter = dict(egress=fc['logical_source_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(src_pd_filter) - if pds: - for pd in pds: - # update src_node portpair_details refence info - if src_node and pd['id'] in src_node[ - 'portpair_details' - ]: - src_node['portpair_details'].remove(pd['id']) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if fc.get('logical_destination_port', ''): - # Create dst port detail - dst_pd_filter = dict(ingress=fc['logical_destination_port'], - tenant_id=tenant_id - ) - pds = self.get_port_details_by_filter(dst_pd_filter) - if pds: - for pd in pds: - # update dst_node portpair_details refence info - if dst_node and pd['id'] in dst_node[ - 'portpair_details' - ]: - # update portpair_details of this node - dst_node['portpair_details'].remove(pd['id']) - # update last hop(SF-group) next hop info - if last_sf_node: - next_hop = dict(portpair_id=pd['id'], - weight=1) - next_hops = jsonutils.loads( - last_sf_node['next_hop']) - next_hops.remove(next_hop) - last_sf_node['next_hop'] = jsonutils.dumps( - next_hops) - if len(pd['path_nodes']) == 1: - self.delete_port_detail(pd['id']) - - if last_sf_node: - # update nexthop info of pre node - self.update_path_node(last_sf_node['id'], - last_sf_node) - - @log_helpers.log_method_call - def _create_portchain_path(self, context, port_chain): - src_node, src_pd, dst_node, dst_pd = (({}, ) * 4) - path_nodes, dst_ports = [], [] - # Create an assoc object for chain_id and path_id - # context = context._plugin_context - path_id = self.id_pool.assign_intid('portchain', port_chain['id']) - - if not path_id: - LOG.error(_LE('No path_id available for creating port chain path')) - return - - next_group_intid, next_group_members = self._get_portgroup_members( - context, port_chain['port_pair_groups'][0]) - - port_pair_groups = port_chain['port_pair_groups'] - sf_path_length = len(port_pair_groups) - # Create a head node object for port chain - src_args = {'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SRC_NODE, - 'nsp': path_id, - 'nsi': 0xff, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': jsonutils.dumps(next_group_members), - } - src_node = self.create_path_node(src_args) - LOG.debug('create src node: %s', src_node) - path_nodes.append(src_node) - - # Create a destination node object for port chain - dst_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.DST_NODE, - 'nsp': path_id, - 'nsi': 0xff - sf_path_length - 1, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': None, - 'next_hop': None - } - dst_node = self.create_path_node(dst_args) - LOG.debug('create dst node: %s', dst_node) - path_nodes.append(dst_node) - - dst_ports = self._add_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'], - src_node, - dst_node - ) - - for i in range(sf_path_length): - cur_group_members = next_group_members - # next_group for next hop - if i < sf_path_length - 1: - next_group_intid, next_group_members = ( - self._get_portgroup_members( - context, port_pair_groups[i + 1]) - ) - else: - next_group_intid = None - next_group_members = None if not dst_ports else dst_ports - - # Create a node object - node_args = { - 'tenant_id': port_chain['tenant_id'], - 'node_type': ovs_const.SF_NODE, - 'nsp': path_id, - 'nsi': 0xfe - i, - 'portchain_id': port_chain['id'], - 'status': ovs_const.STATUS_BUILDING, - 'next_group_id': next_group_intid, - 'next_hop': ( - None if not next_group_members else - jsonutils.dumps(next_group_members) - ) - } - sf_node = self.create_path_node(node_args) - LOG.debug('chain path node: %s', sf_node) - # Create the assocation objects that combine the pathnode_id with - # the ingress of the port_pairs in the current group - # when port_group does not reach tail - for member in cur_group_members: - assco_args = {'portpair_id': member['portpair_id'], - 'pathnode_id': sf_node['id'], - 'weight': member['weight'], } - sfna = self.create_pathport_assoc(assco_args) - LOG.debug('create assoc port with node: %s', sfna) - sf_node['portpair_details'].append(member['portpair_id']) - path_nodes.append(sf_node) - - return path_nodes - - def _delete_path_node_port_flowrule(self, node, port, fc_ids): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - flow_rule = self._build_portchain_flowrule_body( - node, - port, - None, - fc_ids) - - self.ovs_driver_rpc.ask_agent_to_delete_flow_rules( - self.admin_context, - flow_rule) - - def _delete_path_node_flowrule(self, node, fc_ids): - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._delete_path_node_port_flowrule( - node, port, fc_ids) - - @log_helpers.log_method_call - def _delete_portchain_path(self, context, portchain_id): - port_chain = context.current - first = self.get_path_node_by_filter( - filters={ - 'portchain_id': portchain_id, - 'nsi': 0xff - } - ) - - # delete flow rules which source port isn't assigned - # in flow classifier - if first: - self._delete_src_node_flowrules( - first, - port_chain['flow_classifiers'] - ) - - pds = self.get_path_nodes_by_filter( - dict(portchain_id=portchain_id)) - if pds: - for pd in pds: - self._delete_path_node_flowrule( - pd, - port_chain['flow_classifiers'] - ) - self.delete_path_node(pd['id']) - - # delete the ports on the traffic classifier - self._remove_flowclassifier_port_assoc( - port_chain['flow_classifiers'], - port_chain['tenant_id'] - ) - - # Delete the chainpathpair - intid = self.id_pool.get_intid_by_uuid( - 'portchain', portchain_id) - self.id_pool.release_intid('portchain', intid) - - def _update_path_node_next_hops(self, flow_rule): - node_next_hops = [] - if not flow_rule['next_hop']: - return None - next_hops = jsonutils.loads(flow_rule['next_hop']) - if not next_hops: - return None - for member in next_hops: - detail = {} - port_detail = self.get_port_detail_by_filter( - dict(id=member['portpair_id'])) - if not port_detail or not port_detail['host_id']: - continue - detail['local_endpoint'] = port_detail['local_endpoint'] - detail['weight'] = member['weight'] - detail['mac_address'] = port_detail['mac_address'] - detail['ingress'] = port_detail['ingress'] - node_next_hops.append(detail) - - mac, cidr, net_uuid = self._get_port_subnet_gw_info_by_port_id( - detail['ingress'] - ) - - detail['gw_mac'] = mac - detail['cidr'] = cidr - detail['net_uuid'] = net_uuid - - flow_rule['next_hops'] = node_next_hops - flow_rule.pop('next_hop') - - return node_next_hops - - def _build_portchain_flowrule_body(self, node, port, - add_fc_ids=None, del_fc_ids=None): - node_info = node.copy() - node_info.pop('tenant_id') - node_info.pop('portpair_details') - - port_info = port.copy() - port_info.pop('tenant_id') - port_info.pop('id') - port_info.pop('path_nodes') - port_info.pop('host_id') - - flow_rule = dict(node_info, **port_info) - # if this port is belong to NSH/MPLS-aware vm, only to - # notify the flow classifier for 1st SF. - flow_rule['add_fcs'] = self._filter_flow_classifiers( - flow_rule, add_fc_ids) - flow_rule['del_fcs'] = self._filter_flow_classifiers( - flow_rule, del_fc_ids) - - self._update_portchain_group_reference_count(flow_rule, - port['host_id']) - - # update next hop info - self._update_path_node_next_hops(flow_rule) - - return flow_rule - - def _filter_flow_classifiers(self, flow_rule, fc_ids): - """Filter flow classifiers. - - @return: list of the flow classifiers - """ - - fc_return = [] - - if not fc_ids: - return fc_return - fcs = self._get_fcs_by_ids(fc_ids) - for fc in fcs: - new_fc = fc.copy() - new_fc.pop('id') - new_fc.pop('name') - new_fc.pop('tenant_id') - new_fc.pop('description') - - if ((flow_rule['node_type'] == ovs_const.SRC_NODE and - flow_rule['egress'] == fc['logical_source_port'] - ) or - (flow_rule['node_type'] == ovs_const.DST_NODE and - flow_rule['ingress'] == fc['logical_destination_port'] - )): - fc_return.append(new_fc) - elif flow_rule['node_type'] == ovs_const.SF_NODE: - fc_return.append(new_fc) - - return fc_return - - def _update_path_node_port_flowrules(self, node, port, - add_fc_ids=None, del_fc_ids=None): - # if this port is not binding, don't to generate flow rule - if not port['host_id']: - return - - flow_rule = self._build_portchain_flowrule_body( - node, - port, - add_fc_ids, - del_fc_ids) - - self.ovs_driver_rpc.ask_agent_to_update_flow_rules( - self.admin_context, - flow_rule) - - def _update_path_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - if node['portpair_details'] is None: - return - for each in node['portpair_details']: - port = self.get_port_detail_by_filter(dict(id=each)) - if port: - self._update_path_node_port_flowrules( - node, port, add_fc_ids, del_fc_ids) - - def _thread_update_path_nodes(self, nodes, - add_fc_ids=None, del_fc_ids=None): - for node in nodes: - self._update_path_node_flowrules(node, add_fc_ids, del_fc_ids) - self._update_src_node_flowrules(nodes[0], add_fc_ids, del_fc_ids) - - def _get_portchain_fcs(self, port_chain): - return self._get_fcs_by_ids(port_chain['flow_classifiers']) - - def _get_fcs_by_ids(self, fc_ids): - flow_classifiers = [] - if not fc_ids: - return flow_classifiers - - # Get the portchain flow classifiers - fc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - flowclassifier.FLOW_CLASSIFIER_EXT) - ) - if not fc_plugin: - LOG.warn(_LW("Not found the flow classifier service plugin")) - return flow_classifiers - - for fc_id in fc_ids: - fc = fc_plugin.get_flow_classifier(self.admin_context, fc_id) - flow_classifiers.append(fc) - - return flow_classifiers - - @log_helpers.log_method_call - def create_port_chain(self, context): - port_chain = context.current - path_nodes = self._create_portchain_path(context, port_chain) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes( - path_nodes, - port_chain['flow_classifiers'], - None) - - @log_helpers.log_method_call - def delete_port_chain(self, context): - port_chain = context.current - portchain_id = port_chain['id'] - LOG.debug("to delete portchain path") - self._delete_portchain_path(context, portchain_id) - - def _get_diff_set(self, orig, cur): - orig_set = set(item for item in orig) - cur_set = set(item for item in cur) - - to_del = orig_set.difference(cur_set) - to_add = cur_set.difference(orig_set) - - return to_del, to_add - - @log_helpers.log_method_call - def update_port_chain(self, context): - port_chain = context.current - orig = context.original - - del_fc_ids, add_fc_ids = self._get_diff_set( - orig['flow_classifiers'], - port_chain['flow_classifiers'] - ) - path_nodes = self.get_path_nodes_by_filter( - dict(portchain_id=port_chain['id']) - ) - if not path_nodes: - return - - sort_path_nodes = sorted(path_nodes, - key=lambda x: x['nsi'], - reverse=True) - if del_fc_ids: - self._thread_update_path_nodes(sort_path_nodes, - None, - del_fc_ids) - self._remove_flowclassifier_port_assoc(del_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - if add_fc_ids: - self._add_flowclassifier_port_assoc(add_fc_ids, - port_chain['tenant_id'], - sort_path_nodes[0], - sort_path_nodes[-1], - sort_path_nodes[-2]) - - # notify agent with async thread - # current we don't use greenthread.spawn - self._thread_update_path_nodes(sort_path_nodes, - add_fc_ids, - None) - - @log_helpers.log_method_call - def create_port_pair_group(self, context): - group = context.current - self.id_pool.assign_intid('group', group['id']) - - @log_helpers.log_method_call - def delete_port_pair_group(self, context): - group = context.current - group_intid = self.id_pool.get_intid_by_uuid('group', group['id']) - if group_intid: - self.id_pool.release_intid('group', group_intid) - - @log_helpers.log_method_call - def update_port_pair_group(self, context): - current = context.current - original = context.original - - if set(current['port_pairs']) == set(original['port_pairs']): - return - - # Update the path_nodes and flows for each port chain that - # contains this port_pair_group - # Note: _get_port_pair_group is temporarily used here. - ppg_obj = context._plugin._get_port_pair_group(context._plugin_context, - current['id']) - port_chains = [assoc.portchain_id for assoc in - ppg_obj.chain_group_associations] - - for chain_id in port_chains: - port_chain = context._plugin.get_port_chain( - context._plugin_context, chain_id) - group_intid = self.id_pool.get_intid_by_uuid('group', - current['id']) - # Get the previous node - prev_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'next_group_id': group_intid}) - if not prev_node: - continue - - before_update_prev_node = prev_node.copy() - # Update the previous node - _, curr_group_members = self._get_portgroup_members(context, - current['id']) - prev_node['next_hop'] = ( - jsonutils.dumps(curr_group_members) - if curr_group_members else None - ) - # update next hop to database - self.update_path_node(prev_node['id'], prev_node) - if prev_node['node_type'] == ovs_const.SRC_NODE: - self._delete_src_node_flowrules( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_src_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - else: - self._delete_path_node_flowrule( - before_update_prev_node, port_chain['flow_classifiers']) - self._update_path_node_flowrules( - prev_node, port_chain['flow_classifiers'], None) - - # Update the current node - # to find the current node by using the node's next_group_id - # if this node is the last, next_group_id would be None - curr_pos = port_chain['port_pair_groups'].index(current['id']) - curr_node = self.get_path_node_by_filter( - filters={'portchain_id': chain_id, - 'nsi': 0xfe - curr_pos}) - if not curr_node: - continue - - # Add the port-pair-details into the current node - for pp_id in ( - set(current['port_pairs']) - set(original['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - - assco_args = {'portpair_id': ppd['id'], - 'pathnode_id': curr_node['id'], - 'weight': 1, } - self.create_pathport_assoc(assco_args) - self._update_path_node_port_flowrules( - curr_node, ppd, port_chain['flow_classifiers']) - - # Delete the port-pair-details from the current node - for pp_id in ( - set(original['port_pairs']) - set(current['port_pairs']) - ): - ppd = self._get_port_pair_detail_by_port_pair(context, - pp_id) - if not ppd: - LOG.debug('No port_pair_detail for the port_pair: %s', - pp_id) - LOG.debug("Failed to update port-pair-group") - return - self._delete_path_node_port_flowrule( - curr_node, ppd, port_chain['flow_classifiers']) - self.delete_pathport_assoc(curr_node['id'], ppd['id']) - - @log_helpers.log_method_call - def _get_portpair_detail_info(self, portpair_id): - """Get port detail. - - @param: portpair_id: uuid - @return: (host_id, local_ip, network_type, segment_id, - service_insert_type): tuple - """ - - core_plugin = manager.NeutronManager.get_plugin() - port_detail = core_plugin.get_port(self.admin_context, portpair_id) - host_id, local_ip, network_type, segment_id, mac_address = ( - (None, ) * 5) - - if port_detail: - host_id = port_detail['binding:host_id'] - network_id = port_detail['network_id'] - mac_address = port_detail['mac_address'] - network_info = core_plugin.get_network( - self.admin_context, network_id) - network_type = network_info['provider:network_type'] - segment_id = network_info['provider:segmentation_id'] - - if ( - host_id and - network_type in [np_const.TYPE_GRE, np_const.TYPE_VXLAN] - ): - driver = core_plugin.type_manager.drivers.get(network_type) - host_endpoint = driver.obj.get_endpoint_by_host(host_id) - local_ip = host_endpoint['ip_address'] - - return host_id, local_ip, network_type, segment_id, mac_address - - @log_helpers.log_method_call - def _create_port_detail(self, port_pair): - # since first node may not assign the ingress port, and last node may - # not assign the egress port. we use one of the - # port as the key to get the SF information. - port = None - if port_pair.get('ingress', None): - port = port_pair['ingress'] - elif port_pair.get('egress', None): - port = port_pair['egress'] - - host_id, local_endpoint, network_type, segment_id, mac_address = ( - self._get_portpair_detail_info(port)) - port_detail = { - 'ingress': port_pair.get('ingress', None), - 'egress': port_pair.get('egress', None), - 'tenant_id': port_pair['tenant_id'], - 'host_id': host_id, - 'segment_id': segment_id, - 'network_type': network_type, - 'local_endpoint': local_endpoint, - 'mac_address': mac_address - } - r = self.create_port_detail(port_detail) - LOG.debug('create port detail: %s', r) - return r - - @log_helpers.log_method_call - def create_port_pair(self, context): - port_pair = context.current - self._create_port_detail(port_pair) - - @log_helpers.log_method_call - def delete_port_pair(self, context): - port_pair = context.current - - pd_filter = dict(ingress=port_pair.get('ingress', None), - egress=port_pair.get('egress', None), - tenant_id=port_pair['tenant_id'] - ) - pds = self.get_port_details_by_filter(pd_filter) - if pds: - for pd in pds: - self.delete_port_detail(pd['id']) - - @log_helpers.log_method_call - def update_port_pair(self, context): - pass - - def get_flowrules_by_host_portid(self, context, host, port_id): - port_chain_flowrules = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return port_chain_flowrules - try: - port_detail_list = [] - # one port only may be in egress/ingress port once time. - ingress_port = self.get_port_detail_by_filter( - dict(ingress=port_id)) - egress_port = self.get_port_detail_by_filter( - dict(egress=port_id)) - if not ingress_port and not egress_port: - return None - # SF migrate to other host - if ingress_port: - port_detail_list.append(ingress_port) - if ingress_port['host_id'] != host: - ingress_port.update(dict(host_id=host)) - - if egress_port: - port_detail_list.append(egress_port) - if egress_port['host_id'] != host: - egress_port.update(dict(host_id=host)) - - # this is a SF if there are both egress and engress. - for i, ports in enumerate(port_detail_list): - nodes_assocs = ports['path_nodes'] - for assoc in nodes_assocs: - # update current path flow rule - node = self.get_path_node(assoc['pathnode_id']) - port_chain = sfc_plugin.get_port_chain( - context, - node['portchain_id']) - flow_rule = self._build_portchain_flowrule_body( - node, - ports, - add_fc_ids=port_chain['flow_classifiers'] - ) - port_chain_flowrules.append(flow_rule) - - # update the pre-path node flow rule - # if node['node_type'] != ovs_const.SRC_NODE: - # node_filter = dict(nsp=node['nsp'], - # nsi=node['nsi'] + 1 - # ) - # pre_node_list = self.get_path_nodes_by_filter( - # node_filter) - # if not pre_node_list: - # continue - # for pre_node in pre_node_list: - # self._update_path_node_flowrules( - # pre_node, - # add_fc_ids=port_chain['flow_classifiers']) - - return port_chain_flowrules - - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flowrules_by_host_portid failed")) - - def get_flow_classifier_by_portchain_id(self, context, portchain_id): - try: - flow_classifier_list = [] - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - - port_chain = sfc_plugin.get_port_chain( - context, - portchain_id) - flow_classifier_list = self._get_portchain_fcs(port_chain) - return flow_classifier_list - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_flow_classifier_by_portchain_id failed")) - - def update_flowrule_status(self, context, id, status): - try: - flowrule_status = dict(status=status) - self.update_path_node(id, flowrule_status) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("update_flowrule_status failed")) - - def _update_src_node_flowrules(self, node, - add_fc_ids=None, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - add_fc_ids, - del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, - filters={'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - flow_rule['host'] = agent['host'] - self.ovs_driver_rpc.ask_agent_to_update_src_node_flow_rules( - self.admin_context, - flow_rule) - - def _delete_src_node_flowrules(self, node, del_fc_ids=None): - flow_rule = self._get_portchain_src_node_flowrule(node, - None, del_fc_ids) - if not flow_rule: - return - - core_plugin = manager.NeutronManager.get_plugin() - pc_agents = core_plugin.get_agents( - self.admin_context, filters={ - 'agent_type': [nc_const.AGENT_TYPE_OVS]}) - if not pc_agents: - return - - for agent in pc_agents: - if agent['alive']: - # update host info to flow rule - self._update_portchain_group_reference_count(flow_rule, - agent['host']) - self.ovs_driver_rpc.ask_agent_to_delete_src_node_flow_rules( - self.admin_context, - flow_rule) - - def get_all_src_node_flowrules(self, context): - sfc_plugin = ( - manager.NeutronManager.get_service_plugins().get( - sfc.SFC_EXT - ) - ) - if not sfc_plugin: - return [] - try: - frs = [] - port_chains = sfc_plugin.get_port_chains(context) - - for port_chain in port_chains: - # get the first node of this chain - node_filters = dict(portchain_id=port_chain['id'], nsi=0xff) - portchain_node = self.get_path_node_by_filter(node_filters) - if not portchain_node: - continue - flow_rule = self._get_portchain_src_node_flowrule( - portchain_node, - port_chain['flow_classifiers'] - ) - if not flow_rule: - continue - frs.append(flow_rule) - return frs - except Exception as e: - LOG.exception(e) - LOG.error(_LE("get_all_src_node_flowrules failed")) - - def _get_portchain_src_node_flowrule(self, node, - add_fc_ids=None, del_fc_ids=None): - try: - add_fc_rt = [] - del_fc_rt = [] - - if add_fc_ids: - for fc in self._get_fcs_by_ids(add_fc_ids): - if not fc.get('logical_source_port', None): - add_fc_rt.append(fc) - - if del_fc_ids: - for fc in self._get_fcs_by_ids(del_fc_ids): - if not fc.get('logical_source_port', None): - del_fc_rt.append(fc) - - if not add_fc_rt and not del_fc_rt: - return None - - return self._build_portchain_flowrule_body_without_port( - node, add_fc_rt, del_fc_rt) - except Exception as e: - LOG.exception(e) - LOG.error(_LE("_get_portchain_src_node_flowrule failed")) - - def _update_portchain_group_reference_count(self, flow_rule, host): - group_refcnt = 0 - flow_rule['host'] = host - - if flow_rule['next_group_id'] is not None: - all_nodes = self.get_path_nodes_by_filter( - filters={'next_group_id': flow_rule['next_group_id'], - 'nsi': 0xff}) - if all_nodes is not None: - for node in all_nodes: - if not node['portpair_details']: - group_refcnt += 1 - - port_details = self.get_port_details_by_filter( - dict(host_id=flow_rule['host'])) - if port_details is not None: - for pd in port_details: - for path in pd['path_nodes']: - path_node = self.get_path_node(path['pathnode_id']) - if ( - path_node['next_group_id'] == - flow_rule['next_group_id'] - ): - group_refcnt += 1 - - flow_rule['group_refcnt'] = group_refcnt - - return group_refcnt - - def _build_portchain_flowrule_body_without_port(self, - node, - add_fcs=None, - del_fcs=None): - flow_rule = node.copy() - flow_rule.pop('tenant_id') - flow_rule.pop('portpair_details') - - # according to the first sf node get network information - if not node['next_hop']: - return None - - next_hops = jsonutils.loads(node['next_hop']) - if not next_hops: - return None - - port_detail = self.get_port_detail_by_filter( - dict(id=next_hops[0]['portpair_id'])) - if not port_detail: - return None - - flow_rule['ingress'] = None - flow_rule['egress'] = None - flow_rule['network_type'] = port_detail['network_type'] - flow_rule['segment_id'] = port_detail['segment_id'] - - flow_rule['add_fcs'] = add_fcs - flow_rule['del_fcs'] = del_fcs - - # update next hop info - self._update_path_node_next_hops(flow_rule) - return flow_rule |