From 7e83d0876ddb84a45e130eeba28bc40ef53c074b Mon Sep 17 00:00:00 2001 From: Yaron Yogev Date: Thu, 27 Jul 2017 09:02:54 +0300 Subject: Calipso initial release for OPNFV Change-Id: I7210c244b0c10fa80bfa8c77cb86c9d6ddf8bc88 Signed-off-by: Yaron Yogev --- app/api/validation/data_validate.py | 185 ++++++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) create mode 100644 app/api/validation/data_validate.py (limited to 'app/api/validation/data_validate.py') diff --git a/app/api/validation/data_validate.py b/app/api/validation/data_validate.py new file mode 100644 index 0000000..6928c4b --- /dev/null +++ b/app/api/validation/data_validate.py @@ -0,0 +1,185 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + + +from api.validation import regex + + +class DataValidate: + LIST = "list" + REGEX = "regex" + + def __init__(self): + super().__init__() + self.BOOL_CONVERSION = { + "true": True, + "1": True, + 1: True, + "false": False, + "0": False, + 0: False + } + self.TYPES_CUSTOMIZED_NAMES = { + 'str': 'string', + 'bool': 'boolean', + 'int': 'integer', + 'ObjectId': 'MongoDB ObjectId' + } + self.VALIDATE_SWITCHER = { + self.LIST: self.validate_value_in_list, + self.REGEX: regex.validate + } + + def validate_type(self, obj, t, convert_to_type): + if convert_to_type: + # user may input different values for the + # boolean True or False, convert the number or + # the string to corresponding python bool values + if t == bool: + if isinstance(obj, str): + obj = obj.lower() + if obj in self.BOOL_CONVERSION: + return self.BOOL_CONVERSION[obj] + return None + try: + obj = t(obj) + except Exception: + return None + return obj + else: + return obj if isinstance(obj, t) else None + + # get the requirement for validation + # this requirement object will be used in validate_data method + @staticmethod + def require(types, convert_to_type=False, validate=None, + requirement=None, mandatory=False, error_messages=None): + if error_messages is None: + error_messages = {} + return { + "types": types, + "convert_to_type": convert_to_type, + "validate": validate, + "requirement": requirement, + "mandatory": mandatory, + "error_messages": error_messages + } + + def validate_data(self, data, requirements, + additional_key_re=None, + can_be_empty_keys=[]): + + illegal_keys = [key for key in data.keys() + if key not in requirements.keys()] + + if additional_key_re: + illegal_keys = [key for key in illegal_keys + if not re.match(additional_key_re, key)] + + if illegal_keys: + return 'Invalid key(s): {0}'.format(' and '.join(illegal_keys)) + + for key, requirement in requirements.items(): + value = data.get(key) + error_messages = requirement['error_messages'] + + if not value and value is not False and value is not 0: + if key in data and key not in can_be_empty_keys: + return "Invalid data: value of {0} key doesn't exist ".format(key) + # check if the key is mandatory + mandatory_error = error_messages.get('mandatory') + error_message = self.mandatory_check(key, + requirement['mandatory'], + mandatory_error) + if error_message: + return error_message + continue + + # check the required types + error_message = self.types_check(requirement["types"], + requirement["convert_to_type"], + key, + value, data, + error_messages.get('types')) + if error_message: + return error_message + + # after the types check, the value of the key may be changed + # get the value again + value = data[key] + validate = requirement.get('validate') + if not validate: + continue + requirement_value = requirement.get('requirement') + # validate the data against the requirement + req_error = error_messages.get("requirement") + error_message = self.requirement_check(key, value, validate, + requirement_value, + req_error) + if error_message: + return error_message + return None + + @staticmethod + def mandatory_check(key, mandatory, error_message): + if mandatory: + return error_message if error_message \ + else "{} must be specified".format(key) + return None + + def types_check(self, requirement_types, convert_to_type, key, + value, data, error_message): + if not isinstance(requirement_types, list): + requirement_types = [requirement_types] + for requirement_type in requirement_types: + converted_val = self.validate_type( + value, requirement_type, convert_to_type + ) + if converted_val is not None: + if convert_to_type: + # value has been converted, update the data + data[key] = converted_val + return None + required_types = self.get_type_names(requirement_types) + return error_message if error_message else \ + "{0} must be {1}".format(key, " or ".join(required_types)) + + def requirement_check(self, key, value, validate, + requirement, error_message): + return self.VALIDATE_SWITCHER[validate](key, value, requirement, + error_message) + + @staticmethod + def validate_value_in_list(key, value, + required_list, error_message): + if not isinstance(value, list): + value = [value] + + if [v for v in value if v not in required_list]: + return error_message if error_message else\ + "The possible value of {0} is {1}".\ + format(key, " or ".join(required_list)) + return None + + # get customized type names from type names array + def get_type_names(self, types): + return [self.get_type_name(t) for t in types] + + # get customized type name from string + def get_type_name(self, t): + t = str(t) + a = t.split(" ")[1] + type_name = a.rstrip(">").strip("'") + # strip the former module names + type_name = type_name.split('.')[-1] + if type_name in self.TYPES_CUSTOMIZED_NAMES.keys(): + type_name = self.TYPES_CUSTOMIZED_NAMES[type_name] + return type_name -- cgit 1.2.3-korg