diff options
author | Morgan Richomme <morgan.richomme@orange.com> | 2017-08-28 07:06:09 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@opnfv.org> | 2017-08-28 07:06:09 +0000 |
commit | 4e39ca947fcb30ab176bf3a2405a5b71965497f7 (patch) | |
tree | ec00fe5909568c0b7d602bf30077a198c9cc251a /functest/opnfv_tests/sdn/onos/sfc | |
parent | af86e67845ff7d074d389aa54cd4c3744d037348 (diff) | |
parent | c9ef93de81e802bd01ec7381b243c724c38af34f (diff) |
Merge "Remove Onos in Functest"
Diffstat (limited to 'functest/opnfv_tests/sdn/onos/sfc')
-rw-r--r-- | functest/opnfv_tests/sdn/onos/sfc/README.md | 21 | ||||
-rw-r--r-- | functest/opnfv_tests/sdn/onos/sfc/sfc.py | 174 | ||||
-rw-r--r-- | functest/opnfv_tests/sdn/onos/sfc/sfc_onos.py | 887 |
3 files changed, 0 insertions, 1082 deletions
diff --git a/functest/opnfv_tests/sdn/onos/sfc/README.md b/functest/opnfv_tests/sdn/onos/sfc/README.md deleted file mode 100644 index ae63ee21..00000000 --- a/functest/opnfv_tests/sdn/onos/sfc/README.md +++ /dev/null @@ -1,21 +0,0 @@ -SFC Script ReadMe File -********************** - -Topology ---------- - -Validated with the Fuel Enviroment. - - -Things to Remember : --------------------- - -1] This Script basically Tests the SFC functionality with ONOS controller. -2] Ip address of Openstack and ONOS are got dynamically. -3] Initally this sfc script can be used for ONOS and on Request , if need will modify for other controllers. - - -Contact Details : ------------------ - -email-id : antonysilvester@gmail.com diff --git a/functest/opnfv_tests/sdn/onos/sfc/sfc.py b/functest/opnfv_tests/sdn/onos/sfc/sfc.py deleted file mode 100644 index 2bb9082c..00000000 --- a/functest/opnfv_tests/sdn/onos/sfc/sfc.py +++ /dev/null @@ -1,174 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (c) CREATED5 All rights reserved -# This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# ########################################################################### -# OPNFV SFC Script -# **** Scripted by Antony Silvester - antony.silvester@huawei.com ****** -# ########################################################################### - -# Testcase 1 : Prerequisites configuration for SFC -# Testcase 2 : Creation of 3 VNF Nodes and Attaching Ports -# Testcase 3 : Configure SFC [Port pair,Port Group ,Flow classifer -# Testcase 4 : Configure Port Chain and verify the flows are added. -# Testcase 5 : Verify traffic with VNF node. -# Testcase 6 : Remove the Port Chain and Verify the traffic. -# Testcase 7 : Cleanup -# ########################################################################### -# -"""Script to Test the SFC scenarios in ONOS.""" - -import logging -import time -import functest.utils.functest_utils as ft_utils -from sfc_onos import SfcOnos - -logger = logging.getLogger(__name__) -Sfc_obj = SfcOnos() - -OK = 200 -CREATED = 201 -ACCEPTED = 202 -NO_CONTENT = 204 - -start_time = time.time() - - -def PreConfig(): - logger.info("Testcase 1 : Prerequisites configuration for SFC") - logger.info("1.1 Creation of Auth-Token") - check(Sfc_obj.getToken, OK, "Creation of Token") - logger.info("1.2 Creation of Network") - check(Sfc_obj.createNetworks, CREATED, "Creation of network") - logger.info("1.3 Creation of Subnetwork") - check(Sfc_obj.createSubnets, CREATED, "Creation of Subnetwork") - - -def CreateNodes(): - logger.info("Testcase 2 : Creation of 3 VNF Nodes and Attaching Ports") - logger.info("2.1 Creation of Ports") - check(Sfc_obj.createPorts, CREATED, "Creation of Port") - logger.info("2.2 Creation of VM-Compute-Node") - check(Sfc_obj.createVm, ACCEPTED, "Creation of VM") - logger.info("2.3 Check VM Status") - check(Sfc_obj.checkVmState, OK, "Vm statue check") - logger.info("2.4 Router Creation") - check(Sfc_obj.createRouter, CREATED, "Creation of Router") - logger.info("2.5 Attachement of Interface to VM") - check(Sfc_obj.attachInterface, OK, "Interface attached to VM") - logger.info("2.6 Attachement of FLoating Ip to VM") - check(Sfc_obj.addFloatingIp, ACCEPTED, "Floating Ip attached to VM") - - -def ConfigSfc(): - logger.info( - "Testcase 3 : Configure SFC [Portair,PortGroup,Flow classifer]") - logger.info("3.1 Creation of Port Pair") - check(Sfc_obj.createPortPair, CREATED, "Creation of Port Pair") - logger.info("3.2 Getting the Port Pair ID") - check(Sfc_obj.getPortPair, OK, "Getting Port Pair ID") - logger.info("3.3 Creation of Port Pair Group") - check(Sfc_obj.createPortGroup, CREATED, "Creation of Port Pair Group") - logger.info("3.4 Getting Port Pair Group ID ") - check(Sfc_obj.getPortGroup, OK, "Getting Port Pair Group ID") - logger.info("3.5 Creation of Flow Classifier") - check(Sfc_obj.createFlowClassifier, CREATED, "Creation of Flow Classifier") - logger.info( - "Testcase 4 : Configure Port Chain and verify flows are added") - logger.info("4.1 Creation of Port Chain") - check(Sfc_obj.createPortChain, CREATED, "Creation of Port Chain") - - -def VerifySfcTraffic(): - status = "PASS" - logger.info("Testcase 5 : Verify traffic with VNF node.") - if (Sfc_obj.loginToVM() == "1"): - logger.info("SFC function Working") - else: - logger.error("SFC function not working") - status = "FAIL" - - logger.info("Testcase 6 : Remove the Port Chain and Verify the traffic") - if (Sfc_obj.deletePortChain() == NO_CONTENT): - if (Sfc_obj.loginToVM() == "0"): - logger.info("SFC function is removed Successfully") - else: - logger.error("SFC function not Removed. Have some problem") - status = "FAIL" - if (Sfc_obj.deleteFlowClassifier() == NO_CONTENT): - if (Sfc_obj.deletePortGroup() == NO_CONTENT): - if (Sfc_obj.deletePortPair() == NO_CONTENT): - logger.info( - "SFC configuration is deleted successfully") - else: - logger.error("Port pair is deleted successfully") - status = "FAIL" - else: - logger.error("Port Group is NOT deleted successfully") - status = "FAIL" - else: - logger.error("Flow classifier is NOT deleted successfully") - status = "FAIL" - else: - logger.error("PortChain configuration is NOT deleted successfully") - status = "FAIL" - if (status == "FAIL"): - fail("Traffic for SFC is NOT verified successfully") - - -def CleanUp(): - logger.info("Testcase 7 : Cleanup") - if (Sfc_obj.cleanup() == NO_CONTENT): - logger.info("CleanUp is successfull") - else: - logger.error("CleanUp is NOT successfull") - - -def check(method, criteria, msg): - if (method() == criteria): - logger.info(msg + 'is Successful') - else: - fail(msg + 'is not successful') - - -def fail(fail_info): - logger.error(fail_info) - CleanUp() - PushDB("FAIL", fail_info) - exit(-1) - - -def PushDB(status, info): - logger.info("Summary :") - try: - logger.debug("Push ONOS SFC results into DB") - stop_time = time.time() - - # ONOS SFC success criteria = all tests OK - duration = round(stop_time - start_time, 1) - logger.info("Result is " + status) - ft_utils.push_results_to_db("functest", - "onos_sfc", - start_time, - stop_time, - status, - details={'duration': duration, - 'error': info}) - except: - logger.error("Error pushing results into Database") - - -def main(): - """Script to Test the SFC scenarios in ONOS.""" - logging.basicConfig() - PreConfig() - CreateNodes() - ConfigSfc() - VerifySfcTraffic() - CleanUp() - PushDB("PASS", "") diff --git a/functest/opnfv_tests/sdn/onos/sfc/sfc_onos.py b/functest/opnfv_tests/sdn/onos/sfc/sfc_onos.py deleted file mode 100644 index 4e93c133..00000000 --- a/functest/opnfv_tests/sdn/onos/sfc/sfc_onos.py +++ /dev/null @@ -1,887 +0,0 @@ -import logging -import os -import re -import time -import json -import requests - -from multiprocessing import Process -from multiprocessing import Queue -from pexpect import pxssh - -from functest.utils.constants import CONST - -OK = 200 -CREATED = 201 -ACCEPTED = 202 -NO_CONTENT = 204 - - -class SfcOnos(object): - """Defines all the def function of SFC.""" - - def __init__(self): - """Initialization of variables.""" - self.logger = logging.getLogger(__name__) - self.osver = "v2.0" - self.token_id = 0 - self.net_id = 0 - self.image_id = 0 - self.keystone_hostname = 'keystone_ip' - self.neutron_hostname = 'neutron_ip' - self.nova_hostname = 'nova_ip' - self.glance_hostname = 'glance_ip' - self.onos_hostname = 'onos_ip' - # Network variables ####### - self.netname = "test_nw" - self.admin_state_up = True - self.tenant_id = 0 - self.subnetId = 0 - # ######################### - # SubNet variables######### - self.ip_version = 4 - self.cidr = "20.20.20.0/24" - self.subnetname = "test_nw_subnets" - # ############################### - # Port variable - self.port = "port" - self.port_num = [] - self.vm_id = 0 - self.port_ip = [] - self.count = 0 - self.i = 0 - self.numTerms = 3 - self.security_groups = [] - self.port_security_enabled = False - # ############################### - # VM creation variable - self.container_format = "bare" - self.disk_format = "qcow2" - self.imagename = "TestSfcVm" - self.createImage = ("/home/root1/devstack/files/images/" - "firewall_block_image.img") - - self.vm_name = "vm" - self.imageRef = "test" - self.flavorRef = "1" - self.max_count = "1" - self.min_count = "1" - self.org_nw_port = [] - self.image_id = 0 - self.routername = "router1" - self.router_id = 0 - # ##################################### - # Port pair - self.port_pair_ingress = 0 - self.port_pair_egress = 0 - self.port_pair_name = "PP" - self.port_pair_id = [] - # #################################### - # Port Group - self.port_group_name = "PG" - self.port_grp_id = [] - # #################################### - # FlowClassifier - self.source_ip_prefix = "20.20.20.0/24" - self.destination_ip_prefix = "20.20.20.0/24" - self.logical_source_port = 0 - self.fcname = "FC" - self.ethertype = "IPv4" - # ##################################### - self.flow_class_if = 0 - # ##################################### - # Port Chain variables - self.pcname = 'PC' - self.PC_id = 0 - # ##################################### - # Port Chain variables - self.flowadd = '' - # ##################################### - self.ip_pool = 0 - self.vm_public_ip = [] - self.vm_public_id = [] - self.cirros_username = CONST.__getattribute__( - 'openstack_image_username') - self.cirros_password = CONST.__getattribute__( - 'openstack_image_password') - self.net_id1 = 0 - self.vm = [] - self.address = 0 - self.value = 0 - self.pub_net_id = 0 - - def getToken(self): - """Get the keystone token value from Openstack .""" - url = 'http://%s:5000/%s/tokens' % (self.keystone_hostname, - self.osver) - data = ('{"auth": {"tenantName": "admin", "passwordCredentials":' - '{ "username": "admin", "password": "console"}}}') - headers = {"Accept": "application/json"} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == OK): - json1_data = json.loads(response.content) - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.debug(json1_data) - self.token_id = json1_data['access']['token']['id'] - self.tenant_id = json1_data['access']['token']['tenant']['id'] - return(response.status_code) - else: - return(response.status_code) - - def createNetworks(self): - """Creation of networks.""" - Dicdata = {} - if self.netname != '': - Dicdata['name'] = self.netname - if self.admin_state_up != '': - Dicdata['admin_state_up'] = self.admin_state_up - Dicdata = {'network': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/networks' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.net_id = json1_data['network']['id'] - return(response.status_code) - else: - return(response.status_code) - - def createSubnets(self): - """Creation of SubNets.""" - Dicdata = {} - if self.net_id != 0: - Dicdata['network_id'] = self.net_id - if self.ip_version != '': - Dicdata['ip_version'] = self.ip_version - if self.cidr != '': - Dicdata['cidr'] = self.cidr - if self.subnetname != '': - Dicdata['name'] = self.subnetname - - Dicdata = {'subnet': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/subnets' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - - if (response.status_code == CREATED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.subnetId = json1_data['subnet']['id'] - return(response.status_code) - else: - return(response.status_code) - - def createPorts(self): - """Creation of Ports.""" - for x in range(self.i, self.numTerms): - Dicdata = {} - if self.net_id != '': - Dicdata['network_id'] = self.net_id - if self.port != '': - Dicdata['name'] = "port" + str(x) - if self.admin_state_up != '': - Dicdata['admin_state_up'] = self.admin_state_up - if self.security_groups != '': - Dicdata['security_groups'] = self.security_groups - # if self.port_security_enabled != '': - # Dicdata['port_security_enabled'] = self.port_security_enabled - - Dicdata = {'port': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/ports' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - - if (response.status_code == CREATED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.port_num.append(json1_data['port']['id']) - self.port_ip.append(json1_data['port']['fixed_ips'][0] - ['ip_address']) - else: - return(response.status_code) - return(response.status_code) - - def createVm(self): - """Creation of Instance, using firewall image.""" - url = ("http://%s:9292/v2/images?" - "name=TestSfcVm" % (self.glance_hostname)) - headers = {"Accept": "application/json", - "Content-Type": "application/octet-stream", - "X-Auth-Token": self.token_id} - response = requests.get(url, headers=headers) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("FireWall Image is available") - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.image_id = json1_data['images'][0]['id'] - else: - return(response.status_code) - - url = ("http://%s:8774/v2.1/%s/flavors?" - "name=m1.tiny" % (self.nova_hostname, self.tenant_id)) - headers = {"Accept": "application/json", "Content-Type": - "application/json", "X-Auth-Token": self.token_id} - response = requests.get(url, headers=headers) - - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("Flavor is available") - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.flavorRef = json1_data['flavors'][0]['id'] - else: - return(response.status_code) - - for y in range(0, 3): - Dicdata = {} - org_nw_port = [] - org_nw_port.append({'port': self.port_num[y]}) - if self.vm_name != '': - Dicdata['name'] = "vm" + str(y) - if self.imageRef != '': - Dicdata['imageRef'] = self.image_id - if self.flavorRef != '': - Dicdata['flavorRef'] = self.flavorRef - if self.max_count != '': - Dicdata['max_count'] = self.max_count - if self.min_count != '': - Dicdata['min_count'] = self.min_count - if self.org_nw_port != '': - Dicdata['networks'] = org_nw_port - Dicdata = {'server': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:8774/v2.1/%s/servers' % (self.nova_hostname, - self.tenant_id) - headers = {"Accept": "application/json", "Content-Type": - "application/json", "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == ACCEPTED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - info = "Creation of VM" + str(y) + " is successfull" - self.logger.debug(info) - - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.vm_id = json1_data['server']['id'] - self.vm.append(json1_data['server']['id']) - else: - return(response.status_code) - - return(response.status_code) - - def checkVmState(self): - """Checking the Status of the Instance.""" - time.sleep(10) - for y in range(0, 3): - url = ("http://%s:8774/v2.1/servers/" - "detail?name=vm" + str(y)) % (self.neutron_hostname) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.get(url, headers=headers) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.vm_active = json1_data['servers'][0]['status'] - if (self.vm_active == "ACTIVE"): - info = "VM" + str(y) + " is Active : " + self.vm_active - else: - info = "VM" + str(y) + " is NOT Active : " + self.vm_active - self.logger.debug(info) - else: - return(response.status_code) - return(response.status_code) - time.sleep(10) - - def createPortPair(self): - """Creation of Port Pair.""" - for p in range(1, 2): - Dicdata = {} - if self.port_pair_ingress != '': - Dicdata['ingress'] = self.port_num[p] - if self.port_pair_egress != '': - egress = p - Dicdata['egress'] = self.port_num[egress] - if self.port_pair_name != '': - Dicdata['name'] = "PP" + str(p) - - Dicdata = {'port_pair': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/sfc/port_pairs' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - info = ("Creation of Port Pair PP" + str(p) + - " is successful") - self.logger.debug(info) - else: - return(response.status_code) - - return(response.status_code) - - def getPortPair(self): - """Query the Portpair id value.""" - for p in range(0, 1): - url = ("http://%s:9696/%s/" - "sfc/port_pairs?name=PP1" % (self.neutron_hostname, - self.osver)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.get(url, headers=headers) - - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.port_pair_id.append(json1_data['port_pairs'][0]['id']) - else: - return(response.status_code) - return(response.status_code) - - def createPortGroup(self): - """Creation of PortGroup.""" - for p in range(0, 1): - Dicdata = {} - port_pair_list = [] - port_pair_list.append(self.port_pair_id[p]) - if self.port_group_name != '': - Dicdata['name'] = "PG" + str(p) - if self.port_pair_id != '': - Dicdata['port_pairs'] = port_pair_list - - Dicdata = {'port_pair_group': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = ("http://%s:9696/%s/" - "sfc/port_pair_groups" % (self.neutron_hostname, - self.osver)) - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - info = ("Creation of Port Group PG" + str(p) + - "is successful") - self.logger.debug(info) - else: - return(response.status_code) - - return(response.status_code) - - def getPortGroup(self): - """Query the PortGroup id.""" - for p in range(0, 1): - url = ("http://%s:9696/%s/sfc/port_pair_groups" - "?name=PG" + str(p)) % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.get(url, headers=headers) - - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - json1_data = json.loads(response.content) - self.port_grp_id.append(json1_data['port_pair_groups'] - [0]['id']) - else: - return(response.status_code) - return(response.status_code) - - def createFlowClassifier(self): - """Creation of Flow Classifier.""" - Dicdata = {} - if self.source_ip_prefix != '': - Dicdata['source_ip_prefix'] = self.source_ip_prefix - if self.destination_ip_prefix != '': - Dicdata['destination_ip_prefix'] = self.destination_ip_prefix - if self.logical_source_port != '': - Dicdata['logical_source_port'] = self.port_num[0] - if self.fcname != '': - Dicdata['name'] = "FC1" - if self.ethertype != '': - Dicdata['ethertype'] = self.ethertype - - Dicdata = {'flow_classifier': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = ("http://%s:9696/%s/" - "sfc/flow_classifiers" % (self.neutron_hostname, - self.osver)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - json1_data = json.loads(response.content) - self.flow_class_if = json1_data['flow_classifier']['id'] - self.logger.debug("Creation of Flow Classifier is successful") - return(response.status_code) - else: - return(response.status_code) - - def createPortChain(self): - """Creation of PortChain.""" - Dicdata = {} - flow_class_list = [] - flow_class_list.append(self.flow_class_if) - port_pair_groups_list = [] - port_pair_groups_list.append(self.port_grp_id[0]) - - if flow_class_list != '': - Dicdata['flow_classifiers'] = flow_class_list - if self.pcname != '': - Dicdata['name'] = "PC1" - if port_pair_groups_list != '': - Dicdata['port_pair_groups'] = port_pair_groups_list - - Dicdata = {'port_chain': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/sfc/port_chains' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", - "Content-Type": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - self.logger.debug("Creation of PORT CHAIN is successful") - json1_data = json.loads(response.content) - self.PC_id = json1_data['port_chain']['id'] - return(response.status_code) - else: - return(response.status_code) - - def checkFlowAdded(self): - """Check whether the Flows are downloaded successfully.""" - time.sleep(5) - response = requests.get('http://' + self.onos_hostname + - ':8181/onos/v1/flows', - auth=("karaf", "karaf")) - if (response.status_code == OK): - self.logger.debug("Flow is successfully Queries") - json1_data = json.loads(response.content) - self.flowadd = json1_data['flows'][0]['state'] - - if (self.flowadd == "ADDED"): - self.logger.info("Flow is successfully added to OVS") - return(response.status_code) - else: - return(404) - else: - return(response.status_code) -#################################################################### - - def createRouter(self): - """Creation of Router.""" - Dicdata = {} - if self.routername != '': - Dicdata['name'] = "router1" - if self.admin_state_up != '': - Dicdata['admin_state_up'] = self.admin_state_up - - Dicdata = {'router': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = 'http://%s:9696/%s/routers.json' % (self.neutron_hostname, - self.osver) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == CREATED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.debug("Creation of Router is successfull") - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.router_id = json1_data['router']['id'] - return(response.status_code) - else: - return(response.status_code) - - def attachInterface(self): - """Attachment of instance ports to the Router.""" - url = ("http://%s:9696/%s/networks" - "?name=admin_floating_net" % (self.neutron_hostname, - self.osver)) - headers = {"Accept": "application/json", "X-Auth-Token": self.token_id} - response = requests.get(url, headers=headers) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.net_name = json1_data['networks'][0]['name'] - if (self.net_name == "admin_floating_net"): - self.pub_net_id = json1_data['networks'][0]['id'] - else: - return(response.status_code) - ############################################################ - - self.logger.info("Attachment of Instance interface to Router") - Dicdata = {} - if self.subnetId != '': - Dicdata['subnet_id'] = self.subnetId - - data = json.dumps(Dicdata, indent=4) - url = ("http://%s:9696/%s/routers" - "/%s/add_router_interface" % (self.neutron_hostname, - self.osver, - self.router_id)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.put(url, headers=headers, data=data) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("Interface attached successfull") - else: - return(response.status_code) - ############################################################ - self.logger.info("Attachment of Gateway to Router") - - Dicdata1 = {} - if self.pub_net_id != 0: - Dicdata1['network_id'] = self.pub_net_id - - Dicdata1 = {'external_gateway_info': Dicdata1} - Dicdata1 = {'router': Dicdata1} - data = json.dumps(Dicdata1, indent=4) - url = 'http://%s:9696/%s/routers/%s' % (self.neutron_hostname, - self.osver, - self.router_id) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.put(url, headers=headers, data=data) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("Gateway Interface attached successfull") - return(response.status_code) - else: - return(response.status_code) - - def addFloatingIp(self): - """Attachment of Floating Ip to the Router.""" - for ip_num in range(0, 2): - Dicdata = {} - Dicdata['pool'] = "admin_floating_net" - - data = json.dumps(Dicdata, indent=4) - url = ("http://%s:8774/v2.1/" - "os-floating-ips" % (self.nova_hostname)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("Floating ip created successfully") - json1_data = json.loads(response.content) - self.logger.debug(json1_data) - self.vm_public_ip.append(json1_data['floating_ip']['ip']) - self.vm_public_id.append(json1_data['floating_ip']['id']) - else: - self.logger.error("Floating ip NOT created successfully") - - Dicdata1 = {} - if self.address != '': - Dicdata1['address'] = self.vm_public_ip[ip_num] - - Dicdata1 = {'addFloatingIp': Dicdata1} - data = json.dumps(Dicdata1, indent=4) - url = ("http://%s:8774/v2.1/" - "servers/%s/action" % (self.nova_hostname, - self.vm[ip_num])) - - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.post(url, headers=headers, data=data) - if(response.status_code == ACCEPTED): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.info("Public Ip successfully added to VM") - else: - return(response.status_code) - return(response.status_code) - - def loginToVM(self): - """Login to the VM to check NSH packets are received.""" - queue1 = "0" - - def vm0(): - - s = pxssh.pxssh() - hostname = self.vm_public_ip[0] - s.login(hostname, self.cirros_username, self.cirros_password) - s.sendline("ping -c 5 " + str(self.port_ip[2])) - s.prompt() # match the prompt - - ping_re = re.search("transmitted.*received", s.before).group() - x = re.split('\s+', ping_re) - if (x[1] >= "1"): - self.logger.info("Ping is Successfull") - else: - self.logger.info("Ping is NOT Successfull") - - def vm1(queue1): - s = pxssh.pxssh() - hostname = self.vm_public_ip[1] - s.login(hostname, self.cirros_username, self.cirros_password) - s.sendline('sudo ./firewall') - s.prompt() - output_pack = s.before - - if(output_pack.find("nshc") != -1): - self.logger.info("The packet has reached VM2 Instance") - queue1.put("1") - else: - self.logger.info("Packet not received in Instance") - queue1.put("0") - - def ping(ip, timeout=300): - while True: - time.sleep(1) - self.logger.debug("Pinging %s. Waiting for response..." % ip) - response = os.system("ping -c 1 " + ip + " >/dev/null 2>&1") - if response == 0: - self.logger.info("Ping " + ip + " detected!") - return 0 - - elif timeout == 0: - self.logger.info("Ping " + ip + " timeout reached.") - return 1 - timeout -= 1 - - result0 = ping(self.vm_public_ip[0]) - result1 = ping(self.vm_public_ip[1]) - if result0 == 0 and result1 == 0: - time.sleep(300) - queue1 = Queue() - p1 = Process(target=vm1, args=(queue1, )) - p1.start() - p2 = Process(target=vm0) - p2.start() - p1.join(10) - return (queue1.get()) - else: - self.logger.error("Thread didnt run") - - """##################################################################""" - """ ######################## Stats Functions ################# #####""" - - def portChainDeviceMap(self): - """Check the PC Device Stats in the ONOS.""" - response = requests.get('http://' + self.onos_hostname + - ':8181/onos/vtn/portChainDeviceMap/' + - self.PC_id, auth=("karaf", "karaf")) - if (response.status_code == OK): - self.logger.info("PortChainDeviceMap is successfully Queries") - return(response.status_code) - else: - return(response.status_code) - - def portChainSfMap(self): - """Check the PC SF Map Stats in the ONOS.""" - response = requests.get('http://' + self.onos_hostname + - ':8181/onos/vtn/portChainSfMap/' + - self.PC_id, auth=("karaf", "karaf")) - if (response.status_code == OK): - self.logger.info("portChainSfMap is successfully Queries") - return(response.status_code) - else: - return(response.status_code) - - """###################################################################""" - - def deletePortChain(self): - """Deletion of PortChain.""" - url = ('http://' + self.neutron_hostname + ':9696/' + - self.osver + '/sfc/port_chains/' + self.PC_id) - headers = {"Accept": "application/json", "Content-Type": - "application/json", "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - return(response.status_code) - else: - return(response.status_code) - - def deleteFlowClassifier(self): - """Deletion of Flow Classifier.""" - url = ("http://%s:9696/%s/sfc/" - "flow_classifiers/%s" % (self.neutron_hostname, - self.osver, - self.flow_class_if)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - return(response.status_code) - else: - return(response.status_code) - - def deletePortGroup(self): - """Deletion of PortGroup.""" - for p in range(0, 1): - url = ("http://%s:9696/%s/sfc/" - "port_pair_groups/%s" % (self.neutron_hostname, - self.osver, - self.port_grp_id[p])) - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - else: - return(response.status_code) - return(response.status_code) - - def deletePortPair(self): - """Deletion of Portpair.""" - for p in range(1, 2): - url = ("http://%s:9696/%s/sfc/" - "port_pairs/%s" % (self.neutron_hostname, - self.osver, - self.port_pair_id[0])) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - else: - return(response.status_code) - return(response.status_code) - - def cleanup(self): - """Cleanup.""" - self.logger.info("Deleting VMs") - for y in range(0, 3): - url = ("http://%s:8774/v2.1/" - "/servers/%s" % (self.nova_hostname, - self.vm[y])) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.debug("VM" + str(y) + " is Deleted : ") - time.sleep(10) - else: - return(response.status_code) - self.logger.info("Deleting Ports") - for x in range(self.i, self.numTerms): - url = ('http://' + self.neutron_hostname + ':9696/' + - self.osver + '/ports/' + self.port_num[x]) - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.delete(url, headers=headers) - - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - self.logger.debug("Port" + str(x) + " Deleted") - else: - return(response.status_code) - self.logger.info("Deleting Router") - - Dicdata = {} - Dicdata['external_gateway_info'] = {} - Dicdata = {'router': Dicdata} - data = json.dumps(Dicdata, indent=4) - url = ("http://%s:9696/%s/" - "/routers/%s" % (self.neutron_hostname, - self.osver, - self.router_id)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.put(url, headers=headers, data=data) - if (response.status_code == OK): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - Dicdata1 = {} - if self.subnetId != '': - Dicdata1['subnet_id'] = self.subnetId - data = json.dumps(Dicdata1, indent=4) - url = ("http://%s:9696/%s/routers/%s" - "/remove_router_interface.json" % (self.neutron_hostname, - self.osver, - self.router_id)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.put(url, headers=headers, data=data) - if (response.status_code == OK): - url = ("http://%s:9696/%s/" - "routers/%s" % (self.neutron_hostname, - self.osver, - self.router_id)) - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - else: - return(response.status_code) - else: - return(response.status_code) - else: - return(response.status_code) - - self.logger.info("Deleting Network") - url = "http://%s:9696/%s/networks/%s" % (self.neutron_hostname, - self.osver, - self.net_id) - - headers = {"Accept": "application/json", - "X-Auth-Token": self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - else: - return(response.status_code) - - self.logger.info("Deleting Floating ip") - for ip_num in range(0, 2): - url = ("http://%s:9696/%s/floatingips" - "/%s" % (self.neutron_hostname, - self.osver, - self.vm_public_id[ip_num])) - - headers = {"Accept": "application/json", "X-Auth-Token": - self.token_id} - response = requests.delete(url, headers=headers) - if (response.status_code == NO_CONTENT): - self.logger.debug(response.status_code) - self.logger.debug(response.content) - else: - return(response.status_code) - return(response.status_code) |