From 7e83d0876ddb84a45e130eeba28bc40ef53c074b Mon Sep 17 00:00:00 2001 From: Yaron Yogev Date: Thu, 27 Jul 2017 09:02:54 +0300 Subject: Calipso initial release for OPNFV Change-Id: I7210c244b0c10fa80bfa8c77cb86c9d6ddf8bc88 Signed-off-by: Yaron Yogev --- app/api/validation/__init__.py | 10 ++ app/api/validation/data_validate.py | 185 ++++++++++++++++++++++++++++++++++++ app/api/validation/regex.py | 57 +++++++++++ 3 files changed, 252 insertions(+) create mode 100644 app/api/validation/__init__.py create mode 100644 app/api/validation/data_validate.py create mode 100644 app/api/validation/regex.py (limited to 'app/api/validation') diff --git a/app/api/validation/__init__.py b/app/api/validation/__init__.py new file mode 100644 index 0000000..1e85a2a --- /dev/null +++ b/app/api/validation/__init__.py @@ -0,0 +1,10 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### + diff --git a/app/api/validation/data_validate.py b/app/api/validation/data_validate.py new file mode 100644 index 0000000..6928c4b --- /dev/null +++ b/app/api/validation/data_validate.py @@ -0,0 +1,185 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + + +from api.validation import regex + + +class DataValidate: + LIST = "list" + REGEX = "regex" + + def __init__(self): + super().__init__() + self.BOOL_CONVERSION = { + "true": True, + "1": True, + 1: True, + "false": False, + "0": False, + 0: False + } + self.TYPES_CUSTOMIZED_NAMES = { + 'str': 'string', + 'bool': 'boolean', + 'int': 'integer', + 'ObjectId': 'MongoDB ObjectId' + } + self.VALIDATE_SWITCHER = { + self.LIST: self.validate_value_in_list, + self.REGEX: regex.validate + } + + def validate_type(self, obj, t, convert_to_type): + if convert_to_type: + # user may input different values for the + # boolean True or False, convert the number or + # the string to corresponding python bool values + if t == bool: + if isinstance(obj, str): + obj = obj.lower() + if obj in self.BOOL_CONVERSION: + return self.BOOL_CONVERSION[obj] + return None + try: + obj = t(obj) + except Exception: + return None + return obj + else: + return obj if isinstance(obj, t) else None + + # get the requirement for validation + # this requirement object will be used in validate_data method + @staticmethod + def require(types, convert_to_type=False, validate=None, + requirement=None, mandatory=False, error_messages=None): + if error_messages is None: + error_messages = {} + return { + "types": types, + "convert_to_type": convert_to_type, + "validate": validate, + "requirement": requirement, + "mandatory": mandatory, + "error_messages": error_messages + } + + def validate_data(self, data, requirements, + additional_key_re=None, + can_be_empty_keys=[]): + + illegal_keys = [key for key in data.keys() + if key not in requirements.keys()] + + if additional_key_re: + illegal_keys = [key for key in illegal_keys + if not re.match(additional_key_re, key)] + + if illegal_keys: + return 'Invalid key(s): {0}'.format(' and '.join(illegal_keys)) + + for key, requirement in requirements.items(): + value = data.get(key) + error_messages = requirement['error_messages'] + + if not value and value is not False and value is not 0: + if key in data and key not in can_be_empty_keys: + return "Invalid data: value of {0} key doesn't exist ".format(key) + # check if the key is mandatory + mandatory_error = error_messages.get('mandatory') + error_message = self.mandatory_check(key, + requirement['mandatory'], + mandatory_error) + if error_message: + return error_message + continue + + # check the required types + error_message = self.types_check(requirement["types"], + requirement["convert_to_type"], + key, + value, data, + error_messages.get('types')) + if error_message: + return error_message + + # after the types check, the value of the key may be changed + # get the value again + value = data[key] + validate = requirement.get('validate') + if not validate: + continue + requirement_value = requirement.get('requirement') + # validate the data against the requirement + req_error = error_messages.get("requirement") + error_message = self.requirement_check(key, value, validate, + requirement_value, + req_error) + if error_message: + return error_message + return None + + @staticmethod + def mandatory_check(key, mandatory, error_message): + if mandatory: + return error_message if error_message \ + else "{} must be specified".format(key) + return None + + def types_check(self, requirement_types, convert_to_type, key, + value, data, error_message): + if not isinstance(requirement_types, list): + requirement_types = [requirement_types] + for requirement_type in requirement_types: + converted_val = self.validate_type( + value, requirement_type, convert_to_type + ) + if converted_val is not None: + if convert_to_type: + # value has been converted, update the data + data[key] = converted_val + return None + required_types = self.get_type_names(requirement_types) + return error_message if error_message else \ + "{0} must be {1}".format(key, " or ".join(required_types)) + + def requirement_check(self, key, value, validate, + requirement, error_message): + return self.VALIDATE_SWITCHER[validate](key, value, requirement, + error_message) + + @staticmethod + def validate_value_in_list(key, value, + required_list, error_message): + if not isinstance(value, list): + value = [value] + + if [v for v in value if v not in required_list]: + return error_message if error_message else\ + "The possible value of {0} is {1}".\ + format(key, " or ".join(required_list)) + return None + + # get customized type names from type names array + def get_type_names(self, types): + return [self.get_type_name(t) for t in types] + + # get customized type name from string + def get_type_name(self, t): + t = str(t) + a = t.split(" ")[1] + type_name = a.rstrip(">").strip("'") + # strip the former module names + type_name = type_name.split('.')[-1] + if type_name in self.TYPES_CUSTOMIZED_NAMES.keys(): + type_name = self.TYPES_CUSTOMIZED_NAMES[type_name] + return type_name diff --git a/app/api/validation/regex.py b/app/api/validation/regex.py new file mode 100644 index 0000000..2684636 --- /dev/null +++ b/app/api/validation/regex.py @@ -0,0 +1,57 @@ +############################################################################### +# Copyright (c) 2017 Koren Lev (Cisco Systems), Yaron Yogev (Cisco Systems) # +# and others # +# # +# All rights reserved. This program and the accompanying materials # +# are made available under the terms of the Apache License, Version 2.0 # +# which accompanies this distribution, and is available at # +# http://www.apache.org/licenses/LICENSE-2.0 # +############################################################################### +import re + +PORT = "port number" +IP = "ipv4/ipv6 address" +HOSTNAME = "host name" +PATH = "path" + +_PORT_REGEX = re.compile('^0*(?:6553[0-5]|655[0-2][0-9]|65[0-4][0-9]{2}|' + '6[0-4][0-9]{3}|[1-5][0-9]{4}|[1-9][0-9]{1,3}|[0-9])$') + +_HOSTNAME_REGEX = re.compile('^([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])' + '(\.([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]{0,61}[a-zA-Z0-9]))*$') + +_PATH_REGEX = re.compile('^(\/){1}([^\/\0]+(\/)?)+$') + +_IPV4_REGEX = re.compile('^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}$') +_IPV6_REGEX = re.compile('^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}' + '(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})$') + +_REGEX_MAP = { + PORT: _PORT_REGEX, + HOSTNAME: _HOSTNAME_REGEX, + PATH: _PATH_REGEX, + IP: [_IPV4_REGEX, _IPV6_REGEX] +} + + +def validate(key, value, regex_names, error_message=None): + if not isinstance(regex_names, list): + regex_names = [regex_names] + + for regex_name in regex_names: + regexes = _REGEX_MAP[regex_name] + + if not isinstance(regexes, list): + regexes = [regexes] + + try: + value = str(value) + match_regexes = [regex for regex in regexes + if regex.match(value)] + if match_regexes: + return None + except: + pass + + return error_message if error_message else \ + '{0} must be a valid {1}'.format(key, " or ".join(regex_names)) -- cgit 1.2.3-korg