From 4cd8b20676cf12bbbf6ab281700220fd61174e7a Mon Sep 17 00:00:00 2001 From: "jose.lausuch" Date: Wed, 20 Jul 2016 13:37:41 +0200 Subject: Remove logger as input parameter of functions in openstack_utils JIRA: FUNCTEST-376 Also some fixes to get rid of flake8 violations in onos-sfc Change-Id: I52c2fd30a6b81af20ea779db2d73b9386524f7ce Signed-off-by: jose.lausuch --- testcases/Controllers/ONOS/Sfc/Sfc_fun.py | 143 ++++++++++++++++-------------- 1 file changed, 76 insertions(+), 67 deletions(-) (limited to 'testcases/Controllers/ONOS/Sfc') diff --git a/testcases/Controllers/ONOS/Sfc/Sfc_fun.py b/testcases/Controllers/ONOS/Sfc/Sfc_fun.py index 28d83bbfd..b94eedcaf 100644 --- a/testcases/Controllers/ONOS/Sfc/Sfc_fun.py +++ b/testcases/Controllers/ONOS/Sfc/Sfc_fun.py @@ -1,13 +1,14 @@ """SFC functions.""" -import functest.utils.functest_logger as ft_logger import json from multiprocessing import Process from multiprocessing import Queue -from pexpect import pxssh +import os import re -import requests import time -import os + +import functest.utils.functest_logger as ft_logger +from pexpect import pxssh +import requests class Sfc_fun: @@ -101,7 +102,8 @@ class Sfc_fun: def getToken(self): """Get the keystone token value from Openstack .""" - url = 'http://'+self.keystone_hostname+':5000/'+self.osver+'/tokens' + url = 'http://' + self.keystone_hostname + \ + ':5000/' + self.osver + '/tokens' data = '{"auth": {"tenantName": "admin", "passwordCredentials":\ { "username": "admin", "password": "console"}}}' headers = {"Accept": "application/json"} @@ -126,7 +128,8 @@ class Sfc_fun: Dicdata['admin_state_up'] = self.admin_state_up Dicdata = {'network': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/networks' + url = 'http://' + self.neutron_hostname + \ + ':9696/' + self.osver + '/networks' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -155,7 +158,8 @@ class Sfc_fun: Dicdata = {'subnet': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/subnets' + url = 'http://' + self.neutron_hostname + \ + ':9696/' + self.osver + '/subnets' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -177,7 +181,7 @@ class Sfc_fun: if self.net_id != '': Dicdata['network_id'] = self.net_id if self.port != '': - Dicdata['name'] = "port"+str(x) + Dicdata['name'] = "port" + str(x) if self.admin_state_up != '': Dicdata['admin_state_up'] = self.admin_state_up if self.security_groups != '': @@ -187,7 +191,8 @@ class Sfc_fun: Dicdata = {'port': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/ports' + url = 'http://' + self.neutron_hostname + \ + ':9696/' + self.osver + '/ports' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -207,7 +212,8 @@ class Sfc_fun: def createVm(self): """Creation of Instance, using firewall image.""" - url = 'http://'+self.glance_hostname+':9292/v2/images?name=TestSfcVm' + url = 'http://' + self.glance_hostname + \ + ':9292/v2/images?name=TestSfcVm' headers = {"Accept": "application/json", "Content-Type": "application/\ octet-stream", "X-Auth-Token": self.token_id} response = requests.get(url, headers=headers) @@ -226,7 +232,7 @@ class Sfc_fun: org_nw_port = [] org_nw_port.append({'port': self.port_num[y]}) if self.vm_name != '': - Dicdata['name'] = "vm"+str(y) + Dicdata['name'] = "vm" + str(y) if self.imageRef != '': Dicdata['imageRef'] = self.image_id if self.flavorRef != '': @@ -240,8 +246,8 @@ class Sfc_fun: Dicdata = {'server': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.nova_hostname+':8774/v2.1/'+self.tenant_id + \ - '/servers' + url = ('http://' + self.nova_hostname + ':8774/v2.1/' + + self.tenant_id + '/servers') headers = {"Accept": "application/json", "Content-Type": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -264,7 +270,8 @@ class Sfc_fun: time.sleep(10) for y in range(0, 3): url = 'http://' + \ - self.nova_hostname+':8774/v2.1/servers/detail?name=vm'+str(y) + self.nova_hostname + \ + ':8774/v2.1/servers/detail?name=vm' + str(y) headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.get(url, headers=headers) @@ -275,11 +282,11 @@ class Sfc_fun: self.logger.debug(json1_data) self.vm_active = json1_data['servers'][0]['status'] if (self.vm_active == "ACTIVE"): - print ("\t\t\t\t\t\tVM"+str(y)+" is Active : " - + self.vm_active) + print ("\t\t\t\t\t\tVM" + str(y) + " is Active : " + + self.vm_active) else: - print ("\t\t\t\t\t\tVM"+str(y)+" is NOT Active : " - + self.vm_active) + print ("\t\t\t\t\t\tVM" + str(y) + " is NOT Active : " + + self.vm_active) else: return(response.status_code) return(response.status_code) @@ -295,18 +302,18 @@ class Sfc_fun: egress = p Dicdata['egress'] = self.port_num[egress] if self.port_pair_name != '': - Dicdata['name'] = "PP"+str(p) + Dicdata['name'] = "PP" + str(p) Dicdata = {'port_pair': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/sfc/port_pairs' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) if (response.status_code == 201): - print ("\t\t\t\tCreation of Port Pair PP"+str(p) + + print ("\t\t\t\tCreation of Port Pair PP" + str(p) + " is successful") else: return(response.status_code) @@ -316,7 +323,7 @@ class Sfc_fun: def getPortPair(self): """Query the Portpair id value.""" for p in range(0, 1): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/sfc/port_pairs?name=PP1' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} @@ -339,19 +346,19 @@ class Sfc_fun: port_pair_list = [] port_pair_list.append(self.port_pair_id[p]) if self.port_group_name != '': - Dicdata['name'] = "PG"+str(p) + Dicdata['name'] = "PG" + str(p) if self.port_pair_id != '': Dicdata['port_pairs'] = port_pair_list Dicdata = {'port_pair_group': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/sfc/port_pair_groups' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) if (response.status_code == 201): - print ("\t\t\t\tCreation of Port Group PG"+str(p) + + print ("\t\t\t\tCreation of Port Group PG" + str(p) + "is successful") else: return(response.status_code) @@ -361,8 +368,8 @@ class Sfc_fun: def getPortGroup(self): """Query the PortGroup id.""" for p in range(0, 1): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/sfc/port_pair_groups?name=PG'+str(p) + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/sfc/port_pair_groups?name=PG' + str(p) headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.get(url, headers=headers) @@ -393,7 +400,7 @@ class Sfc_fun: Dicdata = {'flow_classifier': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/sfc/flow_classifiers' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} @@ -423,7 +430,7 @@ class Sfc_fun: Dicdata = {'port_chain': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/sfc/port_chains' headers = {"Accept": "application/json", "Content-Type": "application/json", @@ -467,8 +474,8 @@ class Sfc_fun: Dicdata = {'router': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/' + \ - self.osver+'/routers.json' + url = 'http://' + self.neutron_hostname + ':9696/' + \ + self.osver + '/routers.json' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -485,7 +492,7 @@ class Sfc_fun: def attachInterface(self): """Attachment of instance ports to the Router.""" - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/networks?name=admin_floating_net' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} @@ -508,8 +515,8 @@ class Sfc_fun: Dicdata['subnet_id'] = self.subnetId data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/routers/'+self.router_id+'/add_router_interface' + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/routers/' + self.router_id + '/add_router_interface' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.put(url, headers=headers, data=data) @@ -529,7 +536,7 @@ class Sfc_fun: Dicdata1 = {'external_gateway_info': Dicdata1} Dicdata1 = {'router': Dicdata1} data = json.dumps(Dicdata1, indent=4) - url = 'http://' + self.neutron_hostname+':9696/'+self.osver + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ '/routers/' + self.router_id headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} @@ -549,7 +556,7 @@ class Sfc_fun: Dicdata['pool'] = "admin_floating_net" data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.nova_hostname+':8774/v2.1/os-floating-ips' + url = 'http://' + self.nova_hostname + ':8774/v2.1/os-floating-ips' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -571,7 +578,7 @@ class Sfc_fun: Dicdata1 = {'addFloatingIp': Dicdata1} data = json.dumps(Dicdata1, indent=4) url = 'http://' + self.nova_hostname + ':8774/v2.1/servers/' + \ - self.vm[ip_num]+'/action' + self.vm[ip_num] + '/action' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.post(url, headers=headers, data=data) @@ -594,7 +601,7 @@ class Sfc_fun: username = "cirros" password = "cubswin:)" s.login(hostname, username, password) - s.sendline("ping -c 5 "+str(self.port_ip[2])) + s.sendline("ping -c 5 " + str(self.port_ip[2])) s.prompt() # match the prompt ping_re = re.search("transmitted.*received", s.before).group() @@ -651,6 +658,7 @@ class Sfc_fun: """##################################################################""" """ ######################## Stats Functions ################# #####""" + def portChainDeviceMap(self): """Check the PC Device Stats in the ONOS.""" response = requests.get('http://' + self.onos_hostname + @@ -664,7 +672,7 @@ class Sfc_fun: def portChainSfMap(self): """Check the PC SF Map Stats in the ONOS.""" - response = requests.get('http://'+self.onos_hostname + + response = requests.get('http://' + self.onos_hostname + ':8181/onos/vtn/portChainSfMap/' + self.PC_id, auth=("karaf", "karaf")) if (response.status_code == 200): @@ -677,8 +685,8 @@ class Sfc_fun: def deletePortChain(self): """Deletion of PortChain.""" - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/sfc/port_chains/'+self.PC_id + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/sfc/port_chains/' + self.PC_id headers = {"Accept": "application/json", "Content-Type": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -691,8 +699,8 @@ class Sfc_fun: def deleteFlowClassifier(self): """Deletion of Flow Classifier.""" - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/sfc/flow_classifiers/'+self.flow_class_if + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/sfc/flow_classifiers/' + self.flow_class_if headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -706,8 +714,8 @@ class Sfc_fun: def deletePortGroup(self): """Deletion of PortGroup.""" for p in range(0, 1): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/sfc/port_pair_groups/'+self.port_grp_id[p] + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/sfc/port_pair_groups/' + self.port_grp_id[p] headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -722,8 +730,8 @@ class Sfc_fun: def deletePortPair(self): """Deletion of Portpair.""" for p in range(1, 2): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/sfc/port_pairs/'+self.port_pair_id[0] + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/sfc/port_pairs/' + self.port_pair_id[0] headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -738,21 +746,22 @@ class Sfc_fun: """Cleanup.""" print ("\n\t\tDeleting the VMs") for y in range(0, 3): - url = 'http://'+self.nova_hostname+':8774/v2.1/servers/'+self.vm[y] + url = 'http://' + self.nova_hostname + \ + ':8774/v2.1/servers/' + self.vm[y] headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) if (response.status_code == 204): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - print ("\n\t\tVM"+str(y)+" is Deleted : ") - time.sleep(10) + self.logger.debug(response.status_code) + self.logger.debug(response.content) + print ("\n\t\tVM" + str(y) + " is Deleted : ") + time.sleep(10) else: return(response.status_code) print ("\n\t\tDeletion of Ports") for x in range(self.i, self.numTerms): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/ports/'+self.port_num[x] + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/ports/' + self.port_num[x] headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -769,8 +778,8 @@ class Sfc_fun: Dicdata['external_gateway_info'] = {} Dicdata = {'router': Dicdata} data = json.dumps(Dicdata, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/routers/'+self.router_id + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/routers/' + self.router_id headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.put(url, headers=headers, data=data) @@ -781,15 +790,15 @@ class Sfc_fun: if self.subnetId != '': Dicdata1['subnet_id'] = self.subnetId data = json.dumps(Dicdata1, indent=4) - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/routers/'+self.router_id + \ + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/routers/' + self.router_id + \ '/remove_router_interface.json' headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.put(url, headers=headers, data=data) if (response.status_code == 200): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/routers/'+self.router_id + url = ('http://' + self.neutron_hostname + ':9696/' + + self.osver + '/routers/' + self.router_id) headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -805,8 +814,8 @@ class Sfc_fun: return(response.status_code) print ("\n\t\tDeletion of Network") - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/networks/'+self.net_id + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/networks/' + self.net_id headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) @@ -819,14 +828,14 @@ class Sfc_fun: print ("\n\t\tDeletion of Floating ip") for ip_num in range(0, 2): - url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \ - '/floatingips/'+self.vm_public_id[ip_num] + url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \ + '/floatingips/' + self.vm_public_id[ip_num] headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} response = requests.delete(url, headers=headers) if (response.status_code == 204): - self.logger.debug(response.status_code) - self.logger.debug(response.content) + self.logger.debug(response.status_code) + self.logger.debug(response.content) else: - return(response.status_code) + return(response.status_code) return(response.status_code) -- cgit 1.2.3-korg