aboutsummaryrefslogtreecommitdiffstats
path: root/tools/hdv/redfish/hdv_redfish.py
diff options
context:
space:
mode:
Diffstat (limited to 'tools/hdv/redfish/hdv_redfish.py')
-rw-r--r--tools/hdv/redfish/hdv_redfish.py676
1 files changed, 0 insertions, 676 deletions
diff --git a/tools/hdv/redfish/hdv_redfish.py b/tools/hdv/redfish/hdv_redfish.py
deleted file mode 100644
index 5fc44ca..0000000
--- a/tools/hdv/redfish/hdv_redfish.py
+++ /dev/null
@@ -1,676 +0,0 @@
-##############################################################################
-# Copyright (c) 2020 China Mobile Co.,Ltd and others.
-#
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-'''
-an implementation of hardware delivery validation based on redfish interface.
-'''
-import time
-import os
-import re
-from re import DOTALL as DT
-import json
-import copy
-from ast import literal_eval
-import yaml
-from openpyxl.reader.excel import load_workbook
-from http_handler import UrllibHttpHandler, HEADERS
-# pylint: disable=E0611
-from log_utils import BASE_DIR, LOG_FILE, LOGGER
-from errors import ERROR_CODE, WARN_CODE
-
-LOGGER.info(BASE_DIR)
-
-ACCOUNT_INFO = {}
-WAIT_INTERVAL = 5
-
-
-def parse_config(config_yaml):
- """
- parse setting from config.yaml
- :return:
- """
- try:
- if not os.path.exists(config_yaml):
- LOGGER.error(" %s, %s", ERROR_CODE['E400001'], config_yaml)
- with open(config_yaml, 'r') as conf_file:
- config = yaml.load(conf_file.read(), Loader=yaml.FullLoader)
- except FileNotFoundError as fnfe:
- LOGGER.error(fnfe)
- LOGGER.error(u"%s", ERROR_CODE['E400002'])
- return None
- else:
- return config
-
-
-def get_token(http_handler, url):
- """
- :return: x_auth_token
- """
- retry_num = 3
- x_auth_token = None
- while retry_num:
- retry_num -= 1
- res = http_handler.post(url, ACCOUNT_INFO)
- if res is None:
- LOGGER.error("%s, %s", WARN_CODE['W100001'], url)
- LOGGER.info("wait %s seconds to try again", WAIT_INTERVAL)
- time.sleep(WAIT_INTERVAL)
- continue
- data = res.info()
- if "X-Auth-Token" in data:
- x_auth_token = data.get("X-Auth-Token")
- return x_auth_token
- else:
- time.sleep(WAIT_INTERVAL)
- return None
-
-
-def get_etag(http_handler, url):
- """
- :return: ETag
- """
- etag = None
- res = http_handler.get(url)
- data = None
- if res is not None:
- data = res.info()
- if data is not None and "ETag" in data:
- etag = data.get("ETag")
- return etag
-
-
-def parse_data(exp_value, act_value):
- '''
- parse the expected value and actual value:
- @return: case 1: exp_value and actual value is str or int,
- then return tuple (exp_value,act_value)
- case 2: list,dict type, then return updated exp_value
- ERROR_CODE for unexpected case.
- '''
- if isinstance(exp_value, (str, int)) and isinstance(act_value, (str, int)):
- return (exp_value, act_value)
- if isinstance(exp_value, list):
- if not isinstance(act_value, list):
- return (exp_value, act_value)
- else:
- for exp in enumerate(exp_value, start=0):
- index = exp[0]
- exp_value[index] = parse_data(
- exp_value[index], act_value[index])
-
- elif isinstance(exp_value, dict):
- if isinstance(act_value, dict):
- for key, val in exp_value.items():
- if key in act_value:
- exp_value[key] = parse_data(val, act_value[key])
- else:
- LOGGER.error("%s,%s", ERROR_CODE['E500001'], key)
- else:
- LOGGER.error("%s,expected: %s , actual: %s",
- ERROR_CODE['E400005'], exp_value, act_value)
- else:
- LOGGER.error("%s, expected type:%s, actual type %s",
- ERROR_CODE['E400006'], type(exp_value), type(act_value))
- return exp_value
-
-
-def compare_data(value, flag):
- '''
- compare value content
- '''
- if isinstance(value, tuple):
- if value[1] is not None or value[1]:
- if value[0] == 'N/A':
- return "Success", flag
- elif isinstance(value[0], (bool, int, str)):
- if value[0] == value[1]:
- return "Success", flag
- else:
- flag += 1
- return "Failure, expect value: " + str(value[0]) + \
- ", return value: " + str(value[1]), flag
- elif value[1] in value[0] or value[0] == ['N/A']:
- return "Success", flag
- else:
- flag += 1
- return "Failure, expect value: " + str(value[0]) + \
- ", return value: " + str(value[1]), flag
- else:
- flag += 1
- return "Failure, expect value: " + str(value[0]) + \
- ", return value: " + str(value[1]), flag
-
- elif isinstance(value, list):
- for elem in enumerate(value, start=0):
- index = elem[0]
- value[index], flag = compare_data(value[index], flag)
- elif isinstance(value, dict):
- for key, val in value.items():
- value[key], flag = compare_data(val, flag)
- else:
- LOGGER.error("%s", ERROR_CODE['E400007'])
- flag += 1
- return value, flag
-
-
-def get_component_ids_yaml(file):
- '''
- get component ids from yaml file
- '''
- if not os.path.exists(file):
- LOGGER.info("%s, %s", ERROR_CODE['E400001'], file)
- return None
- return yaml.load(open(file, "r"))
-
-
-def get_component_ids_excel(excel_file):
- '''
- get the component_id settings from the excel sheet2
- the componnet_id is the parent id of the hardware resource of sheet1
- '''
- input_file = load_workbook(excel_file)
- input_ws = input_file[input_file.sheetnames[1]]
- cell_key = []
- id_info_list = []
- for i in range(1, 5):
- cell_key.append(input_ws.cell(row=1, column=i).value)
- row = 2
- while input_ws.cell(row=row, column=1).value:
- cell_value = []
- for i in range(1, 5):
-
- cell_value.append(input_ws.cell(row=row, column=i).value.
- encode("utf8").decode("utf8").replace('\n', ''))
- cell_dict = dict(zip(cell_key, cell_value))
- row += 1
- id_info_list.append(cell_dict)
- return id_info_list
-
-
-def create_real_url(url_value, id_dict, config_file):
- '''
- create the real url
- either a static url, or a replaced url by depended_id
- '''
- url_list = []
- replaced = 0
- regexp = r'[^{]*{(?P<var>[a-zA-Z_]*)}'
- # pattern = re.compile(regexp, re.S)
- pattern = re.compile(regexp, DT)
- LOGGER.info("url_value %s", url_value)
- matches = list(pattern.finditer(url_value))
- for match in matches:
- value = match.groupdict()
- if value['var'] in config_file:
- url_value = url_value.replace('{' + str(value['var']) + '}',
- str(config_file[value['var']]))
-
- elif value['var'] in id_dict:
- replaced = 1
- instance_list = id_dict[value['var']]
- for instance in instance_list:
- sgl_url = url_value.replace('{' + str(value['var']) + '}',
- str(instance))
- LOGGER.debug("replaced url value %s", sgl_url)
- url_list.append(sgl_url)
- else:
- replaced = 2
- LOGGER.error("%s for parameter %s",
- ERROR_CODE['E300002'], value['var'])
- # combine single case with list case together.
- if replaced == 0:
- LOGGER.info("adding static url %s into list", url_value)
- url_list.append(url_value)
- return url_list
-
-
-def execute_get_url(url, http_handler):
- """
- execute the url
- """
- LOGGER.debug("execute url %s", url)
- rsp = http_handler.get(url)
- if rsp is None:
- LOGGER.error("return None for url %s", url)
- return None
- ret_dict = {}
- ret_dict.update({"return_code": rsp.code})
- return_value = json.loads(rsp.read())
- ret_dict.update({"return_value": return_value})
- LOGGER.info("ret_dict is %s", ret_dict)
- LOGGER.debug("ret_dict type is %s", type(ret_dict))
- return ret_dict
-
-
-def handle_depend_url(method, url_list, http_handler):
- '''
- run request url in url_list and collect the response as list
- '''
- response_list = []
- if method == 'GET':
- for url_case in url_list:
- response = execute_get_url(url_case, http_handler)
- response_list.append(response)
- elif method == 'POST':
- pass
- elif method == 'PATCH':
- pass
- elif method == 'DELETE':
- pass
- return response_list
-
-
-def create_obj_id_list(key_flags, response_list):
- '''
- create object id list
- '''
- if response_list is None or response_list.__len__() == 0:
- LOGGER.debug("response list is None")
- return None
- if key_flags is not None:
- key_list = key_flags.split(':')
- end_id_list = []
- for response in response_list:
- if response is None:
- LOGGER.warning("response is None")
- continue
- return_value = response['return_value']
- if len(key_list) == 1 and key_list[0] in return_value:
- for i in return_value[key_list[0]]:
- end_id_list.append(i['@odata.id'])
- elif len(key_list) > 1:
- for elem in enumerate(key_list, start=0):
- index = elem[0]
- if index == len(key_list) - 1:
- for case in return_value[key_list[index]]:
- end_id_list.append(case['@odata.id'])
- else:
- if isinstance(return_value, list):
- return_value = return_value[0]
- elif isinstance(return_value, dict):
- return_value = return_value[key_list[index]]
- else:
- LOGGER.warning("%s, %s", WARN_CODE['W100002'],
- type(return_value))
-
- else:
- LOGGER.error("%s %s", ERROR_CODE['E400003'], key_flags)
- return end_id_list
-
-
-def get_depend_id(config_file, http_handler, depend_ids):
- '''
- @param mode: yaml or excel,default value "excel"
- parse the component id list
- build up the id resource for each component_id
- return: id_dict like {component_id:[obj_list]}
- '''
- id_dict = {}
- for case in depend_ids:
- component_name = case.get('component_id')
- LOGGER.info("parsing component %s", component_name)
- pro_value = case.get('pro_value')
- url_value = case.get('url_value')
- key_flags = case.get('key_flags')
- # url_list = []
- url_list = create_real_url(url_value, id_dict, config_file)
- # response_list = []
- response_list = handle_depend_url(pro_value, url_list, http_handler)
- # end_id_list = []
- end_id_list = create_obj_id_list(key_flags, response_list)
- if end_id_list is None or end_id_list.__len__() == 0:
- LOGGER.error("%s,%s", ERROR_CODE['E300003'], component_name)
- continue
- id_dict.update({component_name: end_id_list})
- LOGGER.debug("id_dict content is %s", id_dict)
- return id_dict
-
-
-def read_row(input_ws, row, config_file):
- '''
- read a row value
- '''
- pro_value = input_ws.cell(row=row, column=config_file["pro_seq"]).value
- url_value = input_ws.cell(row=row, column=config_file["url_seq"]).value
- req_body_value = input_ws.cell(
- row=row, column=config_file["req_body_seq"]).value
- expect_return_code = \
- input_ws.cell(
- row=row, column=config_file["expect_return_code_seq"]).value
- expect_return_value = \
- input_ws.cell(
- row=row, column=config_file["expect_return_value_seq"]).value
- attr_name = input_ws.cell(row=row, column=config_file["attr_name"]).value
-
- if req_body_value is not None:
- req_body_value = literal_eval(req_body_value)
- if expect_return_code is not None:
- expect_return_code = int(expect_return_code)
- if expect_return_value is not None:
- expect_return_value = literal_eval(expect_return_value)
- return pro_value, url_value, req_body_value, expect_return_code,\
- expect_return_value, attr_name
-
-
-def execute_post_url(body, handler, url):
- '''
- execute post url
- '''
- LOGGER.debug("execute url %s", url)
- rsp = handler.post(url, body)
- LOGGER.debug("post response %s", rsp)
- if not isinstance(rsp, dict):
- LOGGER.error("%s,%s, expected type %s",
- ERROR_CODE["E400004"], type(rsp), dict)
- return None
- return rsp
-
-
-def execute_patch_url(body, http_handler, url):
- '''
- execute patch url
- '''
- etag = get_etag(http_handler, url)
- LOGGER.info("etag %s", etag)
- rsp = http_handler.patch(url, body, etag)
- LOGGER.debug("patch response %s", rsp)
- LOGGER.debug("type response is %s", type(rsp))
- ret_dict = {}
- if rsp is None:
- LOGGER.error("%s %s", ERROR_CODE['E100001'], url)
- ret_dict.update({"return_code": "N/A"})
- ret_dict.update({"return_value": "Failure"})
- return ret_dict
- ret_dict.update({"return_code": rsp.code})
- return_value = json.loads(rsp.read())
- ret_dict.update({"return_value": return_value})
- return ret_dict
-
-
-def handle_final_url(method, url_list, req_body=None, http_handler=None):
- '''execute the requested url to get the response
- '''
- response_list = []
- if method == 'GET':
- for url_case in url_list:
- rsp = execute_get_url(url_case, http_handler)
- response_list.append(rsp)
- elif method == 'POST':
- if len(url_list) > 1:
- LOGGER.error(ERROR_CODE['E100002'])
- return None
- url_value = url_list[0]
- rsp = execute_post_url(req_body, http_handler, url_value)
- response_list.append(rsp)
- elif method == 'PATCH':
- for url_case in url_list:
- LOGGER.info(url_case)
- temp = execute_patch_url(req_body, http_handler, url_case)
- if temp is not None:
- response_list.append(temp)
- elif method == 'DELETE':
- pass
- LOGGER.info("response_list %s", response_list)
- return response_list
-
-
-def check_component_cnt(expect_return_value, res_list, result):
- '''
- #check if the component count meet the required.
- '''
- if expect_return_value.__contains__('count'):
- if expect_return_value['count'] == len(res_list):
- result.update({"count": "Success"})
- else:
- result.update({"count":
- "Failure, the actual num is " + str(len(res_list))})
- else:
- result.update({"count": "N/A for this case"})
- return result
-
-
-def parse_test_result(expect_return_value, expect_return_code,
- actual_result_list, final_result):
- '''
- @param expected_return_value expected value set in input excel
- @param expected_return_code expected return code
- @param actual_result_list: actual result run by each url list checking
- @param final_result: returned final result
- parsing the test final_result by comparing expected_value with
- real test final_result value.
- '''
- return_code_list = []
- return_value_list = []
- flag = 0
- final_result = check_component_cnt(expect_return_value,
- actual_result_list, final_result)
-
- for each_result in actual_result_list:
- temp_result = {}
- if each_result is not None:
- LOGGER.debug("current result is %s,result_list is %s",
- each_result, actual_result_list)
- return_code = each_result["return_code"]
- return_code_list.append(return_code)
- return_value = each_result["return_value"]
- if return_code == expect_return_code:
- code_result = 'Success'
- else:
- code_result = 'Failure'
- temp_result.update({'return_code': code_result})
- else:
- LOGGER.warning("%s ,set failure", WARN_CODE['W100003'])
- temp_result.update({'return_code': 'Failure'})
- return_value_list.append(temp_result)
- flag += 1
- continue
-
- # parse the actual result according to the expected value hierachy.
- ex_value = copy.deepcopy(expect_return_value)
- exp_act_pairs = {}
- for key, value in ex_value.items():
- if key in return_value:
- exp_act_pairs[key] = parse_data(value, return_value[key])
- elif key == 'count':
- pass
- else:
- LOGGER.error("%s, %s", ERROR_CODE['E500001'], key)
- exp_act_pairs[key] = \
- (value, "Can't find key {} in return value".format(key))
- LOGGER.debug("real_result:%s", exp_act_pairs)
-
- # comparing expected result with real result.
- if exp_act_pairs:
- for key, value in exp_act_pairs.items():
- temp_result[key], flag = compare_data(value, flag)
- return_value_list.append(temp_result)
- return return_value_list, return_code_list, final_result, flag
-
-
-def write_result_2_excel(config_file, input_ws, row, flag, result):
- '''
- write the result back to excel
- '''
- if not result:
- input_ws.cell(row=row, column=config_file["detail_result"],
- value=str('N/A'))
- else:
- input_ws.cell(row=row, column=config_file["detail_result"],
- value=str(result))
- if flag == 0:
- input_ws.cell(row=row, column=config_file["final_result"],
- value=str("Success"))
- else:
- input_ws.cell(row=row, column=config_file["final_result"],
- value=str("Failure"))
- return row
-
-
-def execute_final_url(config_file, depends_id, http_handler,
- method, url, req_body):
- '''
- execute final url to get the request result
- '''
- url_list = create_real_url(url, depends_id, config_file)
- rsp_list = handle_final_url(method, url_list, req_body, http_handler)
- return rsp_list
-
-
-def run_test_case_yaml(config_file, case_file, depends_id, http_handler):
- '''run test case from cases.yaml
- '''
- LOGGER.info("############### start perform test case #################")
- cases_result = []
- cases = read_yaml(case_file)
- for case in cases:
- method, url, req_body, expected_code, expected_value, tc_name \
- = case['method'], case['url'], case['request_body'], \
- case['expected_code'], case['expected_result'], case['case_name']
-
- expected_value = literal_eval(expected_value)
- flag = 0
- final_rst = {}
- rsp_list = execute_final_url(config_file, depends_id,
- http_handler, method, url, req_body)
- if rsp_list is not None and len(rsp_list) > 0:
- return_value_list, return_code_list, final_rst, flag = \
- parse_test_result(
- expected_value, expected_code, rsp_list, final_rst)
- final_rst.update({'info': return_value_list})
- LOGGER.debug("return_code_list:%s", return_code_list)
- case['return_code_seq'] = str(return_code_list)
- else:
- LOGGER.error("%s", ERROR_CODE['E600001'])
- flag += 1
- case['final_rst'] = "Success" if flag == 0 else "Failure"
- case['details_result'] = \
- str(final_rst) if len(final_rst) > 0 else "N/A"
- cases_result.append(case)
- LOGGER.info("writing test final_rst for case %s", tc_name)
-
- write_result_2_yaml(cases_result)
-
- LOGGER.info("############### end perform test case ###################")
-
-
-def read_yaml(file):
- '''read a yaml file
- '''
- if not os.path.exists(file):
- LOGGER.info("%s %s", ERROR_CODE['E400001'], file)
- return None
- return yaml.load(open(file, "r"))
-
-
-def write_result_2_yaml(result):
- '''
- write test result to new report.yaml
- '''
- LOGGER.info("writing to yaml file")
- yaml.safe_dump(result, open("./conf/report.yaml", "w"),
- explicit_start=True)
-
-
-def run_test_case_excel(config_file, case_file, depends_id, http_handler):
- '''
- perform the test case one by one,
- and write test final_result back to the excel.
- '''
- LOGGER.info("############### start perform test case #################")
- input_file = load_workbook(case_file)
- input_ws = input_file[input_file.sheetnames[0]]
-
- row = 2
- while input_ws.cell(row=row, column=1).value:
- method, url, req_body, expected_code, expected_value, tc_name \
- = read_row(input_ws, row, config_file)
-
- LOGGER.info("run test case ##%s##", tc_name)
- if tc_name == "configure BMC ip in static, ipv4":
- LOGGER.debug("debug")
- flag = 0
- final_result = {}
- rsp_list = []
- rsp_list = execute_final_url(config_file, depends_id, http_handler,
- method, url, req_body)
- if rsp_list is not None and len(rsp_list) > 0:
- return_value_list, return_code_list, final_result, flag = \
- parse_test_result(expected_value, expected_code,
- rsp_list, final_result)
- final_result.update({'info': return_value_list})
- LOGGER.debug("return_code_list:%s", return_code_list)
- input_ws.cell(row=row, column=config_file["return_code_seq"],
- value=str(return_code_list))
- else:
- LOGGER.error("%s", ERROR_CODE['E600001'])
- flag += 1
-
- LOGGER.info("writing test final_result for row %s", row)
- row = write_result_2_excel(
- config_file, input_ws, row, flag, final_result)
- row += 1
- input_file.save(case_file)
- LOGGER.info("############### end perform test case ###################")
-
-
-def run(conf_file, case_excel_file=None, depend_yaml_file=None,
- case_yaml_file=None, file_mode=None):
- '''
- @param conf_file: config.yaml
- @param case_excel_file: excel case file
- @param depend_yaml_file: depends yaml file used if file_mode=yaml
- @param case_yaml_file: case yaml file, used if file_mode=yaml
- @param file_mode: "excel" or "yaml"
- access function
- '''
- # parse config.yaml
- LOGGER.info("start engine ...")
- config_file = parse_config(conf_file)
- http_handler = UrllibHttpHandler()
-
- # get bmc info
- bmc_ip, bmc_user, bmc_pwd = \
- config_file["bmc_ip"], config_file["bmc_user"], config_file["bmc_pwd"]
- ACCOUNT_INFO.update({"UserName": bmc_user})
- ACCOUNT_INFO.update({"Password": bmc_pwd})
-
- url = "https://{0}/redfish/v1/SessionService/Sessions".format(bmc_ip)
- x_auth_token = get_token(http_handler, url)
- LOGGER.info("x_auth_token: %s", x_auth_token)
-
- if x_auth_token is None:
- LOGGER.error("%s token is None", ERROR_CODE['E300001'])
- return None
-
- HEADERS.update({"X-Auth-Token": x_auth_token})
- id_info_list = None
- if file_mode == "excel":
- id_info_list = get_component_ids_excel(case_excel_file)
- elif file_mode == "yaml":
- id_info_list = get_component_ids_yaml(depend_yaml_file)
- else:
- LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode)
- return None
-
- # get dependent id
- depends_id = get_depend_id(config_file, http_handler, id_info_list)
-
- # read the test case sheet and perform test
- if file_mode == "excel":
- run_test_case_excel(config_file,
- case_excel_file, depends_id, http_handler)
- elif file_mode == "yaml":
- run_test_case_yaml(config_file,
- case_yaml_file, depends_id, http_handler)
- else:
- LOGGER.error("%s,%s", ERROR_CODE['E200001'], file_mode)
- return None
-
- LOGGER.info("done,checking the log %s", LOG_FILE)
-
- return True