summaryrefslogtreecommitdiffstats
path: root/utils/test/testapi/opnfv_testapi/common/check.py
diff options
context:
space:
mode:
Diffstat (limited to 'utils/test/testapi/opnfv_testapi/common/check.py')
-rw-r--r--utils/test/testapi/opnfv_testapi/common/check.py148
1 files changed, 0 insertions, 148 deletions
diff --git a/utils/test/testapi/opnfv_testapi/common/check.py b/utils/test/testapi/opnfv_testapi/common/check.py
deleted file mode 100644
index 667578fed..000000000
--- a/utils/test/testapi/opnfv_testapi/common/check.py
+++ /dev/null
@@ -1,148 +0,0 @@
-##############################################################################
-# Copyright (c) 2017 ZTE Corp
-# feng.xiaowei@zte.com.cn
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
-import functools
-import re
-
-from tornado import gen
-
-from opnfv_testapi.common import constants
-from opnfv_testapi.common import message
-from opnfv_testapi.common import raises
-from opnfv_testapi.common.config import CONF
-from opnfv_testapi.db import api as dbapi
-
-
-def is_authorized(method):
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- if CONF.api_authenticate and self.table in ['pods']:
- testapi_id = self.get_secure_cookie(constants.TESTAPI_ID)
- if not testapi_id:
- raises.Unauthorized(message.not_login())
- user_info = yield dbapi.db_find_one('users', {'user': testapi_id})
- if not user_info:
- raises.Unauthorized(message.not_lfid())
- kwargs['owner'] = testapi_id
- ret = yield gen.coroutine(method)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrapper
-
-
-def valid_token(method):
- @functools.wraps(method)
- def wrapper(self, *args, **kwargs):
- if self.auth and self.table == 'results':
- try:
- token = self.request.headers['X-Auth-Token']
- except KeyError:
- raises.Unauthorized(message.unauthorized())
- query = {'access_token': token}
- check = yield dbapi.db_find_one('tokens', query)
- if not check:
- raises.Forbidden(message.invalid_token())
- ret = yield gen.coroutine(method)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrapper
-
-
-def not_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- query = kwargs.get('query')
- data = yield dbapi.db_find_one(self.table, query)
- if not data:
- raises.NotFound(message.not_found(self.table, query))
- ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
- raise gen.Return(ret)
-
- return wrap
-
-
-def no_body(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- if self.json_args is None:
- raises.BadRequest(message.no_body())
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
-
- return wrap
-
-
-def miss_fields(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- fields = kwargs.pop('miss_fields', [])
- if fields:
- for miss in fields:
- miss_data = self.json_args.get(miss)
- if miss_data is None or miss_data == '':
- raises.BadRequest(message.missing(miss))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def carriers_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- carriers = kwargs.pop('carriers', {})
- if carriers:
- for table, query in carriers:
- exist = yield dbapi.db_find_one(table, query())
- if not exist:
- raises.Forbidden(message.not_found(table, query()))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def values_check(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- checks = kwargs.pop('values_check', {})
- if checks:
- for field, check, options in checks:
- if not check(field, options):
- raises.BadRequest(message.invalid_value(field, options))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def new_not_exists(xstep):
- @functools.wraps(xstep)
- def wrap(self, *args, **kwargs):
- query = kwargs.get('query')
- if query:
- query_data = query()
- if self.table == 'pods':
- if query_data.get('name') is not None:
- query_data['name'] = re.compile(query_data.get('name'),
- re.IGNORECASE)
- to_data = yield dbapi.db_find_one(self.table, query_data)
- if to_data:
- raises.Forbidden(message.exist(self.table, query()))
- ret = yield gen.coroutine(xstep)(self, *args, **kwargs)
- raise gen.Return(ret)
- return wrap
-
-
-def updated_one_not_exist(xstep):
- @functools.wraps(xstep)
- def wrap(self, data, *args, **kwargs):
- db_keys = kwargs.pop('db_keys', [])
- query = self._update_query(db_keys, data)
- if query:
- to_data = yield dbapi.db_find_one(self.table, query)
- if to_data:
- raises.Forbidden(message.exist(self.table, query))
- ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs)
- raise gen.Return(ret)
- return wrap