1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
##############################################################################
# Copyright (c) 2017 ZTE Corp
# feng.xiaowei@zte.com.cn
# All rights reserved. This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
import functools
from tornado import gen
from tornado import web
from opnfv_testapi.common import message
from opnfv_testapi.common import raises
def authenticate(method):
@web.asynchronous
@gen.coroutine
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
if self.auth:
try:
token = self.request.headers['X-Auth-Token']
except KeyError:
raises.Unauthorized(message.unauthorized())
query = {'access_token': token}
check = yield self._eval_db_find_one(query, 'tokens')
if not check:
raises.Forbidden(message.invalid_token())
ret = yield gen.coroutine(method)(self, *args, **kwargs)
raise gen.Return(ret)
return wrapper
def not_exist(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
data = yield self._eval_db_find_one(query)
if not data:
raises.NotFound(message.not_found(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
raise gen.Return(ret)
return wrap
def no_body(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
if self.json_args is None:
raises.BadRequest(message.no_body())
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
raise gen.Return(ret)
return wrap
def miss_fields(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
fields = kwargs.pop('miss_fields', [])
if fields:
for miss in fields:
miss_data = self.json_args.get(miss)
if miss_data is None or miss_data == '':
raises.BadRequest(message.missing(miss))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
raise gen.Return(ret)
return wrap
def carriers_exist(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
carriers = kwargs.pop('carriers', {})
if carriers:
for table, query in carriers:
exist = yield self._eval_db_find_one(query(), table)
if not exist:
raises.Forbidden(message.not_found(table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
raise gen.Return(ret)
return wrap
def new_not_exists(xstep):
@functools.wraps(xstep)
def wrap(self, *args, **kwargs):
query = kwargs.get('query')
if query:
to_data = yield self._eval_db_find_one(query())
if to_data:
raises.Forbidden(message.exist(self.table, query()))
ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
raise gen.Return(ret)
return wrap
def updated_one_not_exist(xstep):
@functools.wraps(xstep)
def wrap(self, data, *args, **kwargs):
db_keys = kwargs.pop('db_keys', [])
query = self._update_query(db_keys, data)
if query:
to_data = yield self._eval_db_find_one(query)
if to_data:
raises.Forbidden(message.exist(self.table, query))
ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
raise gen.Return(ret)
return wrap
|