aboutsummaryrefslogtreecommitdiffstats
path: root/old/python_moonutilities/python_moonutilities/security_functions.py
diff options
context:
space:
mode:
Diffstat (limited to 'old/python_moonutilities/python_moonutilities/security_functions.py')
-rw-r--r--old/python_moonutilities/python_moonutilities/security_functions.py331
1 files changed, 331 insertions, 0 deletions
diff --git a/old/python_moonutilities/python_moonutilities/security_functions.py b/old/python_moonutilities/python_moonutilities/security_functions.py
new file mode 100644
index 00000000..1069eb2f
--- /dev/null
+++ b/old/python_moonutilities/python_moonutilities/security_functions.py
@@ -0,0 +1,331 @@
+# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
+# This software is distributed under the terms and conditions of the 'Apache-2.0'
+# license which can be found in the file 'LICENSE' in this package distribution
+# or at 'http://www.apache.org/licenses/LICENSE-2.0'.
+
+
+import html
+import re
+import os
+import types
+import requests
+import time
+from functools import wraps
+from flask import request
+import logging
+from python_moonutilities import exceptions, configuration
+
+logger = logging.getLogger("moon.utilities." + __name__)
+
+keystone_config = configuration.get_configuration("openstack/keystone")["openstack/keystone"]
+TOKENS = {}
+__targets = {}
+
+
+def filter_input(func_or_str):
+
+ def __filter(string):
+ if string and type(string) is str:
+ return "".join(re.findall("[\w\- +]*", string))
+ return string
+
+ def __filter_dict(arg):
+ result = dict()
+ for key in arg.keys():
+ if key == "email":
+ result["email"] = __filter_email(arg[key])
+ elif key == "password":
+ result["password"] = arg['password']
+ else:
+ result[key] = __filter(arg[key])
+ return result
+
+ def __filter_email(string):
+ if string and type(string) is str:
+ return "".join(re.findall("[\w@\._\- +]*", string))
+ return string
+
+ def wrapped(*args, **kwargs):
+ _args = []
+ for arg in args:
+ if isinstance(arg, str):
+ arg = __filter(arg)
+ elif isinstance(arg, list):
+ arg = [__filter(item) for item in arg]
+ elif isinstance(arg, tuple):
+ arg = (__filter(item) for item in arg)
+ elif isinstance(arg, dict):
+ arg = __filter_dict(arg)
+ _args.append(arg)
+ for arg in kwargs:
+ if type(kwargs[arg]) is str:
+ kwargs[arg] = __filter(kwargs[arg])
+ if isinstance(kwargs[arg], str):
+ kwargs[arg] = __filter(kwargs[arg])
+ elif isinstance(kwargs[arg], list):
+ kwargs[arg] = [__filter(item) for item in kwargs[arg]]
+ elif isinstance(kwargs[arg], tuple):
+ kwargs[arg] = (__filter(item) for item in kwargs[arg])
+ elif isinstance(kwargs[arg], dict):
+ kwargs[arg] = __filter_dict(kwargs[arg])
+ return func_or_str(*_args, **kwargs)
+
+ if isinstance(func_or_str, str):
+ return __filter(func_or_str)
+ if isinstance(func_or_str, list):
+ return [__filter(item) for item in func_or_str]
+ if isinstance(func_or_str, tuple):
+ return (__filter(item) for item in func_or_str)
+ if isinstance(func_or_str, dict):
+ return __filter_dict(func_or_str)
+ if isinstance(func_or_str, types.FunctionType):
+ return wrapped
+ return None
+
+
+"""
+To do should check value of Dictionary but it's dependent on from where it's coming
+"""
+
+
+def validate_data(data):
+ def __validate_string(string):
+ temp_str = html.escape(string)
+ if string != temp_str:
+ raise exceptions.ValidationContentError('Forbidden characters in string')
+
+ def __validate_list_or_tuple(container):
+ for i in container:
+ validate_data(i)
+
+ def __validate_dict(dictionary):
+ for key in dictionary:
+ validate_data(dictionary[key])
+
+ if isinstance(data, bool):
+ return True
+ if data is None:
+ data = ""
+ if isinstance(data, str):
+ __validate_string(data)
+ elif isinstance(data, list) or isinstance(data, tuple):
+ __validate_list_or_tuple(data)
+ elif isinstance(data, dict):
+ __validate_dict(data)
+ else:
+ raise exceptions.ValidationContentError('Value is Not String or Container or Dictionary: {}'.format(data))
+
+
+def validate_input(type='get', args_state=[], kwargs_state=[], body_state=[]):
+ """
+ this fucntion works only on List or tuple or dictionary of Strings ,and String direct
+ Check if input of function is Valid or not, Valid if not has spaces and values is not None or empty.
+
+ :param type: type of request if function is used as decorator
+ :param args_state: list of Booleans for args,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :param kwargs_state: list of Booleans for kwargs as order of input kwargs,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :param body_state: list of Booleans for arguments in body of request if request is post,
+ values must be order as target values of arguments,
+ True if None is not Allowed and False if is allowed
+ :return:
+ """
+
+ def validate_input_decorator(func):
+ def wrapped(*args, **kwargs):
+
+ temp_args = []
+ """
+ this loop made to filter args from object class,
+ when put this function as decorator in function control
+ then there is copy of this class add to front of args
+ """
+ for arg in args:
+ if isinstance(arg, str) == True or \
+ isinstance(arg, list) == True or \
+ isinstance(arg, dict) == True:
+ temp_args.append(arg)
+
+ while len(args_state) < len(temp_args):
+ args_state.append(True)
+
+ for i in range(0, len(temp_args)):
+ if args_state[i]:
+ validate_data(temp_args[i])
+
+ while len(kwargs_state) < len(kwargs):
+ kwargs_state.append(False)
+ counter = 0
+ for i in kwargs:
+ if kwargs_state[counter]:
+ validate_data(kwargs[i])
+
+ counter = counter + 1
+
+ if type == "post" or type == "patch":
+ body = request.json
+ # while len(body_state) < len(body):
+ # body_state.append(True)
+ # counter = 0
+ for key in body_state:
+ if key in body:
+ if body_state[key]:
+ try:
+ validate_data(body.get(key))
+ except exceptions.ValidationContentError as e:
+ raise exceptions.ValidationContentError("Key: '{}', [{}]".format(key, str(e)))
+ else:
+ raise exceptions.ValidationKeyError('Invalid Key :{} not found'.format(key))
+
+ # counter = counter + 1
+
+ return func(*args, **kwargs)
+
+ return wrapped
+
+ return validate_input_decorator
+
+
+def enforce(action_names, object_name, **extra):
+ """Fake version of the enforce decorator"""
+ def wrapper_func(func):
+ def wrapper_args(*args, **kwargs):
+ # LOG.info("kwargs={}".format(kwargs))
+ # kwargs['user_id'] = kwargs.pop('user_id', "admin")
+ # LOG.info("Calling enforce on {} with args={} kwargs={}".format(func.__name__, args, kwargs))
+ return func(*args, **kwargs)
+ return wrapper_args
+ return wrapper_func
+
+
+def login(user=None, password=None, domain=None, project=None, url=None):
+ start_time = time.time()
+ if not user:
+ user = keystone_config['user']
+ if not password:
+ password = keystone_config['password']
+ if not domain:
+ domain = keystone_config['domain']
+ if not project:
+ project = keystone_config['project']
+ if not url:
+ url = keystone_config['url']
+ headers = {
+ "Content-Type": "application/json"
+ }
+ data_auth = {
+ "auth": {
+ "identity": {
+ "methods": [
+ "password"
+ ],
+ "password": {
+ "user": {
+ "domain": {
+ "id": domain
+ },
+ "name": user,
+ "password": password
+ }
+ }
+ },
+ "scope": {
+ "project": {
+ "domain": {
+ "id": domain
+ },
+ "name": project
+ }
+ }
+ }
+ }
+
+ while True:
+ req = requests.post("{}/auth/tokens".format(url),
+ json=data_auth, headers=headers,
+ verify=keystone_config['certificate'])
+
+ if req.status_code in (200, 201, 204):
+ headers['X-Auth-Token'] = req.headers['X-Subject-Token']
+ return headers
+ logger.warning("Waiting for Keystone...")
+ if time.time() - start_time == 100:
+ logger.error(req.text)
+ raise exceptions.KeystoneError
+ time.sleep(5)
+
+
+def logout(headers, url=None):
+ if not url:
+ url = keystone_config['url']
+ headers['X-Subject-Token'] = headers['X-Auth-Token']
+ req = requests.delete("{}/auth/tokens".format(url), headers=headers, verify=keystone_config['certificate'])
+ if req.status_code in (200, 201, 204):
+ return
+ logger.error(req.text)
+ raise exceptions.KeystoneError
+
+
+def check_token(token, url=None):
+ _verify = False
+ if keystone_config['certificate']:
+ _verify = keystone_config['certificate']
+ try:
+ os.environ.pop("http_proxy")
+ os.environ.pop("https_proxy")
+ except KeyError:
+ pass
+ if not url:
+ url = keystone_config['url']
+ headers = {
+ "Content-Type": "application/json",
+ 'X-Subject-Token': token,
+ 'X-Auth-Token': token,
+ }
+ if not keystone_config['check_token']:
+ # TODO (asteroide): must send the admin id
+ return "admin" if not token else token
+ elif keystone_config['check_token'].lower() in ("false", "no", "n"):
+ # TODO (asteroide): must send the admin id
+ return "admin" if not token else token
+ if keystone_config['check_token'].lower() in ("yes", "y", "true"):
+ if token in TOKENS:
+ delta = time.mktime(TOKENS[token]["expires_at"]) - time.mktime(time.gmtime())
+ if delta > 0:
+ return TOKENS[token]["user"]
+ raise exceptions.KeystoneError
+ else:
+ req = requests.get("{}/auth/tokens".format(url), headers=headers, verify=_verify)
+ if req.status_code in (200, 201):
+ # Note (asteroide): the time stamps is not in ISO 8601, so it is necessary to delete
+ # characters after the dot
+ token_time = req.json().get("token").get("expires_at").split(".")
+ TOKENS[token] = dict()
+ TOKENS[token]["expires_at"] = time.strptime(token_time[0], "%Y-%m-%dT%H:%M:%S")
+ TOKENS[token]["user"] = req.json().get("token").get("user").get("id")
+ return TOKENS[token]["user"]
+ logger.error("{} - {}".format(req.status_code, req.text))
+ raise exceptions.KeystoneError
+ elif keystone_config['check_token'].lower() == "strict":
+ req = requests.head("{}/auth/tokens".format(url), headers=headers, verify=_verify)
+ if req.status_code in (200, 201):
+ return token
+ logger.error("{} - {}".format(req.status_code, req.text))
+ raise exceptions.KeystoneError
+ raise exceptions.KeystoneError
+
+
+def check_auth(function):
+ @wraps(function)
+ def wrapper(*args, **kwargs):
+ token = request.headers.get('X-Auth-Token')
+ token = check_token(token)
+ if not token:
+ raise exceptions.AuthException
+ user_id = kwargs.pop("user_id", token)
+ result = function(*args, **kwargs, user_id=user_id)
+ return result
+ return wrapper