#!/usr/bin/env python3 # Copyright 2020 University Of Delhi. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """(Post)Deployment State Validation tool """ import argparse import logging import os import re import ast import sys from datetime import datetime import requests from tools.conf import settings from tools.result_api import result_api, Local from core import load_pdf from core import display_report from validator import AirshipValidator VERBOSITY_LEVELS = { 'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, 'critical': logging.CRITICAL } _LOGGER = logging.getLogger() _CURR_DIR = os.path.dirname(os.path.realpath(__file__)) def parse_param_string(values): """ Parse and split a single '--test-params' argument. This expects either 'x=y', 'x=y,z' or 'x' (implicit true) values. For multiple overrides use a ; separated list for e.g. --test-params 'x=z; y=(a,b)' """ results = {} if values == '': return {} for param, _, value in re.findall('([^;=]+)(=([^;]+))?', values): param = param.strip() value = value.strip() if param: if value: # values are passed inside string from CLI, so we must retype them accordingly try: results[param] = ast.literal_eval(value) except ValueError: # for backward compatibility, we have to accept strings without quotes _LOGGER.warning(f'Adding missing quotes around string value: {param} = {str(value)}') results[param] = str(value) else: results[param] = True return results def parse_arguments(): """ Parse command line arguments. """ class _SplitTestParamsAction(argparse.Action): """ Parse and split '--conf-params' arguments. This expects either a single list of ; separated overrides as 'x=y', 'x=y,z' or 'x' (implicit true) values. e.g. --test-params 'x=z; y=(a,b)' Or a list of these ; separated lists with overrides for multiple tests. e.g. --test-params "['x=z; y=(a,b)','x=z']" """ def __call__(self, parser, namespace, values, option_string=None): if values[0] == '[': input_list = ast.literal_eval(values) parameter_list = [] for test_params in input_list: parameter_list.append(parse_param_string(test_params)) else: parameter_list = parse_param_string(values) results = parameter_list setattr(namespace, self.dest, results) class _ValidateFileAction(argparse.Action): """Validate a file can be read from before using it. """ def __call__(self, parser, namespace, values, option_string=None): if not os.path.isfile(values): raise argparse.ArgumentTypeError( 'the path \'%s\' is not a valid path' % values) if not os.access(values, os.R_OK): raise argparse.ArgumentTypeError( 'the path \'%s\' is not accessible' % values) setattr(namespace, self.dest, values) def list_logging_levels(): """Give a summary of all available logging levels. :return: List of verbosity level names in decreasing order of verbosity """ return sorted(VERBOSITY_LEVELS.keys(), key=lambda x: VERBOSITY_LEVELS[x]) parser = argparse.ArgumentParser(description='(Post)Deployment State Validation tool') parser.add_argument('--version', action='version', version='%(prog)s 0.1') parser.add_argument('--log-verbosity', choices=list_logging_levels(), help='logging level') parser.add_argument('-s', '--save-results-locally', action="store_true", default=None, help='turn on local storage of results') group = parser.add_argument_group('test configuration options') group.add_argument('--conf-file', action=_ValidateFileAction, help='settings file') group.add_argument('--conf-params', action=_SplitTestParamsAction, help='csv list of conf-file parameters: key=val; e.g. ' 'KUBE_CONFIG=/path/to/kubeconfig/file;PDF_FILE=path/to/pdf/file' ' or a list of csv lists of conf-file parameters: key=val; e.g. ' '[\'KUBE_CONFIG=/path/to/kubeconfig/file\',' '\'PDF_FILE=path/to/pdf/file\']') group = parser.add_argument_group('override conf-file options') group.add_argument('--pdf-file', help='Path to PDF file') args = vars(parser.parse_args()) return args def main(): """Main Function """ args = parse_arguments() ################################## # Load settings: # precedence: cli > env > conf-file > settings-dir ################################## settings.load_from_dir(os.path.join(_CURR_DIR, 'settings')) if args["conf_file"]: settings.load_from_file(os.path.join(_CURR_DIR, args["conf_file"])) settings.load_from_env() if args['conf_params']: args = {**args.pop('conf_params'), **args} #merge 2 dicts for key in args: settings.setValue(key, args[key]) ################################## # Results settings: ################################## now = datetime.now().strftime("%d-%m-%Y_%H-%M-%S/") results_path = settings.getValue('results_dir') + now settings.setValue('results_path', results_path) if not os.path.exists(results_path): os.makedirs(results_path) ################################## # Logs settings: ################################## log_file = results_path + settings.getValue('log_filename') stream_handler = logging.StreamHandler(sys.stdout) file_handler = logging.FileHandler(filename=log_file) level = settings.getValue('LOG_VERBOSITY') _LOGGER.setLevel(logging.DEBUG) stream_handler.setLevel(VERBOSITY_LEVELS[level]) file_handler.setLevel(VERBOSITY_LEVELS[level]) form = '[%(levelname)-8s] : %(msg)s' if level == 'debug': form = '["%(asctime)s"][%(levelname)-8s][%(pathname)s:%(funcName)s:%(lineno)-4s] : %(msg)s' stream_handler.setFormatter(logging.Formatter(form)) file_handler.setFormatter(logging.Formatter(form)) _LOGGER.addHandler(stream_handler) _LOGGER.addHandler(file_handler) ################################## # ResultAPI settings: ################################## if settings.getValue('save_results_locally'): result_api.register_storage(Local()) #### # Still Developing #### load_pdf() installer = settings.getValue('pdf_file')["deployment_info"]["installer_used"].lower() if installer == 'airship': airship = AirshipValidator() airship.validate() report = airship.get_report() # Displaying Report display_report(report) if settings.getValue('enable_testapi'): logger = logging.getLogger(__name__) logger.info('Publishing results to TestAPI') url = settings.getValue('testapi_url') url += "/results/" response = requests.post(url, json=report) logger.info(response) if __name__ == "__main__": main()