From b036354841f2d52eeeec27c8ee13869b6481ec90 Mon Sep 17 00:00:00 2001 From: SerenaFeng Date: Fri, 7 Apr 2017 18:36:20 +0800 Subject: unify data existence check Change-Id: I2ee4c3be6f34ce12530450cd22f2561c458685f9 Signed-off-by: SerenaFeng --- testapi/opnfv_testapi/common/check.py | 111 ++++++++++++++++++++++++++++++++++ 1 file changed, 111 insertions(+) create mode 100644 testapi/opnfv_testapi/common/check.py (limited to 'testapi/opnfv_testapi/common') diff --git a/testapi/opnfv_testapi/common/check.py b/testapi/opnfv_testapi/common/check.py new file mode 100644 index 0000000..be4b1df --- /dev/null +++ b/testapi/opnfv_testapi/common/check.py @@ -0,0 +1,111 @@ +############################################################################## +# Copyright (c) 2017 ZTE Corp +# feng.xiaowei@zte.com.cn +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +import functools + +from tornado import web, gen + +from opnfv_testapi.common import raises, message + + +def authenticate(method): + @web.asynchronous + @gen.coroutine + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + if self.auth: + try: + token = self.request.headers['X-Auth-Token'] + except KeyError: + raises.Unauthorized(message.unauthorized()) + query = {'access_token': token} + check = yield self._eval_db_find_one(query, 'tokens') + if not check: + raises.Forbidden(message.invalid_token()) + ret = yield gen.coroutine(method)(self, *args, **kwargs) + raise gen.Return(ret) + return wrapper + + +def not_exist(xstep): + @functools.wraps(xstep) + def wrap(self, *args, **kwargs): + query = kwargs.get('query') + data = yield self._eval_db_find_one(query) + if not data: + raises.NotFound(message.not_found(self.table, query)) + ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs) + raise gen.Return(ret) + + return wrap + + +def no_body(xstep): + @functools.wraps(xstep) + def wrap(self, *args, **kwargs): + if self.json_args is None: + raises.BadRequest(message.no_body()) + ret = yield gen.coroutine(xstep)(self, *args, **kwargs) + raise gen.Return(ret) + + return wrap + + +def miss_fields(xstep): + @functools.wraps(xstep) + def wrap(self, *args, **kwargs): + fields = kwargs.get('miss_fields') + if fields: + for miss in fields: + miss_data = self.json_args.get(miss) + if miss_data is None or miss_data == '': + raises.BadRequest(message.missing(miss)) + ret = yield gen.coroutine(xstep)(self, *args, **kwargs) + raise gen.Return(ret) + return wrap + + +def carriers_exist(xstep): + @functools.wraps(xstep) + def wrap(self, *args, **kwargs): + carriers = kwargs.get('carriers') + if carriers: + for table, query in carriers: + exist = yield self._eval_db_find_one(query(), table) + if not exist: + raises.Forbidden(message.not_found(table, query())) + ret = yield gen.coroutine(xstep)(self, *args, **kwargs) + raise gen.Return(ret) + return wrap + + +def new_not_exists(xstep): + @functools.wraps(xstep) + def wrap(self, *args, **kwargs): + query = kwargs.get('query') + if query: + to_data = yield self._eval_db_find_one(query()) + if to_data: + raises.Forbidden(message.exist(self.table, query())) + ret = yield gen.coroutine(xstep)(self, *args, **kwargs) + raise gen.Return(ret) + return wrap + + +def updated_one_not_exist(xstep): + @functools.wraps(xstep) + def wrap(self, data, *args, **kwargs): + db_keys = kwargs.get('db_keys') + query = self._update_query(db_keys, data) + if query: + to_data = yield self._eval_db_find_one(query) + if to_data: + raises.Forbidden(message.exist(self.table, query)) + ret = yield gen.coroutine(xstep)(self, data, *args, **kwargs) + raise gen.Return(ret) + return wrap -- cgit 1.2.3-korg