summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeorge Paraskevopoulos <geopar@intracom-telecom.com>2017-03-22 16:15:56 +0200
committerGeorge Paraskevopoulos <geopar@intracom-telecom.com>2017-03-27 16:56:08 +0300
commitb23c753e79172a2f865804854f80fb3b963d6f8e (patch)
treeb396ef57265eb84287eb7ff22f0b6abc0ebf6e66
parent9ed03234419c6d6bfb6467bb0c48a45d722ac090 (diff)
Rewrite classification rules wait loop
This is a candidate replacement for the wait_for_classification_rules function. The functionality is as follows: - Query ODL operational datastore for the RSPs for which we should expect to see flows in the computes (RSPs for which ACL rules exist) and associate them with a list of vnfs. Also get the tp_dst from the ACL match rules. - Use the known topology to associate the RSPs to a set of compute nodes (at this point we know that ODL promised us that we should see classification rules for a particular rsp in the nodes C1, C2 ...) - Gather the installed flows in table=11 in each compute - Verify that the RSPs are installed as promised by ODL for the correct tp_dst Change-Id: I79747ad1df3eb0c67f783167601b5ea99fb43f16 Signed-off-by: George Paraskevopoulos <geopar@intracom-telecom.com>
-rw-r--r--sfc/lib/utils.py259
-rw-r--r--sfc/tests/functest/sfc_one_chain_two_service_functions.py7
-rw-r--r--sfc/tests/functest/sfc_symmetric_chain.py6
-rw-r--r--sfc/tests/functest/sfc_two_chains_SSH_and_HTTP.py8
4 files changed, 181 insertions, 99 deletions
diff --git a/sfc/lib/utils.py b/sfc/lib/utils.py
index a480f2c2..fc5ed025 100644
--- a/sfc/lib/utils.py
+++ b/sfc/lib/utils.py
@@ -379,91 +379,155 @@ def check_ssh(ips, retries=100):
return False
-def ofctl_time_counter(ovs_logger, ssh_conn, max_duration=None):
- try:
- # We get the flows from table 11
- table = 11
- br = "br-int"
- output = ovs_logger.ofctl_dump_flows(ssh_conn, br, table)
- pattern = "NXM_NX_NSP"
- rsps = []
- lines = output.split(",")
- for line in lines:
- if max_duration is not None:
- pattern2 = "duration"
- is_there2 = re.findall(pattern2, line)
- if is_there2:
- value = line.split("=")[1].split(".")[0]
- value_int = int(value)
- if value_int < max_duration:
- # The RSP is new, no need to store the RSP in first_RSP
- return rsps
- else:
- continue
- is_there = re.findall(pattern, line)
- if is_there:
- value = line.split(":")[1].split("-")[0]
- rsps.append(value)
- return rsps
- except Exception, e:
- logger.error('Error when countering %s' % e)
- return None
+def actual_rsps_in_compute(ovs_logger, compute_ssh):
+ '''
+ Example flows that match the regex (line wrapped because of flake8)
+ cookie=0x1110010002280255, duration=4366.745s, table=11, n_packets=14,
+ n_bytes=980, tcp,reg0=0x1,tp_dst=22 actions=move:NXM_NX_TUN_ID[0..31]->
+ NXM_NX_NSH_C2[],push_nsh,load:0x1->NXM_NX_NSH_MDTYPE[],load:0x3->
+ NXM_NX_NSH_NP[],load:0xc0a80005->NXM_NX_NSH_C1[],load:0xe4->
+ NXM_NX_NSP[0..23],load:0xff->NXM_NX_NSI[],load:0xc0a80005->
+ NXM_NX_TUN_IPV4_DST[],load:0xe4->NXM_NX_TUN_ID[0..31],output:26
+ '''
+ match_rsp = re.compile(
+ r'.+tp_dst=([0-9]+).+load:(0x[0-9a-f]+)->NXM_NX_NSP\[0\.\.23\].+')
+ # First line is OFPST_FLOW reply (OF1.3) (xid=0x2):
+ # This is not a flow so ignore
+ flows = (ovs_logger.ofctl_dump_flows(compute_ssh, 'br-int', '11')
+ .strip().split('\n')[1:])
+ matching_flows = [match_rsp.match(f) for f in flows]
+ # group(1) = 22 (tp_dst value) | group(2) = 0xff (rsp value)
+ rsps_in_compute = ['{0}_{1}'.format(mf.group(2), mf.group(1))
+ for mf in matching_flows if mf is not None]
+ return rsps_in_compute
+
+
+def get_active_rsps(odl_ip, odl_port):
+ '''
+ Queries operational datastore and returns the RSPs for which we have
+ created a classifier (ACL). These are considered as active RSPs
+ for which classification rules should exist in the compute nodes
+
+ This function enhances the returned dictionary with the
+ destination port of the ACL.
+ '''
+
+ acls = get_odl_acl_list(odl_ip, odl_port)
+ rsps = []
+ for acl in acls['access-lists']['acl']:
+ try:
+ # We get the first ace. ODL creates a new ACL
+ # with one ace for each classifier
+ ace = acl['access-list-entries']['ace'][0]
+ except:
+ logger.warn('ACL {0} does not have an ACE'.format(
+ acl['acl-name']))
+ continue
+ rsp_name = ace['actions']['netvirt-sfc-acl:rsp-name']
+ rsp = get_odl_resource_elem(odl_ip,
+ odl_port,
+ 'rendered-service-path',
+ rsp_name,
+ datastore='operational')
+ '''
+ Rsps are returned in the format:
+ {
+ "rendered-service-path": [
+ {
+ "name": "Path-red-Path-83",
+ "path-id": 83,
+ ...
+ "rendered-service-path-hop": [
+ {
+ ...
+ "service-function-name": "testVNF1",
+ "service-index": 255
+ ...
+ 'rendered-service-path' Is returned as a list with one
+ element (we select by name and the names are unique)
+ '''
+ rsp_port = rsp['rendered-service-path'][0]
+ rsp_port['dst-port'] = (ace['matches']
+ ['destination-port-range']['lower-port'])
+ rsps.append(rsp_port)
+ return rsps
+
+
+def promised_rsps_in_computes(
+ odl_ip, odl_port, topology, all_compute_av_zones):
+ '''
+ And the computes in the topology where we should expect to see them.
+ The returned object is in the format 'path-id': [ch1_availability_zone,
+ ch2_availability_zone, ...] This means we should expect to see table=11
+ (classification) flow with path_id in ch1, ch2, ...
+ '''
+ rsps = get_active_rsps(odl_ip, odl_port)
+ rsps_in_computes = {}
+ # A classification rule should be installed for all (rsp, tp_dst) pairs
+ # to every compute that has at least one SF
+ computes_with_sf = list(set(topology.values()))
+ if 'nova' in computes_with_sf:
+ # this does a glorified time.sleep(timeout) for now
+ # TODO: find a better way to do this
+ computes_with_sf = all_compute_av_zones
+ for rsp in rsps:
+ key = '{0}_{1}'.format(hex(rsp['path-id']), rsp['dst-port'])
+ rsps_in_computes[key] = computes_with_sf
+ return rsps_in_computes
@ft_utils.timethis
-def wait_for_classification_rules(ovs_logger, compute_clients,
- num_chains, timeout=200):
- # 10 sec. is the threshold to consider a flow from an old deployment
- for compute_client in compute_clients:
- max_duration = 10
- rsps = ofctl_time_counter(ovs_logger, compute_client, max_duration)
- # first_RSP saves a potential RSP from an old deployment.
- # ODL may take quite some time to implement the new flow
- # and an old flow may be there
- if compute_client == compute_clients[0]:
- first_RSP = rsps[0] if len(rsps) > 0 else ''
- else:
- first_RSP = ''
- rsps = ''
- logger.info("This is the first_RSP: %s" % first_RSP)
- if num_chains == 1:
- while not ((len(rsps) == 1) and (first_RSP != rsps[0])):
- rsps = ofctl_time_counter(ovs_logger, compute_client)
- logger.info("These are the rsps: %s" % rsps)
- timeout -= 1
- if timeout == 10:
- output = ovs_logger.ofctl_dump_flows(compute_client)
- logger.info("output ofctl: %s" % output)
- output2 = ovs_logger.vsctl_show(compute_client)
- logger.info("output vsctl: %s" % output2)
- _, stdout, _ = compute_client.exec_command('ip a')
- output3 = ''.join(stdout.readlines())
- logger.info("The interfaces: %s" % output3)
- if timeout == 0:
- logger.error(
- "Timeout but classification rules are not updated")
- return
- time.sleep(1)
- elif num_chains == 2:
- while not ((len(rsps) > 1) and (first_RSP != rsps[0]) and
- (rsps[0] == rsps[1])):
- rsps = ofctl_time_counter(ovs_logger, compute_client)
- logger.info("This is the rsps: %s" % rsps)
- timeout -= 1
- if timeout == 10:
- output = ovs_logger.ofctl_dump_flows(compute_client)
- logger.info("output ofctl: %s" % output)
- output2 = ovs_logger.vsctl_show(compute_client)
- logger.info("output vsctl: %s" % output2)
- _, stdout, _ = compute_client.exec_command('ip a')
- output3 = ''.join(stdout.readlines())
- logger.info("The interfaces: %s" % output3)
- if timeout == 0:
- logger.error(
- "Timeout but classification rules are not updated")
- return
- time.sleep(1)
- logger.info("classification rules updated")
+def wait_for_classification_rules(ovs_logger, compute_nodes, odl_ip, odl_port,
+ topology, timeout=200):
+ try:
+ compute_av_zones = {
+ node.id: 'nova::node-{0}.domain.tld'.format(node.id)
+ for node in compute_nodes
+ }
+
+ # keep only vnfs
+ topology = {
+ key: host for key, host in topology.items()
+ if key not in ['client', 'server', 'id', 'description']
+ }
+
+ promised_rsps = promised_rsps_in_computes(
+ odl_ip, odl_port, topology, compute_av_zones.values())
+
+ while timeout > 0:
+ logger.info("RSPs in ODL Operational DataStore:")
+ logger.info("{0}".format(promised_rsps))
+
+ actual_rsps_in_computes = {}
+ for node in compute_nodes:
+ av_zone = compute_av_zones[node.id]
+ actual_rsps_in_computes[av_zone] = actual_rsps_in_compute(
+ ovs_logger, node.ssh_client)
+
+ logger.info("RSPs in compute nodes:")
+ logger.info("{0}".format(actual_rsps_in_computes))
+
+ promises_fulfilled = []
+ for rsp, computes in promised_rsps.items():
+ computes_have_rsp = [rsp
+ in actual_rsps_in_computes[compute]
+ for compute in computes]
+ promises_fulfilled.append(all(computes_have_rsp))
+
+ if all(promises_fulfilled):
+ # OVS state is consistent with ODL
+ logger.info("Classification rules were updated")
+ return
+
+ timeout -= 1
+ time.sleep(1)
+
+ if timeout <= 0:
+ logger.error(
+ "Timeout but classification rules are not updated")
+
+ except Exception, e:
+ logger.error('Error when waiting for classification rules: %s' % e)
def setup_compute_node(cidr, compute_nodes):
@@ -513,14 +577,17 @@ def pluralize(s):
def format_odl_resource_list_url(odl_ip, odl_port, resource,
- odl_user='admin', odl_pwd='admin'):
- return ('http://{usr}:{pwd}@{ip}:{port}/restconf/config/{rsrc}:{rsrcs}'
+ datastore='config', odl_user='admin',
+ odl_pwd='admin'):
+ return ('http://{usr}:{pwd}@{ip}:{port}/restconf/{ds}/{rsrc}:{rsrcs}'
.format(usr=odl_user, pwd=odl_pwd, ip=odl_ip, port=odl_port,
- rsrc=resource, rsrcs=pluralize(resource)))
+ ds=datastore, rsrc=resource, rsrcs=pluralize(resource)))
-def format_odl_resource_elem_url(odl_ip, odl_port, resource, elem_name):
- list_url = format_odl_resource_list_url(odl_ip, odl_port, resource)
+def format_odl_resource_elem_url(odl_ip, odl_port, resource,
+ elem_name, datastore='config'):
+ list_url = format_odl_resource_list_url(
+ odl_ip, odl_port, resource, datastore=datastore)
return ('{0}/{1}/{2}'.format(list_url, resource, elem_name))
@@ -530,13 +597,23 @@ def odl_resource_list_names(resource, resource_json):
return [r['name'] for r in resource_json[pluralize(resource)][resource]]
-def get_odl_resource_list(odl_ip, odl_port, resource):
- url = format_odl_resource_list_url(odl_ip, odl_port, resource)
+def get_odl_resource_list(odl_ip, odl_port, resource, datastore='config'):
+ url = format_odl_resource_list_url(
+ odl_ip, odl_port, resource, datastore=datastore)
+ return requests.get(url).json()
+
+
+def get_odl_resource_elem(odl_ip, odl_port, resource,
+ elem_name, datastore='config'):
+ url = format_odl_resource_elem_url(
+ odl_ip, odl_port, resource, elem_name, datastore=datastore)
return requests.get(url).json()
-def delete_odl_resource_elem(odl_ip, odl_port, resource, elem_name):
- url = format_odl_resource_elem_url(odl_ip, odl_port, resource, elem_name)
+def delete_odl_resource_elem(odl_ip, odl_port, resource, elem_name,
+ datastore='config'):
+ url = format_odl_resource_elem_url(
+ odl_ip, odl_port, resource, elem_name, datastore=datastore)
requests.delete(url)
diff --git a/sfc/tests/functest/sfc_one_chain_two_service_functions.py b/sfc/tests/functest/sfc_one_chain_two_service_functions.py
index e55af011..bdf6f1b4 100644
--- a/sfc/tests/functest/sfc_one_chain_two_service_functions.py
+++ b/sfc/tests/functest/sfc_one_chain_two_service_functions.py
@@ -51,6 +51,8 @@ def main():
compute_nodes = [node for node in openstack_nodes
if node.is_compute()]
+ odl_ip, odl_port = test_utils.get_odl_ip_port(openstack_nodes)
+
for compute in compute_nodes:
logger.info("This is a compute: %s" % compute.info)
@@ -174,11 +176,10 @@ def main():
logger.info(test_utils.run_cmd('tacker sfc-list')[1])
logger.info(test_utils.run_cmd('tacker sfc-classifier-list')[1])
- num_chains = 1
-
# Start measuring the time it takes to implement the classification rules
t1 = threading.Thread(target=test_utils.wait_for_classification_rules,
- args=(ovs_logger, compute_clients, num_chains,))
+ args=(ovs_logger, compute_nodes, odl_ip, odl_port,
+ testTopology,))
try:
t1.start()
except Exception, e:
diff --git a/sfc/tests/functest/sfc_symmetric_chain.py b/sfc/tests/functest/sfc_symmetric_chain.py
index 568a6488..7f58f770 100644
--- a/sfc/tests/functest/sfc_symmetric_chain.py
+++ b/sfc/tests/functest/sfc_symmetric_chain.py
@@ -48,6 +48,8 @@ def main():
controller_nodes = [node for node in all_nodes if node.is_controller()]
compute_nodes = [node for node in all_nodes if node.is_compute()]
+ odl_ip, odl_port = test_utils.get_odl_ip_port(all_nodes)
+
results = Results(COMMON_CONFIG.line_length)
results.add_to_summary(0, "=")
results.add_to_summary(2, "STATUS", "SUBTEST")
@@ -180,7 +182,9 @@ def main():
# Start measuring the time it takes to implement the classification rules
t1 = threading.Thread(target=test_utils.wait_for_classification_rules,
- args=(ovs_logger, compute_clients,))
+ args=(ovs_logger, compute_nodes, odl_ip, odl_port,
+ testTopology,))
+
try:
t1.start()
except Exception, e:
diff --git a/sfc/tests/functest/sfc_two_chains_SSH_and_HTTP.py b/sfc/tests/functest/sfc_two_chains_SSH_and_HTTP.py
index 54cacbc7..2c688412 100644
--- a/sfc/tests/functest/sfc_two_chains_SSH_and_HTTP.py
+++ b/sfc/tests/functest/sfc_two_chains_SSH_and_HTTP.py
@@ -190,11 +190,10 @@ def main():
logger.info(test_utils.run_cmd('tacker sfc-list')[1])
logger.info(test_utils.run_cmd('tacker sfc-classifier-list')[1])
- num_chains = 2
-
# Start measuring the time it takes to implement the classification rules
t1 = threading.Thread(target=test_utils.wait_for_classification_rules,
- args=(ovs_logger, compute_clients, num_chains,))
+ args=(ovs_logger, compute_nodes, odl_ip, odl_port,
+ testTopology,))
try:
t1.start()
@@ -269,7 +268,8 @@ def main():
# Start measuring the time it takes to implement the classification rules
t2 = threading.Thread(target=test_utils.wait_for_classification_rules,
- args=(ovs_logger, compute_clients, num_chains,))
+ args=(ovs_logger, compute_nodes, odl_ip, odl_port,
+ testTopology,))
try:
t2.start()
except Exception, e: