aboutsummaryrefslogtreecommitdiffstats
path: root/testcases/Controllers/ONOS/Sfc
diff options
context:
space:
mode:
Diffstat (limited to 'testcases/Controllers/ONOS/Sfc')
-rw-r--r--testcases/Controllers/ONOS/Sfc/Sfc_fun.py143
1 files changed, 76 insertions, 67 deletions
diff --git a/testcases/Controllers/ONOS/Sfc/Sfc_fun.py b/testcases/Controllers/ONOS/Sfc/Sfc_fun.py
index 28d83bbfd..b94eedcaf 100644
--- a/testcases/Controllers/ONOS/Sfc/Sfc_fun.py
+++ b/testcases/Controllers/ONOS/Sfc/Sfc_fun.py
@@ -1,13 +1,14 @@
"""SFC functions."""
-import functest.utils.functest_logger as ft_logger
import json
from multiprocessing import Process
from multiprocessing import Queue
-from pexpect import pxssh
+import os
import re
-import requests
import time
-import os
+
+import functest.utils.functest_logger as ft_logger
+from pexpect import pxssh
+import requests
class Sfc_fun:
@@ -101,7 +102,8 @@ class Sfc_fun:
def getToken(self):
"""Get the keystone token value from Openstack ."""
- url = 'http://'+self.keystone_hostname+':5000/'+self.osver+'/tokens'
+ url = 'http://' + self.keystone_hostname + \
+ ':5000/' + self.osver + '/tokens'
data = '{"auth": {"tenantName": "admin", "passwordCredentials":\
{ "username": "admin", "password": "console"}}}'
headers = {"Accept": "application/json"}
@@ -126,7 +128,8 @@ class Sfc_fun:
Dicdata['admin_state_up'] = self.admin_state_up
Dicdata = {'network': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/networks'
+ url = 'http://' + self.neutron_hostname + \
+ ':9696/' + self.osver + '/networks'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -155,7 +158,8 @@ class Sfc_fun:
Dicdata = {'subnet': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/subnets'
+ url = 'http://' + self.neutron_hostname + \
+ ':9696/' + self.osver + '/subnets'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -177,7 +181,7 @@ class Sfc_fun:
if self.net_id != '':
Dicdata['network_id'] = self.net_id
if self.port != '':
- Dicdata['name'] = "port"+str(x)
+ Dicdata['name'] = "port" + str(x)
if self.admin_state_up != '':
Dicdata['admin_state_up'] = self.admin_state_up
if self.security_groups != '':
@@ -187,7 +191,8 @@ class Sfc_fun:
Dicdata = {'port': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver+'/ports'
+ url = 'http://' + self.neutron_hostname + \
+ ':9696/' + self.osver + '/ports'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -207,7 +212,8 @@ class Sfc_fun:
def createVm(self):
"""Creation of Instance, using firewall image."""
- url = 'http://'+self.glance_hostname+':9292/v2/images?name=TestSfcVm'
+ url = 'http://' + self.glance_hostname + \
+ ':9292/v2/images?name=TestSfcVm'
headers = {"Accept": "application/json", "Content-Type": "application/\
octet-stream", "X-Auth-Token": self.token_id}
response = requests.get(url, headers=headers)
@@ -226,7 +232,7 @@ class Sfc_fun:
org_nw_port = []
org_nw_port.append({'port': self.port_num[y]})
if self.vm_name != '':
- Dicdata['name'] = "vm"+str(y)
+ Dicdata['name'] = "vm" + str(y)
if self.imageRef != '':
Dicdata['imageRef'] = self.image_id
if self.flavorRef != '':
@@ -240,8 +246,8 @@ class Sfc_fun:
Dicdata = {'server': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.nova_hostname+':8774/v2.1/'+self.tenant_id + \
- '/servers'
+ url = ('http://' + self.nova_hostname + ':8774/v2.1/' +
+ self.tenant_id + '/servers')
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -264,7 +270,8 @@ class Sfc_fun:
time.sleep(10)
for y in range(0, 3):
url = 'http://' + \
- self.nova_hostname+':8774/v2.1/servers/detail?name=vm'+str(y)
+ self.nova_hostname + \
+ ':8774/v2.1/servers/detail?name=vm' + str(y)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.get(url, headers=headers)
@@ -275,11 +282,11 @@ class Sfc_fun:
self.logger.debug(json1_data)
self.vm_active = json1_data['servers'][0]['status']
if (self.vm_active == "ACTIVE"):
- print ("\t\t\t\t\t\tVM"+str(y)+" is Active : "
- + self.vm_active)
+ print ("\t\t\t\t\t\tVM" + str(y) + " is Active : " +
+ self.vm_active)
else:
- print ("\t\t\t\t\t\tVM"+str(y)+" is NOT Active : "
- + self.vm_active)
+ print ("\t\t\t\t\t\tVM" + str(y) + " is NOT Active : " +
+ self.vm_active)
else:
return(response.status_code)
return(response.status_code)
@@ -295,18 +302,18 @@ class Sfc_fun:
egress = p
Dicdata['egress'] = self.port_num[egress]
if self.port_pair_name != '':
- Dicdata['name'] = "PP"+str(p)
+ Dicdata['name'] = "PP" + str(p)
Dicdata = {'port_pair': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/sfc/port_pairs'
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.post(url, headers=headers, data=data)
if (response.status_code == 201):
- print ("\t\t\t\tCreation of Port Pair PP"+str(p) +
+ print ("\t\t\t\tCreation of Port Pair PP" + str(p) +
" is successful")
else:
return(response.status_code)
@@ -316,7 +323,7 @@ class Sfc_fun:
def getPortPair(self):
"""Query the Portpair id value."""
for p in range(0, 1):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/sfc/port_pairs?name=PP1'
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
@@ -339,19 +346,19 @@ class Sfc_fun:
port_pair_list = []
port_pair_list.append(self.port_pair_id[p])
if self.port_group_name != '':
- Dicdata['name'] = "PG"+str(p)
+ Dicdata['name'] = "PG" + str(p)
if self.port_pair_id != '':
Dicdata['port_pairs'] = port_pair_list
Dicdata = {'port_pair_group': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/sfc/port_pair_groups'
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.post(url, headers=headers, data=data)
if (response.status_code == 201):
- print ("\t\t\t\tCreation of Port Group PG"+str(p) +
+ print ("\t\t\t\tCreation of Port Group PG" + str(p) +
"is successful")
else:
return(response.status_code)
@@ -361,8 +368,8 @@ class Sfc_fun:
def getPortGroup(self):
"""Query the PortGroup id."""
for p in range(0, 1):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/sfc/port_pair_groups?name=PG'+str(p)
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/sfc/port_pair_groups?name=PG' + str(p)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.get(url, headers=headers)
@@ -393,7 +400,7 @@ class Sfc_fun:
Dicdata = {'flow_classifier': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/sfc/flow_classifiers'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
@@ -423,7 +430,7 @@ class Sfc_fun:
Dicdata = {'port_chain': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/sfc/port_chains'
headers = {"Accept": "application/json",
"Content-Type": "application/json",
@@ -467,8 +474,8 @@ class Sfc_fun:
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/' + \
- self.osver+'/routers.json'
+ url = 'http://' + self.neutron_hostname + ':9696/' + \
+ self.osver + '/routers.json'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -485,7 +492,7 @@ class Sfc_fun:
def attachInterface(self):
"""Attachment of instance ports to the Router."""
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/networks?name=admin_floating_net'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
@@ -508,8 +515,8 @@ class Sfc_fun:
Dicdata['subnet_id'] = self.subnetId
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/routers/'+self.router_id+'/add_router_interface'
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/routers/' + self.router_id + '/add_router_interface'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
@@ -529,7 +536,7 @@ class Sfc_fun:
Dicdata1 = {'external_gateway_info': Dicdata1}
Dicdata1 = {'router': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname+':9696/'+self.osver + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
'/routers/' + self.router_id
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
@@ -549,7 +556,7 @@ class Sfc_fun:
Dicdata['pool'] = "admin_floating_net"
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.nova_hostname+':8774/v2.1/os-floating-ips'
+ url = 'http://' + self.nova_hostname + ':8774/v2.1/os-floating-ips'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -571,7 +578,7 @@ class Sfc_fun:
Dicdata1 = {'addFloatingIp': Dicdata1}
data = json.dumps(Dicdata1, indent=4)
url = 'http://' + self.nova_hostname + ':8774/v2.1/servers/' + \
- self.vm[ip_num]+'/action'
+ self.vm[ip_num] + '/action'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.post(url, headers=headers, data=data)
@@ -594,7 +601,7 @@ class Sfc_fun:
username = "cirros"
password = "cubswin:)"
s.login(hostname, username, password)
- s.sendline("ping -c 5 "+str(self.port_ip[2]))
+ s.sendline("ping -c 5 " + str(self.port_ip[2]))
s.prompt() # match the prompt
ping_re = re.search("transmitted.*received", s.before).group()
@@ -651,6 +658,7 @@ class Sfc_fun:
"""##################################################################"""
""" ######################## Stats Functions ################# #####"""
+
def portChainDeviceMap(self):
"""Check the PC Device Stats in the ONOS."""
response = requests.get('http://' + self.onos_hostname +
@@ -664,7 +672,7 @@ class Sfc_fun:
def portChainSfMap(self):
"""Check the PC SF Map Stats in the ONOS."""
- response = requests.get('http://'+self.onos_hostname +
+ response = requests.get('http://' + self.onos_hostname +
':8181/onos/vtn/portChainSfMap/' +
self.PC_id, auth=("karaf", "karaf"))
if (response.status_code == 200):
@@ -677,8 +685,8 @@ class Sfc_fun:
def deletePortChain(self):
"""Deletion of PortChain."""
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/sfc/port_chains/'+self.PC_id
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/sfc/port_chains/' + self.PC_id
headers = {"Accept": "application/json", "Content-Type":
"application/json", "X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
@@ -691,8 +699,8 @@ class Sfc_fun:
def deleteFlowClassifier(self):
"""Deletion of Flow Classifier."""
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/sfc/flow_classifiers/'+self.flow_class_if
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/sfc/flow_classifiers/' + self.flow_class_if
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
@@ -706,8 +714,8 @@ class Sfc_fun:
def deletePortGroup(self):
"""Deletion of PortGroup."""
for p in range(0, 1):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/sfc/port_pair_groups/'+self.port_grp_id[p]
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/sfc/port_pair_groups/' + self.port_grp_id[p]
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
@@ -722,8 +730,8 @@ class Sfc_fun:
def deletePortPair(self):
"""Deletion of Portpair."""
for p in range(1, 2):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/sfc/port_pairs/'+self.port_pair_id[0]
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/sfc/port_pairs/' + self.port_pair_id[0]
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
@@ -738,21 +746,22 @@ class Sfc_fun:
"""Cleanup."""
print ("\n\t\tDeleting the VMs")
for y in range(0, 3):
- url = 'http://'+self.nova_hostname+':8774/v2.1/servers/'+self.vm[y]
+ url = 'http://' + self.nova_hostname + \
+ ':8774/v2.1/servers/' + self.vm[y]
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
if (response.status_code == 204):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- print ("\n\t\tVM"+str(y)+" is Deleted : ")
- time.sleep(10)
+ self.logger.debug(response.status_code)
+ self.logger.debug(response.content)
+ print ("\n\t\tVM" + str(y) + " is Deleted : ")
+ time.sleep(10)
else:
return(response.status_code)
print ("\n\t\tDeletion of Ports")
for x in range(self.i, self.numTerms):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/ports/'+self.port_num[x]
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/ports/' + self.port_num[x]
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
@@ -769,8 +778,8 @@ class Sfc_fun:
Dicdata['external_gateway_info'] = {}
Dicdata = {'router': Dicdata}
data = json.dumps(Dicdata, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/routers/'+self.router_id
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/routers/' + self.router_id
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
@@ -781,15 +790,15 @@ class Sfc_fun:
if self.subnetId != '':
Dicdata1['subnet_id'] = self.subnetId
data = json.dumps(Dicdata1, indent=4)
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/routers/'+self.router_id + \
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/routers/' + self.router_id + \
'/remove_router_interface.json'
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.put(url, headers=headers, data=data)
if (response.status_code == 200):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/routers/'+self.router_id
+ url = ('http://' + self.neutron_hostname + ':9696/' +
+ self.osver + '/routers/' + self.router_id)
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
@@ -805,8 +814,8 @@ class Sfc_fun:
return(response.status_code)
print ("\n\t\tDeletion of Network")
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/networks/'+self.net_id
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/networks/' + self.net_id
headers = {"Accept": "application/json",
"X-Auth-Token": self.token_id}
response = requests.delete(url, headers=headers)
@@ -819,14 +828,14 @@ class Sfc_fun:
print ("\n\t\tDeletion of Floating ip")
for ip_num in range(0, 2):
- url = 'http://'+self.neutron_hostname+':9696/'+self.osver + \
- '/floatingips/'+self.vm_public_id[ip_num]
+ url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
+ '/floatingips/' + self.vm_public_id[ip_num]
headers = {"Accept": "application/json", "X-Auth-Token":
self.token_id}
response = requests.delete(url, headers=headers)
if (response.status_code == 204):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
+ self.logger.debug(response.status_code)
+ self.logger.debug(response.content)
else:
- return(response.status_code)
+ return(response.status_code)
return(response.status_code)