summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjose.lausuch <jose.lausuch@ericsson.com>2016-04-07 14:36:43 +0200
committerJose Lausuch <jose.lausuch@ericsson.com>2016-04-11 10:02:45 +0000
commit4b1a0c48c915c0af207587489ec3e556a326be23 (patch)
treef4b5e339619b9230cf64d7778b494040c792f77e
parenta71001dc2874725b494be6a0c18a586c5d0bfbe2 (diff)
Split functest_utils.py into 2 scripts
JIRA: FUNCTEST-186 All the openstack related functions are in openstack_utils.py and the remaining functions in functest_utils.py All the scripts are adapted to this structure. Change-Id: I7f3805779741f0b085985d0d053feb429250b1ea Signed-off-by: jose.lausuch <jose.lausuch@ericsson.com>
-rw-r--r--testcases/VIM/OpenStack/CI/libraries/clean_openstack.py66
-rw-r--r--testcases/VIM/OpenStack/CI/libraries/generate_defaults.py30
-rwxr-xr-xtestcases/VIM/OpenStack/CI/libraries/run_rally-cert.py25
-rwxr-xr-xtestcases/VIM/OpenStack/CI/libraries/run_rally.py11
-rw-r--r--testcases/VIM/OpenStack/CI/libraries/run_tempest.py21
-rwxr-xr-xtestcases/config_functest.py17
-rw-r--r--testcases/features/promise.py25
-rw-r--r--testcases/functest_utils.py745
-rw-r--r--testcases/openstack_utils.py757
-rw-r--r--testcases/vIMS/CI/vIMS.py61
-rw-r--r--testcases/vPing/CI/libraries/vPing_ssh.py75
-rw-r--r--testcases/vPing/CI/libraries/vPing_userdata.py63
12 files changed, 958 insertions, 938 deletions
diff --git a/testcases/VIM/OpenStack/CI/libraries/clean_openstack.py b/testcases/VIM/OpenStack/CI/libraries/clean_openstack.py
index e47750052..8ea08b49d 100644
--- a/testcases/VIM/OpenStack/CI/libraries/clean_openstack.py
+++ b/testcases/VIM/OpenStack/CI/libraries/clean_openstack.py
@@ -57,7 +57,7 @@ if not os.path.exists(REPO_PATH):
logger.error("Functest repository directory not found '%s'" % REPO_PATH)
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
-import functest_utils
+import openstack_utils
DEFAULTS_FILE = '/home/opnfv/functest/conf/os_defaults.yaml'
@@ -85,7 +85,7 @@ def separator():
def remove_instances(nova_client):
logger.info("Removing Nova instances...")
- instances = functest_utils.get_instances(nova_client)
+ instances = openstack_utils.get_instances(nova_client)
if instances is None or len(instances) == 0:
logger.debug("No instances found.")
return
@@ -94,7 +94,7 @@ def remove_instances(nova_client):
instance_name = getattr(instance, 'name')
instance_id = getattr(instance, 'id')
logger.debug("Removing instance '%s', ID=%s ..." % (instance_name,instance_id))
- if functest_utils.delete_instance(nova_client, instance_id):
+ if openstack_utils.delete_instance(nova_client, instance_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -102,7 +102,7 @@ def remove_instances(nova_client):
timeout = 50
while timeout > 0:
- instances = functest_utils.get_instances(nova_client)
+ instances = openstack_utils.get_instances(nova_client)
if instances is None or len(instances) == 0:
break
else:
@@ -113,7 +113,7 @@ def remove_instances(nova_client):
def remove_images(nova_client):
logger.info("Removing Glance images...")
- images = functest_utils.get_images(nova_client)
+ images = openstack_utils.get_images(nova_client)
if images is None or len(images) == 0:
logger.debug("No images found.")
return
@@ -124,7 +124,7 @@ def remove_images(nova_client):
logger.debug("'%s', ID=%s " %(image_name,image_id))
if image_id not in default_images:
logger.debug("Removing image '%s', ID=%s ..." % (image_name,image_id))
- if functest_utils.delete_glance_image(nova_client, image_id):
+ if openstack_utils.delete_glance_image(nova_client, image_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the"
@@ -135,7 +135,7 @@ def remove_images(nova_client):
def remove_volumes(cinder_client):
logger.info("Removing Cinder volumes...")
- volumes = functest_utils.get_volumes(cinder_client)
+ volumes = openstack_utils.get_volumes(cinder_client)
if volumes is None or len(volumes) == 0:
logger.debug("No volumes found.")
return
@@ -146,11 +146,11 @@ def remove_volumes(cinder_client):
logger.debug("'%s', ID=%s " %(volume_name,volume_id))
if volume_id not in default_volumes:
logger.debug("Removing cinder volume %s ..." % volume_id)
- if functest_utils.delete_volume(cinder_client, volume_id):
+ if openstack_utils.delete_volume(cinder_client, volume_id):
logger.debug(" > Done!")
else:
logger.debug("Trying forced removal...")
- if functest_utils.delete_volume(cinder_client,
+ if openstack_utils.delete_volume(cinder_client,
volume_id,
forced=True):
logger.debug(" > Done!")
@@ -162,7 +162,7 @@ def remove_volumes(cinder_client):
def remove_floatingips(nova_client):
logger.info("Removing floating IPs...")
- floatingips = functest_utils.get_floating_ips(nova_client)
+ floatingips = openstack_utils.get_floating_ips(nova_client)
if floatingips is None or len(floatingips) == 0:
logger.debug("No floating IPs found.")
return
@@ -175,7 +175,7 @@ def remove_floatingips(nova_client):
logger.debug("'%s', ID=%s " %(fip_ip,fip_id))
if fip_id not in default_floatingips:
logger.debug("Removing floating IP %s ..." % fip_id)
- if functest_utils.delete_floating_ip(nova_client, fip_id):
+ if openstack_utils.delete_floating_ip(nova_client, fip_id):
logger.debug(" > Done!")
deleted += 1
else:
@@ -187,7 +187,7 @@ def remove_floatingips(nova_client):
timeout = 50
while timeout > 0:
- floatingips = functest_utils.get_floating_ips(nova_client)
+ floatingips = openstack_utils.get_floating_ips(nova_client)
if floatingips is None or len(floatingips) == (init_len - deleted):
break
else:
@@ -199,7 +199,7 @@ def remove_floatingips(nova_client):
def remove_networks(neutron_client):
logger.info("Removing Neutron objects")
network_ids = []
- networks = functest_utils.get_network_list(neutron_client)
+ networks = openstack_utils.get_network_list(neutron_client)
if networks == None:
logger.debug("There are no networks in the deployment. ")
else:
@@ -217,14 +217,14 @@ def remove_networks(neutron_client):
network_ids.append(net_id)
#delete ports
- ports = functest_utils.get_port_list(neutron_client)
+ ports = openstack_utils.get_port_list(neutron_client)
if ports is None:
logger.debug("There are no ports in the deployment. ")
else:
remove_ports(neutron_client, ports, network_ids)
#remove routers
- routers = functest_utils.get_router_list(neutron_client)
+ routers = openstack_utils.get_router_list(neutron_client)
if routers is None:
logger.debug("There are no routers in the deployment. ")
else:
@@ -234,7 +234,7 @@ def remove_networks(neutron_client):
if network_ids != None:
for net_id in network_ids:
logger.debug("Removing network %s ..." % net_id)
- if functest_utils.delete_neutron_net(neutron_client, net_id):
+ if openstack_utils.delete_neutron_net(neutron_client, net_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -253,7 +253,7 @@ def remove_ports(neutron_client, ports, network_ids):
router_id = port['device_id']
if len(port['fixed_ips']) == 0 and router_id == '':
logger.debug("Removing port %s ..." % port_id)
- if functest_utils.delete_neutron_port(neutron_client, port_id):
+ if openstack_utils.delete_neutron_port(neutron_client, port_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -263,7 +263,7 @@ def remove_ports(neutron_client, ports, network_ids):
elif port['device_owner'] == 'network:router_interface':
logger.debug("Detaching port %s (subnet %s) from router %s ..."
% (port_id,subnet_id,router_id))
- if functest_utils.remove_interface_router(neutron_client,
+ if openstack_utils.remove_interface_router(neutron_client,
router_id, subnet_id):
time.sleep(5) # leave 5 seconds to detach before doing anything else
logger.debug(" > Done!")
@@ -277,11 +277,11 @@ def remove_ports(neutron_client, ports, network_ids):
def force_remove_port(neutron_client, port_id):
logger.debug("Clearing device_owner for port %s ..." % port_id)
- functest_utils.update_neutron_port(neutron_client,
+ openstack_utils.update_neutron_port(neutron_client,
port_id,
device_owner='clear')
logger.debug("Removing port %s ..." % port_id)
- if functest_utils.delete_neutron_port(neutron_client, port_id):
+ if openstack_utils.delete_neutron_port(neutron_client, port_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing "
@@ -296,7 +296,7 @@ def remove_routers(neutron_client, routers):
logger.debug("Checking '%s' with ID=(%s) ..." % (router_name,router_id))
if router['external_gateway_info'] != None:
logger.debug("Router has gateway to external network. Removing link...")
- if functest_utils.remove_gateway_router(neutron_client, router_id):
+ if openstack_utils.remove_gateway_router(neutron_client, router_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing "
@@ -304,7 +304,7 @@ def remove_routers(neutron_client, routers):
else:
logger.debug("Router is not connected to anything. Ready to remove...")
logger.debug("Removing router %s(%s) ..." % (router_name, router_id))
- if functest_utils.delete_neutron_router(neutron_client, router_id):
+ if openstack_utils.delete_neutron_router(neutron_client, router_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -313,7 +313,7 @@ def remove_routers(neutron_client, routers):
def remove_security_groups(neutron_client):
logger.info("Removing Security groups...")
- secgroups = functest_utils.get_security_groups(neutron_client)
+ secgroups = openstack_utils.get_security_groups(neutron_client)
if secgroups is None or len(secgroups) == 0:
logger.debug("No security groups found.")
return
@@ -324,7 +324,7 @@ def remove_security_groups(neutron_client):
logger.debug("'%s', ID=%s " %(secgroup_name,secgroup_id))
if secgroup_id not in default_security_groups:
logger.debug(" Removing '%s'..." % secgroup_name)
- if functest_utils.delete_security_group(neutron_client, secgroup_id):
+ if openstack_utils.delete_security_group(neutron_client, secgroup_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -336,7 +336,7 @@ def remove_security_groups(neutron_client):
def remove_users(keystone_client):
logger.info("Removing Users...")
- users = functest_utils.get_users(keystone_client)
+ users = openstack_utils.get_users(keystone_client)
if users == None:
logger.debug("There are no users in the deployment. ")
return
@@ -347,7 +347,7 @@ def remove_users(keystone_client):
logger.debug("'%s', ID=%s " %(user_name,user_id))
if user_id not in default_users:
logger.debug(" Removing '%s'..." % user_name)
- if functest_utils.delete_user(keystone_client,user_id):
+ if openstack_utils.delete_user(keystone_client,user_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -358,7 +358,7 @@ def remove_users(keystone_client):
def remove_tenants(keystone_client):
logger.info("Removing Tenants...")
- tenants = functest_utils.get_tenants(keystone_client)
+ tenants = openstack_utils.get_tenants(keystone_client)
if tenants == None:
logger.debug("There are no tenants in the deployment. ")
return
@@ -369,7 +369,7 @@ def remove_tenants(keystone_client):
logger.debug("'%s', ID=%s " %(tenant_name,tenant_id))
if tenant_id not in default_tenants:
logger.debug(" Removing '%s'..." % tenant_name)
- if functest_utils.delete_tenant(keystone_client,tenant_id):
+ if openstack_utils.delete_tenant(keystone_client,tenant_id):
logger.debug(" > Done!")
else:
logger.error("There has been a problem removing the "
@@ -380,16 +380,16 @@ def remove_tenants(keystone_client):
def main():
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2',**creds_nova)
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
- creds_cinder = functest_utils.get_credentials("cinder")
+ creds_cinder = openstack_utils.get_credentials("cinder")
#cinder_client = cinderclient.Client(**creds_cinder)
cinder_client = cinderclient.Client('1',creds_cinder['username'],
creds_cinder['api_key'],
@@ -397,7 +397,7 @@ def main():
creds_cinder['auth_url'],
service_type="volume")
- if not functest_utils.check_credentials():
+ if not openstack_utils.check_credentials():
logger.error("Please source the openrc credentials and run the script again.")
exit(-1)
diff --git a/testcases/VIM/OpenStack/CI/libraries/generate_defaults.py b/testcases/VIM/OpenStack/CI/libraries/generate_defaults.py
index 72987ddda..731ed9e4e 100644
--- a/testcases/VIM/OpenStack/CI/libraries/generate_defaults.py
+++ b/testcases/VIM/OpenStack/CI/libraries/generate_defaults.py
@@ -57,7 +57,7 @@ if not os.path.exists(REPO_PATH):
logger.error("Functest repository directory not found '%s'" % REPO_PATH)
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
-import functest_utils
+import openstack_utils
DEFAULTS_FILE = '/home/opnfv/functest/conf/os_defaults.yaml'
@@ -68,7 +68,7 @@ def separator():
def get_instances(nova_client):
logger.debug("Getting instances...")
dic_instances = {}
- instances = functest_utils.get_instances(nova_client)
+ instances = openstack_utils.get_instances(nova_client)
if not (instances is None or len(instances) == 0):
for instance in instances:
dic_instances.update({getattr(instance, 'id'):getattr(instance, 'name')})
@@ -78,7 +78,7 @@ def get_instances(nova_client):
def get_images(nova_client):
logger.debug("Getting images...")
dic_images = {}
- images = functest_utils.get_images(nova_client)
+ images = openstack_utils.get_images(nova_client)
if not (images is None or len(images) == 0):
for image in images:
dic_images.update({getattr(image, 'id'):getattr(image, 'name')})
@@ -88,7 +88,7 @@ def get_images(nova_client):
def get_volumes(cinder_client):
logger.debug("Getting volumes...")
dic_volumes = {}
- volumes = functest_utils.get_volumes(cinder_client)
+ volumes = openstack_utils.get_volumes(cinder_client)
if volumes != None:
for volume in volumes:
dic_volumes.update({volume.id:volume.display_name})
@@ -98,7 +98,7 @@ def get_volumes(cinder_client):
def get_networks(neutron_client):
logger.debug("Getting networks")
dic_networks = {}
- networks = functest_utils.get_network_list(neutron_client)
+ networks = openstack_utils.get_network_list(neutron_client)
if networks != None:
for network in networks:
dic_networks.update({network['id']:network['name']})
@@ -108,7 +108,7 @@ def get_networks(neutron_client):
def get_routers(neutron_client):
logger.debug("Getting routers")
dic_routers = {}
- routers = functest_utils.get_router_list(neutron_client)
+ routers = openstack_utils.get_router_list(neutron_client)
if routers != None:
for router in routers:
dic_routers.update({router['id']:router['name']})
@@ -118,7 +118,7 @@ def get_routers(neutron_client):
def get_security_groups(neutron_client):
logger.debug("Getting Security groups...")
dic_secgroups = {}
- secgroups = functest_utils.get_security_groups(neutron_client)
+ secgroups = openstack_utils.get_security_groups(neutron_client)
if not (secgroups is None or len(secgroups) == 0):
for secgroup in secgroups:
dic_secgroups.update({secgroup['id']:secgroup['name']})
@@ -128,7 +128,7 @@ def get_security_groups(neutron_client):
def get_floatinips(nova_client):
logger.debug("Getting Floating IPs...")
dic_floatingips = {}
- floatingips = functest_utils.get_floating_ips(nova_client)
+ floatingips = openstack_utils.get_floating_ips(nova_client)
if not (floatingips is None or len(floatingips) == 0):
for floatingip in floatingips:
dic_floatingips.update({floatingip.id:floatingip.ip})
@@ -138,7 +138,7 @@ def get_floatinips(nova_client):
def get_users(keystone_client):
logger.debug("Getting users...")
dic_users = {}
- users = functest_utils.get_users(keystone_client)
+ users = openstack_utils.get_users(keystone_client)
if not (users is None or len(users) == 0):
for user in users:
dic_users.update({getattr(user, 'id'):getattr(user, 'name')})
@@ -148,7 +148,7 @@ def get_users(keystone_client):
def get_tenants(keystone_client):
logger.debug("Getting users...")
dic_tenants = {}
- tenants = functest_utils.get_tenants(keystone_client)
+ tenants = openstack_utils.get_tenants(keystone_client)
if not (tenants is None or len(tenants) == 0):
for tenant in tenants:
dic_tenants.update({getattr(tenant, 'id'):getattr(tenant, 'name')})
@@ -156,23 +156,23 @@ def get_tenants(keystone_client):
def main():
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2',**creds_nova)
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
- creds_cinder = functest_utils.get_credentials("cinder")
+ creds_cinder = openstack_utils.get_credentials("cinder")
cinder_client = cinderclient.Client('1',creds_cinder['username'],
creds_cinder['api_key'],
creds_cinder['project_id'],
creds_cinder['auth_url'],
service_type="volume")
- if not functest_utils.check_credentials():
+ if not openstack_utils.check_credentials():
logger.error("Please source the openrc credentials and run the script again.")
exit(-1)
diff --git a/testcases/VIM/OpenStack/CI/libraries/run_rally-cert.py b/testcases/VIM/OpenStack/CI/libraries/run_rally-cert.py
index 7e1f89abf..c62a65032 100755
--- a/testcases/VIM/OpenStack/CI/libraries/run_rally-cert.py
+++ b/testcases/VIM/OpenStack/CI/libraries/run_rally-cert.py
@@ -89,6 +89,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -210,13 +211,13 @@ def build_task_args(test_file_name):
task_args['iterations'] = ITERATIONS_AMOUNT
task_args['concurrency'] = CONCURRENCY
- ext_net = functest_utils.get_external_net(client_dict['neutron'])
+ ext_net = openstack_utils.get_external_net(client_dict['neutron'])
if ext_net:
task_args['floating_network'] = str(ext_net)
else:
task_args['floating_network'] = ''
- net_id = functest_utils.get_network_id(client_dict['neutron'],
+ net_id = openstack_utils.get_network_id(client_dict['neutron'],
PRIVATE_NETWORK)
task_args['netid'] = str(net_id)
task_args['live_migration'] = live_migration_supported()
@@ -374,17 +375,17 @@ def main():
exit(-1)
SUMMARY = []
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2', **creds_nova)
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
glance_client = glanceclient.Client(1, glance_endpoint,
token=keystone_client.auth_token)
- creds_cinder = functest_utils.get_credentials("cinder")
+ creds_cinder = openstack_utils.get_credentials("cinder")
cinder_client = cinderclient.Client('2', creds_cinder['username'],
creds_cinder['api_key'],
creds_cinder['project_id'],
@@ -393,10 +394,10 @@ def main():
client_dict['neutron'] = neutron_client
- volume_types = functest_utils.list_volume_types(cinder_client,
+ volume_types = openstack_utils.list_volume_types(cinder_client,
private=False)
if not volume_types:
- volume_type = functest_utils.create_volume_type(cinder_client,
+ volume_type = openstack_utils.create_volume_type(cinder_client,
CINDER_VOLUME_TYPE_NAME)
if not volume_type:
logger.error("Failed to create volume type...")
@@ -407,12 +408,12 @@ def main():
else:
logger.debug("Using existing volume type(s)...")
- image_id = functest_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
+ image_id = openstack_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
if image_id == '':
logger.debug("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
- image_id = functest_utils.create_glance_image(glance_client,
+ image_id = openstack_utils.create_glance_image(glance_client,
GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH)
if not image_id:
@@ -499,13 +500,13 @@ def main():
logger.debug("Deleting image '%s' with ID '%s'..." \
% (GLANCE_IMAGE_NAME, image_id))
- if not functest_utils.delete_glance_image(nova_client, image_id):
+ if not openstack_utils.delete_glance_image(nova_client, image_id):
logger.error("Error deleting the glance image")
if not volume_types:
logger.debug("Deleting volume type '%s'..." \
% CINDER_VOLUME_TYPE_NAME)
- if not functest_utils.delete_volume_type(cinder_client, volume_type):
+ if not openstack_utils.delete_volume_type(cinder_client, volume_type):
logger.error("Error in deleting volume type...")
diff --git a/testcases/VIM/OpenStack/CI/libraries/run_rally.py b/testcases/VIM/OpenStack/CI/libraries/run_rally.py
index 1ea6ca6db..a33963986 100755
--- a/testcases/VIM/OpenStack/CI/libraries/run_rally.py
+++ b/testcases/VIM/OpenStack/CI/libraries/run_rally.py
@@ -79,6 +79,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -236,21 +237,21 @@ def main():
logger.error('argument not valid')
exit(-1)
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2', **creds_nova)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
glance_client = glanceclient.Client(1, glance_endpoint,
token=keystone_client.auth_token)
- image_id = functest_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
+ image_id = openstack_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
if image_id == '':
logger.debug("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
- image_id = functest_utils.create_glance_image(glance_client,
+ image_id = openstack_utils.create_glance_image(glance_client,
GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH)
if not image_id:
@@ -281,7 +282,7 @@ def main():
logger.debug("Deleting image '%s' with ID '%s'..." \
% (GLANCE_IMAGE_NAME, image_id))
- if not functest_utils.delete_glance_image(nova_client, image_id):
+ if not openstack_utils.delete_glance_image(nova_client, image_id):
logger.error("Error deleting the glance image")
if __name__ == '__main__':
diff --git a/testcases/VIM/OpenStack/CI/libraries/run_tempest.py b/testcases/VIM/OpenStack/CI/libraries/run_tempest.py
index 58c39225f..75ce7fb9a 100644
--- a/testcases/VIM/OpenStack/CI/libraries/run_tempest.py
+++ b/testcases/VIM/OpenStack/CI/libraries/run_tempest.py
@@ -71,6 +71,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -134,38 +135,38 @@ def push_results_to_db(case, payload, criteria):
def create_tempest_resources():
- ks_creds = functest_utils.get_credentials("keystone")
+ ks_creds = openstack_utils.get_credentials("keystone")
logger.info("Creating tenant and user for Tempest suite")
keystone = ksclient.Client(**ks_creds)
- tenant_id = functest_utils.create_tenant(keystone,
+ tenant_id = openstack_utils.create_tenant(keystone,
TENANT_NAME,
TENANT_DESCRIPTION)
if tenant_id == '':
logger.error("Error : Failed to create %s tenant" % TENANT_NAME)
- user_id = functest_utils.create_user(keystone, USER_NAME, USER_PASSWORD,
+ user_id = openstack_utils.create_user(keystone, USER_NAME, USER_PASSWORD,
None, tenant_id)
if user_id == '':
logger.error("Error : Failed to create %s user" % USER_NAME)
def free_tempest_resources():
- ks_creds = functest_utils.get_credentials("keystone")
+ ks_creds = openstack_utils.get_credentials("keystone")
logger.info("Deleting tenant and user for Tempest suite)")
keystone = ksclient.Client(**ks_creds)
- user_id = functest_utils.get_user_id(keystone, USER_NAME)
+ user_id = openstack_utils.get_user_id(keystone, USER_NAME)
if user_id == '':
logger.error("Error : Failed to get id of %s user" % USER_NAME)
else:
- if not functest_utils.delete_user(keystone, user_id):
+ if not openstack_utils.delete_user(keystone, user_id):
logger.error("Error : Failed to delete %s user" % USER_NAME)
- tenant_id = functest_utils.get_tenant_id(keystone, TENANT_NAME)
+ tenant_id = openstack_utils.get_tenant_id(keystone, TENANT_NAME)
if tenant_id == '':
logger.error("Error : Failed to get id of %s tenant" % TENANT_NAME)
else:
- if not functest_utils.delete_tenant(keystone, tenant_id):
+ if not openstack_utils.delete_tenant(keystone, tenant_id):
logger.error("Error : Failed to delete %s tenant" % TENANT_NAME)
@@ -197,9 +198,9 @@ def configure_tempest():
logger.debug(" Updating fixed_network_name...")
private_net_name = ""
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- private_net = functest_utils.get_private_net(neutron_client)
+ private_net = openstack_utils.get_private_net(neutron_client)
if private_net is None:
logger.error("No shared private networks found.")
else:
diff --git a/testcases/config_functest.py b/testcases/config_functest.py
index 41bf927ff..657bf0b43 100755
--- a/testcases/config_functest.py
+++ b/testcases/config_functest.py
@@ -10,6 +10,7 @@
import re, json, os, urllib2, argparse, logging, shutil, subprocess, yaml, sys, getpass
import functest_utils
+import openstack_utils
from git import Repo
from os import stat
from pwd import getpwuid
@@ -78,7 +79,7 @@ NEUTRON_PRIVATE_SUBNET_CIDR = functest_yaml.get("general"). \
NEUTRON_ROUTER_NAME = functest_yaml.get("general"). \
get("openstack").get("neutron_router_name")
-creds_neutron = functest_utils.get_credentials("neutron")
+creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
def action_start():
@@ -99,7 +100,7 @@ def action_start():
action_clean()
logger.info("Starting installation of functest environment")
- private_net = functest_utils.get_private_net(neutron_client)
+ private_net = openstack_utils.get_private_net(neutron_client)
if private_net is None:
# If there is no private network in the deployment we create one
if not create_private_neutron_net(neutron_client):
@@ -234,7 +235,7 @@ def check_rally():
def create_private_neutron_net(neutron):
neutron.format = 'json'
logger.info("Creating network '%s'..." % NEUTRON_PRIVATE_NET_NAME)
- network_id = functest_utils. \
+ network_id = openstack_utils. \
create_neutron_net(neutron, NEUTRON_PRIVATE_NET_NAME)
if not network_id:
@@ -242,13 +243,13 @@ def create_private_neutron_net(neutron):
logger.debug("Network '%s' created successfully." % network_id)
logger.info("Updating network '%s' with shared=True..." % NEUTRON_PRIVATE_NET_NAME)
- if functest_utils.update_neutron_net(neutron, network_id, shared=True):
+ if openstack_utils.update_neutron_net(neutron, network_id, shared=True):
logger.debug("Network '%s' updated successfully." % network_id)
else:
logger.info("Updating neutron network '%s' failed" % network_id)
logger.info("Creating Subnet....")
- subnet_id = functest_utils. \
+ subnet_id = openstack_utils. \
create_neutron_subnet(neutron,
NEUTRON_PRIVATE_SUBNET_NAME,
NEUTRON_PRIVATE_SUBNET_CIDR,
@@ -257,7 +258,7 @@ def create_private_neutron_net(neutron):
return False
logger.debug("Subnet '%s' created successfully." % subnet_id)
logger.info("Creating Router...")
- router_id = functest_utils. \
+ router_id = openstack_utils. \
create_neutron_router(neutron, NEUTRON_ROUTER_NAME)
if not router_id:
@@ -266,7 +267,7 @@ def create_private_neutron_net(neutron):
logger.debug("Router '%s' created successfully." % router_id)
logger.info("Adding router to subnet...")
- result = functest_utils.add_interface_router(neutron, router_id, subnet_id)
+ result = openstack_utils.add_interface_router(neutron, router_id, subnet_id)
if not result:
return False
@@ -284,7 +285,7 @@ def main():
exit(-1)
- if not functest_utils.check_credentials():
+ if not openstack_utils.check_credentials():
logger.error("Please source the openrc credentials and run the script again.")
#TODO: source the credentials in this script
exit(-1)
diff --git a/testcases/features/promise.py b/testcases/features/promise.py
index de0418211..832083d04 100644
--- a/testcases/features/promise.py
+++ b/testcases/features/promise.py
@@ -60,6 +60,7 @@ GLANCE_IMAGE_PATH = functest_yaml.get('general'). \
sys.path.append('%s/testcases' % FUNCTEST_REPO)
import functest_utils
+import openstack_utils
""" logging configuration """
logger = logging.getLogger('Promise')
@@ -85,20 +86,20 @@ def create_image(glance_client, name):
def main():
- ks_creds = functest_utils.get_credentials("keystone")
- nv_creds = functest_utils.get_credentials("nova")
- nt_creds = functest_utils.get_credentials("neutron")
+ ks_creds = openstack_utils.get_credentials("keystone")
+ nv_creds = openstack_utils.get_credentials("nova")
+ nt_creds = openstack_utils.get_credentials("neutron")
keystone = ksclient.Client(**ks_creds)
- user_id = functest_utils.get_user_id(keystone, ks_creds['username'])
+ user_id = openstack_utils.get_user_id(keystone, ks_creds['username'])
if user_id == '':
logger.error("Error : Failed to get id of %s user" %
ks_creds['username'])
exit(-1)
logger.info("Creating tenant '%s'..." % TENANT_NAME)
- tenant_id = functest_utils.create_tenant(
+ tenant_id = openstack_utils.create_tenant(
keystone, TENANT_NAME, TENANT_DESCRIPTION)
if tenant_id == '':
logger.error("Error : Failed to create %s tenant" % TENANT_NAME)
@@ -109,21 +110,21 @@ def main():
role_id = ''
for role_name in roles_name:
if role_id == '':
- role_id = functest_utils.get_role_id(keystone, role_name)
+ role_id = openstack_utils.get_role_id(keystone, role_name)
if role_id == '':
logger.error("Error : Failed to get id for %s role" % role_name)
exit(-1)
logger.info("Adding role '%s' to tenant '%s'..." % (role_id, TENANT_NAME))
- if not functest_utils.add_role_user(keystone, user_id, role_id, tenant_id):
+ if not openstack_utils.add_role_user(keystone, user_id, role_id, tenant_id):
logger.error("Error : Failed to add %s on tenant %s" %
(ks_creds['username'], TENANT_NAME))
exit(-1)
logger.debug("Role added successfully.")
logger.info("Creating user '%s'..." % USER_NAME)
- user_id = functest_utils.create_user(
+ user_id = openstack_utils.create_user(
keystone, USER_NAME, USER_PWD, None, tenant_id)
if user_id == '':
@@ -153,7 +154,7 @@ def main():
logger.info("Creating image '%s' from '%s'..." % (IMAGE_NAME,
GLANCE_IMAGE_PATH))
- image_id = functest_utils.create_glance_image(glance,
+ image_id = openstack_utils.create_glance_image(glance,
IMAGE_NAME,
GLANCE_IMAGE_PATH)
if not image_id:
@@ -161,10 +162,10 @@ def main():
exit(-1)
logger.debug("Image '%s' with ID '%s' created successfully." % (IMAGE_NAME,
image_id))
- flavor_id = functest_utils.get_flavor_id(nova, FLAVOR_NAME)
+ flavor_id = openstack_utils.get_flavor_id(nova, FLAVOR_NAME)
if flavor_id == '':
logger.info("Creating flavor '%s'..." % FLAVOR_NAME)
- flavor_id = functest_utils.create_flavor(nova,
+ flavor_id = openstack_utils.create_flavor(nova,
FLAVOR_NAME,
FLAVOR_RAM,
FLAVOR_DISK,
@@ -179,7 +180,7 @@ def main():
flavor_id))
neutron = ntclient.Client(**nt_creds)
- private_net = functest_utils.get_private_net(neutron)
+ private_net = openstack_utils.get_private_net(neutron)
if private_net is None:
logger.error("There is no private network in the deployment. Aborting...")
exit(-1)
diff --git a/testcases/functest_utils.py b/testcases/functest_utils.py
index 5d380ab31..8111959f5 100644
--- a/testcases/functest_utils.py
+++ b/testcases/functest_utils.py
@@ -23,751 +23,6 @@ from git import Repo
# ----------------------------------------------------------
#
-# OPENSTACK UTILS
-#
-# -----------------------------------------------------------
-
-
-# *********************************************
-# CREDENTIALS
-# *********************************************
-def check_credentials():
- """
- Check if the OpenStack credentials (openrc) are sourced
- """
- env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
- return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
-
-
-def get_credentials(service):
- """Returns a creds dictionary filled with the following keys:
- * username
- * password/api_key (depending on the service)
- * tenant_name/project_id (depending on the service)
- * auth_url
- :param service: a string indicating the name of the service
- requesting the credentials.
- """
- creds = {}
- # Unfortunately, each of the OpenStack client will request slightly
- # different entries in their credentials dict.
- if service.lower() in ("nova", "cinder"):
- password = "api_key"
- tenant = "project_id"
- else:
- password = "password"
- tenant = "tenant_name"
-
- # The most common way to pass these info to the script is to do it through
- # environment variables.
- creds.update({
- "username": os.environ.get('OS_USERNAME', "admin"),
- password: os.environ.get("OS_PASSWORD", 'admin'),
- "auth_url": os.environ.get("OS_AUTH_URL",
- "http://192.168.20.71:5000/v2.0"),
- tenant: os.environ.get("OS_TENANT_NAME", "admin"),
- })
- cacert = os.environ.get("OS_CACERT")
- if cacert != None:
- # each openstack client uses differnt kwargs for this
- creds.update({"cacert":cacert,"ca_cert":cacert,"https_ca_cert":cacert, \
- "https_cacert":cacert,"ca_file":cacert})
- creds.update({"insecure":"True","https_insecure":"True"})
- if not os.path.isfile(cacert):
- print "WARNING: The 'OS_CACERT' environment variable is set to %s "\
- "but the file does not exist." % cacert
- return creds
-
-
-# *********************************************
-# NOVA
-# *********************************************
-def get_instances(nova_client):
- try:
- instances = nova_client.servers.list(search_opts={'all_tenants': 1})
- return instances
- except Exception, e:
- print "Error [get_instances(nova_client)]:", e
- return None
-
-
-def get_instance_status(nova_client, instance):
- try:
- instance = nova_client.servers.get(instance.id)
- return instance.status
- except:
- # print "Error [get_instance_status(nova_client, '%s')]:" % \
- # str(instance), e
- return None
-
-
-def get_instance_by_name(nova_client, instance_name):
- try:
- instance = nova_client.servers.find(name=instance_name)
- return instance
- except Exception, e:
- print "Error [get_instance_by_name(nova_client, '%s')]:" % \
- instance_name, e
- return None
-
-
-def get_flavor_id(nova_client, flavor_name):
- flavors = nova_client.flavors.list(detailed=True)
- id = ''
- for f in flavors:
- if f.name == flavor_name:
- id = f.id
- break
- return id
-
-
-def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
- flavors = nova_client.flavors.list(detailed=True)
- id = ''
- for f in flavors:
- if min_ram <= f.ram and f.ram <= max_ram:
- id = f.id
- break
- return id
-
-
-def get_floating_ips(nova_client):
- try:
- floating_ips = nova_client.floating_ips.list()
- return floating_ips
- except Exception, e:
- print "Error [get_floating_ips(nova_client)]:", e
- return None
-
-
-def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
- try:
- flavor = nova_client.flavors.create(flavor_name, ram, vcpus, disk)
- except Exception, e:
- print "Error [create_flavor(nova_client, '%s', '%s', '%s', "\
- "'%s')]:" % (flavor_name, ram, disk, vcpus), e
- return None
- return flavor.id
-
-
-def create_floating_ip(neutron_client):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id}
- try:
- ip_json = neutron_client.create_floatingip({'floatingip': props})
- fip_addr = ip_json['floatingip']['floating_ip_address']
- fip_id = ip_json['floatingip']['id']
- except Exception, e:
- print "Error [create_floating_ip(neutron_client)]:", e
- return None
- return {'fip_addr': fip_addr, 'fip_id': fip_id}
-
-
-def add_floating_ip(nova_client, server_id, floatingip_id):
- try:
- nova_client.servers.add_floating_ip(server_id, floatingip_id)
- return True
- except Exception, e:
- print "Error [add_floating_ip(nova_client, '%s', '%s')]:" % \
- (server_id, floatingip_id), e
- return False
-
-
-def delete_instance(nova_client, instance_id):
- try:
- nova_client.servers.force_delete(instance_id)
- return True
- except Exception, e:
- print "Error [delete_instance(nova_client, '%s')]:" % instance_id, e
- return False
-
-
-def delete_floating_ip(nova_client, floatingip_id):
- try:
- nova_client.floating_ips.delete(floatingip_id)
- return True
- except Exception, e:
- print "Error [delete_floating_ip(nova_client, '%s')]:" % floatingip_id, e
- return False
-
-
-# *********************************************
-# NEUTRON
-# *********************************************
-def get_network_list(neutron_client):
- network_list = neutron_client.list_networks()['networks']
- if len(network_list) == 0:
- return None
- else:
- return network_list
-
-
-def get_router_list(neutron_client):
- router_list = neutron_client.list_routers()['routers']
- if len(router_list) == 0:
- return None
- else:
- return router_list
-
-
-def get_port_list(neutron_client):
- port_list = neutron_client.list_ports()['ports']
- if len(port_list) == 0:
- return None
- else:
- return port_list
-
-
-def get_network_id(neutron_client, network_name):
- networks = neutron_client.list_networks()['networks']
- id = ''
- for n in networks:
- if n['name'] == network_name:
- id = n['id']
- break
- return id
-
-
-def get_subnet_id(neutron_client, subnet_name):
- subnets = neutron_client.list_subnets()['subnets']
- id = ''
- for s in subnets:
- if s['name'] == subnet_name:
- id = s['id']
- break
- return id
-
-
-def get_router_id(neutron_client, router_name):
- routers = neutron_client.list_routers()['routers']
- id = ''
- for r in routers:
- if r['name'] == router_name:
- id = r['id']
- break
- return id
-
-
-def get_private_net(neutron_client):
- # Checks if there is an existing shared private network
- networks = neutron_client.list_networks()['networks']
- if len(networks) == 0:
- return None
- for net in networks:
- if (net['router:external'] is False) and (net['shared'] is True):
- return net
- return None
-
-
-def get_external_net(neutron_client):
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['name']
- return False
-
-
-def get_external_net_id(neutron_client):
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['id']
- return False
-
-
-def check_neutron_net(neutron_client, net_name):
- for network in neutron_client.list_networks()['networks']:
- if network['name'] == net_name:
- for subnet in network['subnets']:
- return True
- return False
-
-
-def create_neutron_net(neutron_client, name):
- json_body = {'network': {'name': name,
- 'admin_state_up': True}}
- try:
- network = neutron_client.create_network(body=json_body)
- network_dict = network['network']
- return network_dict['id']
- except Exception, e:
- print "Error [create_neutron_net(neutron_client, '%s')]:" % name, e
- return False
-
-
-def create_neutron_subnet(neutron_client, name, cidr, net_id):
- json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id}]}
- try:
- subnet = neutron_client.create_subnet(body=json_body)
- return subnet['subnets'][0]['id']
- except Exception, e:
- print "Error [create_neutron_subnet(neutron_client, '%s', '%s', "\
- "'%s')]:" % (name, cidr, net_id), e
- return False
-
-
-def create_neutron_router(neutron_client, name):
- json_body = {'router': {'name': name, 'admin_state_up': True}}
- try:
- router = neutron_client.create_router(json_body)
- return router['router']['id']
- except Exception, e:
- print "Error [create_neutron_router(neutron_client, '%s')]:" % name, e
- return False
-
-
-def create_neutron_port(neutron_client, name, network_id, ip):
- json_body = {'port': {
- 'admin_state_up': True,
- 'name': name,
- 'network_id': network_id,
- 'fixed_ips': [{"ip_address": ip}]
- }}
- try:
- port = neutron_client.create_port(body=json_body)
- return port['port']['id']
- except Exception, e:
- print "Error [create_neutron_port(neutron_client, '%s', '%s', "\
- "'%s')]:" % (name, network_id, ip), e
- return False
-
-
-def update_neutron_net(neutron_client, network_id, shared=False):
- json_body = {'network': {'shared': shared}}
- try:
- neutron_client.update_network(network_id, body=json_body)
- return True
- except Exception, e:
- print "Error [update_neutron_net(neutron_client, '%s', '%s')]:" % \
- (network_id, str(shared)), e
- return False
-
-
-def update_neutron_port(neutron_client, port_id, device_owner):
- json_body = {'port': {
- 'device_owner': device_owner,
- }}
- try:
- port = neutron_client.update_port(port=port_id,
- body=json_body)
- return port['port']['id']
- except Exception, e:
- print "Error [update_neutron_port(neutron_client, '%s', '%s')]:" % \
- (port_id, device_owner), e
- return False
-
-
-def add_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
- try:
- neutron_client.add_interface_router(router=router_id, body=json_body)
- return True
- except Exception, e:
- print "Error [add_interface_router(neutron_client, '%s', '%s')]:" % \
- (router_id, subnet_id), e
- return False
-
-
-def add_gateway_router(neutron_client, router_id):
- ext_net_id = get_external_net_id(neutron_client)
- router_dict = {'network_id': ext_net_id}
- try:
- neutron_client.add_gateway_router(router_id, router_dict)
- return True
- except Exception, e:
- print "Error [add_gateway_router(neutron_client, '%s')]:" % router_id, e
- return False
-
-
-def delete_neutron_net(neutron_client, network_id):
- try:
- neutron_client.delete_network(network_id)
- return True
- except Exception, e:
- print "Error [delete_neutron_net(neutron_client, '%s')]:" % network_id, e
- return False
-
-
-def delete_neutron_subnet(neutron_client, subnet_id):
- try:
- neutron_client.delete_subnet(subnet_id)
- return True
- except Exception, e:
- print "Error [delete_neutron_subnet(neutron_client, '%s')]:" % subnet_id, e
- return False
-
-
-def delete_neutron_router(neutron_client, router_id):
- json_body = {'router': {'id': router_id}}
- try:
- neutron_client.delete_router(router=router_id)
- return True
- except Exception, e:
- print "Error [delete_neutron_router(neutron_client, '%s')]:" % \
- router_id, e
- return False
-
-
-def delete_neutron_port(neutron_client, port_id):
- try:
- neutron_client.delete_port(port_id)
- return True
- except Exception, e:
- print "Error [delete_neutron_port(neutron_client, '%s')]:" % port_id, e
- return False
-
-
-def remove_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
- try:
- neutron_client.remove_interface_router(router=router_id,
- body=json_body)
- return True
- except Exception, e:
- print "Error [remove_interface_router(neutron_client, '%s', '%s')]:" % \
- (router_id, subnet_id), e
- return False
-
-
-def remove_gateway_router(neutron_client, router_id):
- try:
- neutron_client.remove_gateway_router(router_id)
- return True
- except Exception, e:
- print "Error [remove_gateway_router(neutron_client, '%s')]:" % router_id, e
- return False
-
-
-# *********************************************
-# SEC GROUPS
-# *********************************************
-def get_security_groups(neutron_client):
- try:
- security_groups = neutron_client.list_security_groups()[
- 'security_groups']
- return security_groups
- except Exception, e:
- print "Error [get_security_groups(neutron_client)]:", e
- return None
-
-
-def get_security_group_id(neutron_client, sg_name):
- security_groups = get_security_groups(neutron_client)
- id = ''
- for sg in security_groups:
- if sg['name'] == sg_name:
- id = sg['id']
- break
- return id
-
-
-def create_security_group(neutron_client, sg_name, sg_description):
- json_body = {'security_group': {'name': sg_name,
- 'description': sg_description}}
- try:
- secgroup = neutron_client.create_security_group(json_body)
- return secgroup['security_group']
- except Exception, e:
- print "Error [create_security_group(neutron_client, '%s', '%s')]:" % \
- (sg_name, sg_description), e
- return False
-
-
-def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
- port_range_min=None, port_range_max=None):
- if port_range_min is None and port_range_max is None:
- json_body = {'security_group_rule': {'direction': direction,
- 'security_group_id': sg_id,
- 'protocol': protocol}}
- elif port_range_min is not None and port_range_max is not None:
- json_body = {'security_group_rule': {'direction': direction,
- 'security_group_id': sg_id,
- 'port_range_min': port_range_min,
- 'port_range_max': port_range_max,
- 'protocol': protocol}}
- else:
- print "Error [create_secgroup_rule(neutron_client, '%s', '%s', "\
- "'%s', '%s', '%s', '%s')]:" % (neutron_client, sg_id, direction,
- port_range_min, port_range_max, protocol),\
- " Invalid values for port_range_min, port_range_max"
- return False
- try:
- neutron_client.create_security_group_rule(json_body)
- return True
- except Exception, e:
- print "Error [create_secgroup_rule(neutron_client, '%s', '%s', "\
- "'%s', '%s', '%s', '%s')]:" % (neutron_client, sg_id, direction,
- port_range_min, port_range_max,
- protocol), e
- return False
-
-
-def add_secgroup_to_instance(nova_client, instance_id, secgroup_id):
- try:
- nova_client.servers.add_security_group(instance_id, secgroup_id)
- return True
- except Exception, e:
- print "Error [add_secgroup_to_instance(nova_client, '%s', '%s')]: " % \
- (instance_id, secgroup_id), e
- return False
-
-
-def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
- json_body = {"quota": {
- "security_group": sg_quota,
- "security_group_rule": sg_rule_quota
- }}
-
- try:
- quota = neutron_client.update_quota(tenant_id=tenant_id,
- body=json_body)
- return True
- except Exception, e:
- print "Error [update_sg_quota(neutron_client, '%s', '%s', "\
- "'%s')]:" % (tenant_id, sg_quota, sg_rule_quota), e
- return False
-
-
-def delete_security_group(neutron_client, secgroup_id):
- try:
- neutron_client.delete_security_group(secgroup_id)
- return True
- except Exception, e:
- print "Error [delete_security_group(neutron_client, '%s')]:" % secgroup_id, e
- return False
-
-
-# *********************************************
-# GLANCE
-# *********************************************
-def get_images(nova_client):
- try:
- images = nova_client.images.list()
- return images
- except Exception, e:
- print "Error [get_images]:", e
- return None
-
-
-def get_image_id(glance_client, image_name):
- images = glance_client.images.list()
- id = ''
- for i in images:
- if i.name == image_name:
- id = i.id
- break
- return id
-
-
-def create_glance_image(glance_client, image_name, file_path, public=True):
- if not os.path.isfile(file_path):
- print "Error: file " + file_path + " does not exist."
- return False
- try:
- with open(file_path) as fimage:
- image = glance_client.images.create(name=image_name,
- is_public=public,
- disk_format="qcow2",
- container_format="bare",
- data=fimage)
- return image.id
- except Exception, e:
- print "Error [create_glance_image(glance_client, '%s', '%s', "\
- "'%s')]:" % (image_name, file_path, str(public)), e
- return False
-
-
-def delete_glance_image(nova_client, image_id):
- try:
- nova_client.images.delete(image_id)
- return True
- except Exception, e:
- print "Error [delete_glance_image(nova_client, '%s')]:" % image_id, e
- return False
-
-
-# *********************************************
-# CINDER
-# *********************************************
-def get_volumes(cinder_client):
- try:
- volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
- return volumes
- except Exception, e:
- print "Error [get_volumes(cinder_client)]:", e
- return None
-
-
-def list_volume_types(cinder_client, public=True, private=True):
- try:
- volume_types = cinder_client.volume_types.list()
- if not public:
- volume_types = [vt for vt in volume_types if not vt.is_public]
- if not private:
- volume_types = [vt for vt in volume_types if vt.is_public]
- return volume_types
- except Exception, e:
- print "Error [list_volume_types(cinder_client)]:", e
- return None
-
-
-def create_volume_type(cinder_client, name):
- try:
- volume_type = cinder_client.volume_types.create(name)
- return volume_type
- except Exception, e:
- print "Error [create_volume_type(cinder_client, '%s')]:" % name, e
- return None
-
-
-def update_cinder_quota(cinder_client, tenant_id, vols_quota,
- snapshots_quota, gigabytes_quota):
- quotas_values = {"volumes": vols_quota,
- "snapshots": snapshots_quota,
- "gigabytes": gigabytes_quota}
-
- try:
- quotas_default = cinder_client.quotas.update(tenant_id,
- **quotas_values)
- return True
- except Exception, e:
- print "Error [update_cinder_quota(cinder_client, '%s', '%s', '%s'" \
- "'%s')]:" % (tenant_id, vols_quota,
- snapshots_quota, gigabytes_quota), e
- return False
-
-
-def delete_volume(cinder_client, volume_id, forced=False):
- try:
- if forced:
- try:
- cinder_client.volumes.detach(volume_id)
- except:
- print "Error:", sys.exc_info()[0]
- cinder_client.volumes.force_delete(volume_id)
- else:
- cinder_client.volumes.delete(volume_id)
- return True
- except Exception, e:
- print "Error [delete_volume(cinder_client, '%s', '%s')]:" % \
- (volume_id, str(forced)), e
- return False
-
-
-def delete_volume_type(cinder_client, volume_type):
- try:
- cinder_client.volume_types.delete(volume_type)
- return True
- except Exception, e:
- print "Error [delete_volume_type(cinder_client, '%s')]:" % volume_type, e
- return False
-
-
-# *********************************************
-# KEYSTONE
-# *********************************************
-def get_tenants(keystone_client):
- try:
- tenants = keystone_client.tenants.list()
- return tenants
- except Exception, e:
- print "Error [get_tenants(keystone_client)]:", e
- return None
-
-
-def get_users(keystone_client):
- try:
- users = keystone_client.users.list()
- return users
- except Exception, e:
- print "Error [get_users(keystone_client)]:", e
- return None
-
-
-def get_tenant_id(keystone_client, tenant_name):
- tenants = keystone_client.tenants.list()
- id = ''
- for t in tenants:
- if t.name == tenant_name:
- id = t.id
- break
- return id
-
-
-def get_user_id(keystone_client, user_name):
- users = keystone_client.users.list()
- id = ''
- for u in users:
- if u.name == user_name:
- id = u.id
- break
- return id
-
-
-def get_role_id(keystone_client, role_name):
- roles = keystone_client.roles.list()
- id = ''
- for r in roles:
- if r.name == role_name:
- id = r.id
- break
- return id
-
-
-def create_tenant(keystone_client, tenant_name, tenant_description):
- try:
- tenant = keystone_client.tenants.create(tenant_name,
- tenant_description,
- enabled=True)
- return tenant.id
- except Exception, e:
- print "Error [create_tenant(cinder_client, '%s', '%s')]:" % \
- (tenant_name, tenant_description), e
- return False
-
-
-def create_user(keystone_client, user_name, user_password,
- user_email, tenant_id):
- try:
- user = keystone_client.users.create(user_name, user_password,
- user_email, tenant_id,
- enabled=True)
- return user.id
- except Exception, e:
- print "Error [create_user(keystone_client, '%s', '%s', '%s'" \
- "'%s')]:" % (user_name, user_password, user_email, tenant_id), e
- return False
-
-
-def add_role_user(keystone_client, user_id, role_id, tenant_id):
- try:
- keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
- return True
- except Exception, e:
- print "Error [add_role_user(keystone_client, '%s', '%s'" \
- "'%s')]:" % (user_id, role_id, tenant_id), e
- return False
-
-
-def delete_tenant(keystone_client, tenant_id):
- try:
- tenant = keystone_client.tenants.delete(tenant_id)
- return True
- except Exception, e:
- print "Error [delete_tenant(keystone_client, '%s')]:" % tenant_id, e
- return False
-
-
-def delete_user(keystone_client, user_id):
- try:
- tenant = keystone_client.users.delete(user_id)
- return True
- except Exception, e:
- print "Error [delete_user(keystone_client, '%s')]:" % user_id, e
- return False
-
-
-# ----------------------------------------------------------
-#
# INTERNET UTILS
#
# -----------------------------------------------------------
diff --git a/testcases/openstack_utils.py b/testcases/openstack_utils.py
new file mode 100644
index 000000000..6ef6d061a
--- /dev/null
+++ b/testcases/openstack_utils.py
@@ -0,0 +1,757 @@
+#!/usr/bin/env python
+#
+# jose.lausuch@ericsson.com
+# valentin.boucher@orange.com
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+
+import os
+import os.path
+import sys
+
+# ----------------------------------------------------------
+#
+# OPENSTACK UTILS
+#
+# -----------------------------------------------------------
+
+
+# *********************************************
+# CREDENTIALS
+# *********************************************
+def check_credentials():
+ """
+ Check if the OpenStack credentials (openrc) are sourced
+ """
+ env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
+ return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
+
+
+def get_credentials(service):
+ """Returns a creds dictionary filled with the following keys:
+ * username
+ * password/api_key (depending on the service)
+ * tenant_name/project_id (depending on the service)
+ * auth_url
+ :param service: a string indicating the name of the service
+ requesting the credentials.
+ """
+ creds = {}
+ # Unfortunately, each of the OpenStack client will request slightly
+ # different entries in their credentials dict.
+ if service.lower() in ("nova", "cinder"):
+ password = "api_key"
+ tenant = "project_id"
+ else:
+ password = "password"
+ tenant = "tenant_name"
+
+ # The most common way to pass these info to the script is to do it through
+ # environment variables.
+ creds.update({
+ "username": os.environ.get('OS_USERNAME', "admin"),
+ password: os.environ.get("OS_PASSWORD", 'admin'),
+ "auth_url": os.environ.get("OS_AUTH_URL",
+ "http://192.168.20.71:5000/v2.0"),
+ tenant: os.environ.get("OS_TENANT_NAME", "admin"),
+ })
+ cacert = os.environ.get("OS_CACERT")
+ if cacert != None:
+ # each openstack client uses differnt kwargs for this
+ creds.update({"cacert":cacert,"ca_cert":cacert,"https_ca_cert":cacert, \
+ "https_cacert":cacert,"ca_file":cacert})
+ creds.update({"insecure":"True","https_insecure":"True"})
+ if not os.path.isfile(cacert):
+ print "WARNING: The 'OS_CACERT' environment variable is set to %s "\
+ "but the file does not exist." % cacert
+ return creds
+
+
+# *********************************************
+# NOVA
+# *********************************************
+def get_instances(nova_client):
+ try:
+ instances = nova_client.servers.list(search_opts={'all_tenants': 1})
+ return instances
+ except Exception, e:
+ print "Error [get_instances(nova_client)]:", e
+ return None
+
+
+def get_instance_status(nova_client, instance):
+ try:
+ instance = nova_client.servers.get(instance.id)
+ return instance.status
+ except:
+ # print "Error [get_instance_status(nova_client, '%s')]:" % \
+ # str(instance), e
+ return None
+
+
+def get_instance_by_name(nova_client, instance_name):
+ try:
+ instance = nova_client.servers.find(name=instance_name)
+ return instance
+ except Exception, e:
+ print "Error [get_instance_by_name(nova_client, '%s')]:" % \
+ instance_name, e
+ return None
+
+
+def get_flavor_id(nova_client, flavor_name):
+ flavors = nova_client.flavors.list(detailed=True)
+ id = ''
+ for f in flavors:
+ if f.name == flavor_name:
+ id = f.id
+ break
+ return id
+
+
+def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
+ flavors = nova_client.flavors.list(detailed=True)
+ id = ''
+ for f in flavors:
+ if min_ram <= f.ram and f.ram <= max_ram:
+ id = f.id
+ break
+ return id
+
+
+def get_floating_ips(nova_client):
+ try:
+ floating_ips = nova_client.floating_ips.list()
+ return floating_ips
+ except Exception, e:
+ print "Error [get_floating_ips(nova_client)]:", e
+ return None
+
+
+def create_flavor(nova_client, flavor_name, ram, disk, vcpus):
+ try:
+ flavor = nova_client.flavors.create(flavor_name, ram, vcpus, disk)
+ except Exception, e:
+ print "Error [create_flavor(nova_client, '%s', '%s', '%s', "\
+ "'%s')]:" % (flavor_name, ram, disk, vcpus), e
+ return None
+ return flavor.id
+
+
+def create_floating_ip(neutron_client):
+ extnet_id = get_external_net_id(neutron_client)
+ props = {'floating_network_id': extnet_id}
+ try:
+ ip_json = neutron_client.create_floatingip({'floatingip': props})
+ fip_addr = ip_json['floatingip']['floating_ip_address']
+ fip_id = ip_json['floatingip']['id']
+ except Exception, e:
+ print "Error [create_floating_ip(neutron_client)]:", e
+ return None
+ return {'fip_addr': fip_addr, 'fip_id': fip_id}
+
+
+def add_floating_ip(nova_client, server_id, floatingip_id):
+ try:
+ nova_client.servers.add_floating_ip(server_id, floatingip_id)
+ return True
+ except Exception, e:
+ print "Error [add_floating_ip(nova_client, '%s', '%s')]:" % \
+ (server_id, floatingip_id), e
+ return False
+
+
+def delete_instance(nova_client, instance_id):
+ try:
+ nova_client.servers.force_delete(instance_id)
+ return True
+ except Exception, e:
+ print "Error [delete_instance(nova_client, '%s')]:" % instance_id, e
+ return False
+
+
+def delete_floating_ip(nova_client, floatingip_id):
+ try:
+ nova_client.floating_ips.delete(floatingip_id)
+ return True
+ except Exception, e:
+ print "Error [delete_floating_ip(nova_client, '%s')]:" % floatingip_id, e
+ return False
+
+
+# *********************************************
+# NEUTRON
+# *********************************************
+def get_network_list(neutron_client):
+ network_list = neutron_client.list_networks()['networks']
+ if len(network_list) == 0:
+ return None
+ else:
+ return network_list
+
+
+def get_router_list(neutron_client):
+ router_list = neutron_client.list_routers()['routers']
+ if len(router_list) == 0:
+ return None
+ else:
+ return router_list
+
+
+def get_port_list(neutron_client):
+ port_list = neutron_client.list_ports()['ports']
+ if len(port_list) == 0:
+ return None
+ else:
+ return port_list
+
+
+def get_network_id(neutron_client, network_name):
+ networks = neutron_client.list_networks()['networks']
+ id = ''
+ for n in networks:
+ if n['name'] == network_name:
+ id = n['id']
+ break
+ return id
+
+
+def get_subnet_id(neutron_client, subnet_name):
+ subnets = neutron_client.list_subnets()['subnets']
+ id = ''
+ for s in subnets:
+ if s['name'] == subnet_name:
+ id = s['id']
+ break
+ return id
+
+
+def get_router_id(neutron_client, router_name):
+ routers = neutron_client.list_routers()['routers']
+ id = ''
+ for r in routers:
+ if r['name'] == router_name:
+ id = r['id']
+ break
+ return id
+
+
+def get_private_net(neutron_client):
+ # Checks if there is an existing shared private network
+ networks = neutron_client.list_networks()['networks']
+ if len(networks) == 0:
+ return None
+ for net in networks:
+ if (net['router:external'] is False) and (net['shared'] is True):
+ return net
+ return None
+
+
+def get_external_net(neutron_client):
+ for network in neutron_client.list_networks()['networks']:
+ if network['router:external']:
+ return network['name']
+ return False
+
+
+def get_external_net_id(neutron_client):
+ for network in neutron_client.list_networks()['networks']:
+ if network['router:external']:
+ return network['id']
+ return False
+
+
+def check_neutron_net(neutron_client, net_name):
+ for network in neutron_client.list_networks()['networks']:
+ if network['name'] == net_name:
+ for subnet in network['subnets']:
+ return True
+ return False
+
+
+def create_neutron_net(neutron_client, name):
+ json_body = {'network': {'name': name,
+ 'admin_state_up': True}}
+ try:
+ network = neutron_client.create_network(body=json_body)
+ network_dict = network['network']
+ return network_dict['id']
+ except Exception, e:
+ print "Error [create_neutron_net(neutron_client, '%s')]:" % name, e
+ return False
+
+
+def create_neutron_subnet(neutron_client, name, cidr, net_id):
+ json_body = {'subnets': [{'name': name, 'cidr': cidr,
+ 'ip_version': 4, 'network_id': net_id}]}
+ try:
+ subnet = neutron_client.create_subnet(body=json_body)
+ return subnet['subnets'][0]['id']
+ except Exception, e:
+ print "Error [create_neutron_subnet(neutron_client, '%s', '%s', "\
+ "'%s')]:" % (name, cidr, net_id), e
+ return False
+
+
+def create_neutron_router(neutron_client, name):
+ json_body = {'router': {'name': name, 'admin_state_up': True}}
+ try:
+ router = neutron_client.create_router(json_body)
+ return router['router']['id']
+ except Exception, e:
+ print "Error [create_neutron_router(neutron_client, '%s')]:" % name, e
+ return False
+
+
+def create_neutron_port(neutron_client, name, network_id, ip):
+ json_body = {'port': {
+ 'admin_state_up': True,
+ 'name': name,
+ 'network_id': network_id,
+ 'fixed_ips': [{"ip_address": ip}]
+ }}
+ try:
+ port = neutron_client.create_port(body=json_body)
+ return port['port']['id']
+ except Exception, e:
+ print "Error [create_neutron_port(neutron_client, '%s', '%s', "\
+ "'%s')]:" % (name, network_id, ip), e
+ return False
+
+
+def update_neutron_net(neutron_client, network_id, shared=False):
+ json_body = {'network': {'shared': shared}}
+ try:
+ neutron_client.update_network(network_id, body=json_body)
+ return True
+ except Exception, e:
+ print "Error [update_neutron_net(neutron_client, '%s', '%s')]:" % \
+ (network_id, str(shared)), e
+ return False
+
+
+def update_neutron_port(neutron_client, port_id, device_owner):
+ json_body = {'port': {
+ 'device_owner': device_owner,
+ }}
+ try:
+ port = neutron_client.update_port(port=port_id,
+ body=json_body)
+ return port['port']['id']
+ except Exception, e:
+ print "Error [update_neutron_port(neutron_client, '%s', '%s')]:" % \
+ (port_id, device_owner), e
+ return False
+
+
+def add_interface_router(neutron_client, router_id, subnet_id):
+ json_body = {"subnet_id": subnet_id}
+ try:
+ neutron_client.add_interface_router(router=router_id, body=json_body)
+ return True
+ except Exception, e:
+ print "Error [add_interface_router(neutron_client, '%s', '%s')]:" % \
+ (router_id, subnet_id), e
+ return False
+
+
+def add_gateway_router(neutron_client, router_id):
+ ext_net_id = get_external_net_id(neutron_client)
+ router_dict = {'network_id': ext_net_id}
+ try:
+ neutron_client.add_gateway_router(router_id, router_dict)
+ return True
+ except Exception, e:
+ print "Error [add_gateway_router(neutron_client, '%s')]:" % router_id, e
+ return False
+
+
+def delete_neutron_net(neutron_client, network_id):
+ try:
+ neutron_client.delete_network(network_id)
+ return True
+ except Exception, e:
+ print "Error [delete_neutron_net(neutron_client, '%s')]:" % network_id, e
+ return False
+
+
+def delete_neutron_subnet(neutron_client, subnet_id):
+ try:
+ neutron_client.delete_subnet(subnet_id)
+ return True
+ except Exception, e:
+ print "Error [delete_neutron_subnet(neutron_client, '%s')]:" % subnet_id, e
+ return False
+
+
+def delete_neutron_router(neutron_client, router_id):
+ json_body = {'router': {'id': router_id}}
+ try:
+ neutron_client.delete_router(router=router_id)
+ return True
+ except Exception, e:
+ print "Error [delete_neutron_router(neutron_client, '%s')]:" % \
+ router_id, e
+ return False
+
+
+def delete_neutron_port(neutron_client, port_id):
+ try:
+ neutron_client.delete_port(port_id)
+ return True
+ except Exception, e:
+ print "Error [delete_neutron_port(neutron_client, '%s')]:" % port_id, e
+ return False
+
+
+def remove_interface_router(neutron_client, router_id, subnet_id):
+ json_body = {"subnet_id": subnet_id}
+ try:
+ neutron_client.remove_interface_router(router=router_id,
+ body=json_body)
+ return True
+ except Exception, e:
+ print "Error [remove_interface_router(neutron_client, '%s', '%s')]:" % \
+ (router_id, subnet_id), e
+ return False
+
+
+def remove_gateway_router(neutron_client, router_id):
+ try:
+ neutron_client.remove_gateway_router(router_id)
+ return True
+ except Exception, e:
+ print "Error [remove_gateway_router(neutron_client, '%s')]:" % router_id, e
+ return False
+
+
+# *********************************************
+# SEC GROUPS
+# *********************************************
+def get_security_groups(neutron_client):
+ try:
+ security_groups = neutron_client.list_security_groups()[
+ 'security_groups']
+ return security_groups
+ except Exception, e:
+ print "Error [get_security_groups(neutron_client)]:", e
+ return None
+
+
+def get_security_group_id(neutron_client, sg_name):
+ security_groups = get_security_groups(neutron_client)
+ id = ''
+ for sg in security_groups:
+ if sg['name'] == sg_name:
+ id = sg['id']
+ break
+ return id
+
+
+def create_security_group(neutron_client, sg_name, sg_description):
+ json_body = {'security_group': {'name': sg_name,
+ 'description': sg_description}}
+ try:
+ secgroup = neutron_client.create_security_group(json_body)
+ return secgroup['security_group']
+ except Exception, e:
+ print "Error [create_security_group(neutron_client, '%s', '%s')]:" % \
+ (sg_name, sg_description), e
+ return False
+
+
+def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
+ port_range_min=None, port_range_max=None):
+ if port_range_min is None and port_range_max is None:
+ json_body = {'security_group_rule': {'direction': direction,
+ 'security_group_id': sg_id,
+ 'protocol': protocol}}
+ elif port_range_min is not None and port_range_max is not None:
+ json_body = {'security_group_rule': {'direction': direction,
+ 'security_group_id': sg_id,
+ 'port_range_min': port_range_min,
+ 'port_range_max': port_range_max,
+ 'protocol': protocol}}
+ else:
+ print "Error [create_secgroup_rule(neutron_client, '%s', '%s', "\
+ "'%s', '%s', '%s', '%s')]:" % (neutron_client, sg_id, direction,
+ port_range_min, port_range_max, protocol),\
+ " Invalid values for port_range_min, port_range_max"
+ return False
+ try:
+ neutron_client.create_security_group_rule(json_body)
+ return True
+ except Exception, e:
+ print "Error [create_secgroup_rule(neutron_client, '%s', '%s', "\
+ "'%s', '%s', '%s', '%s')]:" % (neutron_client, sg_id, direction,
+ port_range_min, port_range_max,
+ protocol), e
+ return False
+
+
+def add_secgroup_to_instance(nova_client, instance_id, secgroup_id):
+ try:
+ nova_client.servers.add_security_group(instance_id, secgroup_id)
+ return True
+ except Exception, e:
+ print "Error [add_secgroup_to_instance(nova_client, '%s', '%s')]: " % \
+ (instance_id, secgroup_id), e
+ return False
+
+
+def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
+ json_body = {"quota": {
+ "security_group": sg_quota,
+ "security_group_rule": sg_rule_quota
+ }}
+
+ try:
+ quota = neutron_client.update_quota(tenant_id=tenant_id,
+ body=json_body)
+ return True
+ except Exception, e:
+ print "Error [update_sg_quota(neutron_client, '%s', '%s', "\
+ "'%s')]:" % (tenant_id, sg_quota, sg_rule_quota), e
+ return False
+
+
+def delete_security_group(neutron_client, secgroup_id):
+ try:
+ neutron_client.delete_security_group(secgroup_id)
+ return True
+ except Exception, e:
+ print "Error [delete_security_group(neutron_client, '%s')]:" % secgroup_id, e
+ return False
+
+
+# *********************************************
+# GLANCE
+# *********************************************
+def get_images(nova_client):
+ try:
+ images = nova_client.images.list()
+ return images
+ except Exception, e:
+ print "Error [get_images]:", e
+ return None
+
+
+def get_image_id(glance_client, image_name):
+ images = glance_client.images.list()
+ id = ''
+ for i in images:
+ if i.name == image_name:
+ id = i.id
+ break
+ return id
+
+
+def create_glance_image(glance_client, image_name, file_path, public=True):
+ if not os.path.isfile(file_path):
+ print "Error: file " + file_path + " does not exist."
+ return False
+ try:
+ with open(file_path) as fimage:
+ image = glance_client.images.create(name=image_name,
+ is_public=public,
+ disk_format="qcow2",
+ container_format="bare",
+ data=fimage)
+ return image.id
+ except Exception, e:
+ print "Error [create_glance_image(glance_client, '%s', '%s', "\
+ "'%s')]:" % (image_name, file_path, str(public)), e
+ return False
+
+
+def delete_glance_image(nova_client, image_id):
+ try:
+ nova_client.images.delete(image_id)
+ return True
+ except Exception, e:
+ print "Error [delete_glance_image(nova_client, '%s')]:" % image_id, e
+ return False
+
+
+# *********************************************
+# CINDER
+# *********************************************
+def get_volumes(cinder_client):
+ try:
+ volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
+ return volumes
+ except Exception, e:
+ print "Error [get_volumes(cinder_client)]:", e
+ return None
+
+
+def list_volume_types(cinder_client, public=True, private=True):
+ try:
+ volume_types = cinder_client.volume_types.list()
+ if not public:
+ volume_types = [vt for vt in volume_types if not vt.is_public]
+ if not private:
+ volume_types = [vt for vt in volume_types if vt.is_public]
+ return volume_types
+ except Exception, e:
+ print "Error [list_volume_types(cinder_client)]:", e
+ return None
+
+
+def create_volume_type(cinder_client, name):
+ try:
+ volume_type = cinder_client.volume_types.create(name)
+ return volume_type
+ except Exception, e:
+ print "Error [create_volume_type(cinder_client, '%s')]:" % name, e
+ return None
+
+
+def update_cinder_quota(cinder_client, tenant_id, vols_quota,
+ snapshots_quota, gigabytes_quota):
+ quotas_values = {"volumes": vols_quota,
+ "snapshots": snapshots_quota,
+ "gigabytes": gigabytes_quota}
+
+ try:
+ quotas_default = cinder_client.quotas.update(tenant_id,
+ **quotas_values)
+ return True
+ except Exception, e:
+ print "Error [update_cinder_quota(cinder_client, '%s', '%s', '%s'" \
+ "'%s')]:" % (tenant_id, vols_quota,
+ snapshots_quota, gigabytes_quota), e
+ return False
+
+
+def delete_volume(cinder_client, volume_id, forced=False):
+ try:
+ if forced:
+ try:
+ cinder_client.volumes.detach(volume_id)
+ except:
+ print "Error:", sys.exc_info()[0]
+ cinder_client.volumes.force_delete(volume_id)
+ else:
+ cinder_client.volumes.delete(volume_id)
+ return True
+ except Exception, e:
+ print "Error [delete_volume(cinder_client, '%s', '%s')]:" % \
+ (volume_id, str(forced)), e
+ return False
+
+
+def delete_volume_type(cinder_client, volume_type):
+ try:
+ cinder_client.volume_types.delete(volume_type)
+ return True
+ except Exception, e:
+ print "Error [delete_volume_type(cinder_client, '%s')]:" % volume_type, e
+ return False
+
+
+# *********************************************
+# KEYSTONE
+# *********************************************
+def get_tenants(keystone_client):
+ try:
+ tenants = keystone_client.tenants.list()
+ return tenants
+ except Exception, e:
+ print "Error [get_tenants(keystone_client)]:", e
+ return None
+
+
+def get_users(keystone_client):
+ try:
+ users = keystone_client.users.list()
+ return users
+ except Exception, e:
+ print "Error [get_users(keystone_client)]:", e
+ return None
+
+
+def get_tenant_id(keystone_client, tenant_name):
+ tenants = keystone_client.tenants.list()
+ id = ''
+ for t in tenants:
+ if t.name == tenant_name:
+ id = t.id
+ break
+ return id
+
+
+def get_user_id(keystone_client, user_name):
+ users = keystone_client.users.list()
+ id = ''
+ for u in users:
+ if u.name == user_name:
+ id = u.id
+ break
+ return id
+
+
+def get_role_id(keystone_client, role_name):
+ roles = keystone_client.roles.list()
+ id = ''
+ for r in roles:
+ if r.name == role_name:
+ id = r.id
+ break
+ return id
+
+
+def create_tenant(keystone_client, tenant_name, tenant_description):
+ try:
+ tenant = keystone_client.tenants.create(tenant_name,
+ tenant_description,
+ enabled=True)
+ return tenant.id
+ except Exception, e:
+ print "Error [create_tenant(cinder_client, '%s', '%s')]:" % \
+ (tenant_name, tenant_description), e
+ return False
+
+
+def create_user(keystone_client, user_name, user_password,
+ user_email, tenant_id):
+ try:
+ user = keystone_client.users.create(user_name, user_password,
+ user_email, tenant_id,
+ enabled=True)
+ return user.id
+ except Exception, e:
+ print "Error [create_user(keystone_client, '%s', '%s', '%s'" \
+ "'%s')]:" % (user_name, user_password, user_email, tenant_id), e
+ return False
+
+
+def add_role_user(keystone_client, user_id, role_id, tenant_id):
+ try:
+ keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
+ return True
+ except Exception, e:
+ print "Error [add_role_user(keystone_client, '%s', '%s'" \
+ "'%s')]:" % (user_id, role_id, tenant_id), e
+ return False
+
+
+def delete_tenant(keystone_client, tenant_id):
+ try:
+ tenant = keystone_client.tenants.delete(tenant_id)
+ return True
+ except Exception, e:
+ print "Error [delete_tenant(keystone_client, '%s')]:" % tenant_id, e
+ return False
+
+
+def delete_user(keystone_client, user_id):
+ try:
+ tenant = keystone_client.users.delete(user_id)
+ return True
+ except Exception, e:
+ print "Error [delete_user(keystone_client, '%s')]:" % user_id, e
+ return False
diff --git a/testcases/vIMS/CI/vIMS.py b/testcases/vIMS/CI/vIMS.py
index 290dc982d..3eef5b381 100644
--- a/testcases/vIMS/CI/vIMS.py
+++ b/testcases/vIMS/CI/vIMS.py
@@ -63,6 +63,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -112,7 +113,7 @@ def download_and_add_image_on_glance(glance, image_name, image_url):
logger.error("Failed to download image %s" % file_name)
return False
- image = functest_utils.create_glance_image(
+ image = openstack_utils.create_glance_image(
glance, image_name, dest_path + file_name)
if not image:
logger.error("Failed to upload image on glance")
@@ -271,19 +272,19 @@ def main():
if not os.path.exists(VIMS_DATA_DIR):
os.makedirs(VIMS_DATA_DIR)
- ks_creds = functest_utils.get_credentials("keystone")
- nv_creds = functest_utils.get_credentials("nova")
- nt_creds = functest_utils.get_credentials("neutron")
+ ks_creds = openstack_utils.get_credentials("keystone")
+ nv_creds = openstack_utils.get_credentials("nova")
+ nt_creds = openstack_utils.get_credentials("neutron")
logger.info("Prepare OpenStack plateform (create tenant and user)")
keystone = ksclient.Client(**ks_creds)
- user_id = functest_utils.get_user_id(keystone, ks_creds['username'])
+ user_id = openstack_utils.get_user_id(keystone, ks_creds['username'])
if user_id == '':
step_failure("init", "Error : Failed to get id of " +
ks_creds['username'])
- tenant_id = functest_utils.create_tenant(
+ tenant_id = openstack_utils.create_tenant(
keystone, TENANT_NAME, TENANT_DESCRIPTION)
if tenant_id == '':
step_failure("init", "Error : Failed to create " +
@@ -293,16 +294,16 @@ def main():
role_id = ''
for role_name in roles_name:
if role_id == '':
- role_id = functest_utils.get_role_id(keystone, role_name)
+ role_id = openstack_utils.get_role_id(keystone, role_name)
if role_id == '':
logger.error("Error : Failed to get id for %s role" % role_name)
- if not functest_utils.add_role_user(keystone, user_id, role_id, tenant_id):
+ if not openstack_utils.add_role_user(keystone, user_id, role_id, tenant_id):
logger.error("Error : Failed to add %s on tenant" %
ks_creds['username'])
- user_id = functest_utils.create_user(
+ user_id = openstack_utils.create_user(
keystone, TENANT_NAME, TENANT_NAME, None, tenant_id)
if user_id == '':
logger.error("Error : Failed to create %s user" % TENANT_NAME)
@@ -331,7 +332,7 @@ def main():
image_name = IMAGES[img]['image_name']
image_url = IMAGES[img]['image_url']
- image_id = functest_utils.get_image_id(glance, image_name)
+ image_id = openstack_utils.get_image_id(glance, image_name)
if image_id == '':
logger.info("""%s image doesn't exist on glance repository.
@@ -347,20 +348,20 @@ def main():
logger.info("Update security group quota for this tenant")
neutron = ntclient.Client(**nt_creds)
- if not functest_utils.update_sg_quota(neutron, tenant_id, 50, 100):
+ if not openstack_utils.update_sg_quota(neutron, tenant_id, 50, 100):
step_failure(
"init", "Failed to update security group quota for tenant " + TENANT_NAME)
logger.info("Update cinder quota for this tenant")
from cinderclient import client as cinderclient
- creds_cinder = functest_utils.get_credentials("cinder")
+ creds_cinder = openstack_utils.get_credentials("cinder")
cinder_client = cinderclient.Client('1', creds_cinder['username'],
creds_cinder['api_key'],
creds_cinder['project_id'],
creds_cinder['auth_url'],
service_type="volume")
- if not functest_utils.update_cinder_quota(cinder_client, tenant_id, 20, 10, 150):
+ if not openstack_utils.update_cinder_quota(cinder_client, tenant_id, 20, 10, 150):
step_failure(
"init", "Failed to update cinder quota for tenant " + TENANT_NAME)
@@ -375,16 +376,16 @@ def main():
nova = nvclient.Client("2", **nv_creds)
flavor_name = "m1.medium"
- flavor_id = functest_utils.get_flavor_id(nova, flavor_name)
+ flavor_id = openstack_utils.get_flavor_id(nova, flavor_name)
for requirement in CFY_MANAGER_REQUIERMENTS:
if requirement == 'ram_min':
- flavor_id = functest_utils.get_flavor_id_by_ram_range(
+ flavor_id = openstack_utils.get_flavor_id_by_ram_range(
nova, CFY_MANAGER_REQUIERMENTS['ram_min'], 8196)
if flavor_id == '':
logger.error(
"Failed to find %s flavor. Try with ram range default requirement !" % flavor_name)
- flavor_id = functest_utils.get_flavor_id_by_ram_range(nova, 4000, 8196)
+ flavor_id = openstack_utils.get_flavor_id_by_ram_range(nova, 4000, 8196)
if flavor_id == '':
step_failure("orchestrator",
@@ -393,10 +394,10 @@ def main():
cfy.set_flavor_id(flavor_id)
image_name = "centos_7"
- image_id = functest_utils.get_image_id(glance, image_name)
+ image_id = openstack_utils.get_image_id(glance, image_name)
for requirement in CFY_MANAGER_REQUIERMENTS:
if requirement == 'os_image':
- image_id = functest_utils.get_image_id(
+ image_id = openstack_utils.get_image_id(
glance, CFY_MANAGER_REQUIERMENTS['os_image'])
if image_id == '':
@@ -405,7 +406,7 @@ def main():
cfy.set_image_id(image_id)
- ext_net = functest_utils.get_external_net(neutron)
+ ext_net = openstack_utils.get_external_net(neutron)
if not ext_net:
step_failure("orchestrator", "Failed to get external network")
@@ -449,16 +450,16 @@ def main():
nova = nvclient.Client("2", **nv_creds)
flavor_name = "m1.small"
- flavor_id = functest_utils.get_flavor_id(nova, flavor_name)
+ flavor_id = openstack_utils.get_flavor_id(nova, flavor_name)
for requirement in CW_REQUIERMENTS:
if requirement == 'ram_min':
- flavor_id = functest_utils.get_flavor_id_by_ram_range(
+ flavor_id = openstack_utils.get_flavor_id_by_ram_range(
nova, CW_REQUIERMENTS['ram_min'], 8196)
if flavor_id == '':
logger.error(
"Failed to find %s flavor. Try with ram range default requirement !" % flavor_name)
- flavor_id = functest_utils.get_flavor_id_by_ram_range(nova, 4000, 8196)
+ flavor_id = openstack_utils.get_flavor_id_by_ram_range(nova, 4000, 8196)
if flavor_id == '':
step_failure(
@@ -467,10 +468,10 @@ def main():
cw.set_flavor_id(flavor_id)
image_name = "ubuntu_14.04"
- image_id = functest_utils.get_image_id(glance, image_name)
+ image_id = openstack_utils.get_image_id(glance, image_name)
for requirement in CW_REQUIERMENTS:
if requirement == 'os_image':
- image_id = functest_utils.get_image_id(
+ image_id = openstack_utils.get_image_id(
glance, CW_REQUIERMENTS['os_image'])
if image_id == '':
@@ -479,7 +480,7 @@ def main():
cw.set_image_id(image_id)
- ext_net = functest_utils.get_external_net(neutron)
+ ext_net = openstack_utils.get_external_net(neutron)
if not ext_net:
step_failure("vIMS", "Failed to get external network")
@@ -518,29 +519,29 @@ def main():
if args.noclean:
exit(0)
- ks_creds = functest_utils.get_credentials("keystone")
+ ks_creds = openstack_utils.get_credentials("keystone")
keystone = ksclient.Client(**ks_creds)
logger.info("Removing %s tenant .." % CFY_INPUTS['keystone_tenant_name'])
- tenant_id = functest_utils.get_tenant_id(
+ tenant_id = openstack_utils.get_tenant_id(
keystone, CFY_INPUTS['keystone_tenant_name'])
if tenant_id == '':
logger.error("Error : Failed to get id of %s tenant" %
CFY_INPUTS['keystone_tenant_name'])
else:
- if not functest_utils.delete_tenant(keystone, tenant_id):
+ if not openstack_utils.delete_tenant(keystone, tenant_id):
logger.error("Error : Failed to remove %s tenant" %
CFY_INPUTS['keystone_tenant_name'])
logger.info("Removing %s user .." % CFY_INPUTS['keystone_username'])
- user_id = functest_utils.get_user_id(
+ user_id = openstack_utils.get_user_id(
keystone, CFY_INPUTS['keystone_username'])
if user_id == '':
logger.error("Error : Failed to get id of %s user" %
CFY_INPUTS['keystone_username'])
else:
- if not functest_utils.delete_user(keystone, user_id):
+ if not openstack_utils.delete_user(keystone, user_id):
logger.error("Error : Failed to remove %s user" %
CFY_INPUTS['keystone_username'])
diff --git a/testcases/vPing/CI/libraries/vPing_ssh.py b/testcases/vPing/CI/libraries/vPing_ssh.py
index b45d23be8..7adf8a23d 100644
--- a/testcases/vPing/CI/libraries/vPing_ssh.py
+++ b/testcases/vPing/CI/libraries/vPing_ssh.py
@@ -69,6 +69,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -122,7 +123,7 @@ def waitVmActive(nova, vm):
sleep_time = 3
count = VM_BOOT_TIMEOUT / sleep_time
while True:
- status = functest_utils.get_instance_status(nova, vm)
+ status = openstack_utils.get_instance_status(nova, vm)
logger.debug("Status: %s" % status)
if status == "ACTIVE":
return True
@@ -142,7 +143,7 @@ def waitVmDeleted(nova, vm):
sleep_time = 3
count = VM_DELETE_TIMEOUT / sleep_time
while True:
- status = functest_utils.get_instance_status(nova, vm)
+ status = openstack_utils.get_instance_status(nova, vm)
if not status:
return True
elif count == 0:
@@ -158,23 +159,23 @@ def waitVmDeleted(nova, vm):
def create_private_neutron_net(neutron):
# Check if the network already exists
- network_id = functest_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
- subnet_id = functest_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
- router_id = functest_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
+ network_id = openstack_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
+ subnet_id = openstack_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
+ router_id = openstack_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
if network_id != '' and subnet_id != '' and router_id != '':
logger.info("Using existing network '%s'..." % NEUTRON_PRIVATE_NET_NAME)
else:
neutron.format = 'json'
logger.info('Creating neutron network %s...' % NEUTRON_PRIVATE_NET_NAME)
- network_id = functest_utils. \
+ network_id = openstack_utils. \
create_neutron_net(neutron, NEUTRON_PRIVATE_NET_NAME)
if not network_id:
return False
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
- subnet_id = functest_utils. \
+ subnet_id = openstack_utils. \
create_neutron_subnet(neutron,
NEUTRON_PRIVATE_SUBNET_NAME,
NEUTRON_PRIVATE_SUBNET_CIDR,
@@ -183,7 +184,7 @@ def create_private_neutron_net(neutron):
return False
logger.debug("Subnet '%s' created successfully" % subnet_id)
logger.debug('Creating Router...')
- router_id = functest_utils. \
+ router_id = openstack_utils. \
create_neutron_router(neutron, NEUTRON_ROUTER_NAME)
if not router_id:
@@ -192,12 +193,12 @@ def create_private_neutron_net(neutron):
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not functest_utils.add_interface_router(neutron, router_id, subnet_id):
+ if not openstack_utils.add_interface_router(neutron, router_id, subnet_id):
return False
logger.debug("Interface added successfully.")
logger.debug('Adding gateway to router...')
- if not functest_utils.add_gateway_router(neutron, router_id):
+ if not openstack_utils.add_gateway_router(neutron, router_id):
return False
logger.debug("Gateway added successfully.")
@@ -208,12 +209,12 @@ def create_private_neutron_net(neutron):
def create_security_group(neutron_client):
- sg_id = functest_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
+ sg_id = openstack_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
if sg_id != '':
logger.info("Using existing security group '%s'..." % SECGROUP_NAME)
else:
logger.info("Creating security group '%s'..." % SECGROUP_NAME)
- SECGROUP = functest_utils.create_security_group(neutron_client,
+ SECGROUP = openstack_utils.create_security_group(neutron_client,
SECGROUP_NAME,
SECGROUP_DESCR)
if not SECGROUP:
@@ -226,18 +227,18 @@ def create_security_group(neutron_client):
(SECGROUP['name'], sg_id))
logger.debug("Adding ICMP rules in security group '%s'..." % SECGROUP_NAME)
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return False
logger.debug("Adding SSH rules in security group '%s'..." % SECGROUP_NAME)
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'ingress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return False
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return False
@@ -253,10 +254,10 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
logger.info("Cleaning up...")
if not image_exists:
logger.debug("Deleting image...")
- if not functest_utils.delete_glance_image(nova, image_id):
+ if not openstack_utils.delete_glance_image(nova, image_id):
logger.error("Error deleting the glance image")
- vm1 = functest_utils.get_instance_by_name(nova, NAME_VM_1)
+ vm1 = openstack_utils.get_instance_by_name(nova, NAME_VM_1)
if vm1:
logger.debug("Deleting '%s'..." % NAME_VM_1)
nova.servers.delete(vm1)
@@ -264,11 +265,11 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
if not waitVmDeleted(nova, vm1):
logger.error(
"Instance '%s' with cannot be deleted. Status is '%s'" % (
- NAME_VM_1, functest_utils.get_instance_status(nova, vm1)))
+ NAME_VM_1, openstack_utils.get_instance_status(nova, vm1)))
else:
logger.debug("Instance %s terminated." % NAME_VM_1)
- vm2 = functest_utils.get_instance_by_name(nova, NAME_VM_2)
+ vm2 = openstack_utils.get_instance_by_name(nova, NAME_VM_2)
if vm2:
logger.debug("Deleting '%s'..." % NAME_VM_2)
@@ -278,7 +279,7 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
if not waitVmDeleted(nova, vm2):
logger.error(
"Instance '%s' with cannot be deleted. Status is '%s'" % (
- NAME_VM_2, functest_utils.get_instance_status(nova, vm2)))
+ NAME_VM_2, openstack_utils.get_instance_status(nova, vm2)))
else:
logger.debug("Instance %s terminated." % NAME_VM_2)
@@ -288,41 +289,41 @@ def cleanup(nova, neutron, image_id, network_dic, sg_id, floatingip):
subnet_id = network_dic["subnet_id"]
router_id = network_dic["router_id"]
- if not functest_utils.remove_interface_router(neutron, router_id,
+ if not openstack_utils.remove_interface_router(neutron, router_id,
subnet_id):
logger.error("Unable to remove subnet '%s' from router '%s'" % (
subnet_id, router_id))
return False
logger.debug("Interface removed successfully")
- if not functest_utils.delete_neutron_router(neutron, router_id):
+ if not openstack_utils.delete_neutron_router(neutron, router_id):
logger.error("Unable to delete router '%s'" % router_id)
return False
logger.debug("Router deleted successfully")
- if not functest_utils.delete_neutron_subnet(neutron, subnet_id):
+ if not openstack_utils.delete_neutron_subnet(neutron, subnet_id):
logger.error("Unable to delete subnet '%s'" % subnet_id)
return False
logger.debug(
"Subnet '%s' deleted successfully" % NEUTRON_PRIVATE_SUBNET_NAME)
- if not functest_utils.delete_neutron_net(neutron, net_id):
+ if not openstack_utils.delete_neutron_net(neutron, net_id):
logger.error("Unable to delete network '%s'" % net_id)
return False
logger.debug(
"Network '%s' deleted successfully" % NEUTRON_PRIVATE_NET_NAME)
- if not functest_utils.delete_security_group(neutron, sg_id):
+ if not openstack_utils.delete_security_group(neutron, sg_id):
logger.error("Unable to delete security group '%s'" % sg_id)
return False
logger.debug(
"Security group '%s' deleted successfully" % sg_id)
logger.debug("Releasing floating ip '%s'..." % floatingip['fip_addr'])
- if not functest_utils.delete_floating_ip(nova, floatingip['fip_id']):
+ if not openstack_utils.delete_floating_ip(nova, floatingip['fip_id']):
logger.error("Unable to delete floatingip '%s'" % floatingip['fip_addr'])
return False
logger.debug(
@@ -354,11 +355,11 @@ def push_results(start_time_ts, duration, test_status):
def main():
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2', **creds_nova)
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
@@ -370,7 +371,7 @@ def main():
flavor = None
# Check if the given image exists
- image_id = functest_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
+ image_id = openstack_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
if image_id != '':
logger.info("Using existing image '%s'..." % GLANCE_IMAGE_NAME)
global image_exists
@@ -378,7 +379,7 @@ def main():
else:
logger.info("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
- image_id = functest_utils.create_glance_image(glance_client,
+ image_id = openstack_utils.create_glance_image(glance_client,
GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH)
if not image_id:
@@ -435,7 +436,7 @@ def main():
# wait until VM status is active
if not waitVmActive(nova_client, vm1):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
- NAME_VM_1, functest_utils.get_instance_status(nova_client, vm1)))
+ NAME_VM_1, openstack_utils.get_instance_status(nova_client, vm1)))
cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatingip)
return (EXIT_CODE)
else:
@@ -446,7 +447,7 @@ def main():
logger.debug("Instance '%s' got private ip '%s'." % (NAME_VM_1, test_ip))
logger.info("Adding '%s' to security group '%s'..." % (NAME_VM_1, SECGROUP_NAME))
- functest_utils.add_secgroup_to_instance(nova_client, vm1.id, sg_id)
+ openstack_utils.add_secgroup_to_instance(nova_client, vm1.id, sg_id)
# boot VM 2
logger.info("Creating instance '%s'..." % NAME_VM_2)
@@ -462,17 +463,17 @@ def main():
if not waitVmActive(nova_client, vm2):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
- NAME_VM_2, functest_utils.get_instance_status(nova_client, vm2)))
+ NAME_VM_2, openstack_utils.get_instance_status(nova_client, vm2)))
cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
return (EXIT_CODE)
else:
logger.info("Instance '%s' is ACTIVE." % NAME_VM_2)
logger.info("Adding '%s' to security group '%s'..." % (NAME_VM_2, SECGROUP_NAME))
- functest_utils.add_secgroup_to_instance(nova_client, vm2.id, sg_id)
+ openstack_utils.add_secgroup_to_instance(nova_client, vm2.id, sg_id)
logger.info("Creating floating IP for VM '%s'..." % NAME_VM_2)
- floatip_dic = functest_utils.create_floating_ip(neutron_client)
+ floatip_dic = openstack_utils.create_floating_ip(neutron_client)
floatip = floatip_dic['fip_addr']
floatip_id = floatip_dic['fip_id']
@@ -483,7 +484,7 @@ def main():
logger.info("Floating IP created: '%s'" % floatip)
logger.info("Associating floating ip: '%s' to VM '%s' " % (floatip, NAME_VM_2))
- if not functest_utils.add_floating_ip(nova_client, vm2.id, floatip):
+ if not openstack_utils.add_floating_ip(nova_client, vm2.id, floatip):
logger.error("Cannot associate floating IP to VM.")
cleanup(nova_client, neutron_client, image_id, network_dic, sg_id, floatip_dic)
return (EXIT_CODE)
diff --git a/testcases/vPing/CI/libraries/vPing_userdata.py b/testcases/vPing/CI/libraries/vPing_userdata.py
index 3af65e348..9ffb56be3 100644
--- a/testcases/vPing/CI/libraries/vPing_userdata.py
+++ b/testcases/vPing/CI/libraries/vPing_userdata.py
@@ -66,6 +66,7 @@ if not os.path.exists(REPO_PATH):
exit(-1)
sys.path.append(REPO_PATH + "testcases/")
import functest_utils
+import openstack_utils
with open("/home/opnfv/functest/conf/config_functest.yaml") as f:
functest_yaml = yaml.safe_load(f)
@@ -119,7 +120,7 @@ def waitVmActive(nova, vm):
sleep_time = 3
count = VM_BOOT_TIMEOUT / sleep_time
while True:
- status = functest_utils.get_instance_status(nova, vm)
+ status = openstack_utils.get_instance_status(nova, vm)
logger.debug("Status: %s" % status)
if status == "ACTIVE":
return True
@@ -139,7 +140,7 @@ def waitVmDeleted(nova, vm):
sleep_time = 3
count = VM_DELETE_TIMEOUT / sleep_time
while True:
- status = functest_utils.get_instance_status(nova, vm)
+ status = openstack_utils.get_instance_status(nova, vm)
if not status:
return True
elif count == 0:
@@ -155,23 +156,23 @@ def waitVmDeleted(nova, vm):
def create_private_neutron_net(neutron):
# Check if the network already exists
- network_id = functest_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
- subnet_id = functest_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
- router_id = functest_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
+ network_id = openstack_utils.get_network_id(neutron, NEUTRON_PRIVATE_NET_NAME)
+ subnet_id = openstack_utils.get_subnet_id(neutron, NEUTRON_PRIVATE_SUBNET_NAME)
+ router_id = openstack_utils.get_router_id(neutron, NEUTRON_ROUTER_NAME)
if network_id != '' and subnet_id != '' and router_id != '':
logger.info("Using existing network '%s'.." % NEUTRON_PRIVATE_NET_NAME)
else:
neutron.format = 'json'
logger.info('Creating neutron network %s..' % NEUTRON_PRIVATE_NET_NAME)
- network_id = functest_utils. \
+ network_id = openstack_utils. \
create_neutron_net(neutron, NEUTRON_PRIVATE_NET_NAME)
if not network_id:
return False
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
- subnet_id = functest_utils. \
+ subnet_id = openstack_utils. \
create_neutron_subnet(neutron,
NEUTRON_PRIVATE_SUBNET_NAME,
NEUTRON_PRIVATE_SUBNET_CIDR,
@@ -180,7 +181,7 @@ def create_private_neutron_net(neutron):
return False
logger.debug("Subnet '%s' created successfully" % subnet_id)
logger.debug('Creating Router...')
- router_id = functest_utils. \
+ router_id = openstack_utils. \
create_neutron_router(neutron, NEUTRON_ROUTER_NAME)
if not router_id:
@@ -189,12 +190,12 @@ def create_private_neutron_net(neutron):
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not functest_utils.add_interface_router(neutron, router_id, subnet_id):
+ if not openstack_utils.add_interface_router(neutron, router_id, subnet_id):
return False
logger.debug("Interface added successfully.")
logger.debug('Adding gateway to router...')
- if not functest_utils.add_gateway_router(neutron, router_id):
+ if not openstack_utils.add_gateway_router(neutron, router_id):
return False
logger.debug("Gateway added successfully.")
@@ -205,12 +206,12 @@ def create_private_neutron_net(neutron):
def create_security_group(neutron_client):
- sg_id = functest_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
+ sg_id = openstack_utils.get_security_group_id(neutron_client, SECGROUP_NAME)
if sg_id != '':
logger.info("Using existing security group '%s'..." % SECGROUP_NAME)
else:
logger.info("Creating security group '%s'..." % SECGROUP_NAME)
- SECGROUP = functest_utils.create_security_group(neutron_client,
+ SECGROUP = openstack_utils.create_security_group(neutron_client,
SECGROUP_NAME,
SECGROUP_DESCR)
if not SECGROUP:
@@ -223,19 +224,19 @@ def create_security_group(neutron_client):
(SECGROUP['name'], sg_id))
logger.debug("Adding ICMP rules in security group '%s'..." % SECGROUP_NAME)
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return False
logger.debug("Adding SSH rules in security group '%s'..." % SECGROUP_NAME)
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'ingress', 'tcp',
'22', '22'):
logger.error("Failed to create the security group rule...")
return False
- if not functest_utils.create_secgroup_rule(neutron_client, sg_id, \
+ if not openstack_utils.create_secgroup_rule(neutron_client, sg_id, \
'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return False
@@ -251,10 +252,10 @@ def cleanup(nova, neutron, image_id, network_dic):
logger.info("Cleaning up...")
if not image_exists:
logger.debug("Deleting image...")
- if not functest_utils.delete_glance_image(nova, image_id):
+ if not openstack_utils.delete_glance_image(nova, image_id):
logger.error("Error deleting the glance image")
- vm1 = functest_utils.get_instance_by_name(nova, NAME_VM_1)
+ vm1 = openstack_utils.get_instance_by_name(nova, NAME_VM_1)
if vm1:
logger.debug("Deleting '%s'..." % NAME_VM_1)
nova.servers.delete(vm1)
@@ -262,11 +263,11 @@ def cleanup(nova, neutron, image_id, network_dic):
if not waitVmDeleted(nova, vm1):
logger.error(
"Instance '%s' with cannot be deleted. Status is '%s'" % (
- NAME_VM_1, functest_utils.get_instance_status(nova, vm1)))
+ NAME_VM_1, openstack_utils.get_instance_status(nova, vm1)))
else:
logger.debug("Instance %s terminated." % NAME_VM_1)
- vm2 = functest_utils.get_instance_by_name(nova, NAME_VM_2)
+ vm2 = openstack_utils.get_instance_by_name(nova, NAME_VM_2)
if vm2:
logger.debug("Deleting '%s'..." % NAME_VM_2)
@@ -276,7 +277,7 @@ def cleanup(nova, neutron, image_id, network_dic):
if not waitVmDeleted(nova, vm2):
logger.error(
"Instance '%s' with cannot be deleted. Status is '%s'" % (
- NAME_VM_2, functest_utils.get_instance_status(nova, vm2)))
+ NAME_VM_2, openstack_utils.get_instance_status(nova, vm2)))
else:
logger.debug("Instance %s terminated." % NAME_VM_2)
@@ -286,27 +287,27 @@ def cleanup(nova, neutron, image_id, network_dic):
subnet_id = network_dic["subnet_id"]
router_id = network_dic["router_id"]
- if not functest_utils.remove_interface_router(neutron, router_id,
+ if not openstack_utils.remove_interface_router(neutron, router_id,
subnet_id):
logger.error("Unable to remove subnet '%s' from router '%s'" % (
subnet_id, router_id))
return False
logger.debug("Interface removed successfully")
- if not functest_utils.delete_neutron_router(neutron, router_id):
+ if not openstack_utils.delete_neutron_router(neutron, router_id):
logger.error("Unable to delete router '%s'" % router_id)
return False
logger.debug("Router deleted successfully")
- if not functest_utils.delete_neutron_subnet(neutron, subnet_id):
+ if not openstack_utils.delete_neutron_subnet(neutron, subnet_id):
logger.error("Unable to delete subnet '%s'" % subnet_id)
return False
logger.debug(
"Subnet '%s' deleted successfully" % NEUTRON_PRIVATE_SUBNET_NAME)
- if not functest_utils.delete_neutron_net(neutron, net_id):
+ if not openstack_utils.delete_neutron_net(neutron, net_id):
logger.error("Unable to delete network '%s'" % net_id)
return False
@@ -340,11 +341,11 @@ def push_results(start_time_ts, duration, test_status):
def main():
- creds_nova = functest_utils.get_credentials("nova")
+ creds_nova = openstack_utils.get_credentials("nova")
nova_client = novaclient.Client('2', **creds_nova)
- creds_neutron = functest_utils.get_credentials("neutron")
+ creds_neutron = openstack_utils.get_credentials("neutron")
neutron_client = neutronclient.Client(**creds_neutron)
- creds_keystone = functest_utils.get_credentials("keystone")
+ creds_keystone = openstack_utils.get_credentials("keystone")
keystone_client = keystoneclient.Client(**creds_keystone)
glance_endpoint = keystone_client.service_catalog.url_for(service_type='image',
endpoint_type='publicURL')
@@ -356,7 +357,7 @@ def main():
flavor = None
# Check if the given image exists
- image_id = functest_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
+ image_id = openstack_utils.get_image_id(glance_client, GLANCE_IMAGE_NAME)
if image_id != '':
logger.info("Using existing image '%s'..." % GLANCE_IMAGE_NAME)
global image_exists
@@ -364,7 +365,7 @@ def main():
else:
logger.info("Creating image '%s' from '%s'..." % (GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH))
- image_id = functest_utils.create_glance_image(glance_client,
+ image_id = openstack_utils.create_glance_image(glance_client,
GLANCE_IMAGE_NAME,
GLANCE_IMAGE_PATH)
if not image_id:
@@ -427,7 +428,7 @@ def main():
if not waitVmActive(nova_client, vm1):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
- NAME_VM_1, functest_utils.get_instance_status(nova_client, vm1)))
+ NAME_VM_1, openstack_utils.get_instance_status(nova_client, vm1)))
cleanup(nova_client, neutron_client, image_id, network_dic)
return (EXIT_CODE)
else:
@@ -463,7 +464,7 @@ def main():
if not waitVmActive(nova_client, vm2):
logger.error("Instance '%s' cannot be booted. Status is '%s'" % (
- NAME_VM_2, functest_utils.get_instance_status(nova_client, vm2)))
+ NAME_VM_2, openstack_utils.get_instance_status(nova_client, vm2)))
cleanup(nova_client, neutron_client, image_id, network_dic,
port_id1, port_id2)
return (EXIT_CODE)