#!/usr/bin/env python # # jose.lausuch@ericsson.com # valentin.boucher@orange.com # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 # from datetime import datetime as dt import json import os import re import requests import shutil import subprocess import sys import urllib2 import functest.ci.tier_builder as tb import functest.utils.functest_logger as ft_logger import dns.resolver from git import Repo import yaml logger = ft_logger.Logger("functest_utils").getLogger() REPOS_DIR = os.getenv('repos_dir') FUNCTEST_REPO = ("%s/functest" % REPOS_DIR) # ---------------------------------------------------------- # # INTERNET UTILS # # ----------------------------------------------------------- def check_internet_connectivity(url='http://www.opnfv.org/'): """ Check if there is access to the internet """ try: urllib2.urlopen(url, timeout=5) return True except urllib2.URLError: return False def download_url(url, dest_path): """ Download a file to a destination path given a URL """ name = url.rsplit('/')[-1] dest = dest_path + "/" + name try: response = urllib2.urlopen(url) except (urllib2.HTTPError, urllib2.URLError): return False with open(dest, 'wb') as f: shutil.copyfileobj(response, f) return True # ---------------------------------------------------------- # # CI UTILS # # ----------------------------------------------------------- def get_git_branch(repo_path): """ Get git branch name """ repo = Repo(repo_path) branch = repo.active_branch return branch.name def get_installer_type(): """ Get installer type (fuel, apex, joid, compass) """ try: installer = os.environ['INSTALLER_TYPE'] except KeyError: logger.error("Impossible to retrieve the installer type") installer = "Unknown_installer" return installer def get_scenario(): """ Get scenario """ try: scenario = os.environ['DEPLOY_SCENARIO'] except KeyError: logger.error("Impossible to retrieve the scenario") scenario = "Unknown_scenario" return scenario def get_version(): """ Get version """ # Use the build tag to retrieve the version # By default version is unknown # if launched through CI the build tag has the following format # jenkins------ # e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190 # use regex to match branch info rule = "daily-(.+?)-[0-9]*" build_tag = get_build_tag() m = re.search(rule, build_tag) if m: return m.group(1) else: return "unknown" def get_pod_name(): """ Get PoD Name from env variable NODE_NAME """ try: return os.environ['NODE_NAME'] except KeyError: logger.error( "Unable to retrieve the POD name from environment. " + "Using pod name 'unknown-pod'") return "unknown-pod" def get_build_tag(): """ Get build tag of jenkins jobs """ try: build_tag = os.environ['BUILD_TAG'] except KeyError: logger.error("Impossible to retrieve the build tag") build_tag = "unknown_build_tag" return build_tag def get_db_url(): """ Returns DB URL """ return get_functest_config('results.test_db_url') def logger_test_results(project, case_name, status, details): pod_name = get_pod_name() scenario = get_scenario() version = get_version() build_tag = get_build_tag() logger.info( "\n" "****************************************\n" "\t %(p)s/%(n)s results \n\n" "****************************************\n" "DB:\t%(db)s\n" "pod:\t%(pod)s\n" "version:\t%(v)s\n" "scenario:\t%(s)s\n" "status:\t%(c)s\n" "build tag:\t%(b)s\n" "details:\t%(d)s\n" % {'p': project, 'n': case_name, 'db': get_db_url(), 'pod': pod_name, 'v': version, 's': scenario, 'c': status, 'b': build_tag, 'd': details}) def push_results_to_db(project, case_name, start_date, stop_date, criteria, details): """ POST results to the Result target DB """ # Retrieve params from CI and conf url = get_db_url() + "/results" try: installer = os.environ['INSTALLER_TYPE'] scenario = os.environ['DEPLOY_SCENARIO'] pod_name = os.environ['NODE_NAME'] build_tag = os.environ['BUILD_TAG'] except KeyError as e: logger.error("Please set env var: " + str(e)) return False rule = "daily-(.+?)-[0-9]*" m = re.search(rule, build_tag) if m: version = m.group(1) else: logger.error("Please fix BUILD_TAG env var: " + build_tag) return False test_start = dt.fromtimestamp(start_date).strftime('%Y-%m-%d %H:%M:%S') test_stop = dt.fromtimestamp(stop_date).strftime('%Y-%m-%d %H:%M:%S') params = {"project_name": project, "case_name": case_name, "pod_name": pod_name, "installer": installer, "version": version, "scenario": scenario, "criteria": criteria, "build_tag": build_tag, "start_date": test_start, "stop_date": test_stop, "details": details} error = None headers = {'Content-Type': 'application/json'} try: r = requests.post(url, data=json.dumps(params), headers=headers) logger.debug(r) r.raise_for_status() except requests.RequestException as exc: if 'r' in locals(): error = ("Pushing Result to DB(%s) failed: %s" % (r.url, r.content)) else: error = ("Pushing Result to DB(%s) failed: %s" % (url, exc)) except Exception as e: error = ("Error [push_results_to_db(" "DB: '%(db)s', " "project: '%(project)s', " "case: '%(case)s', " "pod: '%(pod)s', " "version: '%(v)s', " "scenario: '%(s)s', " "criteria: '%(c)s', " "build_tag: '%(t)s', " "details: '%(d)s')]: " "%(error)s" % { 'db': url, 'project': project, 'case': case_name, 'pod': pod_name, 'v': version, 's': scenario, 'c': criteria, 't': build_tag, 'd': details, 'error': e }) finally: if error: logger.error(error) return False return True def get_resolvconf_ns(): """ Get nameservers from current resolv.conf """ nameservers = [] rconf = open("/etc/resolv.conf", "r") line = rconf.readline() resolver = dns.resolver.Resolver() while line: ip = re.search(r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", line) if ip: resolver.nameservers = [str(ip)] try: result = resolver.query('opnfv.org')[0] if result != "": nameservers.append(ip.group()) except dns.exception.Timeout: pass line = rconf.readline() return nameservers def get_ci_envvars(): """ Get the CI env variables """ ci_env_var = { "installer": os.environ.get('INSTALLER_TYPE'), "scenario": os.environ.get('DEPLOY_SCENARIO')} return ci_env_var def execute_command(cmd, exit_on_error=True, info=False, error_msg="", verbose=True, output_file=None): if not error_msg: error_msg = ("The command '%s' failed." % cmd) msg_exec = ("Executing command: '%s'" % cmd) if verbose: if info: logger.info(msg_exec) else: logger.debug(msg_exec) p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) if output_file: f = open(output_file, "w") for line in iter(p.stdout.readline, b''): if output_file: f.write(line) else: line = line.replace('\n', '') print line sys.stdout.flush() if output_file: f.close() p.stdout.close() returncode = p.wait() if returncode != 0: if verbose: logger.error(error_msg) if exit_on_error: sys.exit(1) return returncode def get_deployment_dir(): """ Returns current Rally deployment directory """ deployment_name = get_functest_config('rally.deployment_name') rally_dir = get_functest_config('general.directories.dir_rally_inst') cmd = ("rally deployment list | awk '/" + deployment_name + "/ {print $2}'") p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) deployment_uuid = p.stdout.readline().rstrip() if deployment_uuid == "": logger.error("Rally deployment not found.") exit(-1) deployment_dir = (rally_dir + "/tempest/for-deployment-" + deployment_uuid) return deployment_dir def get_criteria_by_test(testname): criteria = "" file = get_testcases_file() tiers = tb.TierBuilder("", "", file) for tier in tiers.get_tiers(): for test in tier.get_tests(): if test.get_name() == testname: criteria = test.get_criteria() return criteria # ---------------------------------------------------------- # # YAML UTILS # # ----------------------------------------------------------- def get_parameter_from_yaml(parameter, file): """ Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ with open(file) as f: file_yaml = yaml.safe_load(f) f.close() value = file_yaml for element in parameter.split("."): value = value.get(element) if value is None: raise ValueError("The parameter %s is not defined in" " config_functest.yaml" % parameter) return value def get_functest_config(parameter): yaml_ = os.environ["CONFIG_FUNCTEST_YAML"] return get_parameter_from_yaml(parameter, yaml_) def check_success_rate(case_name, success_rate): success_rate = float(success_rate) criteria = get_criteria_by_test(case_name) def get_criteria_value(op): return float(criteria.split(op)[1].rstrip('%')) status = 'FAIL' ops = ['==', '>='] for op in ops: if op in criteria: c_value = get_criteria_value(op) if eval("%s %s %s" % (success_rate, op, c_value)): status = 'PASS' break return status def merge_dicts(dict1, dict2): for k in set(dict1.keys()).union(dict2.keys()): if k in dict1 and k in dict2: if isinstance(dict1[k], dict) and isinstance(dict2[k], dict): yield (k, dict(merge_dicts(dict1[k], dict2[k]))) else: yield (k, dict2[k]) elif k in dict1: yield (k, dict1[k]) else: yield (k, dict2[k]) def check_test_result(test_name, ret, start_time, stop_time): def get_criteria_value(): return get_criteria_by_test(test_name).split('==')[1].strip() status = 'FAIL' if str(ret) == get_criteria_value(): status = 'PASS' details = { 'timestart': start_time, 'duration': round(stop_time - start_time, 1), 'status': status, } return status, details def get_testcases_file(): return FUNCTEST_REPO + "/ci/testcases.yaml" def get_functest_yaml(): with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f: functest_yaml = yaml.safe_load(f) f.close() return functest_yaml