summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjose.lausuch <jose.lausuch@ericsson.com>2015-05-29 17:09:22 +0200
committerjose.lausuch <jose.lausuch@ericsson.com>2015-05-30 19:56:34 +0200
commit73e6984c977287b263f7cbd81bdee0bf7cd8abb5 (patch)
tree061e4b3715685dc7545a808c7ca69989eb55ca26
parent486dfd57300d74d0a9f058951112393bf6bc210a (diff)
Improvements
- Added: functest_utils.py for common util functions - Moved: a lot of functions from config_functest.py to functest_utils.py - Moved: create of functest-net from config_functest.py to vPing.py - Improvements and code cleaned JIRA: FUNCTEST-10 Change-Id: I0d3381576cbb24a999ea69e107baafea896a36e7 Signed-off-by: jose.lausuch <jose.lausuch@ericsson.com>
-rw-r--r--testcases/__init__.py0
-rw-r--r--testcases/config_functest.py261
-rw-r--r--testcases/functest_utils.py231
-rw-r--r--testcases/vPing/CI/libraries/vPing.py228
4 files changed, 397 insertions, 323 deletions
diff --git a/testcases/__init__.py b/testcases/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testcases/__init__.py
diff --git a/testcases/config_functest.py b/testcases/config_functest.py
index 5a3fcbb3f..b862ba72d 100644
--- a/testcases/config_functest.py
+++ b/testcases/config_functest.py
@@ -9,10 +9,9 @@
#
import re, json, os, urllib2, argparse, logging, shutil, subprocess, yaml, sys
+import functest_utils
from git import Repo
-from neutronclient.v2_0 import client
-
actions = ['start', 'check', 'clean']
parser = argparse.ArgumentParser()
parser.add_argument("repo_path", help="Path to the repository")
@@ -57,11 +56,6 @@ RALLY_INSTALLATION_DIR = HOME + functest_yaml.get("general").get("directories").
VPING_DIR = REPO_PATH + functest_yaml.get("general").get("directories").get("dir_vping")
ODL_DIR = REPO_PATH + functest_yaml.get("general").get("directories").get("dir_odl")
-# NEUTRON Private Network parameters
-NEUTRON_PRIVATE_NET_NAME = functest_yaml.get("general").get("openstack").get("neutron_private_net_name")
-NEUTRON_PRIVATE_SUBNET_NAME = functest_yaml.get("general").get("openstack").get("neutron_private_subnet_name")
-NEUTRON_PRIVATE_SUBNET_CIDR = functest_yaml.get("general").get("openstack").get("neutron_private_subnet_cidr")
-ROUTER_NAME = functest_yaml.get("general").get("openstack").get("neutron_router_name")
#GLANCE image parameters
IMAGE_URL = functest_yaml.get("general").get("openstack").get("image_url")
@@ -72,14 +66,11 @@ IMAGE_DIR = HOME + functest_yaml.get("general").get("openstack").get("image_down
IMAGE_PATH = IMAGE_DIR + IMAGE_FILE_NAME
-credentials = None
-neutron_client = None
-
def action_start():
"""
Start the functest environment installation
"""
- if not check_internet_connectivity():
+ if not functest_utils.check_internet_connectivity():
logger.error("There is no Internet connectivity. Please check the network configuration.")
exit(-1)
@@ -105,25 +96,10 @@ def action_start():
action_clean()
exit(-1)
- credentials = get_credentials()
- neutron_client = client.Client(**credentials)
-
- logger.info("Configuring Neutron...")
- logger.info("Checking if private network '%s' exists..." % NEUTRON_PRIVATE_NET_NAME)
- #Now: if exists we don't create it again (the clean command does not clean the neutron networks yet)
- if check_neutron_net(neutron_client, NEUTRON_PRIVATE_NET_NAME):
- logger.info("Private network '%s' found. No need to create another one." % NEUTRON_PRIVATE_NET_NAME)
- else:
- logger.info("Private network '%s' not found. Creating..." % NEUTRON_PRIVATE_NET_NAME)
- if not create_private_neutron_net(neutron_client):
- logger.error("There has been a problem while creating the Neutron network.")
- #action_clean()
- exit(-1)
-
- logger.info("Donwloading image...")
- if not download_url(IMAGE_URL, IMAGE_DIR):
- logger.error("There has been a problem while downloading the image.")
+ logger.info("Downloading image...")
+ if not functest_utils.download_url(IMAGE_URL, IMAGE_DIR):
+ logger.error("There has been a problem downloading the image '%s'." %IMAGE_URL)
action_clean()
exit(-1)
@@ -143,10 +119,8 @@ def action_check():
errors_all = False
logger.info("Checking current functest configuration...")
- credentials = get_credentials()
- neutron_client = client.Client(**credentials)
- logger.debug("Checking directories...")
+ logger.debug("Checking script directories...")
errors = False
dirs = [RALLY_DIR, RALLY_INSTALLATION_DIR, VPING_DIR, ODL_DIR]
for dir in dirs:
@@ -164,23 +138,12 @@ def action_check():
logger.debug("Checking Rally deployment...")
if not check_rally():
- logger.debug(" Rally deployment NOT found.")
+ logger.debug(" Rally deployment NOT installed.")
errors_all = True
logger.debug("...FAIL")
else:
logger.debug("...OK")
-
- logger.debug("Checking Neutron...")
- if not check_neutron_net(neutron_client, NEUTRON_PRIVATE_NET_NAME):
- logger.debug(" Private network '%s' NOT found." % NEUTRON_PRIVATE_NET_NAME)
- logger.debug("...FAIL")
- errors_all = True
- else:
- logger.debug(" Private network '%s' found." % NEUTRON_PRIVATE_NET_NAME)
- logger.debug("...OK")
-
-
logger.debug("Checking Image...")
errors = False
if not os.path.isfile(IMAGE_PATH):
@@ -229,13 +192,10 @@ def action_clean():
if os.path.exists(RALLY_REPO_DIR):
logger.debug("Removing Rally repository %s" % RALLY_REPO_DIR)
cmd = "sudo rm -rf " + RALLY_REPO_DIR #need to be sudo, not possible with rmtree
- execute_command(cmd)
- #logger.debug("Deleting Neutron network %s" % NEUTRON_PRIVATE_NET_NAME)
- #if not delete_neutron_net() :
- # logger.error("Error deleting the network. Remove it manually.")
+ functest_utils.execute_command(cmd,logger)
- logger.debug("Deleting glance images")
if os.path.exists(IMAGE_PATH):
+ logger.debug("Deleting image")
os.remove(IMAGE_PATH)
cmd = "glance image-list | grep "+IMAGE_NAME+" | cut -c3-38"
@@ -244,9 +204,9 @@ def action_clean():
#while image_id = p.readline()
for image_id in p.readlines():
cmd = "glance image-delete " + image_id
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
- return True
+ logger.info("Functest environment clean!")
@@ -263,26 +223,26 @@ def install_rally():
logger.debug("Executing %s./install_rally.sh..." %RALLY_REPO_DIR)
install_script = RALLY_REPO_DIR + "install_rally.sh"
cmd = 'sudo ' + install_script
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
#subprocess.call(['sudo', install_script])
logger.debug("Creating Rally environment...")
cmd = "rally deployment create --fromenv --name=opnfv-arno-rally"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
logger.debug("Installing tempest...")
cmd = "rally-manage tempest install"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
cmd = "rally deployment check"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
#TODO: check that everything is 'Available' and warn if not
cmd = "rally show images"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
cmd = "rally show flavors"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
return True
@@ -301,208 +261,35 @@ def check_rally():
#if the command does not exist or there is no deployment
line = p.stdout.readline()
if line == "":
- logger.debug(" Rally deployment not found")
+ logger.debug(" Rally deployment NOT found")
return False
logger.debug(" Rally deployment found")
return True
else:
- logger.debug(" Rally installation directory not found")
return False
def install_odl():
cmd = "chmod +x " + ODL_DIR + "start_tests.sh"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
cmd = "chmod +x " + ODL_DIR + "create_venv.sh"
- execute_command(cmd)
+ functest_utils.execute_command(cmd,logger)
cmd = ODL_DIR + "create_venv.sh"
- execute_command(cmd)
- return True
-
-
-def check_credentials():
- """
- Check if the OpenStack credentials (openrc) are sourced
- """
- #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
- try:
- os.environ['OS_AUTH_URL']
- except KeyError:
- return False
- try:
- os.environ['OS_USERNAME']
- except KeyError:
- return False
- try:
- os.environ['OS_PASSWORD']
- except KeyError:
- return False
- try:
- os.environ['OS_TENANT_NAME']
- except KeyError:
- return False
- try:
- os.environ['OS_REGION_NAME']
- except KeyError:
- return False
+ functest_utils.execute_command(cmd,logger)
return True
-def get_credentials():
- d = {}
- d['username'] = os.environ['OS_USERNAME']
- d['password'] = os.environ['OS_PASSWORD']
- d['auth_url'] = os.environ['OS_AUTH_URL']
- d['tenant_name'] = os.environ['OS_TENANT_NAME']
- return d
-
-def get_nova_credentials():
- d = {}
- d['username'] = os.environ['OS_USERNAME']
- d['api_key'] = os.environ['OS_PASSWORD']
- d['auth_url'] = os.environ['OS_AUTH_URL']
- d['project_id'] = os.environ['OS_TENANT_NAME']
- return d
-
-
-
-def create_private_neutron_net(neutron):
- try:
- neutron.format = 'json'
- logger.debug('Creating Neutron network %s...' % NEUTRON_PRIVATE_NET_NAME)
- json_body = {'network': {'name': NEUTRON_PRIVATE_NET_NAME,
- 'admin_state_up': True}}
- netw = neutron.create_network(body=json_body)
- net_dict = netw['network']
- network_id = net_dict['id']
- logger.debug("Network '%s' created successfully" % network_id)
-
- logger.debug('Creating Subnet....')
- json_body = {'subnets': [{'name': NEUTRON_PRIVATE_SUBNET_NAME, 'cidr': NEUTRON_PRIVATE_SUBNET_CIDR,
- 'ip_version': 4, 'network_id': network_id}]}
-
- subnet = neutron.create_subnet(body=json_body)
- subnet_id = subnet['subnets'][0]['id']
- logger.debug("Subnet '%s' created successfully" % subnet_id)
-
-
- logger.debug('Creating Router...')
- json_body = {'router': {'name': ROUTER_NAME, 'admin_state_up': True}}
- router = neutron.create_router(json_body)
- router_id = router['router']['id']
- logger.debug("Router '%s' created successfully" % router_id)
-
- logger.debug('Adding router to subnet...')
- json_body = {"subnet_id": subnet_id}
- neutron.add_interface_router(router=router_id, body=json_body)
- logger.debug("Interface added successfully.")
-
- except:
- print "Error:", sys.exc_info()[0]
- return False
-
- logger.info("Private Neutron network created successfully.")
- return True
-
-def get_network_id(neutron, network_name):
- networks = neutron.list_networks()['networks']
- id = ''
- for n in networks:
- if n['name'] == network_name:
- id = n['id']
- break
- return id
-
-def check_neutron_net(neutron, net_name):
- for network in neutron.list_networks()['networks']:
- if network['name'] == net_name :
- for subnet in network['subnets']:
- return True
- return False
-
-def delete_neutron_net(neutron):
- #TODO: remove router, ports
- try:
- #https://github.com/isginf/openstack_tools/blob/master/openstack_remove_tenant.py
- for network in neutron.list_networks()['networks']:
- if network['name'] == NEUTRON_PRIVATE_NET_NAME :
- for subnet in network['subnets']:
- print "Deleting subnet " + subnet
- neutron.delete_subnet(subnet)
- print "Deleting network " + network['name']
- neutron.delete_neutron_net(network['id'])
- finally:
- return True
- return False
-
-
-
-
def create_glance_image(path,name,disk_format):
"""
Create a glance image given the absolute path of the image, its name and the disk format
"""
cmd = "glance image-create --name "+name+" --is-public true --disk-format "+disk_format+" --container-format bare --file "+path
- execute_command(cmd)
- return True
-
-
-
-
-
-def download_url(url, dest_path):
- """
- Download a file to a destination path given a URL
- """
- name = url.rsplit('/')[-1]
- dest = dest_path + name
- try:
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
- logger.error("Error in fetching %s" %url)
- return False
-
- with open(dest, 'wb') as f:
- f.write(response.read())
+ functest_utils.execute_command(cmd,logger)
return True
-def check_internet_connectivity(url='http://www.google.com/'):
- """
- Check if there is access to the internet
- """
- try:
- urllib2.urlopen(url, timeout=5)
- return True
- except urllib.request.URLError:
- return False
-
-def execute_command(cmd):
- """
- Execute Linux command
- """
- logger.debug('Executing command : {}'.format(cmd))
- #p = os.popen(cmd,"r")
- #logger.debug(p.read())
- output_file = "output.txt"
- f = open(output_file, 'w+')
- p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
- f.close()
- f = open(output_file, 'r')
- result = f.read()
- if result != "":
- logger.debug(result)
- #p = subprocess.call(cmd,shell=True);
- if p == 0 :
- return True
- else:
- logger.error("Error when executing command %s" %cmd)
- exit(-1)
-
-
-
def main():
if not (args.action in actions):
@@ -510,7 +297,7 @@ def main():
exit(-1)
- if not check_credentials():
+ if not functest_utils.check_credentials():
logger.error("Please source the openrc credentials and run the script again.")
#TODO: source the credentials in this script
exit(-1)
@@ -522,7 +309,7 @@ def main():
if action_check():
logger.info("Functest environment correctly installed")
else:
- logger.info("Functest environment faulty")
+ logger.info("Functest environment not found or faulty")
if args.action == "clean":
if args.force :
diff --git a/testcases/functest_utils.py b/testcases/functest_utils.py
new file mode 100644
index 000000000..7b61198d1
--- /dev/null
+++ b/testcases/functest_utils.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+#
+# jose.lausuch@ericsson.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+import re, json, os, urllib2, shutil, subprocess, sys
+
+
+def check_credentials():
+ """
+ Check if the OpenStack credentials (openrc) are sourced
+ """
+ #TODO: there must be a short way to do this, doing if os.environ["something"] == "" throws an error
+ try:
+ os.environ['OS_AUTH_URL']
+ except KeyError:
+ return False
+ try:
+ os.environ['OS_USERNAME']
+ except KeyError:
+ return False
+ try:
+ os.environ['OS_PASSWORD']
+ except KeyError:
+ return False
+ try:
+ os.environ['OS_TENANT_NAME']
+ except KeyError:
+ return False
+ try:
+ os.environ['OS_REGION_NAME']
+ except KeyError:
+ return False
+ return True
+
+
+def get_credentials(service):
+ """Returns a creds dictionary filled with the following keys:
+ * username
+ * password/api_key (depending on the service)
+ * tenant_name/project_id (depending on the service)
+ * auth_url
+ :param service: a string indicating the name of the service
+ requesting the credentials.
+ """
+ #TODO: get credentials from the openrc file
+ creds = {}
+ # Unfortunately, each of the OpenStack client will request slightly
+ # different entries in their credentials dict.
+ if service.lower() in ("nova", "cinder"):
+ password = "api_key"
+ tenant = "project_id"
+ else:
+ password = "password"
+ tenant = "tenant_name"
+
+ # The most common way to pass these info to the script is to do it through
+ # environment variables.
+ creds.update({
+ "username": os.environ.get('OS_USERNAME', "admin"), # add your cloud username details
+ password: os.environ.get("OS_PASSWORD", 'admin'), # add password
+ "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"), # Auth URL
+ tenant: os.environ.get("OS_TENANT_NAME", "admin"),
+ })
+
+ return creds
+
+
+
+def get_instance_status(nova_client,instance):
+ try:
+ instance = nova_client.servers.get(instance.id)
+ return instance.status
+ except:
+ return None
+
+
+def get_instance_by_name(nova_client, instance_name):
+ try:
+ instance = nova_client.servers.find(name=instance_name)
+ return instance
+ except:
+ return None
+
+
+
+def create_neutron_net(neutron_client, name):
+ json_body = {'network': {'name': name,
+ 'admin_state_up': True}}
+ try:
+ network = neutron_client.create_network(body=json_body)
+ network_dict = network['network']
+ return network_dict['id']
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def delete_neutron_net(neutron_client, network_id):
+ try:
+ neutron_client.delete_network(network_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def create_neutron_subnet(neutron_client, name, cidr, net_id):
+ json_body = {'subnets': [{'name': name, 'cidr': cidr,
+ 'ip_version': 4, 'network_id': net_id}]}
+ try:
+ subnet = neutron_client.create_subnet(body=json_body)
+ return subnet['subnets'][0]['id']
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def delete_neutron_subnet(neutron_client, subnet_id):
+ try:
+ neutron_client.delete_subnet(subnet_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def create_neutron_router(neutron_client, name):
+ json_body = {'router': {'name': name, 'admin_state_up': True}}
+ try:
+ router = neutron_client.create_router(json_body)
+ return router['router']['id']
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+def delete_neutron_router(neutron_client, router_id):
+ json_body = {'router': {'id': router_id}}
+ try:
+ neutron_client.delete_router(router=router_id)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
+def add_interface_router(neutron_client, router_id, subnet_id):
+ json_body = {"subnet_id": subnet_id}
+ try:
+ neutron_client.add_interface_router(router=router_id, body=json_body)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
+def remove_interface_router(neutron_client, router_id, subnet_id):
+ json_body = {"subnet_id": subnet_id}
+ try:
+ neutron_client.remove_interface_router(router=router_id, body=json_body)
+ return True
+ except:
+ print "Error:", sys.exc_info()[0]
+ return False
+
+
+def get_network_id(neutron_client, network_name):
+ networks = neutron_client.list_networks()['networks']
+ id = ''
+ for n in networks:
+ if n['name'] == network_name:
+ id = n['id']
+ break
+ return id
+
+def check_neutron_net(neutron_client, net_name):
+ for network in neutron_client.list_networks()['networks']:
+ if network['name'] == net_name :
+ for subnet in network['subnets']:
+ return True
+ return False
+
+
+
+def check_internet_connectivity(url='http://www.google.com/'):
+ """
+ Check if there is access to the internet
+ """
+ try:
+ urllib2.urlopen(url, timeout=5)
+ return True
+ except urllib.request.URLError:
+ return False
+
+
+def download_url(url, dest_path):
+ """
+ Download a file to a destination path given a URL
+ """
+ name = url.rsplit('/')[-1]
+ dest = dest_path + name
+ try:
+ response = urllib2.urlopen(url)
+ except (urllib2.HTTPError, urllib2.URLError):
+ return False
+
+ with open(dest, 'wb') as f:
+ f.write(response.read())
+ return True
+
+
+def execute_command(cmd, logger=None):
+ """
+ Execute Linux command
+ """
+ if logger:
+ logger.debug('Executing command : {}'.format(cmd))
+ output_file = "output.txt"
+ f = open(output_file, 'w+')
+ p = subprocess.call(cmd,shell=True, stdout=f, stderr=subprocess.STDOUT)
+ f.close()
+ f = open(output_file, 'r')
+ result = f.read()
+ if result != "" and logger:
+ logger.debug(result)
+ if p == 0 :
+ return True
+ else:
+ if loger:
+ logger.error("Error when executing command %s" %cmd)
+ exit(-1)
diff --git a/testcases/vPing/CI/libraries/vPing.py b/testcases/vPing/CI/libraries/vPing.py
index e14155357..379bf9698 100644
--- a/testcases/vPing/CI/libraries/vPing.py
+++ b/testcases/vPing/CI/libraries/vPing.py
@@ -13,9 +13,10 @@
# Note: this is script works only with Ubuntu image, not with Cirros image
#
-import os, time, subprocess, logging, argparse, yaml
-import pprint
+import os, time, subprocess, logging, argparse, yaml, pprint, sys
import novaclient.v2.client as novaclient
+from neutronclient.v2_0 import client as neutronclient
+
pp = pprint.PrettyPrinter(indent=4)
@@ -24,6 +25,9 @@ parser.add_argument("repo_path", help="Path to the repository")
parser.add_argument("-d", "--debug", help="Debug mode", action="store_true")
args = parser.parse_args()
+sys.path.append(args.repo_path + "testcases/")
+import functest_utils
+
""" logging configuration """
logger = logging.getLogger('vPing')
logger.setLevel(logging.DEBUG)
@@ -43,66 +47,33 @@ with open(args.repo_path+"testcases/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
f.close()
+# vPing parameters
VM_BOOT_TIMEOUT = 180
+VM_DELETE_TIMEOUT = 100
PING_TIMEOUT = functest_yaml.get("vping").get("ping_timeout")
NAME_VM_1 = functest_yaml.get("vping").get("vm_name_1")
NAME_VM_2 = functest_yaml.get("vping").get("vm_name_2")
GLANCE_IMAGE_NAME = functest_yaml.get("general").get("openstack").get("image_name")
-NEUTRON_PRIVATE_NET_NAME = functest_yaml.get("general").get("openstack").get("neutron_private_net_name")
FLAVOR = functest_yaml.get("vping").get("vm_flavor")
+# NEUTRON Private Network parameters
+NEUTRON_PRIVATE_NET_NAME = functest_yaml.get("general").get("openstack").get("neutron_private_net_name")
+NEUTRON_PRIVATE_SUBNET_NAME = functest_yaml.get("general").get("openstack").get("neutron_private_subnet_name")
+NEUTRON_PRIVATE_SUBNET_CIDR = functest_yaml.get("general").get("openstack").get("neutron_private_subnet_cidr")
+NEUTRON_ROUTER_NAME = functest_yaml.get("general").get("openstack").get("neutron_router_name")
+
def pMsg(value):
"""pretty printing"""
pp.pprint(value)
-def print_title(title):
- """Print titles"""
- print "\n"+"#"*40+"\n# "+title+"\n"+"#"*40+"\n"
-
-def get_credentials(service):
- """Returns a creds dictionary filled with the following keys:
- * username
- * password/api_key (depending on the service)
- * tenant_name/project_id (depending on the service)
- * auth_url
- :param service: a string indicating the name of the service
- requesting the credentials.
- """
- #TODO: get credentials from the openrc file
- creds = {}
- # Unfortunately, each of the OpenStack client will request slightly
- # different entries in their credentials dict.
- if service.lower() in ("nova", "cinder"):
- password = "api_key"
- tenant = "project_id"
- else:
- password = "password"
- tenant = "tenant_name"
-
- # The most common way to pass these info to the script is to do it through
- # environment variables.
- creds.update({
- "username": os.environ.get('OS_USERNAME', "admin"), # add your cloud username details
- password: os.environ.get("OS_PASSWORD", 'admin'), # add password
- "auth_url": os.environ.get("OS_AUTH_URL","http://192.168.20.71:5000/v2.0"), # Auth URL
- tenant: os.environ.get("OS_TENANT_NAME", "admin"),
- })
-
- return creds
-
-
-def get_server(creds, servername):
- nova = novaclient.Client(**creds)
- return nova.servers.find(name=servername)
-
def waitVmActive(nova,vm):
# sleep and wait for VM status change
sleep_time = 3
count = VM_BOOT_TIMEOUT / sleep_time
while True:
- status = get_status(nova,vm)
+ status = functest_utils.get_instance_status(nova,vm)
logger.debug("Status: %s" % status)
if status == "ACTIVE":
return True
@@ -112,14 +83,106 @@ def waitVmActive(nova,vm):
time.sleep(sleep_time)
return False
-def get_status(nova,vm):
- vm = nova.servers.get(vm.id)
- return vm.status
+def waitVmDeleted(nova,vm):
+ # sleep and wait for VM status change
+ sleep_time = 3
+ count = VM_DELETE_TIMEOUT / sleep_time
+ while True:
+ status = functest_utils.get_instance_status(nova,vm)
+ if not status:
+ return True
+ elif count == 0:
+ logger.debug("Timeout")
+ return False
+ else:
+ #return False
+ count-=1
+ time.sleep(sleep_time)
+ return False
+
+
+def create_private_neutron_net(neutron):
+ neutron.format = 'json'
+ logger.info('Creating neutron network %s...' % NEUTRON_PRIVATE_NET_NAME)
+ network_id = functest_utils.create_neutron_net(neutron, NEUTRON_PRIVATE_NET_NAME)
+ if not network_id:
+ return False
+ logger.debug("Network '%s' created successfully" % network_id)
+
+ logger.debug('Creating Subnet....')
+ subnet_id = functest_utils.create_neutron_subnet(neutron, NEUTRON_PRIVATE_SUBNET_NAME, NEUTRON_PRIVATE_SUBNET_CIDR, network_id)
+ if not subnet_id:
+ return False
+ logger.debug("Subnet '%s' created successfully" % subnet_id)
+
+ logger.debug('Creating Router...')
+ router_id = functest_utils.create_neutron_router(neutron, NEUTRON_ROUTER_NAME)
+ if not router_id:
+ return False
+ logger.debug("Router '%s' created successfully" % router_id)
+
+ logger.debug('Adding router to subnet...')
+ result = functest_utils.add_interface_router(neutron, router_id, subnet_id)
+ if not result:
+ return False
+ logger.debug("Interface added successfully.")
+
+ network_dic = {'net_id' : network_id,
+ 'subnet_id' : subnet_id,
+ 'router_id' : router_id}
+ return network_dic
+
+
+def cleanup(nova,neutron,network_dic):
+ # delete both VMs
+ logger.info("Deleting Instances...")
+ logger.debug("Deleting '%s'..." %NAME_VM_1)
+ vm1 = nova.servers.find(name=NAME_VM_1)
+ nova.servers.delete(vm1)
+ #wait until VMs are deleted
+ if not waitVmDeleted(nova,vm1):
+ logger.error("Instance '%s' with cannot be deleted. Status is '%s'" % (NAME_VM_1,functest_utils.get_instance_status(nova_client,vm1)))
+ else:
+ logger.debug("Instance %s terminated." % NAME_VM_1)
+
+ logger.debug("Deleting '%s'..." %NAME_VM_2)
+ vm2 = nova.servers.find(name=NAME_VM_2)
+ nova.servers.delete(vm2)
+ if not waitVmDeleted(nova,vm2):
+ logger.error("Instance '%s' with cannot be deleted. Status is '%s'" % (NAME_VM_2,functest_utils.get_instance_status(nova_client,vm2)))
+ else:
+ logger.debug("Instance %s terminated." % NAME_VM_2)
+
+ # delete created network
+ logger.info("Deleting network '%s'..." % NEUTRON_PRIVATE_NET_NAME)
+ net_id=network_dic["net_id"]
+ subnet_id=network_dic["subnet_id"]
+ router_id=network_dic["router_id"]
+ if not functest_utils.remove_interface_router(neutron, router_id, subnet_id):
+ logger.error("Unable to remove subnet '%s' from router '%s'" %(subnet_id,router_id))
+ return False
+ logger.debug("Interface removed successfully")
+ if not functest_utils.delete_neutron_router(neutron, router_id):
+ logger.error("Unable to delete router '%s'" %router_id)
+ return False
+ logger.debug("Router deleted successfully")
+ if not functest_utils.delete_neutron_subnet(neutron, subnet_id):
+ logger.error("Unable to delete subnet '%s'" %subnet_id)
+ return False
+ logger.debug("Subnet '%s' deleted successfully" %NEUTRON_PRIVATE_SUBNET_NAME)
+ if not functest_utils.delete_neutron_net(neutron, net_id):
+ logger.error("Unable to delete network '%s'" %net_id)
+ return False
+ logger.debug("Network '%s' deleted successfully" %NEUTRON_PRIVATE_NET_NAME)
+ return True
+
def main():
- creds = get_credentials("nova")
- nova = novaclient.Client(**creds)
+ creds_nova = functest_utils.get_credentials("nova")
+ nova_client = novaclient.Client(**creds_nova)
+ creds_neutron = functest_utils.get_credentials("neutron")
+ neutron_client = neutronclient.Client(**creds_neutron)
EXIT_CODE = -1
image = None
network = None
@@ -127,69 +190,66 @@ def main():
# Check if the given image exists
try:
- image = nova.images.find(name = GLANCE_IMAGE_NAME)
+ image = nova_client.images.find(name = GLANCE_IMAGE_NAME)
logger.info("Glance image found '%s'" % GLANCE_IMAGE_NAME)
except:
logger.error("ERROR: Glance image '%s' not found." % GLANCE_IMAGE_NAME)
logger.info("Available images are: ")
- pMsg(nova.images.list())
+ pMsg(nova_client.images.list())
exit(-1)
- # Check if the given neutron network exists
- try:
- network = nova.networks.find(label = NEUTRON_PRIVATE_NET_NAME)
- logger.info("Network found '%s'" % NEUTRON_PRIVATE_NET_NAME)
- except:
- logger.error("Neutron network '%s' not found." % NEUTRON_PRIVATE_NET_NAME)
- logger.info("Available networks are: ")
- pMsg(nova.networks.list())
+ network_dic = create_private_neutron_net(neutron_client)
+ if not network_dic:
+ logger.error("There has been a problem when creating the neutron network")
exit(-1)
+ network_id = network_dic["net_id"]
+
# Check if the given flavor exists
try:
- flavor = nova.flavors.find(name = FLAVOR)
+ flavor = nova_client.flavors.find(name = FLAVOR)
logger.info("Flavor found '%s'" % FLAVOR)
except:
logger.error("Flavor '%s' not found." % FLAVOR)
logger.info("Available flavors are: ")
- pMsg(nova.flavor.list())
+ pMsg(nova_client.flavor.list())
exit(-1)
# Deleting instances if they exist
- servers=nova.servers.list()
+ servers=nova_client.servers.list()
for server in servers:
if server.name == NAME_VM_1 or server.name == NAME_VM_2:
logger.info("Instance %s found. Deleting..." %server.name)
server.delete()
-
# boot VM 1
# basic boot
# tune (e.g. flavor, images, network) to your specific openstack configuration here
# create VM
logger.info("Creating instance '%s'..." % NAME_VM_1)
- logger.debug("Configuration:\n name=%s \n flavor=%s \n image=%s \n network=%s \n" %(NAME_VM_1,flavor,image,network))
- vm1 = nova.servers.create(
+ logger.debug("Configuration:\n name=%s \n flavor=%s \n image=%s \n network=%s \n" %(NAME_VM_1,flavor,image,network_id))
+ vm1 = nova_client.servers.create(
name = NAME_VM_1,
flavor = flavor,
image = image,
- nics = [{"net-id": network.id}]
+ nics = [{"net-id": network_id}]
)
#wait until VM status is active
- if not waitVmActive(nova,vm1):
- logger.error("Instance '%s' cannot be booted. Status is '%s'" % (NAME_VM_1,get_status(nova,vm1)))
+ if not waitVmActive(nova_client,vm1):
+ logger.error("Instance '%s' cannot be booted. Status is '%s'" % (NAME_VM_1,functest_utils.get_instance_status(nova_client,vm1)))
+ cleanup(nova_client,neutron_client,network_dic)
return (EXIT_CODE)
else:
logger.info("Instance '%s' is ACTIVE." % NAME_VM_1)
#retrieve IP of first VM
logger.debug("Fetching IP...")
- server = get_server(creds, NAME_VM_1)
+ server = functest_utils.get_instance_by_name(nova_client, NAME_VM_1)
# theoretically there is only one IP address so we take the first element of the table
# Dangerous! To be improved!
test_ip = server.networks.get(NEUTRON_PRIVATE_NET_NAME)[0]
@@ -203,21 +263,23 @@ def main():
# create VM
logger.info("Creating instance '%s'..." % NAME_VM_2)
- logger.debug("Configuration:\n name=%s \n flavor=%s \n image=%s \n network=%s \n userdata= \n%s" %(NAME_VM_2,flavor,image,network,u))
- vm2 = nova.servers.create(
+ logger.debug("Configuration:\n name=%s \n flavor=%s \n image=%s \n network=%s \n userdata= \n%s" %(NAME_VM_2,flavor,image,network_id,u))
+ vm2 = nova_client.servers.create(
name = NAME_VM_2,
flavor = flavor,
image = image,
- nics = [{"net-id": network.id}],
+ nics = [{"net-id": network_id}],
userdata = u,
)
- if not waitVmActive(nova,vm2):
- logger.error("Instance '%s' cannot be booted. Status is '%s'" % (NAME_VM_2,get_status(nova,vm2)))
+ if not waitVmActive(nova_client,vm2):
+ logger.error("Instance '%s' cannot be booted. Status is '%s'" % (NAME_VM_2,functest_utils.get_instance_status(nova_client,vm2)))
+ cleanup(nova_client,neutron_client,network_dic)
return (EXIT_CODE)
else:
logger.info("Instance '%s' is ACTIVE." % NAME_VM_2)
+ logger.info("Waiting for ping...")
sec = 0
console_log = vm2.get_console_output()
while True:
@@ -226,23 +288,17 @@ def main():
#print "--"+console_log
# report if the test is failed
if "vPing OK" in console_log:
- logger.info("vPing is OK")
+ logger.info("vPing detected!")
EXIT_CODE = 0
break
- else:
- logger.info("No vPing detected...")
- sec+=1
- if sec == PING_TIMEOUT:
+ elif sec == PING_TIMEOUT:
logger.info("Timeout reached.")
break
+ else:
+ logger.debug("No vPing detected...")
+ sec+=1
-
- # delete both VMs
- logger.debug("Deleting Instances...")
- nova.servers.delete(vm1)
- logger.debug("Instance %s terminated." % NAME_VM_1)
- nova.servers.delete(vm2)
- logger.debug("Instance %s terminated." % NAME_VM_2)
+ cleanup(nova_client,neutron_client,network_dic)
if EXIT_CODE == 0:
logger.info("vPing OK")