aboutsummaryrefslogtreecommitdiffstats
path: root/functest/opnfv_tests/Controllers
diff options
context:
space:
mode:
authorMorgan Richomme <morgan.richomme@orange.com>2016-11-24 18:25:52 +0100
committerMorgan Richomme <morgan.richomme@orange.com>2016-11-25 15:09:35 +0100
commit165ff17205d99b18df36e5ac6f34ede858e3da17 (patch)
tree3199a34b651204cd62bdc78b5c5fc19c73ca3f02 /functest/opnfv_tests/Controllers
parent96aef41089d2c62913320028dd9618197d0e75c5 (diff)
file/dir renaming for consistency
JIRA: FUNCTEST-579 Change-Id: Iaa545db70bfb76770df0a3d17871e29ce518ff2d Signed-off-by: Morgan Richomme <morgan.richomme@orange.com>
Diffstat (limited to 'functest/opnfv_tests/Controllers')
-rwxr-xr-xfunctest/opnfv_tests/Controllers/ODL/OpenDaylightTesting.py241
-rw-r--r--functest/opnfv_tests/Controllers/ODL/__init__.py0
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Sfc/README.md21
-rwxr-xr-xfunctest/opnfv_tests/Controllers/ONOS/Sfc/Sfc.py177
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc_fun.py863
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/Readme.txt5
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/__init__.py0
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/adapters/__init__.py0
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/adapters/client.py92
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/adapters/connection.py200
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/adapters/environment.py286
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/adapters/foundation.py93
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/dependencies/onos29
-rw-r--r--functest/opnfv_tests/Controllers/ONOS/Teston/log/gitignore0
-rwxr-xr-xfunctest/opnfv_tests/Controllers/ONOS/Teston/onosfunctest.py261
-rw-r--r--functest/opnfv_tests/Controllers/__init__.py0
16 files changed, 0 insertions, 2268 deletions
diff --git a/functest/opnfv_tests/Controllers/ODL/OpenDaylightTesting.py b/functest/opnfv_tests/Controllers/ODL/OpenDaylightTesting.py
deleted file mode 100755
index b78db8b1..00000000
--- a/functest/opnfv_tests/Controllers/ODL/OpenDaylightTesting.py
+++ /dev/null
@@ -1,241 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2016 Orange and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-
-import argparse
-import errno
-import fileinput
-import os
-import re
-import sys
-import urlparse
-
-from robot.api import ExecutionResult, ResultVisitor
-from robot.errors import RobotError
-import robot.run
-from robot.utils.robottime import timestamp_to_secs
-
-from functest.core import TestCasesBase
-import functest.utils.functest_logger as ft_logger
-import functest.utils.openstack_utils as op_utils
-import functest.utils.functest_constants as ft_constants
-
-
-class ODLResultVisitor(ResultVisitor):
-
- def __init__(self):
- self._data = []
-
- def visit_test(self, test):
- output = {}
- output['name'] = test.name
- output['parent'] = test.parent.name
- output['status'] = test.status
- output['startime'] = test.starttime
- output['endtime'] = test.endtime
- output['critical'] = test.critical
- output['text'] = test.message
- output['elapsedtime'] = test.elapsedtime
- self._data.append(output)
-
- def get_data(self):
- return self._data
-
-
-class ODLTestCases(TestCasesBase.TestCasesBase):
-
- repos = ft_constants.REPOS_DIR
- odl_test_repo = os.path.join(repos, "odl_test")
- neutron_suite_dir = os.path.join(odl_test_repo,
- "csit/suites/openstack/neutron")
- basic_suite_dir = os.path.join(odl_test_repo,
- "csit/suites/integration/basic")
- res_dir = os.path.join(ft_constants.FUNCTEST_RESULTS_DIR, "odl")
-
- logger = ft_logger.Logger("opendaylight").getLogger()
-
- def __init__(self):
- self.case_name = "odl"
-
- @classmethod
- def set_robotframework_vars(cls, odlusername="admin", odlpassword="admin"):
- odl_variables_files = os.path.join(cls.odl_test_repo,
- 'csit/variables/Variables.py')
- try:
- for line in fileinput.input(odl_variables_files,
- inplace=True):
- print re.sub("AUTH = .*",
- ("AUTH = [u'" + odlusername + "', u'" +
- odlpassword + "']"),
- line.rstrip())
- return True
- except Exception as e:
- cls.logger.error("Cannot set ODL creds: %s" % str(e))
- return False
-
- def parse_results(self):
- output_dir = os.path.join(self.res_dir, 'output.xml')
- result = ExecutionResult(output_dir)
- visitor = ODLResultVisitor()
- result.visit(visitor)
- self.criteria = result.suite.status
- self.start_time = timestamp_to_secs(result.suite.starttime)
- self.stop_time = timestamp_to_secs(result.suite.endtime)
- self.details = {}
- self.details['description'] = result.suite.name
- self.details['tests'] = visitor.get_data()
- return self.criteria
-
- def main(self, **kwargs):
- dirs = [self.basic_suite_dir, self.neutron_suite_dir]
- try:
- odlusername = kwargs['odlusername']
- odlpassword = kwargs['odlpassword']
- variables = ['KEYSTONE:' + kwargs['keystoneip'],
- 'NEUTRON:' + kwargs['neutronip'],
- 'OSUSERNAME:"' + kwargs['osusername'] + '"',
- 'OSTENANTNAME:"' + kwargs['ostenantname'] + '"',
- 'OSPASSWORD:"' + kwargs['ospassword'] + '"',
- 'ODL_SYSTEM_IP:' + kwargs['odlip'],
- 'PORT:' + kwargs['odlwebport'],
- 'RESTCONFPORT:' + kwargs['odlrestconfport']]
- except KeyError as e:
- self.logger.error("Cannot run ODL testcases. Please check "
- "%s" % str(e))
- return self.EX_RUN_ERROR
- if self.set_robotframework_vars(odlusername, odlpassword):
- try:
- os.makedirs(self.res_dir)
- except OSError as e:
- if e.errno != errno.EEXIST:
- self.logger.exception(
- "Cannot create {}".format(self.res_dir))
- return self.EX_RUN_ERROR
- stdout_file = os.path.join(self.res_dir, 'stdout.txt')
- output_dir = os.path.join(self.res_dir, 'output.xml')
- with open(stdout_file, 'w+') as stdout:
- robot.run(*dirs, variable=variables,
- output=output_dir,
- log='NONE',
- report='NONE',
- stdout=stdout)
- stdout.seek(0, 0)
- self.logger.info("\n" + stdout.read())
- self.logger.info("ODL results were successfully generated")
- try:
- test_res = self.parse_results()
- self.logger.info("ODL results were successfully parsed")
- if test_res is not "PASS":
- return self.EX_RUN_ERROR
- except RobotError as e:
- self.logger.error("Run tests before publishing: %s" %
- e.message)
- return self.EX_RUN_ERROR
- try:
- os.remove(stdout_file)
- except OSError:
- self.logger.warning("Cannot remove {}".format(stdout_file))
- return self.EX_OK
- else:
- return self.EX_RUN_ERROR
-
- def run(self):
- try:
- kclient = op_utils.get_keystone_client()
- keystone_url = kclient.service_catalog.url_for(
- service_type='identity', endpoint_type='publicURL')
- neutron_url = kclient.service_catalog.url_for(
- service_type='network', endpoint_type='publicURL')
- kwargs = {'keystoneip': urlparse.urlparse(keystone_url).hostname}
- kwargs['neutronip'] = urlparse.urlparse(neutron_url).hostname
- kwargs['odlip'] = kwargs['neutronip']
- kwargs['odlwebport'] = '8080'
- kwargs['odlrestconfport'] = '8181'
- kwargs['odlusername'] = 'admin'
- kwargs['odlpassword'] = 'admin'
-
- installer_type = ft_constants.CI_INSTALLER_TYPE
- kwargs['osusername'] = ft_constants.OS_USERNAME
- kwargs['ostenantname'] = ft_constants.OS_TENANT_NAME
- kwargs['ospassword'] = ft_constants.OS_PASSWORD
-
- if installer_type == 'fuel':
- kwargs['odlwebport'] = '8282'
- elif installer_type == 'apex':
- if ft_constants.SDN_CONTROLLER_IP is None:
- return self.EX_RUN_ERROR
- kwargs['odlip'] = ft_constants.SDN_CONTROLLER_IP
- kwargs['odlwebport'] = '8181'
- elif installer_type == 'joid':
- if ft_constants.SDN_CONTROLLER is None:
- return self.EX_RUN_ERROR
- kwargs['odlip'] = ft_constants.SDN_CONTROLLER
- elif installer_type == 'compass':
- kwargs['odlwebport'] = '8181'
- else:
- if ft_constants.SDN_CONTROLLER_IP is None:
- return self.EX_RUN_ERROR
- kwargs['odlip'] = ft_constants.SDN_CONTROLLER_IP
- except KeyError as e:
- self.logger.error("Cannot run ODL testcases. "
- "Please check env var: "
- "%s" % str(e))
- return self.EX_RUN_ERROR
- except Exception:
- self.logger.exception("Cannot run ODL testcases.")
- return self.EX_RUN_ERROR
-
- return self.main(**kwargs)
-
-
-if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('-k', '--keystoneip',
- help='Keystone IP',
- default='127.0.0.1')
- parser.add_argument('-n', '--neutronip',
- help='Neutron IP',
- default='127.0.0.1')
- parser.add_argument('-a', '--osusername',
- help='Username for OpenStack',
- default='admin')
- parser.add_argument('-b', '--ostenantname',
- help='Tenantname for OpenStack',
- default='admin')
- parser.add_argument('-c', '--ospassword',
- help='Password for OpenStack',
- default='admin')
- parser.add_argument('-o', '--odlip',
- help='OpenDaylight IP',
- default='127.0.0.1')
- parser.add_argument('-w', '--odlwebport',
- help='OpenDaylight Web Portal Port',
- default='8080')
- parser.add_argument('-r', '--odlrestconfport',
- help='OpenDaylight RESTConf Port',
- default='8181')
- parser.add_argument('-d', '--odlusername',
- help='Username for ODL',
- default='admin')
- parser.add_argument('-e', '--odlpassword',
- help='Password for ODL',
- default='admin')
- parser.add_argument('-p', '--pushtodb',
- help='Push results to DB',
- action='store_true')
-
- args = vars(parser.parse_args())
- odl = ODLTestCases()
- try:
- result = odl.main(**args)
- if result != TestCasesBase.TestCasesBase.EX_OK:
- sys.exit(result)
- if args['pushtodb']:
- sys.exit(odl.push_to_db())
- except Exception:
- sys.exit(TestCasesBase.TestCasesBase.EX_RUN_ERROR)
diff --git a/functest/opnfv_tests/Controllers/ODL/__init__.py b/functest/opnfv_tests/Controllers/ODL/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/Controllers/ODL/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/Controllers/ONOS/Sfc/README.md b/functest/opnfv_tests/Controllers/ONOS/Sfc/README.md
deleted file mode 100644
index ae63ee21..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Sfc/README.md
+++ /dev/null
@@ -1,21 +0,0 @@
-SFC Script ReadMe File
-**********************
-
-Topology
----------
-
-Validated with the Fuel Enviroment.
-
-
-Things to Remember :
---------------------
-
-1] This Script basically Tests the SFC functionality with ONOS controller.
-2] Ip address of Openstack and ONOS are got dynamically.
-3] Initally this sfc script can be used for ONOS and on Request , if need will modify for other controllers.
-
-
-Contact Details :
------------------
-
-email-id : antonysilvester@gmail.com
diff --git a/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc.py b/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc.py
deleted file mode 100755
index e3f08041..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc.py
+++ /dev/null
@@ -1,177 +0,0 @@
-"""Script to Test the SFC scenarios in ONOS."""
-# !/usr/bin/python
-#
-# Copyright (c) CREATED5 All rights reserved
-# This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# ###########################################################################
-# OPNFV SFC Script
-# **** Scripted by Antony Silvester - antony.silvester@huawei.com ******
-# ###########################################################################
-
-# Testcase 1 : Prerequisites configuration for SFC
-# Testcase 2 : Creation of 3 VNF Nodes and Attaching Ports
-# Testcase 3 : Configure SFC [Port pair,Port Group ,Flow classifer
-# Testcase 4 : Configure Port Chain and verify the flows are added.
-# Testcase 5 : Verify traffic with VNF node.
-# Testcase 6 : Remove the Port Chain and Verify the traffic.
-# Testcase 7 : Cleanup
-# ###########################################################################
-#
-
-import time
-import functest.utils.functest_logger as ft_logger
-import functest.utils.functest_utils as ft_utils
-from Sfc_fun import Sfc_fun
-
-logger = ft_logger.Logger("sfc").getLogger()
-Sfc_obj = Sfc_fun()
-
-OK = 200
-CREATED = 201
-ACCEPTED = 202
-NO_CONTENT = 204
-
-start_time = time.time()
-
-
-def PreConfig():
- logger.info("Testcase 1 : Prerequisites configuration for SFC")
- logger.info("1.1 Creation of Auth-Token")
- check(Sfc_obj.getToken, OK, "Creation of Token")
- logger.info("1.2 Creation of Network")
- check(Sfc_obj.createNetworks, CREATED, "Creation of network")
- logger.info("1.3 Creation of Subnetwork")
- check(Sfc_obj.createSubnets, CREATED, "Creation of Subnetwork")
-
-
-def CreateNodes():
- logger.info("Testcase 2 : Creation of 3 VNF Nodes and Attaching Ports")
- logger.info("2.1 Creation of Ports")
- check(Sfc_obj.createPorts, CREATED, "Creation of Port")
- logger.info("2.2 Creation of VM-Compute-Node")
- check(Sfc_obj.createVm, ACCEPTED, "Creation of VM")
- logger.info("2.3 Check VM Status")
- check(Sfc_obj.checkVmState, OK, "Vm statue check")
- logger.info("2.4 Router Creation")
- check(Sfc_obj.createRouter, CREATED, "Creation of Router")
- logger.info("2.5 Attachement of Interface to VM")
- check(Sfc_obj.attachInterface, OK, "Interface attached to VM")
- logger.info("2.6 Attachement of FLoating Ip to VM")
- check(Sfc_obj.addFloatingIp, ACCEPTED, "Floating Ip attached to VM")
-
-
-def ConfigSfc():
- logger.info(
- "Testcase 3 : Configure SFC [Portair,PortGroup,Flow classifer]")
- logger.info("3.1 Creation of Port Pair")
- check(Sfc_obj.createPortPair, CREATED, "Creation of Port Pair")
- logger.info("3.2 Getting the Port Pair ID")
- check(Sfc_obj.getPortPair, OK, "Getting Port Pair ID")
- logger.info("3.3 Creation of Port Pair Group")
- check(Sfc_obj.createPortGroup, CREATED, "Creation of Port Pair Group")
- logger.info("3.4 Getting Port Pair Group ID ")
- check(Sfc_obj.getPortGroup, OK, "Getting Port Pair Group ID")
- logger.info("3.5 Creation of Flow Classifier")
- check(Sfc_obj.createFlowClassifier, CREATED, "Creation of Flow Classifier")
- logger.info(
- "Testcase 4 : Configure Port Chain and verify flows are added")
- logger.info("4.1 Creation of Port Chain")
- check(Sfc_obj.createPortChain, CREATED, "Creation of Port Chain")
-
-
-def VerifySfcTraffic():
- status = "PASS"
- logger.info("Testcase 5 : Verify traffic with VNF node.")
- if (Sfc_obj.loginToVM() == "1"):
- logger.info("SFC function Working")
- else:
- logger.error("SFC function not working")
- status = "FAIL"
-
- logger.info("Testcase 6 : Remove the Port Chain and Verify the traffic")
- if (Sfc_obj.deletePortChain() == NO_CONTENT):
- if (Sfc_obj.loginToVM() == "0"):
- logger.info("SFC function is removed Successfully")
- else:
- logger.error("SFC function not Removed. Have some problem")
- status = "FAIL"
- if (Sfc_obj.deleteFlowClassifier() == NO_CONTENT):
- if (Sfc_obj.deletePortGroup() == NO_CONTENT):
- if (Sfc_obj.deletePortPair() == NO_CONTENT):
- logger.info(
- "SFC configuration is deleted successfully")
- else:
- logger.error("Port pair is deleted successfully")
- status = "FAIL"
- else:
- logger.error("Port Group is NOT deleted successfully")
- status = "FAIL"
- else:
- logger.error("Flow classifier is NOT deleted successfully")
- status = "FAIL"
- else:
- logger.error("PortChain configuration is NOT deleted successfully")
- status = "FAIL"
- if (status == "FAIL"):
- fail("Traffic for SFC is NOT verified successfully")
-
-
-def CleanUp():
- logger.info("Testcase 7 : Cleanup")
- if (Sfc_obj.cleanup() == NO_CONTENT):
- logger.info("CleanUp is successfull")
- else:
- logger.error("CleanUp is NOT successfull")
-
-
-def check(method, criteria, msg):
- if (method() == criteria):
- logger.info(msg + 'is Successful')
- else:
- fail(msg + 'is not successful')
-
-
-def fail(fail_info):
- logger.error(fail_info)
- CleanUp()
- PushDB("FAIL", fail_info)
- exit(-1)
-
-
-def PushDB(status, info):
- logger.info("Summary :")
- try:
- logger.debug("Push ONOS SFC results into DB")
- stop_time = time.time()
-
- # ONOS SFC success criteria = all tests OK
- duration = round(stop_time - start_time, 1)
- logger.info("Result is " + status)
- ft_utils.push_results_to_db("functest",
- "onos_sfc",
- start_time,
- stop_time,
- status,
- details={'duration': duration,
- 'error': info})
- except:
- logger.error("Error pushing results into Database")
-
-
-def main():
- """Script to Test the SFC scenarios in ONOS."""
- PreConfig()
- CreateNodes()
- ConfigSfc()
- VerifySfcTraffic()
- CleanUp()
- PushDB("PASS", "")
-
-
-if __name__ == '__main__':
- main()
diff --git a/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc_fun.py b/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc_fun.py
deleted file mode 100644
index 69e076d0..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Sfc/Sfc_fun.py
+++ /dev/null
@@ -1,863 +0,0 @@
-import os
-import re
-import time
-import json
-import requests
-
-from multiprocessing import Process
-from multiprocessing import Queue
-from pexpect import pxssh
-
-import functest.utils.functest_logger as ft_logger
-
-OK = 200
-CREATED = 201
-ACCEPTED = 202
-NO_CONTENT = 204
-
-
-class Sfc_fun:
- """Defines all the def function of SFC."""
-
- def __init__(self):
- """Initialization of variables."""
- self.logger = ft_logger.Logger("sfc_fun").getLogger()
- self.osver = "v2.0"
- self.token_id = 0
- self.net_id = 0
- self.image_id = 0
- self.keystone_hostname = 'keystone_ip'
- self.neutron_hostname = 'neutron_ip'
- self.nova_hostname = 'nova_ip'
- self.glance_hostname = 'glance_ip'
- self.onos_hostname = 'onos_ip'
- # Network variables #######
- self.netname = "test_nw"
- self.admin_state_up = True
- self.tenant_id = 0
- self.subnetId = 0
- # #########################
- # SubNet variables#########
- self.ip_version = 4
- self.cidr = "20.20.20.0/24"
- self.subnetname = "test_nw_subnets"
- # ###############################
- # Port variable
- self.port = "port"
- self.port_num = []
- self.vm_id = 0
- self.port_ip = []
- self.count = 0
- self.i = 0
- self.numTerms = 3
- self.security_groups = []
- self.port_security_enabled = False
- # ###############################
- # VM creation variable
- self.container_format = "bare"
- self.disk_format = "qcow2"
- self.imagename = "TestSfcVm"
- self.createImage = "/home/root1/devstack/files/images/\
- firewall_block_image.img"
-
- self.vm_name = "vm"
- self.imageRef = "test"
- self.flavorRef = "1"
- self.max_count = "1"
- self.min_count = "1"
- self.org_nw_port = []
- self.image_id = 0
- self.routername = "router1"
- self.router_id = 0
- # #####################################
- # Port pair
- self.port_pair_ingress = 0
- self.port_pair_egress = 0
- self.port_pair_name = "PP"
- self.port_pair_id = []
- # ####################################
- # Port Group
- self.port_group_name = "PG"
- self.port_grp_id = []
- # ####################################
- # FlowClassifier
- self.source_ip_prefix = "20.20.20.0/24"
- self.destination_ip_prefix = "20.20.20.0/24"
- self.logical_source_port = 0
- self.fcname = "FC"
- self.ethertype = "IPv4"
- # #####################################
- self.flow_class_if = 0
- # #####################################
- # Port Chain variables
- self.pcname = 'PC'
- self.PC_id = 0
- # #####################################
- # Port Chain variables
- self.flowadd = ''
- # #####################################
- self.ip_pool = 0
- self.vm_public_ip = []
- self.vm_public_id = []
- self.net_id1 = 0
- self.vm = []
- self.address = 0
- self.value = 0
- self.pub_net_id = 0
-
- def getToken(self):
- """Get the keystone token value from Openstack ."""
- url = 'http://' + self.keystone_hostname + \
- ':5000/' + self.osver + '/tokens'
- data = '{"auth": {"tenantName": "admin", "passwordCredentials":\
- { "username": "admin", "password": "console"}}}'
- headers = {"Accept": "application/json"}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == OK):
- json1_data = json.loads(response.content)
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.debug(json1_data)
- self.token_id = json1_data['access']['token']['id']
- self.tenant_id = json1_data['access']['token']['tenant']['id']
- return(response.status_code)
- else:
- return(response.status_code)
-
- def createNetworks(self):
- """Creation of networks."""
- Dicdata = {}
- if self.netname != '':
- Dicdata['name'] = self.netname
- if self.admin_state_up != '':
- Dicdata['admin_state_up'] = self.admin_state_up
- Dicdata = {'network': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/networks'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
-
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.net_id = json1_data['network']['id']
- return(response.status_code)
- else:
- return(response.status_code)
-
- def createSubnets(self):
- """Creation of SubNets."""
- Dicdata = {}
- if self.net_id != 0:
- Dicdata['network_id'] = self.net_id
- if self.ip_version != '':
- Dicdata['ip_version'] = self.ip_version
- if self.cidr != '':
- Dicdata['cidr'] = self.cidr
- if self.subnetname != '':
- Dicdata['name'] = self.subnetname
-
- Dicdata = {'subnet': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/subnets'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
-
- if (response.status_code == CREATED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.subnetId = json1_data['subnet']['id']
- return(response.status_code)
- else:
- return(response.status_code)
-
- def createPorts(self):
- """Creation of Ports."""
- for x in range(self.i, self.numTerms):
- Dicdata = {}
- if self.net_id != '':
- Dicdata['network_id'] = self.net_id
- if self.port != '':
- Dicdata['name'] = "port" + str(x)
- if self.admin_state_up != '':
- Dicdata['admin_state_up'] = self.admin_state_up
- if self.security_groups != '':
- Dicdata['security_groups'] = self.security_groups
- # if self.port_security_enabled != '':
- # Dicdata['port_security_enabled'] = self.port_security_enabled
-
- Dicdata = {'port': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + \
- ':9696/' + self.osver + '/ports'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
-
- if (response.status_code == CREATED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
-
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.port_num.append(json1_data['port']['id'])
- self.port_ip.append(json1_data['port']['fixed_ips'][0]
- ['ip_address'])
- else:
- return(response.status_code)
- return(response.status_code)
-
- def createVm(self):
- """Creation of Instance, using firewall image."""
- url = 'http://' + self.glance_hostname + \
- ':9292/v2/images?name=TestSfcVm'
- headers = {"Accept": "application/json", "Content-Type": "application/\
- octet-stream", "X-Auth-Token": self.token_id}
- response = requests.get(url, headers=headers)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("FireWall Image is available")
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.image_id = json1_data['images'][0]['id']
- else:
- return(response.status_code)
-
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/' + self.tenant_id + '/flavors?name=m1.tiny'
- headers = {"Accept": "application/json", "Content-Type":
- "application/json", "X-Auth-Token": self.token_id}
- response = requests.get(url, headers=headers)
-
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("Flavor is available")
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.flavorRef = json1_data['flavors'][0]['id']
- else:
- return(response.status_code)
-
- for y in range(0, 3):
- Dicdata = {}
- org_nw_port = []
- org_nw_port.append({'port': self.port_num[y]})
- if self.vm_name != '':
- Dicdata['name'] = "vm" + str(y)
- if self.imageRef != '':
- Dicdata['imageRef'] = self.image_id
- if self.flavorRef != '':
- Dicdata['flavorRef'] = self.flavorRef
- if self.max_count != '':
- Dicdata['max_count'] = self.max_count
- if self.min_count != '':
- Dicdata['min_count'] = self.min_count
- if self.org_nw_port != '':
- Dicdata['networks'] = org_nw_port
- Dicdata = {'server': Dicdata}
- data = json.dumps(Dicdata, indent=4)
-
- url = ('http://' + self.nova_hostname + ':8774/v2.1/' +
- self.tenant_id + '/servers')
- headers = {"Accept": "application/json", "Content-Type":
- "application/json", "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == ACCEPTED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- info = "Creation of VM" + str(y) + " is successfull"
- self.logger.debug(info)
-
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.vm_id = json1_data['server']['id']
- self.vm.append(json1_data['server']['id'])
- else:
- return(response.status_code)
-
- return(response.status_code)
-
- def checkVmState(self):
- """Checking the Status of the Instance."""
- time.sleep(10)
- for y in range(0, 3):
- url = 'http://' + \
- self.nova_hostname + \
- ':8774/v2.1/servers/detail?name=vm' + str(y)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.get(url, headers=headers)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.vm_active = json1_data['servers'][0]['status']
- if (self.vm_active == "ACTIVE"):
- info = "VM" + str(y) + \
- " is Active : " + self.vm_active
- else:
- info = "VM" + str(y) + " is NOT Active : " + \
- self.vm_active
- self.logger.debug(info)
- else:
- return(response.status_code)
- return(response.status_code)
- time.sleep(10)
-
- def createPortPair(self):
- """Creation of Port Pair."""
- for p in range(1, 2):
- Dicdata = {}
- if self.port_pair_ingress != '':
- Dicdata['ingress'] = self.port_num[p]
- if self.port_pair_egress != '':
- egress = p
- Dicdata['egress'] = self.port_num[egress]
- if self.port_pair_name != '':
- Dicdata['name'] = "PP" + str(p)
-
- Dicdata = {'port_pair': Dicdata}
- data = json.dumps(Dicdata, indent=4)
-
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs'
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- info = "Creation of Port Pair PP" + str(p) + \
- " is successful"
- self.logger.debug(info)
- else:
- return(response.status_code)
-
- return(response.status_code)
-
- def getPortPair(self):
- """Query the Portpair id value."""
- for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs?name=PP1'
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.get(url, headers=headers)
-
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.port_pair_id.append(json1_data['port_pairs'][0]['id'])
- else:
- return(response.status_code)
- return(response.status_code)
-
- def createPortGroup(self):
- """Creation of PortGroup."""
- for p in range(0, 1):
- Dicdata = {}
- port_pair_list = []
- port_pair_list.append(self.port_pair_id[p])
- if self.port_group_name != '':
- Dicdata['name'] = "PG" + str(p)
- if self.port_pair_id != '':
- Dicdata['port_pairs'] = port_pair_list
-
- Dicdata = {'port_pair_group': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups'
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- info = "Creation of Port Group PG" + str(p) + \
- "is successful"
- self.logger.debug(info)
- else:
- return(response.status_code)
-
- return(response.status_code)
-
- def getPortGroup(self):
- """Query the PortGroup id."""
- for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups?name=PG' + str(p)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.get(url, headers=headers)
-
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- json1_data = json.loads(response.content)
- self.port_grp_id.append(json1_data['port_pair_groups']
- [0]['id'])
- else:
- return(response.status_code)
- return(response.status_code)
-
- def createFlowClassifier(self):
- """Creation of Flow Classifier."""
- Dicdata = {}
- if self.source_ip_prefix != '':
- Dicdata['source_ip_prefix'] = self.source_ip_prefix
- if self.destination_ip_prefix != '':
- Dicdata['destination_ip_prefix'] = self.destination_ip_prefix
- if self.logical_source_port != '':
- Dicdata['logical_source_port'] = self.port_num[0]
- if self.fcname != '':
- Dicdata['name'] = "FC1"
- if self.ethertype != '':
- Dicdata['ethertype'] = self.ethertype
-
- Dicdata = {'flow_classifier': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- json1_data = json.loads(response.content)
- self.flow_class_if = json1_data['flow_classifier']['id']
- self.logger.debug("Creation of Flow Classifier is successful")
- return(response.status_code)
- else:
- return(response.status_code)
-
- def createPortChain(self):
- """Creation of PortChain."""
- Dicdata = {}
- flow_class_list = []
- flow_class_list.append(self.flow_class_if)
- port_pair_groups_list = []
- port_pair_groups_list.append(self.port_grp_id[0])
-
- if flow_class_list != '':
- Dicdata['flow_classifiers'] = flow_class_list
- if self.pcname != '':
- Dicdata['name'] = "PC1"
- if port_pair_groups_list != '':
- Dicdata['port_pair_groups'] = port_pair_groups_list
-
- Dicdata = {'port_chain': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains'
- headers = {"Accept": "application/json",
- "Content-Type": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- self.logger.debug("Creation of PORT CHAIN is successful")
- json1_data = json.loads(response.content)
- self.PC_id = json1_data['port_chain']['id']
- return(response.status_code)
- else:
- return(response.status_code)
-
- def checkFlowAdded(self):
- """Check whether the Flows are downloaded successfully."""
- time.sleep(5)
- response = requests.get('http://' + self.onos_hostname +
- ':8181/onos/v1/flows',
- auth=("karaf", "karaf"))
- if (response.status_code == OK):
- self.logger.debug("Flow is successfully Queries")
- json1_data = json.loads(response.content)
- self.flowadd = json1_data['flows'][0]['state']
-
- if (self.flowadd == "ADDED"):
- self.logger.info("Flow is successfully added to OVS")
- return(response.status_code)
- else:
- return(404)
- else:
- return(response.status_code)
-####################################################################
-
- def createRouter(self):
- """Creation of Router."""
- Dicdata = {}
- if self.routername != '':
- Dicdata['name'] = "router1"
- if self.admin_state_up != '':
- Dicdata['admin_state_up'] = self.admin_state_up
-
- Dicdata = {'router': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + \
- self.osver + '/routers.json'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == CREATED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.debug("Creation of Router is successfull")
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.router_id = json1_data['router']['id']
- return(response.status_code)
- else:
- return(response.status_code)
-
- def attachInterface(self):
- """Attachment of instance ports to the Router."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks?name=admin_floating_net'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.get(url, headers=headers)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.net_name = json1_data['networks'][0]['name']
- if (self.net_name == "admin_floating_net"):
- self.pub_net_id = json1_data['networks'][0]['id']
- else:
- return(response.status_code)
- ############################################################
-
- self.logger.info("Attachment of Instance interface to Router")
- Dicdata = {}
- if self.subnetId != '':
- Dicdata['subnet_id'] = self.subnetId
-
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + '/add_router_interface'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("Interface attached successfull")
- else:
- return(response.status_code)
- ############################################################
- self.logger.info("Attachment of Gateway to Router")
-
- Dicdata1 = {}
- if self.pub_net_id != 0:
- Dicdata1['network_id'] = self.pub_net_id
-
- Dicdata1 = {'external_gateway_info': Dicdata1}
- Dicdata1 = {'router': Dicdata1}
- data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("Gateway Interface attached successfull")
- return(response.status_code)
- else:
- return(response.status_code)
-
- def addFloatingIp(self):
- """Attachment of Floating Ip to the Router."""
- for ip_num in range(0, 2):
- Dicdata = {}
- Dicdata['pool'] = "admin_floating_net"
-
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/os-floating-ips'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("Floating ip created successfully")
- json1_data = json.loads(response.content)
- self.logger.debug(json1_data)
- self.vm_public_ip.append(json1_data['floating_ip']['ip'])
- self.vm_public_id.append(json1_data['floating_ip']['id'])
- else:
- self.logger.error("Floating ip NOT created successfully")
-
- Dicdata1 = {}
- if self.address != '':
- Dicdata1['address'] = self.vm_public_ip[ip_num]
-
- Dicdata1 = {'addFloatingIp': Dicdata1}
- data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.nova_hostname + ':8774/v2.1/servers/' + \
- self.vm[ip_num] + '/action'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.post(url, headers=headers, data=data)
- if(response.status_code == ACCEPTED):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.info("Public Ip successfully added to VM")
- else:
- return(response.status_code)
- return(response.status_code)
-
- def loginToVM(self):
- """Login to the VM to check NSH packets are received."""
- queue1 = "0"
-
- def vm0():
-
- s = pxssh.pxssh()
- hostname = self.vm_public_ip[0]
- username = "cirros"
- password = "cubswin:)"
- s.login(hostname, username, password)
- s.sendline("ping -c 5 " + str(self.port_ip[2]))
- s.prompt() # match the prompt
-
- ping_re = re.search("transmitted.*received", s.before).group()
- x = re.split('\s+', ping_re)
- if (x[1] >= "1"):
- self.logger.info("Ping is Successfull")
- else:
- self.logger.info("Ping is NOT Successfull")
-
- def vm1(queue1):
- s = pxssh.pxssh()
- hostname = self.vm_public_ip[1]
- username = "cirros"
- password = "cubswin:)"
- s.login(hostname, username, password)
- s.sendline('sudo ./firewall')
- s.prompt()
- output_pack = s.before
-
- if(output_pack.find("nshc") != -1):
- self.logger.info("The packet has reached VM2 Instance")
- queue1.put("1")
- else:
- self.logger.info("Packet not received in Instance")
- queue1.put("0")
-
- def ping(ip, timeout=300):
- while True:
- time.sleep(1)
- self.logger.debug("Pinging %s. Waiting for response..." % ip)
- response = os.system("ping -c 1 " + ip + " >/dev/null 2>&1")
- if response == 0:
- self.logger.info("Ping " + ip + " detected!")
- return 0
-
- elif timeout == 0:
- self.logger.info("Ping " + ip + " timeout reached.")
- return 1
- timeout -= 1
-
- result0 = ping(self.vm_public_ip[0])
- result1 = ping(self.vm_public_ip[1])
- if result0 == 0 and result1 == 0:
- time.sleep(300)
- queue1 = Queue()
- p1 = Process(target=vm1, args=(queue1, ))
- p1.start()
- p2 = Process(target=vm0)
- p2.start()
- p1.join(10)
- return (queue1.get())
- else:
- self.logger.error("Thread didnt run")
-
- """##################################################################"""
- """ ######################## Stats Functions ################# #####"""
-
- def portChainDeviceMap(self):
- """Check the PC Device Stats in the ONOS."""
- response = requests.get('http://' + self.onos_hostname +
- ':8181/onos/vtn/portChainDeviceMap/' +
- self.PC_id, auth=("karaf", "karaf"))
- if (response.status_code == OK):
- self.logger.info("PortChainDeviceMap is successfully Queries")
- return(response.status_code)
- else:
- return(response.status_code)
-
- def portChainSfMap(self):
- """Check the PC SF Map Stats in the ONOS."""
- response = requests.get('http://' + self.onos_hostname +
- ':8181/onos/vtn/portChainSfMap/' +
- self.PC_id, auth=("karaf", "karaf"))
- if (response.status_code == OK):
- self.logger.info("portChainSfMap is successfully Queries")
- return(response.status_code)
- else:
- return(response.status_code)
-
- """###################################################################"""
-
- def deletePortChain(self):
- """Deletion of PortChain."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_chains/' + self.PC_id
- headers = {"Accept": "application/json", "Content-Type":
- "application/json", "X-Auth-Token": self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- return(response.status_code)
- else:
- return(response.status_code)
-
- def deleteFlowClassifier(self):
- """Deletion of Flow Classifier."""
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/flow_classifiers/' + self.flow_class_if
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- return(response.status_code)
- else:
- return(response.status_code)
-
- def deletePortGroup(self):
- """Deletion of PortGroup."""
- for p in range(0, 1):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pair_groups/' + self.port_grp_id[p]
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- else:
- return(response.status_code)
- return(response.status_code)
-
- def deletePortPair(self):
- """Deletion of Portpair."""
- for p in range(1, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/sfc/port_pairs/' + self.port_pair_id[0]
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- else:
- return(response.status_code)
- return(response.status_code)
-
- def cleanup(self):
- """Cleanup."""
- self.logger.info("Deleting VMs")
- for y in range(0, 3):
- url = 'http://' + self.nova_hostname + \
- ':8774/v2.1/servers/' + self.vm[y]
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.debug("VM" + str(y) + " is Deleted : ")
- time.sleep(10)
- else:
- return(response.status_code)
- self.logger.info("Deleting Ports")
- for x in range(self.i, self.numTerms):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/ports/' + self.port_num[x]
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.delete(url, headers=headers)
-
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- self.logger.debug("Port" + str(x) + " Deleted")
- else:
- return(response.status_code)
- self.logger.info("Deleting Router")
-
- Dicdata = {}
- Dicdata['external_gateway_info'] = {}
- Dicdata = {'router': Dicdata}
- data = json.dumps(Dicdata, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
- if (response.status_code == OK):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- Dicdata1 = {}
- if self.subnetId != '':
- Dicdata1['subnet_id'] = self.subnetId
- data = json.dumps(Dicdata1, indent=4)
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/routers/' + self.router_id + \
- '/remove_router_interface.json'
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.put(url, headers=headers, data=data)
- if (response.status_code == OK):
- url = ('http://' + self.neutron_hostname + ':9696/' +
- self.osver + '/routers/' + self.router_id)
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- else:
- return(response.status_code)
- else:
- return(response.status_code)
- else:
- return(response.status_code)
-
- self.logger.info("Deleting Network")
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/networks/' + self.net_id
- headers = {"Accept": "application/json",
- "X-Auth-Token": self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- else:
- return(response.status_code)
-
- self.logger.info("Deleting Floating ip")
- for ip_num in range(0, 2):
- url = 'http://' + self.neutron_hostname + ':9696/' + self.osver + \
- '/floatingips/' + self.vm_public_id[ip_num]
- headers = {"Accept": "application/json", "X-Auth-Token":
- self.token_id}
- response = requests.delete(url, headers=headers)
- if (response.status_code == NO_CONTENT):
- self.logger.debug(response.status_code)
- self.logger.debug(response.content)
- else:
- return(response.status_code)
- return(response.status_code)
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/Readme.txt b/functest/opnfv_tests/Controllers/ONOS/Teston/Readme.txt
deleted file mode 100644
index 7393f59a..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/Readme.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-1.This is a basic test run about onos,we will make them better and better
-2.This test include two suites:
-(1)Test northbound(network/subnet/ports create/update/delete)
-(2)Ovsdb test,default configuration,openflow connection,vm go onlines.
-3.Later we will make a framework to do this test \ No newline at end of file
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/__init__.py b/functest/opnfv_tests/Controllers/ONOS/Teston/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/__init__.py b/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/__init__.py
+++ /dev/null
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/client.py b/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/client.py
deleted file mode 100644
index 6b3285e5..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/client.py
+++ /dev/null
@@ -1,92 +0,0 @@
-"""
-Description:
- This file is used to run testcase
- lanqinglong@huawei.com
-
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-"""
-import json
-import pexpect
-import requests
-import time
-
-from environment import environment
-import functest.utils.functest_logger as ft_logger
-
-
-class client(environment):
-
- logger = ft_logger.Logger("client").getLogger()
-
- def __init__(self):
- environment.__init__(self)
- self.loginfo = environment()
- self.testcase = ''
-
- def RunScript(self, handle, testname, timeout=300):
- """
- Run ONOS Test Script
- Parameters:
- testname: ONOS Testcase Name
- masterusername: The server username of running ONOS
- masterpassword: The server password of running ONOS
- """
- self.testcase = testname
- self.ChangeTestCasePara(testname, self.masterusername,
- self.masterpassword)
- runhandle = handle
- runtest = (self.home + "/OnosSystemTest/TestON/bin/cli.py run " +
- testname)
- runhandle.sendline(runtest)
- circletime = 0
- lastshowscreeninfo = ''
- while True:
- Result = runhandle.expect(["PEXPECT]#", pexpect.EOF,
- pexpect.TIMEOUT])
- curshowscreeninfo = runhandle.before
- if(len(lastshowscreeninfo) != len(curshowscreeninfo)):
- self.loginfo.log(str(curshowscreeninfo)
- [len(lastshowscreeninfo)::])
- lastshowscreeninfo = curshowscreeninfo
- if Result == 0:
- self.logger.info("Done!")
- return
- time.sleep(1)
- circletime += 1
- if circletime > timeout:
- break
- self.loginfo.log("Timeout when running the test, please check!")
-
- def onosstart(self):
- # This is the compass run machine user&pass,you need to modify
-
- self.logger.info("Test Begin.....")
- self.OnosConnectionSet()
- masterhandle = self.SSHlogin(self.localhost, self.masterusername,
- self.masterpassword)
- self.OnosEnvSetup(masterhandle)
- return masterhandle
-
- def onosclean(self, handle):
- self.SSHRelease(handle)
- self.loginfo.log('Release onos handle Successful')
-
- def push_results_to_db(self, payload, pushornot=1):
- if pushornot != 1:
- return 1
- url = self.Result_DB + "/results"
- params = {"project_name": "functest", "case_name": "ONOS-" +
- self.testcase, "pod_name": 'huawei-build-2',
- "details": payload}
-
- headers = {'Content-Type': 'application/json'}
- try:
- r = requests.post(url, data=json.dumps(params), headers=headers)
- self.loginfo.log(r)
- except:
- self.loginfo.log('Error pushing results into Database')
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/connection.py b/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/connection.py
deleted file mode 100644
index b2a2e3d8..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/connection.py
+++ /dev/null
@@ -1,200 +0,0 @@
-"""
-Description:
- This file is used to make connections
- Include ssh & exchange public-key to each other so that
- it can run without password
-
- lanqinglong@huawei.com
-
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-"""
-import os
-import pexpect
-import re
-
-from foundation import foundation
-import functest.utils.functest_logger as ft_logger
-
-
-class connection(foundation):
-
- logger = ft_logger.Logger("connection").getLogger()
-
- def __init__(self):
- foundation.__init__(self)
- self.loginfo = foundation()
-
- def AddKnownHost(self, handle, ipaddr, username, password):
- """
- Add an user to known host,so that onos can login in with onos $ipaddr.
- parameters:
- ipaddr: ip address
- username: login user name
- password: login password
- """
- self.logger.info("Now Adding an user to known hosts " + ipaddr)
- login = handle
- login.sendline("ssh -l %s -p 8101 %s" % (username, ipaddr))
- index = 0
- while index != 2:
- index = login.expect(['assword:', 'yes/no', pexpect.EOF,
- pexpect.TIMEOUT])
- if index == 0:
- login.sendline(password)
- login.sendline("logout")
- index = login.expect(["closed", pexpect.EOF])
- if index == 0:
- self.loginfo.log("Add SSH Known Host Success!")
- break
- else:
- self.loginfo.log("Add SSH Known Host Failed! "
- "Please Check!")
- break
- login.prompt()
-
- if index == 1:
- login.sendline('yes')
-
- def GetEnvValue(self, handle, envname):
- """
- os.getenv only returns current user value
- GetEnvValue returns a environment value of
- current handle
- eg: GetEnvValue(handle,'HOME')
- """
- envhandle = handle
- envhandle.sendline('echo $' + envname)
- envhandle.prompt()
- reg = envname + '\r\n(.*)\r'
- envaluereg = re.compile(reg)
- envalue = envaluereg.search(envhandle.before)
- if envalue:
- return envalue.groups()[0]
- else:
- return None
-
- def Gensshkey(self, handle):
- """
- Generate ssh keys, used for some server have no sshkey.
- """
- self.logger.info("Now Generating SSH keys...")
- # Here file name may be id_rsa or id_ecdsa or others
- # So here will have a judgement
- keysub = handle
- filepath = self.GetEnvValue(keysub, 'HOME') + '/.ssh'
- filelist = os.listdir(filepath)
- for item in filelist:
- if 'id' in item:
- self.loginfo.log("SSH keys are exsit in ssh directory.")
- return True
- keysub.sendline("ssh-keygen -t rsa")
- Result = 0
- while Result != 2:
- Result = keysub.expect(["Overwrite", "Enter", pexpect.EOF,
- 'PEXPECT]#', pexpect.TIMEOUT])
- if Result == 0:
- keysub.sendline("y")
- if Result == 1 or Result == 2:
- keysub.sendline("\n")
- if Result == 3:
- self.loginfo.log("Generate SSH key success.")
- keysub.prompt()
- break
- if Result == 4:
- self.loginfo.log("Generate SSH key failed.")
- keysub.prompt()
- break
-
- def GetRootAuth(self, password):
- """
- Get root user
- parameters:
- password: root login password
- """
- self.logger.info("Now changing to user root")
- login = pexpect.spawn("su - root")
- index = 0
- while index != 2:
- index = login.expect(['assword:', "failure",
- pexpect.EOF, pexpect.TIMEOUT])
- if index == 0:
- login.sendline(password)
- if index == 1:
- self.loginfo.log("Change user to root failed.")
-
- login.interact()
-
- def ReleaseRootAuth(self):
- """
- Exit root user.
- """
- self.logger.info("Now Release user root")
- login = pexpect.spawn("exit")
- index = login.expect(['logout', pexpect.EOF, pexpect.TIMEOUT])
- if index == 0:
- self.loginfo.log("Release root user success.")
- if index == 1:
- self.loginfo.log("Release root user failed.")
-
- login.interact()
-
- def AddEnvIntoBashrc(self, envalue):
- """
- Add Env var into /etc/profile.
- parameters:
- envalue: environment value to add
- """
- self.logger.info("Now Adding bash environment")
- fileopen = open("/etc/profile", 'r')
- findContext = 1
- while findContext:
- findContext = fileopen.readline()
- result = findContext.find(envalue)
- if result != -1:
- break
- fileopen.close
- if result == -1:
- envAdd = open("/etc/profile", 'a+')
- envAdd.writelines("\n" + envalue)
- envAdd.close()
- self.loginfo.log("Add env to bashrc success!")
-
- def OnosRootPathChange(self, onospath):
- """
- Change ONOS root path in file:bash_profile
- onospath: path of onos root
- """
- self.logger.info("Now Changing ONOS Root Path")
- filepath = onospath + 'onos/tools/dev/bash_profile'
- line = open(filepath, 'r').readlines()
- lenall = len(line) - 1
- for i in range(lenall):
- if "export ONOS_ROOT" in line[i]:
- line[i] = 'export ONOS_ROOT=' + onospath + 'onos\n'
- NewFile = open(filepath, 'w')
- NewFile.writelines(line)
- NewFile.close
- self.logger.info("Done!")
-
- def OnosConnectionSet(self):
- """
- Intergrate for ONOS connection setup
- """
- if self.masterusername == 'root':
- filepath = '/root/'
- else:
- filepath = '/home/' + self.masterusername + '/'
- filepath = os.path.join(filepath, "onos/tools/dev/bash_profile")
- self.AddEnvIntoBashrc("source " + filepath + "\n")
- self.AddEnvIntoBashrc("export OCT=" + self.OCT)
- self.AddEnvIntoBashrc("export OC1=" + self.OC1)
- self.AddEnvIntoBashrc("export OC2=" + self.OC2)
- self.AddEnvIntoBashrc("export OC3=" + self.OC3)
- self.AddEnvIntoBashrc("export OCN=" + self.OCN)
- self.AddEnvIntoBashrc("export OCN2=" + self.OCN2)
- self.AddEnvIntoBashrc("export localhost=" + self.localhost)
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/environment.py b/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/environment.py
deleted file mode 100644
index 01f70b85..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/environment.py
+++ /dev/null
@@ -1,286 +0,0 @@
-"""
-Description:
- This file is used to setup the running environment
- Include Download code,setup environment variable
- Set onos running config
- Set user name/password
- Onos-push-keys and so on
- lanqinglong@huawei.com
-
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-"""
-
-import pexpect
-import pxssh
-import re
-import os
-import sys
-import time
-
-from connection import connection
-import functest.utils.functest_logger as ft_logger
-
-
-class environment(connection):
-
- logger = ft_logger.Logger("environment").getLogger()
-
- def __init__(self):
- connection.__init__(self)
- self.loginfo = connection()
- self.masterhandle = ''
- self.home = ''
-
- def DownLoadCode(self, handle, codeurl):
- """
- Download Code use 'git clone'
- parameters:
- handle: current working handle
- codeurl: clone code url
- """
- self.logger.info("Now loading test codes! Please wait in patient...")
- originalfolder = sys.path[0]
- self.logger.info(originalfolder)
- gitclone = handle
- gitclone.sendline("git clone " + codeurl)
- index = 0
- # increment = 0
- while index != 1 or index != 4:
- index = gitclone.expect(['already exists',
- 'esolving deltas: 100%',
- 'eceiving objects',
- 'Already up-to-date',
- 'npacking objects: 100%', pexpect.EOF])
-
- filefolder = self.home + '/' + codeurl.split('/')[-1].split('.')[0]
- if index == 0:
- os.chdir(filefolder)
- os.system('git pull')
- os.chdir(originalfolder)
- self.loginfo.log('Download code success!')
- break
- elif index == 1 or index == 4:
- self.loginfo.log('Download code success!')
- gitclone.sendline("mkdir onos")
- gitclone.prompt()
- gitclone.sendline("cp -rf " + filefolder + "/tools onos/")
- gitclone.prompt()
- break
- elif index == 2:
- os.write(1, gitclone.before)
- sys.stdout.flush()
- else:
- self.loginfo.log('Download code failed!')
- self.loginfo.log('Information before' + gitclone.before)
- break
- gitclone.prompt()
-
- def InstallDefaultSoftware(self, handle):
- """
- Install default software
- parameters:
- handle(input): current working handle
- """
- self.logger.info("Now Cleaning test environment")
- handle.sendline("sudo apt-get install -y mininet")
- handle.prompt()
- handle.sendline("sudo pip install configobj")
- handle.prompt()
- handle.sendline("sudo apt-get install -y sshpass")
- handle.prompt()
- handle.sendline("OnosSystemTest/TestON/bin/cleanup.sh")
- handle.prompt()
- time.sleep(5)
- self.loginfo.log('Clean environment success!')
-
- def OnosPushKeys(self, handle, cmd, password):
- """
- Using onos-push-keys to make ssh device without password
- parameters:
- handle(input): working handle
- cmd(input): onos-push-keys xxx(xxx is device)
- password(input): login in password
- """
- self.logger.info("Now Pushing Onos Keys:" + cmd)
- Pushkeys = handle
- Pushkeys.sendline(cmd)
- Result = 0
- while Result != 2:
- Result = Pushkeys.expect(["(yes/no)", "assword:", "PEXPECT]#",
- pexpect.EOF, pexpect.TIMEOUT])
- if(Result == 0):
- Pushkeys.sendline("yes")
- if(Result == 1):
- Pushkeys.sendline(password)
- if(Result == 2):
- self.loginfo.log("ONOS Push keys Success!")
- break
- if(Result == 3):
- self.loginfo.log("ONOS Push keys Error!")
- break
- time.sleep(2)
- Pushkeys.prompt()
- self.logger.info("Done!")
-
- def SetOnosEnvVar(self, handle, masterpass, agentpass):
- """
- Setup onos pushkeys to all devices(3+2)
- parameters:
- handle(input): current working handle
- masterpass: scripts running server's password
- agentpass: onos cluster&compute node password
- """
- self.logger.info("Now Setting test environment")
- for host in self.hosts:
- self.logger.info("try to connect " + str(host))
- result = self.CheckSshNoPasswd(host)
- if not result:
- self.logger.info(
- "ssh login failed,try to copy master publickey" +
- "to agent " + str(host))
- self.CopyPublicKey(host)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OCT, masterpass)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OC1, agentpass)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OC2, agentpass)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OC3, agentpass)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OCN, agentpass)
- self.OnosPushKeys(handle, "onos-push-keys " + self.OCN2, agentpass)
-
- def CheckSshNoPasswd(self, host):
- """
- Check master can connect agent with no password
- """
- login = pexpect.spawn("ssh " + str(host))
- index = 4
- while index == 4:
- index = login.expect(['(yes/no)', '>|#|\$',
- pexpect.EOF, pexpect.TIMEOUT])
- if index == 0:
- login.sendline("yes")
- index = 4
- if index == 1:
- self.loginfo.log("ssh connect to " + str(host) +
- " success,no need to copy ssh public key")
- return True
- login.interact()
- return False
-
- def ChangeOnosName(self, user, password):
- """
- Change onos name in envDefault file
- Because some command depend on this
- parameters:
- user: onos&compute node user
- password: onos&compute node password
- """
- self.logger.info("Now Changing ONOS name&password")
- filepath = self.home + '/onos/tools/build/envDefaults'
- line = open(filepath, 'r').readlines()
- lenall = len(line) - 1
- for i in range(lenall):
- if "ONOS_USER=" in line[i]:
- line[i] = line[i].replace("sdn", user)
- if "ONOS_GROUP" in line[i]:
- line[i] = line[i].replace("sdn", user)
- if "ONOS_PWD" in line[i]:
- line[i] = line[i].replace("rocks", password)
- NewFile = open(filepath, 'w')
- NewFile.writelines(line)
- NewFile.close
- self.logger.info("Done!")
-
- def ChangeTestCasePara(self, testcase, user, password):
- """
- When running test script, there's something need \
- to change in every test folder's *.param & *.topo files
- user: onos&compute node user
- password: onos&compute node password
- """
- self.logger.info("Now Changing " + testcase + " name&password")
- if self.masterusername == 'root':
- filepath = '/root/'
- else:
- filepath = '/home/' + self.masterusername + '/'
- filepath = (filepath + "OnosSystemTest/TestON/tests/" +
- testcase + "/" + testcase + ".topo")
- line = open(filepath, 'r').readlines()
- lenall = len(line) - 1
- for i in range(lenall - 2):
- if("localhost" in line[i]) or ("OCT" in line[i]):
- line[i + 1] = re.sub(">\w+", ">" + user, line[i + 1])
- line[i + 2] = re.sub(">\w+", ">" + password, line[i + 2])
- if ("OC1" in line[i] or "OC2" in line[i] or "OC3" in line[i] or
- "OCN" in line[i] or "OCN2" in line[i]):
- line[i + 1] = re.sub(">\w+", ">root", line[i + 1])
- line[i + 2] = re.sub(">\w+", ">root", line[i + 2])
- NewFile = open(filepath, 'w')
- NewFile.writelines(line)
- NewFile.close
-
- def SSHlogin(self, ipaddr, username, password):
- """
- SSH login provide a connection to destination.
- parameters:
- ipaddr: ip address
- username: login user name
- password: login password
- return: handle
- """
- login = pxssh.pxssh()
- login.login(ipaddr, username, password, original_prompt='[$#>]')
- # send command ls -l
- login.sendline('ls -l')
- # match prompt
- login.prompt()
- self.logger.info("SSH login " + ipaddr + " success!")
- return login
-
- def SSHRelease(self, handle):
- # Release ssh
- handle.logout()
-
- def CopyOnostoTestbin(self):
- sourcefile = self.cipath + '/dependencies/onos'
- destifile = self.home + '/onos/tools/test/bin/'
- os.system('pwd')
- runcommand = 'cp ' + sourcefile + ' ' + destifile
- os.system(runcommand)
-
- def CopyPublicKey(self, host):
- output = os.popen('cat /root/.ssh/id_rsa.pub')
- publickey = output.read().strip('\n')
- tmphandle = self.SSHlogin(self.installer_master,
- self.installer_master_username,
- self.installer_master_password)
- tmphandle.sendline("ssh " + host + " -T \'echo " +
- str(publickey) + ">>/root/.ssh/authorized_keys\'")
- tmphandle.prompt()
- self.SSHRelease(tmphandle)
- self.logger.info("Add OCT PublicKey to " + host + " success")
-
- def OnosEnvSetup(self, handle):
- """
- Onos Environment Setup function
- """
- self.Gensshkey(handle)
- self.home = self.GetEnvValue(handle, 'HOME')
- self.AddKnownHost(handle, self.OC1, "karaf", "karaf")
- self.AddKnownHost(handle, self.OC2, "karaf", "karaf")
- self.AddKnownHost(handle, self.OC3, "karaf", "karaf")
- self.DownLoadCode(handle,
- 'https://github.com/wuwenbin2/OnosSystemTest.git')
- # self.DownLoadCode(handle, 'https://gerrit.onosproject.org/onos')
- if self.masterusername == 'root':
- filepath = '/root/'
- else:
- filepath = '/home/' + self.masterusername + '/'
- self.OnosRootPathChange(filepath)
- self.CopyOnostoTestbin()
- self.ChangeOnosName(self.agentusername, self.agentpassword)
- self.InstallDefaultSoftware(handle)
- self.SetOnosEnvVar(handle, self.masterpassword, self.agentpassword)
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/foundation.py b/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/foundation.py
deleted file mode 100644
index 6f7a40ff..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/adapters/foundation.py
+++ /dev/null
@@ -1,93 +0,0 @@
-"""
-Description:
- This file include basis functions
- lanqinglong@huawei.com
-
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-"""
-
-import datetime
-import logging
-import os
-import re
-import time
-
-import functest.utils.functest_constants as ft_constants
-import functest.utils.functest_utils as ft_utils
-
-
-class foundation:
-
- def __init__(self):
-
- # currentpath = os.getcwd()
- currentpath = \
- ft_constants.FUNCTEST_TEST_DIR + '/Controllers/ONOS/Teston/CI'
- self.cipath = currentpath
- self.logdir = os.path.join(currentpath, 'log')
- self.workhome = currentpath[0: currentpath.rfind('opnfv_tests') - 1]
- self.Result_DB = ''
- filename = time.strftime('%Y-%m-%d-%H-%M-%S') + '.log'
- self.logfilepath = os.path.join(self.logdir, filename)
- self.starttime = datetime.datetime.now()
-
- def log(self, loginfo):
- """
- Record log in log directory for deploying test environment
- parameters:
- loginfo(input): record info
- """
- logging.basicConfig(level=logging.INFO,
- format='%(asctime)s %(filename)s:%(message)s',
- datefmt='%d %b %Y %H:%M:%S',
- filename=self.logfilepath,
- filemode='w')
- filelog = logging.FileHandler(self.logfilepath)
- logging.getLogger('Functest').addHandler(filelog)
- logging.info(loginfo)
-
- def getdefaultpara(self):
- """
- Get Default Parameters value
- """
- self.Result_DB = str(ft_utils.get_db_url())
- self.masterusername = str(ft_constants.ONOSBENCH_USERNAME)
- self.masterpassword = str(ft_constants.ONOSBENCH_PASSWORD)
- self.agentusername = str(ft_constants.ONOSCLI_USERNAME)
- self.agentpassword = str(ft_constants.ONOSCLI_PASSWORD)
- self.runtimeout = ft_constants.ONOS_RUNTIMEOUT
- self.OCT = str(ft_constants.ONOS_OCT)
- self.OC1 = str(ft_constants.ONOS_OC1)
- self.OC2 = str(ft_constants.ONOS_OC2)
- self.OC3 = str(ft_constants.ONOS_OC3)
- self.OCN = str(ft_constants.ONOS_OCN)
- self.OCN2 = str(ft_constants.ONOS_OCN2)
- self.installer_master = str(ft_constants.ONOS_INSTALLER_MASTER)
- self.installer_master_username = \
- str(ft_constants.ONOS_INSTALLER_MASTER_USERNAME)
- self.installer_master_password = \
- ft_constants.ONOS_INSTALLER_MASTER_PASSWORD
- self.hosts = [self.OC1, self.OCN, self.OCN2]
- self.localhost = self.OCT
-
- def GetResult(self):
- cmd = "cat " + self.logfilepath + " | grep Fail"
- Resultbuffer = os.popen(cmd).read()
- duration = datetime.datetime.now() - self.starttime
- time.sleep(2)
-
- if re.search("[1-9]+", Resultbuffer):
- self.log("Testcase Fails\n" + Resultbuffer)
- Result = "POK"
- else:
- self.log("Testcases Pass")
- Result = "OK"
- payload = {'timestart': str(self.starttime),
- 'duration': str(duration), 'status': Result}
-
- return payload
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/dependencies/onos b/functest/opnfv_tests/Controllers/ONOS/Teston/dependencies/onos
deleted file mode 100644
index bb02fa89..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/dependencies/onos
+++ /dev/null
@@ -1,29 +0,0 @@
-#!/bin/bash
-# -----------------------------------------------------------------------------
-# ONOS remote command-line client.
-# -----------------------------------------------------------------------------
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-
-[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
-. /root/.bashrc
-. $ONOS_ROOT/tools/build/envDefaults
-. $ONOS_ROOT/tools/test/bin/find-node.sh
-
-[ "$1" = "-w" ] && shift && onos-wait-for-start $1
-
-[ -n "$1" ] && OCI=$(find_node $1) && shift
-
-if which client 1>/dev/null 2>&1 && [ -z "$ONOS_USE_SSH" ]; then
- # Use Karaf client only if we can and are allowed to
- unset KARAF_HOME
- client -h $OCI -u karaf "$@" 2>/dev/null
-else
- # Otherwise use raw ssh; strict checking is off for dev environments only
- #ssh -p 8101 -o StrictHostKeyChecking=no $OCI "$@"
- sshpass -p karaf ssh -l karaf -p 8101 $OCI "$@"
-fi
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/log/gitignore b/functest/opnfv_tests/Controllers/ONOS/Teston/log/gitignore
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/log/gitignore
+++ /dev/null
diff --git a/functest/opnfv_tests/Controllers/ONOS/Teston/onosfunctest.py b/functest/opnfv_tests/Controllers/ONOS/Teston/onosfunctest.py
deleted file mode 100755
index 300f56d1..00000000
--- a/functest/opnfv_tests/Controllers/ONOS/Teston/onosfunctest.py
+++ /dev/null
@@ -1,261 +0,0 @@
-"""
-Description: This test is to run onos Teston VTN scripts
-
-List of test cases:
-CASE1 - Northbound NBI test network/subnet/ports
-CASE2 - Ovsdb test&Default configuration&Vm go online
-
-lanqinglong@huawei.com
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-"""
-
-import datetime
-import os
-import re
-import time
-
-import argparse
-from neutronclient.v2_0 import client as neutronclient
-
-import functest.utils.functest_logger as ft_logger
-import functest.utils.functest_utils as ft_utils
-import functest.utils.openstack_utils as openstack_utils
-import functest.utils.functest_constants as ft_constants
-
-
-parser = argparse.ArgumentParser()
-parser.add_argument("-t", "--testcase", help="Testcase name")
-args = parser.parse_args()
-
-
-""" logging configuration """
-logger = ft_logger.Logger("onos").getLogger()
-
-# onos parameters
-ONOSCI_PATH = ft_constants.REPOS_DIR + "/"
-starttime = datetime.datetime.now()
-
-INSTALLER_TYPE = ft_constants.CI_INSTALLER_TYPE
-ONOS_SFC_IMAGE_NAME = ft_constants.ONOS_SFC_IMAGE_NAME
-ONOS_SFC_IMAGE_PATH = os.path.join(ft_constants.FUNCTEST_DATA_DIR,
- ft_constants.ONOS_SFC_IMAGE_FILENAME)
-ONOS_SFC_PATH = os.path.join(ft_constants.FUNCTEST_REPO_DIR,
- ft_constants.ONOS_SFC_RELATIVE_PATH)
-
-
-def RunScript(testname):
- """
- Run ONOS Test Script
- Parameters:
- testname: ONOS Testcase Name
- """
- runtest = ONOSCI_PATH + "onos/TestON/bin/cli.py run " + testname
- logger.debug("Run script " + testname)
- os.system(runtest)
-
-
-def DownloadCodes(url="https://github.com/wuwenbin2/OnosSystemTest.git"):
- """
- Download Onos Teston codes
- Parameters:
- url: github url
- """
- downloadcode = "git clone " + url + " " + ONOSCI_PATH + "OnosSystemTest"
- logger.debug("Download Onos Teston codes " + url)
- os.system(downloadcode)
-
-
-def GetResult():
- LOGPATH = ONOSCI_PATH + "onos/TestON/logs"
- cmd = "grep -rnh " + "Fail" + " " + LOGPATH
- Resultbuffer = os.popen(cmd).read()
- # duration = datetime.datetime.now() - starttime
- time.sleep(2)
-
- if re.search("\s+[1-9]+\s+", Resultbuffer):
- logger.debug("Testcase Fails\n" + Resultbuffer)
- # Result = "Failed"
- else:
- logger.debug("Testcases Success")
- # Result = "Success"
- # payload={'timestart': str(starttime),
- # 'duration': str(duration),
- # 'status': Result}
- cmd = "grep -rnh 'Execution Time' " + LOGPATH
- Resultbuffer = os.popen(cmd).read()
- time1 = Resultbuffer[114:128]
- time2 = Resultbuffer[28:42]
- cmd = "grep -rnh 'Success Percentage' " + LOGPATH + "/FUNCvirNetNB_*"
- Resultbuffer = os.popen(cmd).read()
- if Resultbuffer.find('100%') >= 0:
- result1 = 'Success'
- else:
- result1 = 'Failed'
- cmd = "grep -rnh 'Success Percentage' " + LOGPATH + "/FUNCvirNetNBL3*"
- Resultbuffer = os.popen(cmd).read()
- if Resultbuffer.find('100%') >= 0:
- result2 = 'Success'
- else:
- result2 = 'Failed'
- status1 = []
- status2 = []
- cmd = "grep -rnh 'h3' " + LOGPATH + "/FUNCvirNetNB_*"
- Resultbuffer = os.popen(cmd).read()
- pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
- # res = pattern.search(Resultbuffer).groups()
- res = pattern.findall(Resultbuffer)
- i = 0
- for index in range(len(res)):
- status1.append({'Case name:': res[i][0] + res[i][1],
- 'Case result': res[i][2]})
- i = i + 1
- cmd = "grep -rnh 'h3' " + LOGPATH + "/FUNCvirNetNBL3*"
- Resultbuffer = os.popen(cmd).read()
- pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
- # res = pattern.search(Resultbuffer).groups()
- res = pattern.findall(Resultbuffer)
- i = 0
- for index in range(len(res)):
- status2.append({'Case name:': res[i][0] + res[i][1],
- 'Case result': res[i][2]})
- i = i + 1
- payload = {'timestart': str(starttime),
- 'FUNCvirNet': {'duration': time1,
- 'result': result1,
- 'status': status1},
- 'FUNCvirNetL3': {'duration': time2,
- 'result': result2,
- 'status': status2}}
- return payload
-
-
-def SetOnosIp():
- cmd = "openstack catalog show network | grep publicURL"
- cmd_output = os.popen(cmd).read()
- OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", cmd_output).group()
- os.environ['OC1'] = OC1
- time.sleep(2)
- logger.debug("ONOS IP is " + OC1)
-
-
-def SetOnosIpForJoid():
- cmd = "env | grep SDN_CONTROLLER"
- cmd_output = os.popen(cmd).read()
- OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", cmd_output).group()
- os.environ['OC1'] = OC1
- time.sleep(2)
- logger.debug("ONOS IP is " + OC1)
-
-
-def CleanOnosTest():
- TESTONPATH = ONOSCI_PATH + "onos/"
- cmd = "rm -rf " + TESTONPATH
- os.system(cmd)
- time.sleep(2)
- logger.debug("Clean ONOS Teston")
-
-
-def CreateImage():
- glance_client = openstack_utils.get_glance_client()
- image_id = openstack_utils.create_glance_image(glance_client,
- ONOS_SFC_IMAGE_NAME,
- ONOS_SFC_IMAGE_PATH)
- EXIT_CODE = -1
- if not image_id:
- logger.error("Failed to create a Glance image...")
- return(EXIT_CODE)
- logger.debug("Image '%s' with ID=%s created successfully."
- % (ONOS_SFC_IMAGE_NAME, image_id))
-
-
-def SfcTest():
- cmd = "python " + ONOS_SFC_PATH + "/Sfc.py"
- logger.debug("Run sfc tests")
- os.system(cmd)
-
-
-def GetIp(type):
- cmd = "openstack catalog show " + type + " | grep publicURL"
- cmd_output = os.popen(cmd).read()
- ip = re.search(r"\d+\.\d+\.\d+\.\d+", cmd_output).group()
- return ip
-
-
-def Replace(before, after):
- file = "/Sfc_fun.py"
- cmd = "sed -i 's/" + before + "/" + after + "/g' " + ONOS_SFC_PATH + file
- os.system(cmd)
-
-
-def SetSfcConf():
- Replace("keystone_ip", GetIp("keystone"))
- Replace("neutron_ip", GetIp("neutron"))
- Replace("nova_ip", GetIp("nova"))
- Replace("glance_ip", GetIp("glance"))
- pwd = ft_constants.OS_PASSWORD
- Replace("console", pwd)
- creds_neutron = openstack_utils.get_credentials("neutron")
- neutron_client = neutronclient.Client(**creds_neutron)
- ext_net = openstack_utils.get_external_net(neutron_client)
- Replace("admin_floating_net", ext_net)
- logger.info("Modify configuration for SFC")
-
-
-def OnosTest():
- start_time = time.time()
- stop_time = start_time
- if INSTALLER_TYPE == "joid":
- logger.debug("Installer is Joid")
- SetOnosIpForJoid()
- else:
- SetOnosIp()
- RunScript("FUNCvirNetNB")
- RunScript("FUNCvirNetNBL3")
- try:
- logger.debug("Push ONOS results into DB")
- # TODO check path result for the file
- result = GetResult()
- stop_time = time.time()
-
- # ONOS success criteria = all tests OK
- # i.e. FUNCvirNet & FUNCvirNetL3
- status = "FAIL"
- try:
- if (result['FUNCvirNet']['result'] == "Success" and
- result['FUNCvirNetL3']['result'] == "Success"):
- status = "PASS"
- except:
- logger.error("Unable to set ONOS criteria")
-
- ft_utils.push_results_to_db("functest",
- "onos",
- start_time,
- stop_time,
- status,
- result)
-
- except:
- logger.error("Error pushing results into Database")
-
- if status == "FAIL":
- EXIT_CODE = -1
- exit(EXIT_CODE)
-
-
-def main():
-
- if args.testcase == "sfc":
- CreateImage()
- SetSfcConf()
- SfcTest()
- else:
- OnosTest()
-
-
-if __name__ == '__main__':
- main()
diff --git a/functest/opnfv_tests/Controllers/__init__.py b/functest/opnfv_tests/Controllers/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/functest/opnfv_tests/Controllers/__init__.py
+++ /dev/null