diff options
Diffstat (limited to 'tools/hdv/redfish/hdv_redfish.py')
-rw-r--r-- | tools/hdv/redfish/hdv_redfish.py | 676 |
1 files changed, 0 insertions, 676 deletions
diff --git a/tools/hdv/redfish/hdv_redfish.py b/tools/hdv/redfish/hdv_redfish.py deleted file mode 100644 index 5fc44ca..0000000 --- a/tools/hdv/redfish/hdv_redfish.py +++ /dev/null @@ -1,676 +0,0 @@ -############################################################################## -# Copyright (c) 2020 China Mobile Co.,Ltd and others. -# -# All rights reserved. This program and the accompanying materials -# are made available under the terms of the Apache License, Version 2.0 -# which accompanies this distribution, and is available at -# http://www.apache.org/licenses/LICENSE-2.0 -############################################################################## -''' -an implementation of hardware delivery validation based on redfish interface. -''' -import time -import os -import re -from re import DOTALL as DT -import json -import copy -from ast import literal_eval -import yaml -from openpyxl.reader.excel import load_workbook -from http_handler import UrllibHttpHandler, HEADERS -# pylint: disable=E0611 -from log_utils import BASE_DIR, LOG_FILE, LOGGER -from errors import ERROR_CODE, WARN_CODE - -LOGGER.info(BASE_DIR) - -ACCOUNT_INFO = {} -WAIT_INTERVAL = 5 - - -def parse_config(config_yaml): - """ - parse setting from config.yaml - :return: - """ - try: - if not os.path.exists(config_yaml): - LOGGER.error(" %s, %s", ERROR_CODE['E400001'], config_yaml) - with open(config_yaml, 'r') as conf_file: - config = yaml.load(conf_file.read(), Loader=yaml.FullLoader) - except FileNotFoundError as fnfe: - LOGGER.error(fnfe) - LOGGER.error(u"%s", ERROR_CODE['E400002']) - return None - else: - return config - - -def get_token(http_handler, url): - """ - :return: x_auth_token - """ - retry_num = 3 - x_auth_token = None - while retry_num: - retry_num -= 1 - res = http_handler.post(url, ACCOUNT_INFO) - if res is None: - LOGGER.error("%s, %s", WARN_CODE['W100001'], url) - LOGGER.info("wait %s seconds to try again", WAIT_INTERVAL) - time.sleep(WAIT_INTERVAL) - continue - data = res.info() - if "X-Auth-Token" in data: - x_auth_token = data.get("X-Auth-Token") - return x_auth_token - else: - time.sleep(WAIT_INTERVAL) - return None - - -def get_etag(http_handler, url): - """ - :return: ETag - """ - etag = None - res = http_handler.get(url) - data = None - if res is not None: - data = res.info() - if data is not None and "ETag" in data: - etag = data.get("ETag") - return etag - - -def parse_data(exp_value, act_value): - ''' - parse the expected value and actual value: - @return: case 1: exp_value and actual value is str or int, - then return tuple (exp_value,act_value) - case 2: list,dict type, then return updated exp_value - ERROR_CODE for unexpected case. - ''' - if isinstance(exp_value, (str, int)) and isinstance(act_value, (str, int)): - return (exp_value, act_value) - if isinstance(exp_value, list): - if not isinstance(act_value, list): - return (exp_value, act_value) - else: - for exp in enumerate(exp_value, start=0): - index = exp[0] - exp_value[index] = parse_data( - exp_value[index], act_value[index]) - - elif isinstance(exp_value, dict): - if isinstance(act_value, dict): - for key, val in exp_value.items(): - if key in act_value: - exp_value[key] = parse_data(val, act_value[key]) - else: - LOGGER.error("%s,%s", ERROR_CODE['E500001'], key) - else: - LOGGER.error("%s,expected: %s , actual: %s", - ERROR_CODE['E400005'], exp_value, act_value) - else: - LOGGER.error("%s, expected type:%s, actual type %s", - ERROR_CODE['E400006'], type(exp_value), type(act_value)) - return exp_value - - -def compare_data(value, flag): - ''' - compare value content - ''' - if isinstance(value, tuple): - if value[1] is not None or value[1]: - if value[0] == 'N/A': - return "Success", flag - elif isinstance(value[0], (bool, int, str)): - if value[0] == value[1]: - return "Success", flag - else: - flag += 1 - return "Failure, expect value: " + str(value[0]) + \ - ", return value: " + str(value[1]), flag - elif value[1] in value[0] or value[0] == ['N/A']: - return "Success", flag - else: - flag += 1 - return "Failure, expect value: " + str(value[0]) + \ - ", return value: " + str(value[1]), flag - else: - flag += 1 - return "Failure, expect value: " + str(value[0]) + \ - ", return value: " + str(value[1]), flag - - elif isinstance(value, list): - for elem in enumerate(value, start=0): - index = elem[0] - value[index], flag = compare_data(value[index], flag) - elif isinstance(value, dict): - for key, val in value.items(): - value[key], flag = compare_data(val, flag) - else: - LOGGER.error("%s", ERROR_CODE['E400007']) - flag += 1 - return value, flag - - -def get_component_ids_yaml(file): - ''' - get component ids from yaml file - ''' - if not os.path.exists(file): - LOGGER.info("%s, %s", ERROR_CODE['E400001'], file) - return None - return yaml.load(open(file, "r")) - - -def get_component_ids_excel(excel_file): - ''' - get the component_id settings from the excel sheet2 - the componnet_id is the parent id of the hardware resource of sheet1 - ''' - input_file = load_workbook(excel_file) - input_ws = input_file[input_file.sheetnames[1]] - cell_key = [] - id_info_list = [] - for i in range(1, 5): - cell_key.append(input_ws.cell(row=1, column=i).value) - row = 2 - while input_ws.cell(row=row, column=1).value: - cell_value = [] - for i in range(1, 5): - - cell_value.append(input_ws.cell(row=row, column=i).value. - encode("utf8").decode("utf8").replace('\n', '')) - cell_dict = dict(zip(cell_key, cell_value)) - row += 1 - id_info_list.append(cell_dict) - return id_info_list - - -def create_real_url(url_value, id_dict, config_file): - ''' - create the real url - either a static url, or a replaced url by depended_id - ''' - url_list = [] - replaced = 0 - regexp = r'[^{]*{(?P<var>[a-zA-Z_]*)}' - # pattern = re.compile(regexp, re.S) - pattern = re.compile(regexp, DT) - LOGGER.info("url_value %s", url_value) - matches = list(pattern.finditer(url_value)) - for match in matches: - value = match.groupdict() - if value['var'] in config_file: - url_value = url_value.replace('{' + str(value['var']) + '}', - str(config_file[value['var']])) - - elif value['var'] in id_dict: - replaced = 1 - instance_list = id_dict[value['var']] - for instance in instance_list: - sgl_url = url_value.replace('{' + str(value['var']) + '}', - str(instance)) - LOGGER.debug("replaced url value %s", sgl_url) - url_list.append(sgl_url) - else: - replaced = 2 - LOGGER.error("%s for parameter %s", - ERROR_CODE['E300002'], value['var']) - # combine single case with list case together. - if replaced == 0: - LOGGER.info("adding static url %s into list", url_value) - url_list.append(url_value) - return url_list - - -def execute_get_url(url, http_handler): - """ - execute the url - """ - LOGGER.debug("execute url %s", url) - rsp = http_handler.get(url) - if rsp is None: - LOGGER.error("return None for url %s", url) - return None - ret_dict = {} - ret_dict.update({"return_code": rsp.code}) - return_value = json.loads(rsp.read()) - ret_dict.update({"return_value": return_value}) - LOGGER.info("ret_dict is %s", ret_dict) - LOGGER.debug("ret_dict type is %s", type(ret_dict)) - return ret_dict - - -def handle_depend_url(method, url_list, http_handler): - ''' - run request url in url_list and collect the response as list - ''' - response_list = [] - if method == 'GET': - for url_case in url_list: - response = execute_get_url(url_case, http_handler) - response_list.append(response) - elif method == 'POST': - pass - elif method == 'PATCH': - pass - elif method == 'DELETE': - pass - return response_list - - -def create_obj_id_list(key_flags, response_list): - ''' - create object id list - ''' - if response_list is None or response_list.__len__() == 0: - LOGGER.debug("response list is None") - return None - if key_flags is not None: - key_list = key_flags.split(':') - end_id_list = [] - for response in response_list: - if response is None: - LOGGER.warning("response is None") - continue - return_value = response['return_value'] - if len(key_list) == 1 and key_list[0] in return_value: - for i in return_value[key_list[0]]: - end_id_list.append(i['@odata.id']) - elif len(key_list) > 1: - for elem in enumerate(key_list, start=0): - index = elem[0] - if index == len(key_list) - 1: - for case in return_value[key_list[index]]: - end_id_list.append(case['@odata.id']) - else: - if isinstance(return_value, list): - return_value = return_value[0] - elif isinstance(return_value, dict): - return_value = return_value[key_list[index]] - else: - LOGGER.warning("%s, %s", WARN_CODE['W100002'], - type(return_value)) - - else: - LOGGER.error("%s %s", ERROR_CODE['E400003'], key_flags) - return end_id_list - - -def get_depend_id(config_file, http_handler, depend_ids): - ''' - @param mode: yaml or excel,default value "excel" - parse the component id list - build up the id resource for each component_id - return: id_dict like {component_id:[obj_list]} - ''' - id_dict = {} - for case in depend_ids: - component_name = case.get('component_id') - LOGGER.info("parsing component %s", component_name) - pro_value = case.get('pro_value') - url_value = case.get('url_value') - key_flags = case.get('key_flags') - # url_list = [] - url_list = create_real_url(url_value, id_dict, config_file) - # response_list = [] - response_list = handle_depend_url(pro_value, url_list, http_handler) - # end_id_list = [] - end_id_list = create_obj_id_list(key_flags, response_list) - if end_id_list is None or end_id_list.__len__() == 0: - LOGGER.error("%s,%s", ERROR_CODE['E300003'], component_name) - continue - id_dict.update({component_name: end_id_list}) - LOGGER.debug("id_dict content is %s", id_dict) - return id_dict - - -def read_row(input_ws, row, config_file): - ''' - read a row value - ''' - pro_value = input_ws.cell(row=row, column=config_file["pro_seq"]).value - url_value = input_ws.cell(row=row, column=config_file["url_seq"]).value - req_body_value = input_ws.cell( - row=row, column=config_file["req_body_seq"]).value - expect_return_code = \ - input_ws.cell( - row=row, column=config_file["expect_return_code_seq"]).value - expect_return_value = \ - input_ws.cell( - row=row, column=config_file["expect_return_value_seq"]).value - attr_name = input_ws.cell(row=row, column=config_file["attr_name"]).value - - if req_body_value is not None: - req_body_value = literal_eval(req_body_value) - if expect_return_code is not None: - expect_return_code = int(expect_return_code) - if expect_return_value is not None: - expect_return_value = literal_eval(expect_return_value) - return pro_value, url_value, req_body_value, expect_return_code,\ - expect_return_value, attr_name - - -def execute_post_url(body, handler, url): - ''' - execute post url - ''' - LOGGER.debug("execute url %s", url) - rsp = handler.post(url, body) - LOGGER.debug("post response %s", rsp) - if not isinstance(rsp, dict): - LOGGER.error("%s,%s, expected type %s", - ERROR_CODE["E400004"], type(rsp), dict) - return None - return rsp - - -def execute_patch_url(body, http_handler, url): - ''' - execute patch url - ''' - etag = get_etag(http_handler, url) - LOGGER.info("etag %s", etag) - rsp = http_handler.patch(url, body, etag) - LOGGER.debug("patch response %s", rsp) - LOGGER.debug("type response is %s", type(rsp)) - ret_dict = {} - if rsp is None: - LOGGER.error("%s %s", ERROR_CODE['E100001'], url) - ret_dict.update({"return_code": "N/A"}) - ret_dict.update({"return_value": "Failure"}) - return ret_dict - ret_dict.update({"return_code": rsp.code}) - return_value = json.loads(rsp.read()) - ret_dict.update({"return_value": return_value}) - return ret_dict - - -def handle_final_url(method, url_list, req_body=None, http_handler=None): - '''execute the requested url to get the response - ''' - response_list = [] - if method == 'GET': - for url_case in url_list: - rsp = execute_get_url(url_case, http_handler) - response_list.append(rsp) - elif method == 'POST': - if len(url_list) > 1: - LOGGER.error(ERROR_CODE['E100002']) - return None - url_value = url_list[0] - rsp = execute_post_url(req_body, http_handler, url_value) - response_list.append(rsp) - elif method == 'PATCH': - for url_case in url_list: - LOGGER.info(url_case) - temp = execute_patch_url(req_body, http_handler, url_case) - if temp is not None: - response_list.append(temp) - elif method == 'DELETE': - pass - LOGGER.info("response_list %s", response_list) - return response_list - - -def check_component_cnt(expect_return_value, res_list, result): - ''' - #check if the component count meet the required. - ''' - if expect_return_value.__contains__('count'): - if expect_return_value['count'] == len(res_list): - result.update({"count": "Success"}) - else: - result.update({"count": - "Failure, the actual num is " + str(len(res_list))}) - else: - result.update({"count": "N/A for this case"}) - return result - - -def parse_test_result(expect_return_value, expect_return_code, - actual_result_list, final_result): - ''' - @param expected_return_value expected value set in input excel - @param expected_return_code expected return code - @param actual_result_list: actual result run by each url list checking - @param final_result: returned final result - parsing the test final_result by comparing expected_value with - real test final_result value. - ''' - return_code_list = [] - return_value_list = [] - flag = 0 - final_result = check_component_cnt(expect_return_value, - actual_result_list, final_result) - - for each_result in actual_result_list: - temp_result = {} - if each_result is not None: - LOGGER.debug("current result is %s,result_list is %s", - each_result, actual_result_list) - return_code = each_result["return_code"] - return_code_list.append(return_code) - return_value = each_result["return_value"] - if return_code == expect_return_code: - code_result = 'Success' - else: - code_result = 'Failure' - temp_result.update({'return_code': code_result}) - else: - LOGGER.warning("%s ,set failure", WARN_CODE['W100003']) - temp_result.update({'return_code': 'Failure'}) - return_value_list.append(temp_result) - flag += 1 - continue - - # parse the actual result according to the expected value hierachy. - ex_value = copy.deepcopy(expect_return_value) - exp_act_pairs = {} - for key, value in ex_value.items(): - if key in return_value: - exp_act_pairs[key] = parse_data(value, return_value[key]) - elif key == 'count': - pass - else: - LOGGER.error("%s, %s", ERROR_CODE['E500001'], key) - exp_act_pairs[key] = \ - (value, "Can't find key {} in return value".format(key)) - LOGGER.debug("real_result:%s", exp_act_pairs) - - # comparing expected result with real result. - if exp_act_pairs: - for key, value in exp_act_pairs.items(): - temp_result[key], flag = compare_data(value, flag) - return_value_list.append(temp_result) - return return_value_list, return_code_list, final_result, flag - - -def write_result_2_excel(config_file, input_ws, row, flag, result): - ''' - write the result back to excel - ''' - if not result: - input_ws.cell(row=row, column=config_file["detail_result"], - value=str('N/A')) - else: - input_ws.cell(row=row, column=config_file["detail_result"], - value=str(result)) - if flag == 0: - input_ws.cell(row=row, column=config_file["final_result"], - value=str("Success")) - else: - input_ws.cell(row=row, column=config_file["final_result"], - value=str("Failure")) - return row - - -def execute_final_url(config_file, depends_id, http_handler, - method, url, req_body): - ''' - execute final url to get the request result - ''' - url_list = create_real_url(url, depends_id, config_file) - rsp_list = handle_final_url(method, url_list, req_body, http_handler) - return rsp_list - - -def run_test_case_yaml(config_file, case_file, depends_id, http_handler): - '''run test case from cases.yaml - ''' - LOGGER.info("############### start perform test case #################") - cases_result = [] - cases = read_yaml(case_file) - for case in cases: - method, url, req_body, expected_code, expected_value, tc_name \ - = case['method'], case['url'], case['request_body'], \ - case['expected_code'], case['expected_result'], case['case_name'] - - expected_value = literal_eval(expected_value) - flag = 0 - final_rst = {} - rsp_list = execute_final_url(config_file, depends_id, - http_handler, method, url, req_body) - if rsp_list is not None and len(rsp_list) > 0: - return_value_list, return_code_list, final_rst, flag = \ - parse_test_result( - expected_value, expected_code, rsp_list, final_rst) - final_rst.update({'info': return_value_list}) - LOGGER.debug("return_code_list:%s", return_code_list) - case['return_code_seq'] = str(return_code_list) - else: - LOGGER.error("%s", ERROR_CODE['E600001']) - flag += 1 - case['final_rst'] = "Success" if flag == 0 else "Failure" - case['details_result'] = \ - str(final_rst) if len(final_rst) > 0 else "N/A" - cases_result.append(case) - LOGGER.info("writing test final_rst for case %s", tc_name) - - write_result_2_yaml(cases_result) - - LOGGER.info("############### end perform test case ###################") - - -def read_yaml(file): - '''read a yaml file - ''' - if not os.path.exists(file): - LOGGER.info("%s %s", ERROR_CODE['E400001'], file) - return None - return yaml.load(open(file, "r")) - - -def write_result_2_yaml(result): - ''' - write test result to new report.yaml - ''' - LOGGER.info("writing to yaml file") - yaml.safe_dump(result, open("./conf/report.yaml", "w"), - explicit_start=True) - - -def run_test_case_excel(config_file, case_file, depends_id, http_handler): - ''' - perform the test case one by one, - and write test final_result back to the excel. - ''' - LOGGER.info("############### start perform test case #################") - input_file = load_workbook(case_file) - input_ws = input_file[input_file.sheetnames[0]] - - row = 2 - while input_ws.cell(row=row, column=1).value: - method, url, req_body, expected_code, expected_value, tc_name \ - = read_row(input_ws, row, config_file) - - LOGGER.info("run test case ##%s##", tc_name) - if tc_name == "configure BMC ip in static, ipv4": - LOGGER.debug("debug") - flag = 0 - final_result = {} - rsp_list = [] - rsp_list = execute_final_url(config_file, depends_id, http_handler, - method, url, req_body) - if rsp_list is not None and len(rsp_list) > 0: - return_value_list, return_code_list, final_result, flag = \ - parse_test_result(expected_value, expected_code, - rsp_list, final_result) - final_result.update({'info': return_value_list}) - LOGGER.debug("return_code_list:%s", return_code_list) - input_ws.cell(row=row, column=config_file["return_code_seq"], - value=str(return_code_list)) - else: - LOGGER.error("%s", ERROR_CODE['E600001']) - flag += 1 - - LOGGER.info("writing test final_result for row %s", row) - row = write_result_2_excel( - config_file, input_ws, row, flag, final_result) - row += 1 - input_file.save(case_file) - LOGGER.info("############### end perform test case ###################") - - -def run(conf_file, case_excel_file=None, depend_yaml_file=None, - case_yaml_file=None, file_mode=None): - ''' - @param conf_file: config.yaml - @param case_excel_file: excel case file - @param depend_yaml_file: depends yaml file used if file_mode=yaml - @param case_yaml_file: case yaml file, used if file_mode=yaml - @param file_mode: "excel" or "yaml" - access function - ''' - # parse config.yaml - LOGGER.info("start engine ...") - config_file = parse_config(conf_file) - http_handler = UrllibHttpHandler() - - # get bmc info - bmc_ip, bmc_user, bmc_pwd = \ - config_file["bmc_ip"], config_file["bmc_user"], config_file["bmc_pwd"] - ACCOUNT_INFO.update({"UserName": bmc_user}) - ACCOUNT_INFO.update({"Password": bmc_pwd}) - - url = "https://{0}/redfish/v1/SessionService/Sessions".format(bmc_ip) - x_auth_token = get_token(http_handler, url) - LOGGER.info("x_auth_token: %s", x_auth_token) - - if x_auth_token is None: - LOGGER.error("%s token is None", ERROR_CODE['E300001']) - return None - - HEADERS.update({"X-Auth-Token": x_auth_token}) - id_info_list = None - if file_mode == "excel": - id_info_list = get_component_ids_excel(case_excel_file) - elif file_mode == "yaml": - id_info_list = get_component_ids_yaml(depend_yaml_file) - else: - LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode) - return None - - # get dependent id - depends_id = get_depend_id(config_file, http_handler, id_info_list) - - # read the test case sheet and perform test - if file_mode == "excel": - run_test_case_excel(config_file, - case_excel_file, depends_id, http_handler) - elif file_mode == "yaml": - run_test_case_yaml(config_file, - case_yaml_file, depends_id, http_handler) - else: - LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode) - return None - - LOGGER.info("done,checking the log %s", LOG_FILE) - - return True |