From b23c753e79172a2f865804854f80fb3b963d6f8e Mon Sep 17 00:00:00 2001 From: George Paraskevopoulos Date: Wed, 22 Mar 2017 16:15:56 +0200 Subject: Rewrite classification rules wait loop This is a candidate replacement for the wait_for_classification_rules function. The functionality is as follows: - Query ODL operational datastore for the RSPs for which we should expect to see flows in the computes (RSPs for which ACL rules exist) and associate them with a list of vnfs. Also get the tp_dst from the ACL match rules. - Use the known topology to associate the RSPs to a set of compute nodes (at this point we know that ODL promised us that we should see classification rules for a particular rsp in the nodes C1, C2 ...) - Gather the installed flows in table=11 in each compute - Verify that the RSPs are installed as promised by ODL for the correct tp_dst Change-Id: I79747ad1df3eb0c67f783167601b5ea99fb43f16 Signed-off-by: George Paraskevopoulos --- sfc/lib/utils.py | 259 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 168 insertions(+), 91 deletions(-) (limited to 'sfc/lib/utils.py') diff --git a/sfc/lib/utils.py b/sfc/lib/utils.py index a480f2c2..fc5ed025 100644 --- a/sfc/lib/utils.py +++ b/sfc/lib/utils.py @@ -379,91 +379,155 @@ def check_ssh(ips, retries=100): return False -def ofctl_time_counter(ovs_logger, ssh_conn, max_duration=None): - try: - # We get the flows from table 11 - table = 11 - br = "br-int" - output = ovs_logger.ofctl_dump_flows(ssh_conn, br, table) - pattern = "NXM_NX_NSP" - rsps = [] - lines = output.split(",") - for line in lines: - if max_duration is not None: - pattern2 = "duration" - is_there2 = re.findall(pattern2, line) - if is_there2: - value = line.split("=")[1].split(".")[0] - value_int = int(value) - if value_int < max_duration: - # The RSP is new, no need to store the RSP in first_RSP - return rsps - else: - continue - is_there = re.findall(pattern, line) - if is_there: - value = line.split(":")[1].split("-")[0] - rsps.append(value) - return rsps - except Exception, e: - logger.error('Error when countering %s' % e) - return None +def actual_rsps_in_compute(ovs_logger, compute_ssh): + ''' + Example flows that match the regex (line wrapped because of flake8) + cookie=0x1110010002280255, duration=4366.745s, table=11, n_packets=14, + n_bytes=980, tcp,reg0=0x1,tp_dst=22 actions=move:NXM_NX_TUN_ID[0..31]-> + NXM_NX_NSH_C2[],push_nsh,load:0x1->NXM_NX_NSH_MDTYPE[],load:0x3-> + NXM_NX_NSH_NP[],load:0xc0a80005->NXM_NX_NSH_C1[],load:0xe4-> + NXM_NX_NSP[0..23],load:0xff->NXM_NX_NSI[],load:0xc0a80005-> + NXM_NX_TUN_IPV4_DST[],load:0xe4->NXM_NX_TUN_ID[0..31],output:26 + ''' + match_rsp = re.compile( + r'.+tp_dst=([0-9]+).+load:(0x[0-9a-f]+)->NXM_NX_NSP\[0\.\.23\].+') + # First line is OFPST_FLOW reply (OF1.3) (xid=0x2): + # This is not a flow so ignore + flows = (ovs_logger.ofctl_dump_flows(compute_ssh, 'br-int', '11') + .strip().split('\n')[1:]) + matching_flows = [match_rsp.match(f) for f in flows] + # group(1) = 22 (tp_dst value) | group(2) = 0xff (rsp value) + rsps_in_compute = ['{0}_{1}'.format(mf.group(2), mf.group(1)) + for mf in matching_flows if mf is not None] + return rsps_in_compute + + +def get_active_rsps(odl_ip, odl_port): + ''' + Queries operational datastore and returns the RSPs for which we have + created a classifier (ACL). These are considered as active RSPs + for which classification rules should exist in the compute nodes + + This function enhances the returned dictionary with the + destination port of the ACL. + ''' + + acls = get_odl_acl_list(odl_ip, odl_port) + rsps = [] + for acl in acls['access-lists']['acl']: + try: + # We get the first ace. ODL creates a new ACL + # with one ace for each classifier + ace = acl['access-list-entries']['ace'][0] + except: + logger.warn('ACL {0} does not have an ACE'.format( + acl['acl-name'])) + continue + rsp_name = ace['actions']['netvirt-sfc-acl:rsp-name'] + rsp = get_odl_resource_elem(odl_ip, + odl_port, + 'rendered-service-path', + rsp_name, + datastore='operational') + ''' + Rsps are returned in the format: + { + "rendered-service-path": [ + { + "name": "Path-red-Path-83", + "path-id": 83, + ... + "rendered-service-path-hop": [ + { + ... + "service-function-name": "testVNF1", + "service-index": 255 + ... + 'rendered-service-path' Is returned as a list with one + element (we select by name and the names are unique) + ''' + rsp_port = rsp['rendered-service-path'][0] + rsp_port['dst-port'] = (ace['matches'] + ['destination-port-range']['lower-port']) + rsps.append(rsp_port) + return rsps + + +def promised_rsps_in_computes( + odl_ip, odl_port, topology, all_compute_av_zones): + ''' + And the computes in the topology where we should expect to see them. + The returned object is in the format 'path-id': [ch1_availability_zone, + ch2_availability_zone, ...] This means we should expect to see table=11 + (classification) flow with path_id in ch1, ch2, ... + ''' + rsps = get_active_rsps(odl_ip, odl_port) + rsps_in_computes = {} + # A classification rule should be installed for all (rsp, tp_dst) pairs + # to every compute that has at least one SF + computes_with_sf = list(set(topology.values())) + if 'nova' in computes_with_sf: + # this does a glorified time.sleep(timeout) for now + # TODO: find a better way to do this + computes_with_sf = all_compute_av_zones + for rsp in rsps: + key = '{0}_{1}'.format(hex(rsp['path-id']), rsp['dst-port']) + rsps_in_computes[key] = computes_with_sf + return rsps_in_computes @ft_utils.timethis -def wait_for_classification_rules(ovs_logger, compute_clients, - num_chains, timeout=200): - # 10 sec. is the threshold to consider a flow from an old deployment - for compute_client in compute_clients: - max_duration = 10 - rsps = ofctl_time_counter(ovs_logger, compute_client, max_duration) - # first_RSP saves a potential RSP from an old deployment. - # ODL may take quite some time to implement the new flow - # and an old flow may be there - if compute_client == compute_clients[0]: - first_RSP = rsps[0] if len(rsps) > 0 else '' - else: - first_RSP = '' - rsps = '' - logger.info("This is the first_RSP: %s" % first_RSP) - if num_chains == 1: - while not ((len(rsps) == 1) and (first_RSP != rsps[0])): - rsps = ofctl_time_counter(ovs_logger, compute_client) - logger.info("These are the rsps: %s" % rsps) - timeout -= 1 - if timeout == 10: - output = ovs_logger.ofctl_dump_flows(compute_client) - logger.info("output ofctl: %s" % output) - output2 = ovs_logger.vsctl_show(compute_client) - logger.info("output vsctl: %s" % output2) - _, stdout, _ = compute_client.exec_command('ip a') - output3 = ''.join(stdout.readlines()) - logger.info("The interfaces: %s" % output3) - if timeout == 0: - logger.error( - "Timeout but classification rules are not updated") - return - time.sleep(1) - elif num_chains == 2: - while not ((len(rsps) > 1) and (first_RSP != rsps[0]) and - (rsps[0] == rsps[1])): - rsps = ofctl_time_counter(ovs_logger, compute_client) - logger.info("This is the rsps: %s" % rsps) - timeout -= 1 - if timeout == 10: - output = ovs_logger.ofctl_dump_flows(compute_client) - logger.info("output ofctl: %s" % output) - output2 = ovs_logger.vsctl_show(compute_client) - logger.info("output vsctl: %s" % output2) - _, stdout, _ = compute_client.exec_command('ip a') - output3 = ''.join(stdout.readlines()) - logger.info("The interfaces: %s" % output3) - if timeout == 0: - logger.error( - "Timeout but classification rules are not updated") - return - time.sleep(1) - logger.info("classification rules updated") +def wait_for_classification_rules(ovs_logger, compute_nodes, odl_ip, odl_port, + topology, timeout=200): + try: + compute_av_zones = { + node.id: 'nova::node-{0}.domain.tld'.format(node.id) + for node in compute_nodes + } + + # keep only vnfs + topology = { + key: host for key, host in topology.items() + if key not in ['client', 'server', 'id', 'description'] + } + + promised_rsps = promised_rsps_in_computes( + odl_ip, odl_port, topology, compute_av_zones.values()) + + while timeout > 0: + logger.info("RSPs in ODL Operational DataStore:") + logger.info("{0}".format(promised_rsps)) + + actual_rsps_in_computes = {} + for node in compute_nodes: + av_zone = compute_av_zones[node.id] + actual_rsps_in_computes[av_zone] = actual_rsps_in_compute( + ovs_logger, node.ssh_client) + + logger.info("RSPs in compute nodes:") + logger.info("{0}".format(actual_rsps_in_computes)) + + promises_fulfilled = [] + for rsp, computes in promised_rsps.items(): + computes_have_rsp = [rsp + in actual_rsps_in_computes[compute] + for compute in computes] + promises_fulfilled.append(all(computes_have_rsp)) + + if all(promises_fulfilled): + # OVS state is consistent with ODL + logger.info("Classification rules were updated") + return + + timeout -= 1 + time.sleep(1) + + if timeout <= 0: + logger.error( + "Timeout but classification rules are not updated") + + except Exception, e: + logger.error('Error when waiting for classification rules: %s' % e) def setup_compute_node(cidr, compute_nodes): @@ -513,14 +577,17 @@ def pluralize(s): def format_odl_resource_list_url(odl_ip, odl_port, resource, - odl_user='admin', odl_pwd='admin'): - return ('http://{usr}:{pwd}@{ip}:{port}/restconf/config/{rsrc}:{rsrcs}' + datastore='config', odl_user='admin', + odl_pwd='admin'): + return ('http://{usr}:{pwd}@{ip}:{port}/restconf/{ds}/{rsrc}:{rsrcs}' .format(usr=odl_user, pwd=odl_pwd, ip=odl_ip, port=odl_port, - rsrc=resource, rsrcs=pluralize(resource))) + ds=datastore, rsrc=resource, rsrcs=pluralize(resource))) -def format_odl_resource_elem_url(odl_ip, odl_port, resource, elem_name): - list_url = format_odl_resource_list_url(odl_ip, odl_port, resource) +def format_odl_resource_elem_url(odl_ip, odl_port, resource, + elem_name, datastore='config'): + list_url = format_odl_resource_list_url( + odl_ip, odl_port, resource, datastore=datastore) return ('{0}/{1}/{2}'.format(list_url, resource, elem_name)) @@ -530,13 +597,23 @@ def odl_resource_list_names(resource, resource_json): return [r['name'] for r in resource_json[pluralize(resource)][resource]] -def get_odl_resource_list(odl_ip, odl_port, resource): - url = format_odl_resource_list_url(odl_ip, odl_port, resource) +def get_odl_resource_list(odl_ip, odl_port, resource, datastore='config'): + url = format_odl_resource_list_url( + odl_ip, odl_port, resource, datastore=datastore) + return requests.get(url).json() + + +def get_odl_resource_elem(odl_ip, odl_port, resource, + elem_name, datastore='config'): + url = format_odl_resource_elem_url( + odl_ip, odl_port, resource, elem_name, datastore=datastore) return requests.get(url).json() -def delete_odl_resource_elem(odl_ip, odl_port, resource, elem_name): - url = format_odl_resource_elem_url(odl_ip, odl_port, resource, elem_name) +def delete_odl_resource_elem(odl_ip, odl_port, resource, elem_name, + datastore='config'): + url = format_odl_resource_elem_url( + odl_ip, odl_port, resource, elem_name, datastore=datastore) requests.delete(url) -- cgit 1.2.3-korg