############################################################################## # Copyright (c) 2020 China Mobile Co.,Ltd and others. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## ''' an implementation of hardware delivery validation based on redfish interface. ''' import time import os import re import pytest from re import DOTALL as DT import json import copy from ast import literal_eval import yaml from openpyxl.reader.excel import load_workbook from http_handler import UrllibHttpHandler, HEADERS # pylint: disable=E0611 from log_utils import BASE_DIR, LOG_FILE, LOGGER from errors import ERROR_CODE, WARN_CODE from test_api import PushResults LOGGER.info(BASE_DIR) ACCOUNT_INFO = {} WAIT_INTERVAL = 5 cases_result = [] def parse_config(config_yaml): """ parse setting from config.yaml :return: """ try: if not os.path.exists(config_yaml): LOGGER.error(" %s, %s", ERROR_CODE['E400001'], config_yaml) with open(config_yaml, 'r') as conf_file: config = yaml.load(conf_file.read(), Loader=yaml.FullLoader) except FileNotFoundError as fnfe: LOGGER.error(fnfe) LOGGER.error(u"%s", ERROR_CODE['E400002']) return None else: return config def get_token(http_handler, url): """ :return: x_auth_token """ retry_num = 3 x_auth_token = None while retry_num: retry_num -= 1 res = http_handler.post(url, ACCOUNT_INFO) if res is None: LOGGER.error("%s, %s", WARN_CODE['W100001'], url) LOGGER.info("wait %s seconds to try again", WAIT_INTERVAL) time.sleep(WAIT_INTERVAL) continue data = res.info() if "X-Auth-Token" in data: x_auth_token = data.get("X-Auth-Token") return x_auth_token else: time.sleep(WAIT_INTERVAL) return None def get_etag(http_handler, url): """ :return: ETag """ etag = None res = http_handler.get(url) data = None if res is not None: data = res.info() if data is not None and "ETag" in data: etag = data.get("ETag") return etag def parse_data(exp_value, act_value): ''' parse the expected value and actual value: @return: case 1: exp_value and actual value is str or int, then return tuple (exp_value,act_value) case 2: list,dict type, then return updated exp_value ERROR_CODE for unexpected case. ''' if isinstance(exp_value, (str, int)) and isinstance(act_value, (str, int)): return (exp_value, act_value) if isinstance(exp_value, list): if not isinstance(act_value, list): return (exp_value, act_value) else: for exp in enumerate(exp_value, start=0): index = exp[0] exp_value[index] = parse_data( exp_value[index], act_value[index]) elif isinstance(exp_value, dict): if isinstance(act_value, dict): for key, val in exp_value.items(): if key in act_value: exp_value[key] = parse_data(val, act_value[key]) else: LOGGER.error("%s,%s", ERROR_CODE['E500001'], key) else: LOGGER.error("%s,expected: %s , actual: %s", ERROR_CODE['E400005'], exp_value, act_value) else: LOGGER.error("%s, expected type:%s, actual type %s", ERROR_CODE['E400006'], type(exp_value), type(act_value)) return exp_value def compare_data(value, flag): ''' compare value content ''' if isinstance(value, tuple): if value[1] is not None or value[1]: if value[0] == 'N/A': return "Success", flag elif isinstance(value[0], (bool, int, str)): if value[0] == value[1]: return "Success", flag else: flag += 1 return "Failure, expect value: " + str(value[0]) + \ ", return value: " + str(value[1]), flag elif value[1] in value[0] or value[0] == ['N/A']: return "Success", flag else: flag += 1 return "Failure, expect value: " + str(value[0]) + \ ", return value: " + str(value[1]), flag else: flag += 1 return "Failure, expect value: " + str(value[0]) + \ ", return value: " + str(value[1]), flag elif isinstance(value, list): for elem in enumerate(value, start=0): index = elem[0] value[index], flag = compare_data(value[index], flag) elif isinstance(value, dict): for key, val in value.items(): value[key], flag = compare_data(val, flag) else: LOGGER.error("%s", ERROR_CODE['E400007']) flag += 1 return value, flag def get_component_ids_yaml(file): ''' get component ids from yaml file ''' if not os.path.exists(file): LOGGER.info("%s, %s", ERROR_CODE['E400001'], file) return None return yaml.load(open(file, "r")) def create_real_url(url_value, id_dict, key_flag_dict, http_handler, bmc_ip): ''' create the real url either a static url, or a replaced url by depended_id ''' url_list = [] replaced = 0 regexp = r'[^{]*{(?P[a-zA-Z_]*)}' # pattern = re.compile(regexp, re.S) pattern = re.compile(regexp, DT) LOGGER.info("url_value %s", url_value) matches = list(pattern.finditer(url_value)) for match in matches: value = match.groupdict() # stripping out value['var'] from end of the URL parent_url = match.group().rstrip('{' + value['var'] + '}') if value['var'] in id_dict: replaced = 1 url_list = id_dict[value['var']].copy() elif value['var'] in key_flag_dict: replaced = 2 if(len(url_list) == 0): url_list.append(parent_url) else: for index in range(len(url_list)): url_list[index] = url_list[index] + parent_url response_list = handle_depend_url( "GET", url_list, http_handler, bmc_ip) url_list = create_obj_id_list( key_flag_dict[value['var']], response_list) if url_list is None or url_list.__len__() == 0: LOGGER.error("%s,%s", ERROR_CODE['E300003'], value['var']) continue id_dict.update({value['var']: url_list.copy()}) LOGGER.debug("id_dict content is %s", id_dict) else: replaced = 3 LOGGER.error("%s for parameter %s", ERROR_CODE['E300002'], value['var']) LOGGER.debug('url_list content is %s', url_list) # combine single case with list case together. if replaced == 0: LOGGER.info("adding static url %s into list", url_value) url_list.append(url_value) for index in range(len(url_list)): url_list[index] = url_list[index] + url_value.split('}')[-1] LOGGER.debug("created real url list is %s", url_list) return url_list def execute_get_url(url, http_handler): """ execute the url """ LOGGER.debug("execute url %s", url) rsp = http_handler.get(url) if rsp is None: LOGGER.error("return None for url %s", url) return None ret_dict = {} ret_dict.update({"return_code": rsp.code}) return_value = json.loads(rsp.read()) ret_dict.update({"return_value": return_value}) LOGGER.info("ret_dict is %s", ret_dict) LOGGER.debug("ret_dict type is %s", type(ret_dict)) return ret_dict def handle_depend_url(method, url_list, http_handler, bmc_ip): ''' run request url in url_list and collect the response as list ''' response_list = [] if method == 'GET': for url_case in url_list: response = execute_get_url(bmc_ip + url_case, http_handler) response_list.append(response) elif method == 'POST': pass elif method == 'PATCH': pass elif method == 'DELETE': pass return response_list def create_obj_id_list(key_flags, response_list): ''' create object id list ''' if response_list is None or response_list.__len__() == 0: LOGGER.debug("response list is None") return None if key_flags is not None: key_list = key_flags.split(':') end_id_list = [] for response in response_list: if response is None: LOGGER.warning("response is None") continue return_value = response['return_value'] if len(key_list) == 1 and key_list[0] in return_value: for i in return_value[key_list[0]]: end_id_list.append(i['@odata.id']) elif len(key_list) > 1: for elem in enumerate(key_list, start=0): index = elem[0] if index == len(key_list) - 1: for case in return_value[key_list[index]]: end_id_list.append(case['@odata.id']) else: if isinstance(return_value, list): return_value = return_value[0] elif isinstance(return_value, dict): return_value = return_value[key_list[index]] else: LOGGER.warning("%s, %s", WARN_CODE['W100002'], type(return_value)) else: LOGGER.error("%s %s", ERROR_CODE['E400003'], key_flags) return end_id_list def execute_post_url(body, handler, url): ''' execute post url ''' LOGGER.debug("execute url %s", url) rsp = handler.post(url, body) LOGGER.debug("post response %s", rsp) if not isinstance(rsp, dict): LOGGER.error("%s,%s, expected type %s", ERROR_CODE["E400004"], type(rsp), dict) return None return rsp def execute_patch_url(body, http_handler, url): ''' execute patch url ''' etag = get_etag(http_handler, url) LOGGER.info("etag %s", etag) rsp = http_handler.patch(url, body, etag) LOGGER.debug("patch response %s", rsp) LOGGER.debug("type response is %s", type(rsp)) ret_dict = {} if rsp is None: LOGGER.error("%s %s", ERROR_CODE['E100001'], url) ret_dict.update({"return_code": "N/A"}) ret_dict.update({"return_value": "Failure"}) return ret_dict ret_dict.update({"return_code": rsp.code}) return_value = json.loads(rsp.read()) ret_dict.update({"return_value": return_value}) return ret_dict def handle_final_url(bmc_ip, method, url_list, req_body=None, http_handler=None): '''execute the requested url to get the response ''' response_list = [] if method == 'GET': for url_case in url_list: rsp = execute_get_url(bmc_ip + url_case, http_handler) response_list.append(rsp) elif method == 'POST': if len(url_list) > 1: LOGGER.error(ERROR_CODE['E100002']) return None url_value = url_list[0] rsp = execute_post_url(req_body, http_handler, bmc_ip + url_value) response_list.append(rsp) elif method == 'PATCH': for url_case in url_list: LOGGER.info(url_case) temp = execute_patch_url(req_body, http_handler, bmc_ip + url_case) if temp is not None: response_list.append(temp) elif method == 'DELETE': pass LOGGER.info("response_list %s", response_list) return response_list def check_component_cnt(expect_return_value, res_list, result): ''' #check if the component count meet the required. ''' if expect_return_value.__contains__('count'): if expect_return_value['count'] == len(res_list): result.update({"count": "Success"}) else: result.update({"count": "Failure, the actual num is " + str(len(res_list))}) else: result.update({"count": "N/A for this case"}) return result def parse_test_result(expect_return_value, expect_return_code, actual_result_list, final_result): ''' @param expected_return_value expected value set in input excel @param expected_return_code expected return code @param actual_result_list: actual result run by each url list checking @param final_result: returned final result parsing the test final_result by comparing expected_value with real test final_result value. ''' return_code_list = [] return_value_list = [] flag = 0 final_result = check_component_cnt(expect_return_value, actual_result_list, final_result) for each_result in actual_result_list: temp_result = {} if each_result is not None: LOGGER.debug("current result is %s,result_list is %s", each_result, actual_result_list) return_code = each_result["return_code"] return_code_list.append(return_code) return_value = each_result["return_value"] if return_code == expect_return_code: code_result = 'Success' else: code_result = 'Failure' temp_result.update({'return_code': code_result}) else: LOGGER.warning("%s ,set failure", WARN_CODE['W100003']) temp_result.update({'return_code': 'Failure'}) return_value_list.append(temp_result) flag += 1 continue # parse the actual result according to the expected value hierachy. ex_value = copy.deepcopy(expect_return_value) exp_act_pairs = {} for key, value in ex_value.items(): if key in return_value: exp_act_pairs[key] = parse_data(value, return_value[key]) elif key == 'count': pass else: LOGGER.error("%s, %s", ERROR_CODE['E500001'], key) exp_act_pairs[key] = \ (value, "Can't find key {} in return value".format(key)) LOGGER.debug("real_result:%s", exp_act_pairs) # comparing expected result with real result. if exp_act_pairs: for key, value in exp_act_pairs.items(): temp_result[key], flag = compare_data(value, flag) return_value_list.append(temp_result) return return_value_list, return_code_list, final_result, flag def execute_final_url(depends_id, http_handler, method, url, req_body, key_flag_dict, bmc_ip): ''' execute final url to get the request result ''' url_list = create_real_url( url, depends_id, key_flag_dict, http_handler, bmc_ip) rsp_list = handle_final_url( bmc_ip, method, url_list, req_body, http_handler) return rsp_list def read_yaml(file): ''' read a yaml file ''' if not os.path.exists(file): LOGGER.info("%s %s", ERROR_CODE['E400001'], file) return None return yaml.load(open(file, "r")) def test_case_yaml_run(run, case): ''' run test case from cases.yaml ''' depends_id, http_handler, bmc_ip = run if(case['enabled'] is False): cases_result.append(case) LOGGER.debug("skipping case: %s", case["case_name"]) pytest.skip() LOGGER.debug("running case: %s", case["case_name"]) method, url, req_body, expected_code, expected_value, tc_name, key_flag_dict \ = case['method'], case['url'], case['request_body'], \ case['expected_code'], case['expected_result'], case['case_name'], case['key_flag_dict'] flag = 0 final_rst = {} rsp_list = execute_final_url( depends_id, http_handler, method, url, req_body, key_flag_dict, bmc_ip) if rsp_list is not None and len(rsp_list) > 0: return_value_list, return_code_list, final_rst, flag = \ parse_test_result( expected_value, expected_code, rsp_list, final_rst) final_rst.update({'info': return_value_list}) LOGGER.debug("final_rst:%s", final_rst) LOGGER.debug("return_code_list:%s", return_code_list) case['return_code_seq'] = str(return_code_list) else: LOGGER.error("%s", ERROR_CODE['E600001']) flag += 1 case['final_rst'] = "Success" if flag == 0 else "Failure" case['details_result'] = \ str(final_rst) if len(final_rst) > 0 else "N/A" cases_result.append(case) LOGGER.info("writing test final_rst for case %s", tc_name) assert flag == 0, final_rst def write_result_2_yaml(result): ''' write test result to new report.yaml ''' LOGGER.info("writing to yaml file") yaml.safe_dump(result, open("./conf/report.yaml", "w"), explicit_start=True) def generate_testapi_result(cases_result): ''' convert cases_result from list to dictionary for pushing to testapi ''' testapi_result = {} for case_result in cases_result: testapi_result[case_result['case_sn']] = case_result print(case_result) LOGGER.info("generated result for testapi %s", testapi_result) return testapi_result @pytest.fixture(scope='session') def run(conf_file): ''' @param conf_file: config.yaml @param case_file: case yaml file access function ''' # parse config.yaml LOGGER.info("start engine ...") config_file = parse_config(conf_file) http_handler = UrllibHttpHandler() # get bmc info bmc_ip, bmc_user, bmc_pwd = \ config_file["bmc_ip"], config_file["bmc_user"], config_file["bmc_pwd"] ACCOUNT_INFO.update({"UserName": bmc_user}) ACCOUNT_INFO.update({"Password": bmc_pwd}) url = "{0}/redfish/v1/SessionService/Sessions".format(bmc_ip) x_auth_token = get_token(http_handler, url) LOGGER.info("x_auth_token: %s", x_auth_token) if x_auth_token is None: LOGGER.error("%s token is None", ERROR_CODE['E300001']) return None HEADERS.update({"X-Auth-Token": x_auth_token}) id_info_list = None depends_id = {} LOGGER.info("############### start perform test case #################") yield depends_id, http_handler, bmc_ip write_result_2_yaml(cases_result) LOGGER.info("############### end perform test case ###################") LOGGER.info("done,checking the log %s", LOG_FILE) # parse cases_result for testapi and push it testapi_result = generate_testapi_result(cases_result) PushResults(testapi_result, LOGGER) return True