path: root/functest/utils/
diff options
authorMorgan Richomme <>2016-11-08 14:18:12 +0100
committerMorgan Richomme <>2016-11-09 16:55:45 +0100
commit107e61635c2ab1feb5263380ea63e21cf2e6e65b (patch)
tree4966b77605bd34a40f452b1d268868691e84d008 /functest/utils/
parente74c9b347f2623eb1a3c477921a84da4c31b364f (diff)
Repo structure modification
- create functest subdirectory - rename unit tests - adapt path in exec and config files JIRA: FUNCTEST-525 Change-Id: Ifd5c6edfb5bda1b09f82848e2269ad5fbeb84d0a Signed-off-by: Morgan Richomme <>
Diffstat (limited to 'functest/utils/')
1 files changed, 449 insertions, 0 deletions
diff --git a/functest/utils/ b/functest/utils/
new file mode 100644
index 00000000..41b6485d
--- /dev/null
+++ b/functest/utils/
@@ -0,0 +1,449 @@
+#!/usr/bin/env python
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+import json
+import os
+import re
+import shutil
+import subprocess
+import sys
+import urllib2
+from datetime import datetime as dt
+import dns.resolver
+import requests
+import yaml
+from git import Repo
+import functest.utils.functest_logger as ft_logger
+logger = ft_logger.Logger("functest_utils").getLogger()
+REPOS_DIR = os.getenv('repos_dir')
+FUNCTEST_REPO = ("%s/functest" % REPOS_DIR)
+# ----------------------------------------------------------
+# -----------------------------------------------------------
+def check_internet_connectivity(url=''):
+ """
+ Check if there is access to the internet
+ """
+ try:
+ urllib2.urlopen(url, timeout=5)
+ return True
+ except urllib2.URLError:
+ return False
+def download_url(url, dest_path):
+ """
+ Download a file to a destination path given a URL
+ """
+ name = url.rsplit('/')[-1]
+ dest = dest_path + "/" + name
+ try:
+ response = urllib2.urlopen(url)
+ except (urllib2.HTTPError, urllib2.URLError):
+ return False
+ with open(dest, 'wb') as f:
+ shutil.copyfileobj(response, f)
+ return True
+# ----------------------------------------------------------
+# -----------------------------------------------------------
+def get_git_branch(repo_path):
+ """
+ Get git branch name
+ """
+ repo = Repo(repo_path)
+ branch = repo.active_branch
+ return
+def get_installer_type():
+ """
+ Get installer type (fuel, apex, joid, compass)
+ """
+ try:
+ installer = os.environ['INSTALLER_TYPE']
+ except KeyError:
+ logger.error("Impossible to retrieve the installer type")
+ installer = "Unknown_installer"
+ return installer
+def get_scenario():
+ """
+ Get scenario
+ """
+ try:
+ scenario = os.environ['DEPLOY_SCENARIO']
+ except KeyError:
+ logger.error("Impossible to retrieve the scenario")
+ scenario = "Unknown_scenario"
+ return scenario
+def get_version():
+ """
+ Get version
+ """
+ # Use the build tag to retrieve the version
+ # By default version is unknown
+ # if launched through CI the build tag has the following format
+ # jenkins-<project>-<installer>-<pod>-<job>-<branch>-<id>
+ # e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190
+ # use regex to match branch info
+ rule = "daily-(.+?)-[0-9]*"
+ build_tag = get_build_tag()
+ m =, build_tag)
+ if m:
+ return
+ else:
+ return "unknown"
+def get_pod_name():
+ """
+ Get PoD Name from env variable NODE_NAME
+ """
+ try:
+ return os.environ['NODE_NAME']
+ except KeyError:
+ logger.error(
+ "Unable to retrieve the POD name from environment. " +
+ "Using pod name 'unknown-pod'")
+ return "unknown-pod"
+def get_build_tag():
+ """
+ Get build tag of jenkins jobs
+ """
+ try:
+ build_tag = os.environ['BUILD_TAG']
+ except KeyError:
+ logger.error("Impossible to retrieve the build tag")
+ build_tag = "unknown_build_tag"
+ return build_tag
+def get_db_url():
+ """
+ Returns DB URL
+ """
+ return get_functest_config('results.test_db_url')
+def logger_test_results(project, case_name, status, details):
+ pod_name = get_pod_name()
+ scenario = get_scenario()
+ version = get_version()
+ build_tag = get_build_tag()
+ "\n"
+ "****************************************\n"
+ "\t %(p)s/%(n)s results \n\n"
+ "****************************************\n"
+ "DB:\t%(db)s\n"
+ "pod:\t%(pod)s\n"
+ "version:\t%(v)s\n"
+ "scenario:\t%(s)s\n"
+ "status:\t%(c)s\n"
+ "build tag:\t%(b)s\n"
+ "details:\t%(d)s\n"
+ % {'p': project,
+ 'n': case_name,
+ 'db': get_db_url(),
+ 'pod': pod_name,
+ 'v': version,
+ 's': scenario,
+ 'c': status,
+ 'b': build_tag,
+ 'd': details})
+def push_results_to_db(project, case_name,
+ start_date, stop_date, criteria, details):
+ """
+ POST results to the Result target DB
+ """
+ # Retrieve params from CI and conf
+ url = get_db_url() + "/results"
+ try:
+ installer = os.environ['INSTALLER_TYPE']
+ scenario = os.environ['DEPLOY_SCENARIO']
+ pod_name = os.environ['NODE_NAME']
+ build_tag = os.environ['BUILD_TAG']
+ except KeyError as e:
+ logger.error("Please set env var: " + str(e))
+ return False
+ rule = "daily-(.+?)-[0-9]*"
+ m =, build_tag)
+ if m:
+ version =
+ else:
+ logger.error("Please fix BUILD_TAG env var: " + build_tag)
+ return False
+ test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S')
+ test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S')
+ params = {"project_name": project, "case_name": case_name,
+ "pod_name": pod_name, "installer": installer,
+ "version": version, "scenario": scenario, "criteria": criteria,
+ "build_tag": build_tag, "start_date": test_start,
+ "stop_date": test_stop, "details": details}
+ error = None
+ headers = {'Content-Type': 'application/json'}
+ try:
+ r =, data=json.dumps(params), headers=headers)
+ logger.debug(r)
+ r.raise_for_status()
+ except requests.RequestException as exc:
+ if 'r' in locals():
+ error = ("Pushing Result to DB(%s) failed: %s" %
+ (r.url, r.content))
+ else:
+ error = ("Pushing Result to DB(%s) failed: %s" % (url, exc))
+ except Exception as e:
+ error = ("Error [push_results_to_db("
+ "DB: '%(db)s', "
+ "project: '%(project)s', "
+ "case: '%(case)s', "
+ "pod: '%(pod)s', "
+ "version: '%(v)s', "
+ "scenario: '%(s)s', "
+ "criteria: '%(c)s', "
+ "build_tag: '%(t)s', "
+ "details: '%(d)s')]: "
+ "%(error)s" %
+ {
+ 'db': url,
+ 'project': project,
+ 'case': case_name,
+ 'pod': pod_name,
+ 'v': version,
+ 's': scenario,
+ 'c': criteria,
+ 't': build_tag,
+ 'd': details,
+ 'error': e
+ })
+ finally:
+ if error:
+ logger.error(error)
+ return False
+ return True
+def get_resolvconf_ns():
+ """
+ Get nameservers from current resolv.conf
+ """
+ nameservers = []
+ rconf = open("/etc/resolv.conf", "r")
+ line = rconf.readline()
+ resolver = dns.resolver.Resolver()
+ while line:
+ ip ="\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line)
+ if ip:
+ resolver.nameservers = [str(ip)]
+ try:
+ result = resolver.query('')[0]
+ if result != "":
+ nameservers.append(
+ except dns.exception.Timeout:
+ pass
+ line = rconf.readline()
+ return nameservers
+def get_ci_envvars():
+ """
+ Get the CI env variables
+ """
+ ci_env_var = {
+ "installer": os.environ.get('INSTALLER_TYPE'),
+ "scenario": os.environ.get('DEPLOY_SCENARIO')}
+ return ci_env_var
+def execute_command(cmd, info=False, error_msg="",
+ verbose=True, output_file=None):
+ if not error_msg:
+ error_msg = ("The command '%s' failed." % cmd)
+ msg_exec = ("Executing command: '%s'" % cmd)
+ if verbose:
+ if info:
+ else:
+ logger.debug(msg_exec)
+ p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ if output_file:
+ f = open(output_file, "w")
+ for line in iter(p.stdout.readline, b''):
+ if output_file:
+ f.write(line)
+ else:
+ line = line.replace('\n', '')
+ print line
+ sys.stdout.flush()
+ if output_file:
+ f.close()
+ p.stdout.close()
+ returncode = p.wait()
+ if returncode != 0:
+ if verbose:
+ logger.error(error_msg)
+ return returncode
+def get_deployment_dir():
+ """
+ Returns current Rally deployment directory
+ """
+ deployment_name = get_functest_config('rally.deployment_name')
+ rally_dir = get_functest_config('general.directories.dir_rally_inst')
+ cmd = ("rally deployment list | awk '/" + deployment_name +
+ "/ {print $2}'")
+ p = subprocess.Popen(cmd, shell=True,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT)
+ deployment_uuid = p.stdout.readline().rstrip()
+ if deployment_uuid == "":
+ logger.error("Rally deployment not found.")
+ exit(-1)
+ deployment_dir = (rally_dir + "/tempest/for-deployment-" +
+ deployment_uuid)
+ return deployment_dir
+def get_dict_by_test(testname):
+ with open(get_testcases_file()) as f:
+ testcases_yaml = yaml.safe_load(f)
+ for dic_tier in testcases_yaml.get("tiers"):
+ for dic_testcase in dic_tier['testcases']:
+ if dic_testcase['name'] == testname:
+ return dic_testcase
+ logger.error('Project %s is not defined in testcases.yaml' % testname)
+ return None
+def get_criteria_by_test(testname):
+ dict = get_dict_by_test(testname)
+ if dict:
+ return dict['criteria']
+ return None
+# ----------------------------------------------------------
+# -----------------------------------------------------------
+def get_parameter_from_yaml(parameter, file):
+ """
+ Returns the value of a given parameter in file.yaml
+ parameter must be given in string format with dots
+ Example: general.openstack.image_name
+ """
+ with open(file) as f:
+ file_yaml = yaml.safe_load(f)
+ f.close()
+ value = file_yaml
+ for element in parameter.split("."):
+ value = value.get(element)
+ if value is None:
+ raise ValueError("The parameter %s is not defined in"
+ " config_functest.yaml" % parameter)
+ return value
+def get_functest_config(parameter):
+ yaml_ = os.environ["CONFIG_FUNCTEST_YAML"]
+ return get_parameter_from_yaml(parameter, yaml_)
+def check_success_rate(case_name, success_rate):
+ success_rate = float(success_rate)
+ criteria = get_criteria_by_test(case_name)
+ def get_criteria_value(op):
+ return float(criteria.split(op)[1].rstrip('%'))
+ status = 'FAIL'
+ ops = ['==', '>=']
+ for op in ops:
+ if op in criteria:
+ c_value = get_criteria_value(op)
+ if eval("%s %s %s" % (success_rate, op, c_value)):
+ status = 'PASS'
+ break
+ return status
+def merge_dicts(dict1, dict2):
+ for k in set(dict1.keys()).union(dict2.keys()):
+ if k in dict1 and k in dict2:
+ if isinstance(dict1[k], dict) and isinstance(dict2[k], dict):
+ yield (k, dict(merge_dicts(dict1[k], dict2[k])))
+ else:
+ yield (k, dict2[k])
+ elif k in dict1:
+ yield (k, dict1[k])
+ else:
+ yield (k, dict2[k])
+def check_test_result(test_name, ret, start_time, stop_time):
+ def get_criteria_value():
+ return get_criteria_by_test(test_name).split('==')[1].strip()
+ status = 'FAIL'
+ if str(ret) == get_criteria_value():
+ status = 'PASS'
+ details = {
+ 'timestart': start_time,
+ 'duration': round(stop_time - start_time, 1),
+ 'status': status,
+ }
+ return status, details
+def get_testcases_file():
+ return FUNCTEST_REPO + "/ci/testcases.yaml"
+def get_functest_yaml():
+ with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f:
+ functest_yaml = yaml.safe_load(f)
+ f.close()
+ return functest_yaml