diff options
Diffstat (limited to 'testcases/functest_utils.py')
-rw-r--r-- | testcases/functest_utils.py | 143 |
1 files changed, 131 insertions, 12 deletions
diff --git a/testcases/functest_utils.py b/testcases/functest_utils.py index 0734ccbde..888c043fd 100644 --- a/testcases/functest_utils.py +++ b/testcases/functest_utils.py @@ -25,7 +25,7 @@ def check_credentials(): """ Check if the OpenStack credentials (openrc) are sourced """ - env_vars = ['OS_AUTH_URL','OS_USERNAME','OS_PASSWORD','OS_TENANT_NAME'] + env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME'] return all(map(lambda v: v in os.environ and os.environ[v], env_vars)) @@ -69,6 +69,7 @@ def get_instances(nova_client): except: return None + def get_instance_status(nova_client, instance): try: instance = nova_client.servers.get(instance.id) @@ -76,6 +77,7 @@ def get_instance_status(nova_client, instance): except: return None + def get_instance_by_name(nova_client, instance_name): try: instance = nova_client.servers.find(name=instance_name) @@ -84,7 +86,6 @@ def get_instance_by_name(nova_client, instance_name): return None - def get_flavor_id(nova_client, flavor_name): flavors = nova_client.flavors.list(detailed=True) id = '' @@ -121,6 +122,7 @@ def get_floating_ips(nova_client): except: return None + def delete_floating_ip(nova_client, floatingip_id): try: nova_client.floating_ips.delete(floatingip_id) @@ -222,6 +224,7 @@ def remove_interface_router(neutron_client, router_id, subnet_id): print "Error:", sys.exc_info()[0] return False + def remove_gateway_router(neutron_client, router_id): try: neutron_client.remove_gateway_router(router_id) @@ -301,6 +304,7 @@ def get_router_list(neutron_client): else: return router_list + def get_port_list(neutron_client): port_list = neutron_client.list_ports()['ports'] if len(port_list) == 0: @@ -309,7 +313,6 @@ def get_port_list(neutron_client): return port_list - def get_external_net(neutron_client): for network in neutron_client.list_networks()['networks']: if network['router:external']: @@ -331,30 +334,33 @@ def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota): print "Error:", sys.exc_info()[0] return False -def update_cinder_quota(cinder_client, tenant_id, vols_quota, snapshots_quota,gigabytes_quota): - quotas_values = { - "volumes": vols_quota, - "snapshots": snapshots_quota, - "gigabytes": gigabytes_quota - } + +def update_cinder_quota(cinder_client, tenant_id, vols_quota, + snapshots_quota, gigabytes_quota): + quotas_values = {"volumes": vols_quota, + "snapshots": snapshots_quota, + "gigabytes": gigabytes_quota} try: - quotas_default=cinder_client.quotas.update(tenant_id,**quotas_values) + quotas_default = cinder_client. quotas.update(tenant_id, + **quotas_values) return True except: print "Error:", sys.exc_info()[0] return False + def get_private_net(neutron_client): # Checks if there is an existing shared private network networks = neutron_client.list_networks()['networks'] if len(networks) == 0: return None for net in networks: - if (net['router:external'] == False) and (net['shared'] == True): + if (net['router:external'] is False) and (net['shared'] is True): return net return None + # ################ GLANCE ################# def get_images(nova_client): try: @@ -390,6 +396,7 @@ def create_glance_image(glance_client, image_name, file_path, public=True): print "Error:", sys.exc_info()[0] return False + def delete_glance_image(nova_client, image_id): try: nova_client.images.delete(image_id) @@ -398,6 +405,7 @@ def delete_glance_image(nova_client, image_id): print "Error:", sys.exc_info()[0] return False + # ################ CINDER ################# def get_volumes(cinder_client): try: @@ -406,6 +414,7 @@ def get_volumes(cinder_client): except: return None + def delete_volume(cinder_client, volume_id): try: cinder_client.volumes.delete(volume_id) @@ -414,6 +423,7 @@ def delete_volume(cinder_client, volume_id): print "Error:", sys.exc_info()[0] return False + # ################ CINDER ################# def get_security_groups(neutron_client): try: @@ -422,6 +432,7 @@ def get_security_groups(neutron_client): except: return None + def delete_security_group(neutron_client, secgroup_id): try: neutron_client.delete_security_group(secgroup_id) @@ -449,6 +460,7 @@ def get_tenant_id(keystone_client, tenant_name): break return id + def get_users(keystone_client): try: users = keystone_client.users.list() @@ -456,6 +468,7 @@ def get_users(keystone_client): except: return None + def get_role_id(keystone_client, role_name): roles = keystone_client.roles.list() id = '' @@ -611,7 +624,8 @@ def get_pod_name(logger=None): return "unknown-pod" -def push_results_to_db(db_url, case_name, logger, pod_name, git_version, payload): +def push_results_to_db(db_url, case_name, logger, pod_name, + git_version, payload): url = db_url + "/results" installer = get_installer_type(logger) params = {"project_name": "functest", "case_name": case_name, @@ -626,3 +640,108 @@ def push_results_to_db(db_url, case_name, logger, pod_name, git_version, payload except: print "Error:", sys.exc_info()[0] return False + + +def getTestEnv(test, functest_yaml): + # get the config of the testcase based on functest_config.yaml + # 2 options + # - test = test project e.g; ovno + # - test = testcase e.g. functest/odl + # look for the / to see if it is a test project or a testcase + try: + TEST_ENV = functest_yaml.get("test-dependencies") + + if test.find("/") < 0: + config_test = TEST_ENV[test] + else: + test_split = test.split("/") + testproject = test_split[0] + testcase = test_split[1] + config_test = TEST_ENV[testproject][testcase] + except KeyError: + # if not defined in dependencies => no dependencies + config_test = "" + except: + print "Error getTestEnv:", sys.exc_info()[0] + + return config_test + + +def get_ci_envvars(): + """ + Get the CI env variables + """ + ci_env_var = { + "installer": os.environ.get('INSTALLER_TYPE'), + "controller": os.environ.get('SDN_CONTROLLER'), + "options": os.environ.get("OPNFV_FEATURE")} + return ci_env_var + + +def isTestRunnable(test, functest_yaml): + # check getTestEnv(test) and CI env var + # check installer, controller and options + # e.g. if test needs onos => do not run odl suite + try: + # By default we assume that all the tests are always runnable... + is_runnable = True + # Retrieve CI environment + ci_env = get_ci_envvars() + + # Retrieve test environement from config file + test_env = getTestEnv(test, functest_yaml) + + # if test_env not empty => dependencies to be checked + if test_env is not None and len(test_env) > 0: + # possible criteria = ["installer", "controller", "options"] + # consider test criteria from config file + # compare towards CI env through CI en variable + for criteria in test_env: + if test_env[criteria] != ci_env[criteria]: + # print "Test "+ test + " cannot be run on the environment" + is_runnable = False + except: + print "Error isTestRunnable:", sys.exc_info()[0] + return is_runnable + + +def generateTestcaseList(functest_yaml): + try: + test_list = "" + # Retrieve CI environment + get_ci_envvars() + + # get testcases + testcase_list = functest_yaml.get("test-dependencies") + projects = testcase_list.keys() + for project in projects: + testcases = testcase_list[project] + # 1 or 2 levels for testcases project[/case] + # if only project name without controller or scenario + # => shall be runnable on any controller/scenario + if testcases is None: + test_list += project + " " + else: + for testcase in testcases: + if testcase == "controller" or testcase == "scenario": + # project (1 level) + if isTestRunnable(project, functest_yaml): + test_list += project + " " + else: + # project/testcase (2 levels) + thetest = project + "/" + testcase + if isTestRunnable(thetest, functest_yaml): + test_list += testcase + " " + + # create a file that could be consumed by run-test.sh + file = open("testcase-list.txt", 'w') + file.write(test_list) + file.close() + + return test_list + + # test for each testcase if it is runnable + # towards the declared configuration + # generate the test config file + except: + print "Error generateTestcaseList:", sys.exc_info()[0] |