aboutsummaryrefslogtreecommitdiffstats
path: root/opnfv_testapi/common/check.py
diff options
context:
space:
mode:
authorxudan <xudan16@huawei.com>2018-07-06 05:16:40 -0400
committerxudan <xudan16@huawei.com>2018-07-06 05:21:42 -0400
commitb3e40f026d655501bfa581452c447784604ecb05 (patch)
tree406f8bfc1abc1b33f98153d03abd34ef7b0e2fe9 /opnfv_testapi/common/check.py
parentb1b0ea32d1a296c7d055c5391261dcad6be48c63 (diff)
Move all web portal code to the new repo dovetail-webportal
This is only the first step to simply copy the file here. There still need some more work to make sure all work well. All the changes will be submitted with other patches to make it easily to review. JIRA: DOVETAIL-671 Change-Id: I64d32a9df562184166b6199e2719f298687d1a0a Signed-off-by: xudan <xudan16@huawei.com>
Diffstat (limited to 'opnfv_testapi/common/check.py')
-rw-r--r--opnfv_testapi/common/check.py114
1 files changed, 114 insertions, 0 deletions
diff --git a/opnfv_testapi/common/check.py b/opnfv_testapi/common/check.py
new file mode 100644
index 0000000..24ba876
--- /dev/null
+++ b/opnfv_testapi/common/check.py
@@ -0,0 +1,114 @@
+##############################################################################
+# Copyright (c) 2017 ZTE Corp
+# feng.xiaowei@zte.com.cn
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import functools
+
+from tornado import gen
+from tornado import web
+
+from opnfv_testapi.common import message
+from opnfv_testapi.common import raises
+from opnfv_testapi.db import api as dbapi
+
+
+def authenticate(method):
+ @web.asynchronous
+ @gen.coroutine
+ @functools.wraps(method)
+ def wrapper(self, *args, **kwargs):
+ if self.auth:
+ try:
+ token = self.request.headers['X-Auth-Token']
+ except KeyError:
+ raises.Unauthorized(message.unauthorized())
+ query = {'access_token': token}
+ check = yield dbapi.db_find_one('tokens', query)
+ if not check:
+ raises.Forbidden(message.invalid_token())
+ ret = yield gen.coroutine(method)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrapper
+
+
+def not_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ query = kwargs.get('query')
+ data = yield dbapi.db_find_one(self.table, query)
+ if not data:
+ raises.NotFound(message.not_found(self.table, query))
+ ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
+ raise gen.Return(ret)
+
+ return wrap
+
+
+def no_body(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ if self.json_args is None:
+ raises.BadRequest(message.no_body())
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+
+ return wrap
+
+
+def miss_fields(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ fields = kwargs.pop('miss_fields', [])
+ if fields:
+ for miss in fields:
+ miss_data = self.json_args.get(miss)
+ if miss_data is None or miss_data == '':
+ raises.BadRequest(message.missing(miss))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def carriers_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ carriers = kwargs.pop('carriers', {})
+ if carriers:
+ for table, query in carriers:
+ exist = yield dbapi.db_find_one(table, query())
+ if not exist:
+ raises.Forbidden(message.not_found(table, query()))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def new_not_exists(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, *args, **kwargs):
+ query = kwargs.get('query')
+ if query:
+ to_data = yield dbapi.db_find_one(self.table, query())
+ if to_data:
+ raises.Forbidden(message.exist(self.table, query()))
+ ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap
+
+
+def updated_one_not_exist(xstep):
+ @functools.wraps(xstep)
+ def wrap(self, data, *args, **kwargs):
+ db_keys = kwargs.pop('db_keys', [])
+ query = self._update_query(db_keys, data)
+ if query:
+ to_data = yield dbapi.db_find_one(self.table, query)
+ if to_data:
+ raises.Forbidden(message.exist(self.table, query))
+ ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
+ raise gen.Return(ret)
+ return wrap