aboutsummaryrefslogtreecommitdiffstats
path: root/hdv/redfish/hdv_redfish.py
diff options
context:
space:
mode:
Diffstat (limited to 'hdv/redfish/hdv_redfish.py')
-rw-r--r--hdv/redfish/hdv_redfish.py676
1 files changed, 676 insertions, 0 deletions
diff --git a/hdv/redfish/hdv_redfish.py b/hdv/redfish/hdv_redfish.py
new file mode 100644
index 0000000..5fc44ca
--- /dev/null
+++ b/hdv/redfish/hdv_redfish.py
@@ -0,0 +1,676 @@
+##############################################################################
+# Copyright (c) 2020 China Mobile Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+'''
+an implementation of hardware delivery validation based on redfish interface.
+'''
+import time
+import os
+import re
+from re import DOTALL as DT
+import json
+import copy
+from ast import literal_eval
+import yaml
+from openpyxl.reader.excel import load_workbook
+from http_handler import UrllibHttpHandler, HEADERS
+# pylint: disable=E0611
+from log_utils import BASE_DIR, LOG_FILE, LOGGER
+from errors import ERROR_CODE, WARN_CODE
+
+LOGGER.info(BASE_DIR)
+
+ACCOUNT_INFO = {}
+WAIT_INTERVAL = 5
+
+
+def parse_config(config_yaml):
+ """
+ parse setting from config.yaml
+ :return:
+ """
+ try:
+ if not os.path.exists(config_yaml):
+ LOGGER.error(" %s, %s", ERROR_CODE['E400001'], config_yaml)
+ with open(config_yaml, 'r') as conf_file:
+ config = yaml.load(conf_file.read(), Loader=yaml.FullLoader)
+ except FileNotFoundError as fnfe:
+ LOGGER.error(fnfe)
+ LOGGER.error(u"%s", ERROR_CODE['E400002'])
+ return None
+ else:
+ return config
+
+
+def get_token(http_handler, url):
+ """
+ :return: x_auth_token
+ """
+ retry_num = 3
+ x_auth_token = None
+ while retry_num:
+ retry_num -= 1
+ res = http_handler.post(url, ACCOUNT_INFO)
+ if res is None:
+ LOGGER.error("%s, %s", WARN_CODE['W100001'], url)
+ LOGGER.info("wait %s seconds to try again", WAIT_INTERVAL)
+ time.sleep(WAIT_INTERVAL)
+ continue
+ data = res.info()
+ if "X-Auth-Token" in data:
+ x_auth_token = data.get("X-Auth-Token")
+ return x_auth_token
+ else:
+ time.sleep(WAIT_INTERVAL)
+ return None
+
+
+def get_etag(http_handler, url):
+ """
+ :return: ETag
+ """
+ etag = None
+ res = http_handler.get(url)
+ data = None
+ if res is not None:
+ data = res.info()
+ if data is not None and "ETag" in data:
+ etag = data.get("ETag")
+ return etag
+
+
+def parse_data(exp_value, act_value):
+ '''
+ parse the expected value and actual value:
+ @return: case 1: exp_value and actual value is str or int,
+ then return tuple (exp_value,act_value)
+ case 2: list,dict type, then return updated exp_value
+ ERROR_CODE for unexpected case.
+ '''
+ if isinstance(exp_value, (str, int)) and isinstance(act_value, (str, int)):
+ return (exp_value, act_value)
+ if isinstance(exp_value, list):
+ if not isinstance(act_value, list):
+ return (exp_value, act_value)
+ else:
+ for exp in enumerate(exp_value, start=0):
+ index = exp[0]
+ exp_value[index] = parse_data(
+ exp_value[index], act_value[index])
+
+ elif isinstance(exp_value, dict):
+ if isinstance(act_value, dict):
+ for key, val in exp_value.items():
+ if key in act_value:
+ exp_value[key] = parse_data(val, act_value[key])
+ else:
+ LOGGER.error("%s,%s", ERROR_CODE['E500001'], key)
+ else:
+ LOGGER.error("%s,expected: %s , actual: %s",
+ ERROR_CODE['E400005'], exp_value, act_value)
+ else:
+ LOGGER.error("%s, expected type:%s, actual type %s",
+ ERROR_CODE['E400006'], type(exp_value), type(act_value))
+ return exp_value
+
+
+def compare_data(value, flag):
+ '''
+ compare value content
+ '''
+ if isinstance(value, tuple):
+ if value[1] is not None or value[1]:
+ if value[0] == 'N/A':
+ return "Success", flag
+ elif isinstance(value[0], (bool, int, str)):
+ if value[0] == value[1]:
+ return "Success", flag
+ else:
+ flag += 1
+ return "Failure, expect value: " + str(value[0]) + \
+ ", return value: " + str(value[1]), flag
+ elif value[1] in value[0] or value[0] == ['N/A']:
+ return "Success", flag
+ else:
+ flag += 1
+ return "Failure, expect value: " + str(value[0]) + \
+ ", return value: " + str(value[1]), flag
+ else:
+ flag += 1
+ return "Failure, expect value: " + str(value[0]) + \
+ ", return value: " + str(value[1]), flag
+
+ elif isinstance(value, list):
+ for elem in enumerate(value, start=0):
+ index = elem[0]
+ value[index], flag = compare_data(value[index], flag)
+ elif isinstance(value, dict):
+ for key, val in value.items():
+ value[key], flag = compare_data(val, flag)
+ else:
+ LOGGER.error("%s", ERROR_CODE['E400007'])
+ flag += 1
+ return value, flag
+
+
+def get_component_ids_yaml(file):
+ '''
+ get component ids from yaml file
+ '''
+ if not os.path.exists(file):
+ LOGGER.info("%s, %s", ERROR_CODE['E400001'], file)
+ return None
+ return yaml.load(open(file, "r"))
+
+
+def get_component_ids_excel(excel_file):
+ '''
+ get the component_id settings from the excel sheet2
+ the componnet_id is the parent id of the hardware resource of sheet1
+ '''
+ input_file = load_workbook(excel_file)
+ input_ws = input_file[input_file.sheetnames[1]]
+ cell_key = []
+ id_info_list = []
+ for i in range(1, 5):
+ cell_key.append(input_ws.cell(row=1, column=i).value)
+ row = 2
+ while input_ws.cell(row=row, column=1).value:
+ cell_value = []
+ for i in range(1, 5):
+
+ cell_value.append(input_ws.cell(row=row, column=i).value.
+ encode("utf8").decode("utf8").replace('\n', ''))
+ cell_dict = dict(zip(cell_key, cell_value))
+ row += 1
+ id_info_list.append(cell_dict)
+ return id_info_list
+
+
+def create_real_url(url_value, id_dict, config_file):
+ '''
+ create the real url
+ either a static url, or a replaced url by depended_id
+ '''
+ url_list = []
+ replaced = 0
+ regexp = r'[^{]*{(?P<var>[a-zA-Z_]*)}'
+ # pattern = re.compile(regexp, re.S)
+ pattern = re.compile(regexp, DT)
+ LOGGER.info("url_value %s", url_value)
+ matches = list(pattern.finditer(url_value))
+ for match in matches:
+ value = match.groupdict()
+ if value['var'] in config_file:
+ url_value = url_value.replace('{' + str(value['var']) + '}',
+ str(config_file[value['var']]))
+
+ elif value['var'] in id_dict:
+ replaced = 1
+ instance_list = id_dict[value['var']]
+ for instance in instance_list:
+ sgl_url = url_value.replace('{' + str(value['var']) + '}',
+ str(instance))
+ LOGGER.debug("replaced url value %s", sgl_url)
+ url_list.append(sgl_url)
+ else:
+ replaced = 2
+ LOGGER.error("%s for parameter %s",
+ ERROR_CODE['E300002'], value['var'])
+ # combine single case with list case together.
+ if replaced == 0:
+ LOGGER.info("adding static url %s into list", url_value)
+ url_list.append(url_value)
+ return url_list
+
+
+def execute_get_url(url, http_handler):
+ """
+ execute the url
+ """
+ LOGGER.debug("execute url %s", url)
+ rsp = http_handler.get(url)
+ if rsp is None:
+ LOGGER.error("return None for url %s", url)
+ return None
+ ret_dict = {}
+ ret_dict.update({"return_code": rsp.code})
+ return_value = json.loads(rsp.read())
+ ret_dict.update({"return_value": return_value})
+ LOGGER.info("ret_dict is %s", ret_dict)
+ LOGGER.debug("ret_dict type is %s", type(ret_dict))
+ return ret_dict
+
+
+def handle_depend_url(method, url_list, http_handler):
+ '''
+ run request url in url_list and collect the response as list
+ '''
+ response_list = []
+ if method == 'GET':
+ for url_case in url_list:
+ response = execute_get_url(url_case, http_handler)
+ response_list.append(response)
+ elif method == 'POST':
+ pass
+ elif method == 'PATCH':
+ pass
+ elif method == 'DELETE':
+ pass
+ return response_list
+
+
+def create_obj_id_list(key_flags, response_list):
+ '''
+ create object id list
+ '''
+ if response_list is None or response_list.__len__() == 0:
+ LOGGER.debug("response list is None")
+ return None
+ if key_flags is not None:
+ key_list = key_flags.split(':')
+ end_id_list = []
+ for response in response_list:
+ if response is None:
+ LOGGER.warning("response is None")
+ continue
+ return_value = response['return_value']
+ if len(key_list) == 1 and key_list[0] in return_value:
+ for i in return_value[key_list[0]]:
+ end_id_list.append(i['@odata.id'])
+ elif len(key_list) > 1:
+ for elem in enumerate(key_list, start=0):
+ index = elem[0]
+ if index == len(key_list) - 1:
+ for case in return_value[key_list[index]]:
+ end_id_list.append(case['@odata.id'])
+ else:
+ if isinstance(return_value, list):
+ return_value = return_value[0]
+ elif isinstance(return_value, dict):
+ return_value = return_value[key_list[index]]
+ else:
+ LOGGER.warning("%s, %s", WARN_CODE['W100002'],
+ type(return_value))
+
+ else:
+ LOGGER.error("%s %s", ERROR_CODE['E400003'], key_flags)
+ return end_id_list
+
+
+def get_depend_id(config_file, http_handler, depend_ids):
+ '''
+ @param mode: yaml or excel,default value "excel"
+ parse the component id list
+ build up the id resource for each component_id
+ return: id_dict like {component_id:[obj_list]}
+ '''
+ id_dict = {}
+ for case in depend_ids:
+ component_name = case.get('component_id')
+ LOGGER.info("parsing component %s", component_name)
+ pro_value = case.get('pro_value')
+ url_value = case.get('url_value')
+ key_flags = case.get('key_flags')
+ # url_list = []
+ url_list = create_real_url(url_value, id_dict, config_file)
+ # response_list = []
+ response_list = handle_depend_url(pro_value, url_list, http_handler)
+ # end_id_list = []
+ end_id_list = create_obj_id_list(key_flags, response_list)
+ if end_id_list is None or end_id_list.__len__() == 0:
+ LOGGER.error("%s,%s", ERROR_CODE['E300003'], component_name)
+ continue
+ id_dict.update({component_name: end_id_list})
+ LOGGER.debug("id_dict content is %s", id_dict)
+ return id_dict
+
+
+def read_row(input_ws, row, config_file):
+ '''
+ read a row value
+ '''
+ pro_value = input_ws.cell(row=row, column=config_file["pro_seq"]).value
+ url_value = input_ws.cell(row=row, column=config_file["url_seq"]).value
+ req_body_value = input_ws.cell(
+ row=row, column=config_file["req_body_seq"]).value
+ expect_return_code = \
+ input_ws.cell(
+ row=row, column=config_file["expect_return_code_seq"]).value
+ expect_return_value = \
+ input_ws.cell(
+ row=row, column=config_file["expect_return_value_seq"]).value
+ attr_name = input_ws.cell(row=row, column=config_file["attr_name"]).value
+
+ if req_body_value is not None:
+ req_body_value = literal_eval(req_body_value)
+ if expect_return_code is not None:
+ expect_return_code = int(expect_return_code)
+ if expect_return_value is not None:
+ expect_return_value = literal_eval(expect_return_value)
+ return pro_value, url_value, req_body_value, expect_return_code,\
+ expect_return_value, attr_name
+
+
+def execute_post_url(body, handler, url):
+ '''
+ execute post url
+ '''
+ LOGGER.debug("execute url %s", url)
+ rsp = handler.post(url, body)
+ LOGGER.debug("post response %s", rsp)
+ if not isinstance(rsp, dict):
+ LOGGER.error("%s,%s, expected type %s",
+ ERROR_CODE["E400004"], type(rsp), dict)
+ return None
+ return rsp
+
+
+def execute_patch_url(body, http_handler, url):
+ '''
+ execute patch url
+ '''
+ etag = get_etag(http_handler, url)
+ LOGGER.info("etag %s", etag)
+ rsp = http_handler.patch(url, body, etag)
+ LOGGER.debug("patch response %s", rsp)
+ LOGGER.debug("type response is %s", type(rsp))
+ ret_dict = {}
+ if rsp is None:
+ LOGGER.error("%s %s", ERROR_CODE['E100001'], url)
+ ret_dict.update({"return_code": "N/A"})
+ ret_dict.update({"return_value": "Failure"})
+ return ret_dict
+ ret_dict.update({"return_code": rsp.code})
+ return_value = json.loads(rsp.read())
+ ret_dict.update({"return_value": return_value})
+ return ret_dict
+
+
+def handle_final_url(method, url_list, req_body=None, http_handler=None):
+ '''execute the requested url to get the response
+ '''
+ response_list = []
+ if method == 'GET':
+ for url_case in url_list:
+ rsp = execute_get_url(url_case, http_handler)
+ response_list.append(rsp)
+ elif method == 'POST':
+ if len(url_list) > 1:
+ LOGGER.error(ERROR_CODE['E100002'])
+ return None
+ url_value = url_list[0]
+ rsp = execute_post_url(req_body, http_handler, url_value)
+ response_list.append(rsp)
+ elif method == 'PATCH':
+ for url_case in url_list:
+ LOGGER.info(url_case)
+ temp = execute_patch_url(req_body, http_handler, url_case)
+ if temp is not None:
+ response_list.append(temp)
+ elif method == 'DELETE':
+ pass
+ LOGGER.info("response_list %s", response_list)
+ return response_list
+
+
+def check_component_cnt(expect_return_value, res_list, result):
+ '''
+ #check if the component count meet the required.
+ '''
+ if expect_return_value.__contains__('count'):
+ if expect_return_value['count'] == len(res_list):
+ result.update({"count": "Success"})
+ else:
+ result.update({"count":
+ "Failure, the actual num is " + str(len(res_list))})
+ else:
+ result.update({"count": "N/A for this case"})
+ return result
+
+
+def parse_test_result(expect_return_value, expect_return_code,
+ actual_result_list, final_result):
+ '''
+ @param expected_return_value expected value set in input excel
+ @param expected_return_code expected return code
+ @param actual_result_list: actual result run by each url list checking
+ @param final_result: returned final result
+ parsing the test final_result by comparing expected_value with
+ real test final_result value.
+ '''
+ return_code_list = []
+ return_value_list = []
+ flag = 0
+ final_result = check_component_cnt(expect_return_value,
+ actual_result_list, final_result)
+
+ for each_result in actual_result_list:
+ temp_result = {}
+ if each_result is not None:
+ LOGGER.debug("current result is %s,result_list is %s",
+ each_result, actual_result_list)
+ return_code = each_result["return_code"]
+ return_code_list.append(return_code)
+ return_value = each_result["return_value"]
+ if return_code == expect_return_code:
+ code_result = 'Success'
+ else:
+ code_result = 'Failure'
+ temp_result.update({'return_code': code_result})
+ else:
+ LOGGER.warning("%s ,set failure", WARN_CODE['W100003'])
+ temp_result.update({'return_code': 'Failure'})
+ return_value_list.append(temp_result)
+ flag += 1
+ continue
+
+ # parse the actual result according to the expected value hierachy.
+ ex_value = copy.deepcopy(expect_return_value)
+ exp_act_pairs = {}
+ for key, value in ex_value.items():
+ if key in return_value:
+ exp_act_pairs[key] = parse_data(value, return_value[key])
+ elif key == 'count':
+ pass
+ else:
+ LOGGER.error("%s, %s", ERROR_CODE['E500001'], key)
+ exp_act_pairs[key] = \
+ (value, "Can't find key {} in return value".format(key))
+ LOGGER.debug("real_result:%s", exp_act_pairs)
+
+ # comparing expected result with real result.
+ if exp_act_pairs:
+ for key, value in exp_act_pairs.items():
+ temp_result[key], flag = compare_data(value, flag)
+ return_value_list.append(temp_result)
+ return return_value_list, return_code_list, final_result, flag
+
+
+def write_result_2_excel(config_file, input_ws, row, flag, result):
+ '''
+ write the result back to excel
+ '''
+ if not result:
+ input_ws.cell(row=row, column=config_file["detail_result"],
+ value=str('N/A'))
+ else:
+ input_ws.cell(row=row, column=config_file["detail_result"],
+ value=str(result))
+ if flag == 0:
+ input_ws.cell(row=row, column=config_file["final_result"],
+ value=str("Success"))
+ else:
+ input_ws.cell(row=row, column=config_file["final_result"],
+ value=str("Failure"))
+ return row
+
+
+def execute_final_url(config_file, depends_id, http_handler,
+ method, url, req_body):
+ '''
+ execute final url to get the request result
+ '''
+ url_list = create_real_url(url, depends_id, config_file)
+ rsp_list = handle_final_url(method, url_list, req_body, http_handler)
+ return rsp_list
+
+
+def run_test_case_yaml(config_file, case_file, depends_id, http_handler):
+ '''run test case from cases.yaml
+ '''
+ LOGGER.info("############### start perform test case #################")
+ cases_result = []
+ cases = read_yaml(case_file)
+ for case in cases:
+ method, url, req_body, expected_code, expected_value, tc_name \
+ = case['method'], case['url'], case['request_body'], \
+ case['expected_code'], case['expected_result'], case['case_name']
+
+ expected_value = literal_eval(expected_value)
+ flag = 0
+ final_rst = {}
+ rsp_list = execute_final_url(config_file, depends_id,
+ http_handler, method, url, req_body)
+ if rsp_list is not None and len(rsp_list) > 0:
+ return_value_list, return_code_list, final_rst, flag = \
+ parse_test_result(
+ expected_value, expected_code, rsp_list, final_rst)
+ final_rst.update({'info': return_value_list})
+ LOGGER.debug("return_code_list:%s", return_code_list)
+ case['return_code_seq'] = str(return_code_list)
+ else:
+ LOGGER.error("%s", ERROR_CODE['E600001'])
+ flag += 1
+ case['final_rst'] = "Success" if flag == 0 else "Failure"
+ case['details_result'] = \
+ str(final_rst) if len(final_rst) > 0 else "N/A"
+ cases_result.append(case)
+ LOGGER.info("writing test final_rst for case %s", tc_name)
+
+ write_result_2_yaml(cases_result)
+
+ LOGGER.info("############### end perform test case ###################")
+
+
+def read_yaml(file):
+ '''read a yaml file
+ '''
+ if not os.path.exists(file):
+ LOGGER.info("%s %s", ERROR_CODE['E400001'], file)
+ return None
+ return yaml.load(open(file, "r"))
+
+
+def write_result_2_yaml(result):
+ '''
+ write test result to new report.yaml
+ '''
+ LOGGER.info("writing to yaml file")
+ yaml.safe_dump(result, open("./conf/report.yaml", "w"),
+ explicit_start=True)
+
+
+def run_test_case_excel(config_file, case_file, depends_id, http_handler):
+ '''
+ perform the test case one by one,
+ and write test final_result back to the excel.
+ '''
+ LOGGER.info("############### start perform test case #################")
+ input_file = load_workbook(case_file)
+ input_ws = input_file[input_file.sheetnames[0]]
+
+ row = 2
+ while input_ws.cell(row=row, column=1).value:
+ method, url, req_body, expected_code, expected_value, tc_name \
+ = read_row(input_ws, row, config_file)
+
+ LOGGER.info("run test case ##%s##", tc_name)
+ if tc_name == "configure BMC ip in static, ipv4":
+ LOGGER.debug("debug")
+ flag = 0
+ final_result = {}
+ rsp_list = []
+ rsp_list = execute_final_url(config_file, depends_id, http_handler,
+ method, url, req_body)
+ if rsp_list is not None and len(rsp_list) > 0:
+ return_value_list, return_code_list, final_result, flag = \
+ parse_test_result(expected_value, expected_code,
+ rsp_list, final_result)
+ final_result.update({'info': return_value_list})
+ LOGGER.debug("return_code_list:%s", return_code_list)
+ input_ws.cell(row=row, column=config_file["return_code_seq"],
+ value=str(return_code_list))
+ else:
+ LOGGER.error("%s", ERROR_CODE['E600001'])
+ flag += 1
+
+ LOGGER.info("writing test final_result for row %s", row)
+ row = write_result_2_excel(
+ config_file, input_ws, row, flag, final_result)
+ row += 1
+ input_file.save(case_file)
+ LOGGER.info("############### end perform test case ###################")
+
+
+def run(conf_file, case_excel_file=None, depend_yaml_file=None,
+ case_yaml_file=None, file_mode=None):
+ '''
+ @param conf_file: config.yaml
+ @param case_excel_file: excel case file
+ @param depend_yaml_file: depends yaml file used if file_mode=yaml
+ @param case_yaml_file: case yaml file, used if file_mode=yaml
+ @param file_mode: "excel" or "yaml"
+ access function
+ '''
+ # parse config.yaml
+ LOGGER.info("start engine ...")
+ config_file = parse_config(conf_file)
+ http_handler = UrllibHttpHandler()
+
+ # get bmc info
+ bmc_ip, bmc_user, bmc_pwd = \
+ config_file["bmc_ip"], config_file["bmc_user"], config_file["bmc_pwd"]
+ ACCOUNT_INFO.update({"UserName": bmc_user})
+ ACCOUNT_INFO.update({"Password": bmc_pwd})
+
+ url = "https://{0}/redfish/v1/SessionService/Sessions".format(bmc_ip)
+ x_auth_token = get_token(http_handler, url)
+ LOGGER.info("x_auth_token: %s", x_auth_token)
+
+ if x_auth_token is None:
+ LOGGER.error("%s token is None", ERROR_CODE['E300001'])
+ return None
+
+ HEADERS.update({"X-Auth-Token": x_auth_token})
+ id_info_list = None
+ if file_mode == "excel":
+ id_info_list = get_component_ids_excel(case_excel_file)
+ elif file_mode == "yaml":
+ id_info_list = get_component_ids_yaml(depend_yaml_file)
+ else:
+ LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode)
+ return None
+
+ # get dependent id
+ depends_id = get_depend_id(config_file, http_handler, id_info_list)
+
+ # read the test case sheet and perform test
+ if file_mode == "excel":
+ run_test_case_excel(config_file,
+ case_excel_file, depends_id, http_handler)
+ elif file_mode == "yaml":
+ run_test_case_yaml(config_file,
+ case_yaml_file, depends_id, http_handler)
+ else:
+ LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode)
+ return None
+
+ LOGGER.info("done,checking the log %s", LOG_FILE)
+
+ return True