diff options
Diffstat (limited to 'python_moonutilities/python_moonutilities/security_functions.py')
-rw-r--r-- | python_moonutilities/python_moonutilities/security_functions.py | 331 |
1 files changed, 0 insertions, 331 deletions
diff --git a/python_moonutilities/python_moonutilities/security_functions.py b/python_moonutilities/python_moonutilities/security_functions.py deleted file mode 100644 index 1069eb2f..00000000 --- a/python_moonutilities/python_moonutilities/security_functions.py +++ /dev/null @@ -1,331 +0,0 @@ -# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors -# This software is distributed under the terms and conditions of the 'Apache-2.0' -# license which can be found in the file 'LICENSE' in this package distribution -# or at 'http://www.apache.org/licenses/LICENSE-2.0'. - - -import html -import re -import os -import types -import requests -import time -from functools import wraps -from flask import request -import logging -from python_moonutilities import exceptions, configuration - -logger = logging.getLogger("moon.utilities." + __name__) - -keystone_config = configuration.get_configuration("openstack/keystone")["openstack/keystone"] -TOKENS = {} -__targets = {} - - -def filter_input(func_or_str): - - def __filter(string): - if string and type(string) is str: - return "".join(re.findall("[\w\- +]*", string)) - return string - - def __filter_dict(arg): - result = dict() - for key in arg.keys(): - if key == "email": - result["email"] = __filter_email(arg[key]) - elif key == "password": - result["password"] = arg['password'] - else: - result[key] = __filter(arg[key]) - return result - - def __filter_email(string): - if string and type(string) is str: - return "".join(re.findall("[\w@\._\- +]*", string)) - return string - - def wrapped(*args, **kwargs): - _args = [] - for arg in args: - if isinstance(arg, str): - arg = __filter(arg) - elif isinstance(arg, list): - arg = [__filter(item) for item in arg] - elif isinstance(arg, tuple): - arg = (__filter(item) for item in arg) - elif isinstance(arg, dict): - arg = __filter_dict(arg) - _args.append(arg) - for arg in kwargs: - if type(kwargs[arg]) is str: - kwargs[arg] = __filter(kwargs[arg]) - if isinstance(kwargs[arg], str): - kwargs[arg] = __filter(kwargs[arg]) - elif isinstance(kwargs[arg], list): - kwargs[arg] = [__filter(item) for item in kwargs[arg]] - elif isinstance(kwargs[arg], tuple): - kwargs[arg] = (__filter(item) for item in kwargs[arg]) - elif isinstance(kwargs[arg], dict): - kwargs[arg] = __filter_dict(kwargs[arg]) - return func_or_str(*_args, **kwargs) - - if isinstance(func_or_str, str): - return __filter(func_or_str) - if isinstance(func_or_str, list): - return [__filter(item) for item in func_or_str] - if isinstance(func_or_str, tuple): - return (__filter(item) for item in func_or_str) - if isinstance(func_or_str, dict): - return __filter_dict(func_or_str) - if isinstance(func_or_str, types.FunctionType): - return wrapped - return None - - -""" -To do should check value of Dictionary but it's dependent on from where it's coming -""" - - -def validate_data(data): - def __validate_string(string): - temp_str = html.escape(string) - if string != temp_str: - raise exceptions.ValidationContentError('Forbidden characters in string') - - def __validate_list_or_tuple(container): - for i in container: - validate_data(i) - - def __validate_dict(dictionary): - for key in dictionary: - validate_data(dictionary[key]) - - if isinstance(data, bool): - return True - if data is None: - data = "" - if isinstance(data, str): - __validate_string(data) - elif isinstance(data, list) or isinstance(data, tuple): - __validate_list_or_tuple(data) - elif isinstance(data, dict): - __validate_dict(data) - else: - raise exceptions.ValidationContentError('Value is Not String or Container or Dictionary: {}'.format(data)) - - -def validate_input(type='get', args_state=[], kwargs_state=[], body_state=[]): - """ - this fucntion works only on List or tuple or dictionary of Strings ,and String direct - Check if input of function is Valid or not, Valid if not has spaces and values is not None or empty. - - :param type: type of request if function is used as decorator - :param args_state: list of Booleans for args, - values must be order as target values of arguments, - True if None is not Allowed and False if is allowed - :param kwargs_state: list of Booleans for kwargs as order of input kwargs, - values must be order as target values of arguments, - True if None is not Allowed and False if is allowed - :param body_state: list of Booleans for arguments in body of request if request is post, - values must be order as target values of arguments, - True if None is not Allowed and False if is allowed - :return: - """ - - def validate_input_decorator(func): - def wrapped(*args, **kwargs): - - temp_args = [] - """ - this loop made to filter args from object class, - when put this function as decorator in function control - then there is copy of this class add to front of args - """ - for arg in args: - if isinstance(arg, str) == True or \ - isinstance(arg, list) == True or \ - isinstance(arg, dict) == True: - temp_args.append(arg) - - while len(args_state) < len(temp_args): - args_state.append(True) - - for i in range(0, len(temp_args)): - if args_state[i]: - validate_data(temp_args[i]) - - while len(kwargs_state) < len(kwargs): - kwargs_state.append(False) - counter = 0 - for i in kwargs: - if kwargs_state[counter]: - validate_data(kwargs[i]) - - counter = counter + 1 - - if type == "post" or type == "patch": - body = request.json - # while len(body_state) < len(body): - # body_state.append(True) - # counter = 0 - for key in body_state: - if key in body: - if body_state[key]: - try: - validate_data(body.get(key)) - except exceptions.ValidationContentError as e: - raise exceptions.ValidationContentError("Key: '{}', [{}]".format(key, str(e))) - else: - raise exceptions.ValidationKeyError('Invalid Key :{} not found'.format(key)) - - # counter = counter + 1 - - return func(*args, **kwargs) - - return wrapped - - return validate_input_decorator - - -def enforce(action_names, object_name, **extra): - """Fake version of the enforce decorator""" - def wrapper_func(func): - def wrapper_args(*args, **kwargs): - # LOG.info("kwargs={}".format(kwargs)) - # kwargs['user_id'] = kwargs.pop('user_id', "admin") - # LOG.info("Calling enforce on {} with args={} kwargs={}".format(func.__name__, args, kwargs)) - return func(*args, **kwargs) - return wrapper_args - return wrapper_func - - -def login(user=None, password=None, domain=None, project=None, url=None): - start_time = time.time() - if not user: - user = keystone_config['user'] - if not password: - password = keystone_config['password'] - if not domain: - domain = keystone_config['domain'] - if not project: - project = keystone_config['project'] - if not url: - url = keystone_config['url'] - headers = { - "Content-Type": "application/json" - } - data_auth = { - "auth": { - "identity": { - "methods": [ - "password" - ], - "password": { - "user": { - "domain": { - "id": domain - }, - "name": user, - "password": password - } - } - }, - "scope": { - "project": { - "domain": { - "id": domain - }, - "name": project - } - } - } - } - - while True: - req = requests.post("{}/auth/tokens".format(url), - json=data_auth, headers=headers, - verify=keystone_config['certificate']) - - if req.status_code in (200, 201, 204): - headers['X-Auth-Token'] = req.headers['X-Subject-Token'] - return headers - logger.warning("Waiting for Keystone...") - if time.time() - start_time == 100: - logger.error(req.text) - raise exceptions.KeystoneError - time.sleep(5) - - -def logout(headers, url=None): - if not url: - url = keystone_config['url'] - headers['X-Subject-Token'] = headers['X-Auth-Token'] - req = requests.delete("{}/auth/tokens".format(url), headers=headers, verify=keystone_config['certificate']) - if req.status_code in (200, 201, 204): - return - logger.error(req.text) - raise exceptions.KeystoneError - - -def check_token(token, url=None): - _verify = False - if keystone_config['certificate']: - _verify = keystone_config['certificate'] - try: - os.environ.pop("http_proxy") - os.environ.pop("https_proxy") - except KeyError: - pass - if not url: - url = keystone_config['url'] - headers = { - "Content-Type": "application/json", - 'X-Subject-Token': token, - 'X-Auth-Token': token, - } - if not keystone_config['check_token']: - # TODO (asteroide): must send the admin id - return "admin" if not token else token - elif keystone_config['check_token'].lower() in ("false", "no", "n"): - # TODO (asteroide): must send the admin id - return "admin" if not token else token - if keystone_config['check_token'].lower() in ("yes", "y", "true"): - if token in TOKENS: - delta = time.mktime(TOKENS[token]["expires_at"]) - time.mktime(time.gmtime()) - if delta > 0: - return TOKENS[token]["user"] - raise exceptions.KeystoneError - else: - req = requests.get("{}/auth/tokens".format(url), headers=headers, verify=_verify) - if req.status_code in (200, 201): - # Note (asteroide): the time stamps is not in ISO 8601, so it is necessary to delete - # characters after the dot - token_time = req.json().get("token").get("expires_at").split(".") - TOKENS[token] = dict() - TOKENS[token]["expires_at"] = time.strptime(token_time[0], "%Y-%m-%dT%H:%M:%S") - TOKENS[token]["user"] = req.json().get("token").get("user").get("id") - return TOKENS[token]["user"] - logger.error("{} - {}".format(req.status_code, req.text)) - raise exceptions.KeystoneError - elif keystone_config['check_token'].lower() == "strict": - req = requests.head("{}/auth/tokens".format(url), headers=headers, verify=_verify) - if req.status_code in (200, 201): - return token - logger.error("{} - {}".format(req.status_code, req.text)) - raise exceptions.KeystoneError - raise exceptions.KeystoneError - - -def check_auth(function): - @wraps(function) - def wrapper(*args, **kwargs): - token = request.headers.get('X-Auth-Token') - token = check_token(token) - if not token: - raise exceptions.AuthException - user_id = kwargs.pop("user_id", token) - result = function(*args, **kwargs, user_id=user_id) - return result - return wrapper |