diff options
Diffstat (limited to 'hdv/redfish/hdv_redfish.py')
-rw-r--r-- | hdv/redfish/hdv_redfish.py | 676 |
1 files changed, 676 insertions, 0 deletions
diff --git a/hdv/redfish/hdv_redfish.py b/hdv/redfish/hdv_redfish.py new file mode 100644 index 0000000..5fc44ca --- /dev/null +++ b/hdv/redfish/hdv_redfish.py @@ -0,0 +1,676 @@ +############################################################################## +# Copyright (c) 2020 China Mobile Co.,Ltd and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +''' +an implementation of hardware delivery validation based on redfish interface. +''' +import time +import os +import re +from re import DOTALL as DT +import json +import copy +from ast import literal_eval +import yaml +from openpyxl.reader.excel import load_workbook +from http_handler import UrllibHttpHandler, HEADERS +# pylint: disable=E0611 +from log_utils import BASE_DIR, LOG_FILE, LOGGER +from errors import ERROR_CODE, WARN_CODE + +LOGGER.info(BASE_DIR) + +ACCOUNT_INFO = {} +WAIT_INTERVAL = 5 + + +def parse_config(config_yaml): + """ + parse setting from config.yaml + :return: + """ + try: + if not os.path.exists(config_yaml): + LOGGER.error(" %s, %s", ERROR_CODE['E400001'], config_yaml) + with open(config_yaml, 'r') as conf_file: + config = yaml.load(conf_file.read(), Loader=yaml.FullLoader) + except FileNotFoundError as fnfe: + LOGGER.error(fnfe) + LOGGER.error(u"%s", ERROR_CODE['E400002']) + return None + else: + return config + + +def get_token(http_handler, url): + """ + :return: x_auth_token + """ + retry_num = 3 + x_auth_token = None + while retry_num: + retry_num -= 1 + res = http_handler.post(url, ACCOUNT_INFO) + if res is None: + LOGGER.error("%s, %s", WARN_CODE['W100001'], url) + LOGGER.info("wait %s seconds to try again", WAIT_INTERVAL) + time.sleep(WAIT_INTERVAL) + continue + data = res.info() + if "X-Auth-Token" in data: + x_auth_token = data.get("X-Auth-Token") + return x_auth_token + else: + time.sleep(WAIT_INTERVAL) + return None + + +def get_etag(http_handler, url): + """ + :return: ETag + """ + etag = None + res = http_handler.get(url) + data = None + if res is not None: + data = res.info() + if data is not None and "ETag" in data: + etag = data.get("ETag") + return etag + + +def parse_data(exp_value, act_value): + ''' + parse the expected value and actual value: + @return: case 1: exp_value and actual value is str or int, + then return tuple (exp_value,act_value) + case 2: list,dict type, then return updated exp_value + ERROR_CODE for unexpected case. + ''' + if isinstance(exp_value, (str, int)) and isinstance(act_value, (str, int)): + return (exp_value, act_value) + if isinstance(exp_value, list): + if not isinstance(act_value, list): + return (exp_value, act_value) + else: + for exp in enumerate(exp_value, start=0): + index = exp[0] + exp_value[index] = parse_data( + exp_value[index], act_value[index]) + + elif isinstance(exp_value, dict): + if isinstance(act_value, dict): + for key, val in exp_value.items(): + if key in act_value: + exp_value[key] = parse_data(val, act_value[key]) + else: + LOGGER.error("%s,%s", ERROR_CODE['E500001'], key) + else: + LOGGER.error("%s,expected: %s , actual: %s", + ERROR_CODE['E400005'], exp_value, act_value) + else: + LOGGER.error("%s, expected type:%s, actual type %s", + ERROR_CODE['E400006'], type(exp_value), type(act_value)) + return exp_value + + +def compare_data(value, flag): + ''' + compare value content + ''' + if isinstance(value, tuple): + if value[1] is not None or value[1]: + if value[0] == 'N/A': + return "Success", flag + elif isinstance(value[0], (bool, int, str)): + if value[0] == value[1]: + return "Success", flag + else: + flag += 1 + return "Failure, expect value: " + str(value[0]) + \ + ", return value: " + str(value[1]), flag + elif value[1] in value[0] or value[0] == ['N/A']: + return "Success", flag + else: + flag += 1 + return "Failure, expect value: " + str(value[0]) + \ + ", return value: " + str(value[1]), flag + else: + flag += 1 + return "Failure, expect value: " + str(value[0]) + \ + ", return value: " + str(value[1]), flag + + elif isinstance(value, list): + for elem in enumerate(value, start=0): + index = elem[0] + value[index], flag = compare_data(value[index], flag) + elif isinstance(value, dict): + for key, val in value.items(): + value[key], flag = compare_data(val, flag) + else: + LOGGER.error("%s", ERROR_CODE['E400007']) + flag += 1 + return value, flag + + +def get_component_ids_yaml(file): + ''' + get component ids from yaml file + ''' + if not os.path.exists(file): + LOGGER.info("%s, %s", ERROR_CODE['E400001'], file) + return None + return yaml.load(open(file, "r")) + + +def get_component_ids_excel(excel_file): + ''' + get the component_id settings from the excel sheet2 + the componnet_id is the parent id of the hardware resource of sheet1 + ''' + input_file = load_workbook(excel_file) + input_ws = input_file[input_file.sheetnames[1]] + cell_key = [] + id_info_list = [] + for i in range(1, 5): + cell_key.append(input_ws.cell(row=1, column=i).value) + row = 2 + while input_ws.cell(row=row, column=1).value: + cell_value = [] + for i in range(1, 5): + + cell_value.append(input_ws.cell(row=row, column=i).value. + encode("utf8").decode("utf8").replace('\n', '')) + cell_dict = dict(zip(cell_key, cell_value)) + row += 1 + id_info_list.append(cell_dict) + return id_info_list + + +def create_real_url(url_value, id_dict, config_file): + ''' + create the real url + either a static url, or a replaced url by depended_id + ''' + url_list = [] + replaced = 0 + regexp = r'[^{]*{(?P<var>[a-zA-Z_]*)}' + # pattern = re.compile(regexp, re.S) + pattern = re.compile(regexp, DT) + LOGGER.info("url_value %s", url_value) + matches = list(pattern.finditer(url_value)) + for match in matches: + value = match.groupdict() + if value['var'] in config_file: + url_value = url_value.replace('{' + str(value['var']) + '}', + str(config_file[value['var']])) + + elif value['var'] in id_dict: + replaced = 1 + instance_list = id_dict[value['var']] + for instance in instance_list: + sgl_url = url_value.replace('{' + str(value['var']) + '}', + str(instance)) + LOGGER.debug("replaced url value %s", sgl_url) + url_list.append(sgl_url) + else: + replaced = 2 + LOGGER.error("%s for parameter %s", + ERROR_CODE['E300002'], value['var']) + # combine single case with list case together. + if replaced == 0: + LOGGER.info("adding static url %s into list", url_value) + url_list.append(url_value) + return url_list + + +def execute_get_url(url, http_handler): + """ + execute the url + """ + LOGGER.debug("execute url %s", url) + rsp = http_handler.get(url) + if rsp is None: + LOGGER.error("return None for url %s", url) + return None + ret_dict = {} + ret_dict.update({"return_code": rsp.code}) + return_value = json.loads(rsp.read()) + ret_dict.update({"return_value": return_value}) + LOGGER.info("ret_dict is %s", ret_dict) + LOGGER.debug("ret_dict type is %s", type(ret_dict)) + return ret_dict + + +def handle_depend_url(method, url_list, http_handler): + ''' + run request url in url_list and collect the response as list + ''' + response_list = [] + if method == 'GET': + for url_case in url_list: + response = execute_get_url(url_case, http_handler) + response_list.append(response) + elif method == 'POST': + pass + elif method == 'PATCH': + pass + elif method == 'DELETE': + pass + return response_list + + +def create_obj_id_list(key_flags, response_list): + ''' + create object id list + ''' + if response_list is None or response_list.__len__() == 0: + LOGGER.debug("response list is None") + return None + if key_flags is not None: + key_list = key_flags.split(':') + end_id_list = [] + for response in response_list: + if response is None: + LOGGER.warning("response is None") + continue + return_value = response['return_value'] + if len(key_list) == 1 and key_list[0] in return_value: + for i in return_value[key_list[0]]: + end_id_list.append(i['@odata.id']) + elif len(key_list) > 1: + for elem in enumerate(key_list, start=0): + index = elem[0] + if index == len(key_list) - 1: + for case in return_value[key_list[index]]: + end_id_list.append(case['@odata.id']) + else: + if isinstance(return_value, list): + return_value = return_value[0] + elif isinstance(return_value, dict): + return_value = return_value[key_list[index]] + else: + LOGGER.warning("%s, %s", WARN_CODE['W100002'], + type(return_value)) + + else: + LOGGER.error("%s %s", ERROR_CODE['E400003'], key_flags) + return end_id_list + + +def get_depend_id(config_file, http_handler, depend_ids): + ''' + @param mode: yaml or excel,default value "excel" + parse the component id list + build up the id resource for each component_id + return: id_dict like {component_id:[obj_list]} + ''' + id_dict = {} + for case in depend_ids: + component_name = case.get('component_id') + LOGGER.info("parsing component %s", component_name) + pro_value = case.get('pro_value') + url_value = case.get('url_value') + key_flags = case.get('key_flags') + # url_list = [] + url_list = create_real_url(url_value, id_dict, config_file) + # response_list = [] + response_list = handle_depend_url(pro_value, url_list, http_handler) + # end_id_list = [] + end_id_list = create_obj_id_list(key_flags, response_list) + if end_id_list is None or end_id_list.__len__() == 0: + LOGGER.error("%s,%s", ERROR_CODE['E300003'], component_name) + continue + id_dict.update({component_name: end_id_list}) + LOGGER.debug("id_dict content is %s", id_dict) + return id_dict + + +def read_row(input_ws, row, config_file): + ''' + read a row value + ''' + pro_value = input_ws.cell(row=row, column=config_file["pro_seq"]).value + url_value = input_ws.cell(row=row, column=config_file["url_seq"]).value + req_body_value = input_ws.cell( + row=row, column=config_file["req_body_seq"]).value + expect_return_code = \ + input_ws.cell( + row=row, column=config_file["expect_return_code_seq"]).value + expect_return_value = \ + input_ws.cell( + row=row, column=config_file["expect_return_value_seq"]).value + attr_name = input_ws.cell(row=row, column=config_file["attr_name"]).value + + if req_body_value is not None: + req_body_value = literal_eval(req_body_value) + if expect_return_code is not None: + expect_return_code = int(expect_return_code) + if expect_return_value is not None: + expect_return_value = literal_eval(expect_return_value) + return pro_value, url_value, req_body_value, expect_return_code,\ + expect_return_value, attr_name + + +def execute_post_url(body, handler, url): + ''' + execute post url + ''' + LOGGER.debug("execute url %s", url) + rsp = handler.post(url, body) + LOGGER.debug("post response %s", rsp) + if not isinstance(rsp, dict): + LOGGER.error("%s,%s, expected type %s", + ERROR_CODE["E400004"], type(rsp), dict) + return None + return rsp + + +def execute_patch_url(body, http_handler, url): + ''' + execute patch url + ''' + etag = get_etag(http_handler, url) + LOGGER.info("etag %s", etag) + rsp = http_handler.patch(url, body, etag) + LOGGER.debug("patch response %s", rsp) + LOGGER.debug("type response is %s", type(rsp)) + ret_dict = {} + if rsp is None: + LOGGER.error("%s %s", ERROR_CODE['E100001'], url) + ret_dict.update({"return_code": "N/A"}) + ret_dict.update({"return_value": "Failure"}) + return ret_dict + ret_dict.update({"return_code": rsp.code}) + return_value = json.loads(rsp.read()) + ret_dict.update({"return_value": return_value}) + return ret_dict + + +def handle_final_url(method, url_list, req_body=None, http_handler=None): + '''execute the requested url to get the response + ''' + response_list = [] + if method == 'GET': + for url_case in url_list: + rsp = execute_get_url(url_case, http_handler) + response_list.append(rsp) + elif method == 'POST': + if len(url_list) > 1: + LOGGER.error(ERROR_CODE['E100002']) + return None + url_value = url_list[0] + rsp = execute_post_url(req_body, http_handler, url_value) + response_list.append(rsp) + elif method == 'PATCH': + for url_case in url_list: + LOGGER.info(url_case) + temp = execute_patch_url(req_body, http_handler, url_case) + if temp is not None: + response_list.append(temp) + elif method == 'DELETE': + pass + LOGGER.info("response_list %s", response_list) + return response_list + + +def check_component_cnt(expect_return_value, res_list, result): + ''' + #check if the component count meet the required. + ''' + if expect_return_value.__contains__('count'): + if expect_return_value['count'] == len(res_list): + result.update({"count": "Success"}) + else: + result.update({"count": + "Failure, the actual num is " + str(len(res_list))}) + else: + result.update({"count": "N/A for this case"}) + return result + + +def parse_test_result(expect_return_value, expect_return_code, + actual_result_list, final_result): + ''' + @param expected_return_value expected value set in input excel + @param expected_return_code expected return code + @param actual_result_list: actual result run by each url list checking + @param final_result: returned final result + parsing the test final_result by comparing expected_value with + real test final_result value. + ''' + return_code_list = [] + return_value_list = [] + flag = 0 + final_result = check_component_cnt(expect_return_value, + actual_result_list, final_result) + + for each_result in actual_result_list: + temp_result = {} + if each_result is not None: + LOGGER.debug("current result is %s,result_list is %s", + each_result, actual_result_list) + return_code = each_result["return_code"] + return_code_list.append(return_code) + return_value = each_result["return_value"] + if return_code == expect_return_code: + code_result = 'Success' + else: + code_result = 'Failure' + temp_result.update({'return_code': code_result}) + else: + LOGGER.warning("%s ,set failure", WARN_CODE['W100003']) + temp_result.update({'return_code': 'Failure'}) + return_value_list.append(temp_result) + flag += 1 + continue + + # parse the actual result according to the expected value hierachy. + ex_value = copy.deepcopy(expect_return_value) + exp_act_pairs = {} + for key, value in ex_value.items(): + if key in return_value: + exp_act_pairs[key] = parse_data(value, return_value[key]) + elif key == 'count': + pass + else: + LOGGER.error("%s, %s", ERROR_CODE['E500001'], key) + exp_act_pairs[key] = \ + (value, "Can't find key {} in return value".format(key)) + LOGGER.debug("real_result:%s", exp_act_pairs) + + # comparing expected result with real result. + if exp_act_pairs: + for key, value in exp_act_pairs.items(): + temp_result[key], flag = compare_data(value, flag) + return_value_list.append(temp_result) + return return_value_list, return_code_list, final_result, flag + + +def write_result_2_excel(config_file, input_ws, row, flag, result): + ''' + write the result back to excel + ''' + if not result: + input_ws.cell(row=row, column=config_file["detail_result"], + value=str('N/A')) + else: + input_ws.cell(row=row, column=config_file["detail_result"], + value=str(result)) + if flag == 0: + input_ws.cell(row=row, column=config_file["final_result"], + value=str("Success")) + else: + input_ws.cell(row=row, column=config_file["final_result"], + value=str("Failure")) + return row + + +def execute_final_url(config_file, depends_id, http_handler, + method, url, req_body): + ''' + execute final url to get the request result + ''' + url_list = create_real_url(url, depends_id, config_file) + rsp_list = handle_final_url(method, url_list, req_body, http_handler) + return rsp_list + + +def run_test_case_yaml(config_file, case_file, depends_id, http_handler): + '''run test case from cases.yaml + ''' + LOGGER.info("############### start perform test case #################") + cases_result = [] + cases = read_yaml(case_file) + for case in cases: + method, url, req_body, expected_code, expected_value, tc_name \ + = case['method'], case['url'], case['request_body'], \ + case['expected_code'], case['expected_result'], case['case_name'] + + expected_value = literal_eval(expected_value) + flag = 0 + final_rst = {} + rsp_list = execute_final_url(config_file, depends_id, + http_handler, method, url, req_body) + if rsp_list is not None and len(rsp_list) > 0: + return_value_list, return_code_list, final_rst, flag = \ + parse_test_result( + expected_value, expected_code, rsp_list, final_rst) + final_rst.update({'info': return_value_list}) + LOGGER.debug("return_code_list:%s", return_code_list) + case['return_code_seq'] = str(return_code_list) + else: + LOGGER.error("%s", ERROR_CODE['E600001']) + flag += 1 + case['final_rst'] = "Success" if flag == 0 else "Failure" + case['details_result'] = \ + str(final_rst) if len(final_rst) > 0 else "N/A" + cases_result.append(case) + LOGGER.info("writing test final_rst for case %s", tc_name) + + write_result_2_yaml(cases_result) + + LOGGER.info("############### end perform test case ###################") + + +def read_yaml(file): + '''read a yaml file + ''' + if not os.path.exists(file): + LOGGER.info("%s %s", ERROR_CODE['E400001'], file) + return None + return yaml.load(open(file, "r")) + + +def write_result_2_yaml(result): + ''' + write test result to new report.yaml + ''' + LOGGER.info("writing to yaml file") + yaml.safe_dump(result, open("./conf/report.yaml", "w"), + explicit_start=True) + + +def run_test_case_excel(config_file, case_file, depends_id, http_handler): + ''' + perform the test case one by one, + and write test final_result back to the excel. + ''' + LOGGER.info("############### start perform test case #################") + input_file = load_workbook(case_file) + input_ws = input_file[input_file.sheetnames[0]] + + row = 2 + while input_ws.cell(row=row, column=1).value: + method, url, req_body, expected_code, expected_value, tc_name \ + = read_row(input_ws, row, config_file) + + LOGGER.info("run test case ##%s##", tc_name) + if tc_name == "configure BMC ip in static, ipv4": + LOGGER.debug("debug") + flag = 0 + final_result = {} + rsp_list = [] + rsp_list = execute_final_url(config_file, depends_id, http_handler, + method, url, req_body) + if rsp_list is not None and len(rsp_list) > 0: + return_value_list, return_code_list, final_result, flag = \ + parse_test_result(expected_value, expected_code, + rsp_list, final_result) + final_result.update({'info': return_value_list}) + LOGGER.debug("return_code_list:%s", return_code_list) + input_ws.cell(row=row, column=config_file["return_code_seq"], + value=str(return_code_list)) + else: + LOGGER.error("%s", ERROR_CODE['E600001']) + flag += 1 + + LOGGER.info("writing test final_result for row %s", row) + row = write_result_2_excel( + config_file, input_ws, row, flag, final_result) + row += 1 + input_file.save(case_file) + LOGGER.info("############### end perform test case ###################") + + +def run(conf_file, case_excel_file=None, depend_yaml_file=None, + case_yaml_file=None, file_mode=None): + ''' + @param conf_file: config.yaml + @param case_excel_file: excel case file + @param depend_yaml_file: depends yaml file used if file_mode=yaml + @param case_yaml_file: case yaml file, used if file_mode=yaml + @param file_mode: "excel" or "yaml" + access function + ''' + # parse config.yaml + LOGGER.info("start engine ...") + config_file = parse_config(conf_file) + http_handler = UrllibHttpHandler() + + # get bmc info + bmc_ip, bmc_user, bmc_pwd = \ + config_file["bmc_ip"], config_file["bmc_user"], config_file["bmc_pwd"] + ACCOUNT_INFO.update({"UserName": bmc_user}) + ACCOUNT_INFO.update({"Password": bmc_pwd}) + + url = "https://{0}/redfish/v1/SessionService/Sessions".format(bmc_ip) + x_auth_token = get_token(http_handler, url) + LOGGER.info("x_auth_token: %s", x_auth_token) + + if x_auth_token is None: + LOGGER.error("%s token is None", ERROR_CODE['E300001']) + return None + + HEADERS.update({"X-Auth-Token": x_auth_token}) + id_info_list = None + if file_mode == "excel": + id_info_list = get_component_ids_excel(case_excel_file) + elif file_mode == "yaml": + id_info_list = get_component_ids_yaml(depend_yaml_file) + else: + LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode) + return None + + # get dependent id + depends_id = get_depend_id(config_file, http_handler, id_info_list) + + # read the test case sheet and perform test + if file_mode == "excel": + run_test_case_excel(config_file, + case_excel_file, depends_id, http_handler) + elif file_mode == "yaml": + run_test_case_yaml(config_file, + case_yaml_file, depends_id, http_handler) + else: + LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode) + return None + + LOGGER.info("done,checking the log %s", LOG_FILE) + + return True |