#!/usr/bin/env python # # jose.lausuch@ericsson.com # valentin.boucher@orange.com # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 # from datetime import datetime as dt import json import os import re import requests import shutil import subprocess import sys import urllib2 import functest.ci.tier_builder as tb import functest.utils.functest_logger as ft_logger import dns.resolver from git import Repo import yaml logger = ft_logger.Logger("functest_utils").getLogger() REPOS_DIR = os.getenv('repos_dir') FUNCTEST_REPO = ("%s/functest" % REPOS_DIR) # ---------------------------------------------------------- # # INTERNET UTILS # # ----------------------------------------------------------- def check_internet_connectivity(url='http://www.opnfv.org/'): """ Check if there is access to the internet """ try: urllib2.urlopen(url, timeout=5) return True except urllib2.URLError: return False def download_url(url, dest_path): """ Download a file to a destination path given a URL """ name = url.rsplit('/')[-1] dest = dest_path + "/" + name try: response = urllib2.urlopen(url) except (urllib2.HTTPError, urllib2.URLError): return False with open(dest, 'wb') as f: shutil.copyfileobj(response, f) return True # ---------------------------------------------------------- # # CI UTILS # # ----------------------------------------------------------- def get_git_branch(repo_path): """ Get git branch name """ repo = Repo(repo_path) branch = repo.active_branch return branch.name def get_installer_type(): """ Get installer type (fuel, apex, joid, compass) """ try: installer = os.environ['INSTALLER_TYPE'] except KeyError: logger.error("Impossible to retrieve the installer type") installer = "Unknown_installer" return installer def get_scenario(): """ Get scenario """ try: scenario = os.environ['DEPLOY_SCENARIO'] except KeyError: logger.error("Impossible to retrieve the scenario") scenario = "Unknown_scenario" return scenario def get_version(): """ Get version """ # Use the build tag to retrieve the version # By default version is unknown # if launched through CI the build tag has the following format # jenkins------ # e.g. jenkins-functest-fuel-opnfv-jump-2-daily-master-190 # use regex to match branch info rule = "daily-(.+?)-[0-9]*" build_tag = get_build_tag() m = re.search(rule, build_tag) if m: return m.group(1) else: return "unknown" def get_pod_name(): """ Get PoD Name from env variable NODE_NAME """ try: return os.environ['NODE_NAME'] except KeyError: logger.error( "Unable to retrieve the POD name from environment. " + "Using pod name 'unknown-pod'") return "unknown-pod" def get_build_tag(): """ Get build tag of jenkins jobs """ try: build_tag = os.environ['BUILD_TAG'] except KeyError: logger.error("Impossible to retrieve the build tag") build_tag = "unknown_build_tag" return build_tag def get_db_url(): """ Returns DB URL """ functest_yaml = get_functest_yaml() db_url = functest_yaml.get("results").get("test_db_url") return db_url def logger_test_results(project, case_name, status, details): pod_name = get_pod_name() scenario = get_scenario() version = get_version() build_tag = get_build_tag() logger.info( "\n" "********************************
---
# Sample test suite file
# Test cases listed in the suite file should be in the tests/opnfv/test_cases directory
# or specified in test_cases_dir optional variable as done below

schema: "yardstick:suite:0.1"

name: "Sample test suite"
test_cases_dir: "samples/"
test_cases:
-
  file_name: ping.yaml
-
  file_name: ping-template.yaml
  task_args: '{"packetsize": "200"}'
-
  file_name: ping-template.yaml
  task_args_file: "/tmp/test-args-file.json"
def get_criteria_value(): return get_criteria_by_test(test_name).split('==')[1].strip() status = 'FAIL' if str(ret) == get_criteria_value(): status = 'PASS' details = { 'timestart': start_time, 'duration': round(stop_time - start_time, 1), 'status': status, } return status, details def get_testcases_file(): return FUNCTEST_REPO + "/ci/testcases.yaml" def get_functest_yaml(): with open(os.environ["CONFIG_FUNCTEST_YAML"]) as f: functest_yaml = yaml.safe_load(f) f.close() return functest_yaml