aboutsummaryrefslogtreecommitdiffstats
path: root/python_moonutilities/python_moonutilities/security_functions.py
blob: 84e9ab7d893095f6d051b5ce2f3654aaf75a7d5e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
# Copyright 2015 Open Platform for NFV Project, Inc. and its contributors
# This software is distributed under the terms and conditions of the 'Apache-2.0'
# license which can be found in the file 'LICENSE' in this package distribution
# or at 'http://www.apache.org/licenses/LICENSE-2.0'.


import re
import os
import types
import requests
import time
from functools import wraps
from flask import request
import logging
from python_moonutilities import exceptions, configuration

logger = logging.getLogger("moon.utilities." + __name__)

keystone_config = configuration.get_configuration("openstack/keystone")["openstack/keystone"]
TOKENS = {}
__targets = {}


def filter_input(func_or_str):
    def __filter(string):
        if string and type(string) is str:
            return "".join(re.findall("[\w\- +]*", string))
        return string

    def __filter_dict(arg):
        result = dict()
        for key in arg.keys():
            if key == "email":
                result["email"] = __filter_email(arg[key])
            elif key == "password":
                result["password"] = arg['password']
            else:
                result[key] = __filter(arg[key])
        return result

    def __filter_email(string):
        if string and type(string) is str:
            return "".join(re.findall("[\w@\._\- +]*", string))
        return string

    def wrapped(*args, **kwargs):
        _args = []
        for arg in args:
            if isinstance(arg, str):
                arg = __filter(arg)
            elif isinstance(arg, list):
                arg = [__filter(item) for item in arg]
            elif isinstance(arg, tuple):
                arg = (__filter(item) for item in arg)
            elif isinstance(arg, dict):
                arg = __filter_dict(arg)
            _args.append(arg)
        for arg in kwargs:
            if type(kwargs[arg]) is str:
                kwargs[arg] = __filter(kwargs[arg])
            if isinstance(kwargs[arg], str):
                kwargs[arg] = __filter(kwargs[arg])
            elif isinstance(kwargs[arg], list):
                kwargs[arg] = [__filter(item) for item in kwargs[arg]]
            elif isinstance(kwargs[arg], tuple):
                kwargs[arg] = (__filter(item) for item in kwargs[arg])
            elif isinstance(kwargs[arg], dict):
                kwargs[arg] = __filter_dict(kwargs[arg])
        return func_or_str(*_args, **kwargs)

    if isinstance(func_or_str, str):
        return __filter(func_or_str)
    if isinstance(func_or_str, list):
        return [__filter(item) for item in func_or_str]
    if isinstance(func_or_str, tuple):
        return (__filter(item) for item in func_or_str)
    if isinstance(func_or_str, dict):
        return __filter_dict(func_or_str)
    if isinstance(func_or_str, types.FunctionType):
        return wrapped
    return None


""" 
To do should check value of Dictionary but it's dependent on from where it's coming
"""


def validate_data(data):
    def __validate_string(string):
        if not string:
            raise ValueError('Empty String')
        '''
                is it valid to contains space inbetween 

        '''

        if " " in string:
                raise ValueError('String contains space')

    def __validate_list_or_tuple(container):
        if not container:
            raise ValueError('Empty Container')
        for i in container:
            validate_data(i)

    def __validate_dict(dictionary):
        if not dictionary:
            raise ValueError('Empty Dictionary')
        for key in dictionary:
            validate_data(dictionary[key])

    if isinstance(data, str):
        __validate_string(data)
    elif isinstance(data, list) or isinstance(data, tuple):
        __validate_list_or_tuple(data)
    elif isinstance(data, dict):
        __validate_dict(data)
    else:
        raise ValueError('Value is Not String or Container or Dictionary')


def validate_input(type, args_state=[], kwargs_state=[], body_state=[]):
    """
    this fucntion works only on List or tuple or dictionary of Strings ,and String direct
    Check if input of function is Valid or not, Valid if not has spaces and values is not None or empty.

    :param type: type of request if function is used as decorator
    :param args_state: list of Booleans for args,
                        values must be order as target values of arguments,
                        True if None is not Allowed and False if is allowed
    :param kwargs_state: list of Booleans for kwargs as order of input kwargs,
                          values must be order as target values of arguments,
                          True if None is not Allowed and False if is allowed
    :param body_state: list of Booleans for arguments in body of request if request is post,
                        values must be order as target values of arguments,
                        True if None is not Allowed and False if is allowed
    :return:
    """

    def validate_input_decorator(func):
        def wrapped(*args, **kwargs):

            temp_args = []
            """
            this loop made to filter args from object class, 
            when put this function as decorator in function control
            then there is copy of this class add to front of args  
            """
            for arg in args:
                if isinstance(arg, str) == True or \
                        isinstance(arg, list) == True or \
                        isinstance(arg, dict) == True:
                    temp_args.append(arg)

            while len(args_state) < len(temp_args):
                args_state.append(True)

            for i in range(0, len(temp_args)):
                if args_state[i]:
                    validate_data(temp_args[i])

            while len(kwargs_state) < len(kwargs):
                kwargs_state.append(True)
            counter = 0
            for i in kwargs:
                if kwargs_state[counter]:
                    validate_data({i: kwargs[i]})

                counter = counter + 1

            if type == "post" or type == "patch":
                body = request.json
                while len(body_state) < len(body):
                    body_state.append(True)
                counter = 0
                for i in body:
                    if body_state[counter]:
                        validate_data({i: body[i]})

                    counter = counter + 1

            return func(*args, **kwargs)

        return wrapped

    return validate_input_decorator


def enforce(action_names, object_name, **extra):
    """Fake version of the enforce decorator"""

    def wrapper_func(func):
        def wrapper_args(*args, **kwargs):
            # LOG.info("kwargs={}".format(kwargs))
            # kwargs['user_id'] = kwargs.pop('user_id', "admin")
            # LOG.info("Calling enforce on {} with args={} kwargs={}".format(func.__name__, args, kwargs))
            return func(*args, **kwargs)

        return wrapper_args

    return wrapper_func


def login(user=None, password=None, domain=None, project=None, url=None):
    start_time = time.time()
    if not user:
        user = keystone_config['user']
    if not password:
        password = keystone_config['password']
    if not domain:
        domain = keystone_config['domain']
    if not project:
        project = keystone_config['project']
    if not url:
        url = keystone_config['url']
    headers = {
        "Content-Type": "application/json"
    }
    data_auth = {
        "auth": {
            "identity": {
                "methods": [
                    "password"
                ],
                "password": {
                    "user": {
                        "domain": {
                            "id": domain
                        },
                        "name": user,
                        "password": password
                    }
                }
            },
            "scope": {
                "project": {
                    "domain": {
                        "id": domain
                    },
                    "name": project
                }
            }
        }
    }

    while True:
        req = requests.post("{}/auth/tokens".format(url),
                            json=data_auth, headers=headers,
                            verify=keystone_config['certificate'])

        if req.status_code in (200, 201, 204):
            headers['X-Auth-Token'] = req.headers['X-Subject-Token']
            return headers
        logger.warning("Waiting for Keystone...")
        if time.time() - start_time == 100:
            logger.error(req.text)
            raise exceptions.KeystoneError
        time.sleep(5)


def logout(headers, url=None):
    if not url:
        url = keystone_config['url']
    headers['X-Subject-Token'] = headers['X-Auth-Token']
    req = requests.delete("{}/auth/tokens".format(url), headers=headers, verify=keystone_config['certificate'])
    if req.status_code in (200, 201, 204):
        return
    logger.error(req.text)
    raise exceptions.KeystoneError


def check_token(token, url=None):
    _verify = False
    if keystone_config['certificate']:
        _verify = keystone_config['certificate']
    try:
        os.environ.pop("http_proxy")
        os.environ.pop("https_proxy")
    except KeyError:
        pass
    if not url:
        url = keystone_config['url']
    headers = {
        "Content-Type": "application/json",
        'X-Subject-Token': token,
        'X-Auth-Token': token,
    }
    if not keystone_config['check_token']:
        # TODO (asteroide): must send the admin id
        return "admin" if not token else token
    elif keystone_config['check_token'].lower() in ("false", "no", "n"):
        # TODO (asteroide): must send the admin id
        return "admin" if not token else token
    if keystone_config['check_token'].lower() in ("yes", "y", "true"):
        if token in TOKENS:
            delta = time.mktime(TOKENS[token]["expires_at"]) - time.mktime(time.gmtime())
            if delta > 0:
                return TOKENS[token]["user"]
            raise exceptions.KeystoneError
        else:
            req = requests.get("{}/auth/tokens".format(url), headers=headers, verify=_verify)
            if req.status_code in (200, 201):
                # Note (asteroide): the time stamps is not in ISO 8601, so it is necessary to delete
                # characters after the dot
                token_time = req.json().get("token").get("expires_at").split(".")
                TOKENS[token] = dict()
                TOKENS[token]["expires_at"] = time.strptime(token_time[0], "%Y-%m-%dT%H:%M:%S")
                TOKENS[token]["user"] = req.json().get("token").get("user").get("id")
                return TOKENS[token]["user"]
            logger.error("{} - {}".format(req.status_code, req.text))
            raise exceptions.KeystoneError
    elif keystone_config['check_token'].lower() == "strict":
        req = requests.head("{}/auth/tokens".format(url), headers=headers, verify=_verify)
        if req.status_code in (200, 201):
            return token
        logger.error("{} - {}".format(req.status_code, req.text))
        raise exceptions.KeystoneError
    raise exceptions.KeystoneError


def check_auth(function):
    @wraps(function)
    def wrapper(*args, **kwargs):
        token = request.headers.get('X-Auth-Token')
        token = check_token(token)
        if not token:
            raise exceptions.AuthException
        user_id = kwargs.pop("user_id", token)
        result = function(*args, **kwargs, user_id=user_id)
        return result

    return wrapper