diff options
Diffstat (limited to 'compass-tasks/db/api/user.py')
-rw-r--r-- | compass-tasks/db/api/user.py | 553 |
1 files changed, 0 insertions, 553 deletions
diff --git a/compass-tasks/db/api/user.py b/compass-tasks/db/api/user.py deleted file mode 100644 index db039eb..0000000 --- a/compass-tasks/db/api/user.py +++ /dev/null @@ -1,553 +0,0 @@ -# Copyright 2014 Huawei Technologies Co. Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""User database operations.""" -import datetime -import functools -import logging -import re - -from flask.ext.login import UserMixin - -from compass.db.api import database -from compass.db.api import utils -from compass.db import exception -from compass.db import models - -from compass.utils import setting_wrapper as setting -from compass.utils import util - - -SUPPORTED_FIELDS = ['email', 'is_admin', 'active'] -PERMISSION_SUPPORTED_FIELDS = ['name'] -SELF_UPDATED_FIELDS = ['email', 'firstname', 'lastname', 'password'] -ADMIN_UPDATED_FIELDS = ['is_admin', 'active'] -IGNORE_FIELDS = ['id', 'created_at', 'updated_at'] -UPDATED_FIELDS = [ - 'email', 'firstname', 'lastname', 'password', 'is_admin', 'active' -] -ADDED_FIELDS = ['email', 'password'] -OPTIONAL_ADDED_FIELDS = ['is_admin', 'active'] -PERMISSION_ADDED_FIELDS = ['permission_id'] -RESP_FIELDS = [ - 'id', 'email', 'is_admin', 'active', 'firstname', - 'lastname', 'created_at', 'updated_at' -] -RESP_TOKEN_FIELDS = [ - 'id', 'user_id', 'token', 'expire_timestamp' -] -PERMISSION_RESP_FIELDS = [ - 'id', 'user_id', 'permission_id', 'name', 'alias', 'description', - 'created_at', 'updated_at' -] - - -def _check_email(email): - """Check email is email format.""" - if '@' not in email: - raise exception.InvalidParameter( - 'there is no @ in email address %s.' % email - ) - - -def _check_user_permission(user, permission, session=None): - """Check user has permission.""" - if not user: - logging.info('empty user means the call is from internal') - return - if user.is_admin: - return - - user_permission = utils.get_db_object( - session, models.UserPermission, - False, user_id=user.id, name=permission.name - ) - if not user_permission: - raise exception.Forbidden( - 'user %s does not have permission %s' % ( - user.email, permission.name - ) - ) - - -def check_user_permission(permission): - """Decorator to check user having permission.""" - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - user = kwargs.get('user') - if user is not None: - session = kwargs.get('session') - if session is None: - raise exception.DatabaseException( - 'wrapper check_user_permission does not run in session' - ) - _check_user_permission(user, permission, session=session) - return func(*args, **kwargs) - else: - return func(*args, **kwargs) - return wrapper - return decorator - - -def check_user_admin(): - """Decorator to check user is admin.""" - def decorator(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - user = kwargs.get('user') - if user is not None: - if not user.is_admin: - raise exception.Forbidden( - 'User %s is not admin.' % ( - user.email - ) - ) - return func(*args, **kwargs) - else: - return func(*args, **kwargs) - return wrapper - return decorator - - -def check_user_admin_or_owner(): - """Decorator to check user is admin or the owner of the resource.""" - def decorator(func): - @functools.wraps(func) - def wrapper(user_id, *args, **kwargs): - user = kwargs.get('user') - if user is not None: - session = kwargs.get('session') - if session is None: - raise exception.DatabaseException( - 'wrapper check_user_admin_or_owner is ' - 'not called in session' - ) - check_user = _get_user(user_id, session=session) - if not user.is_admin and user.id != check_user.id: - raise exception.Forbidden( - 'User %s is not admin or the owner of user %s.' % ( - user.email, check_user.email - ) - ) - - return func( - user_id, *args, **kwargs - ) - else: - return func( - user_id, *args, **kwargs - ) - return wrapper - return decorator - - -def _add_user_permissions(user, session=None, **permission_filters): - """add permissions to a user.""" - from compass.db.api import permission as permission_api - for api_permission in permission_api.list_permissions( - session=session, **permission_filters - ): - utils.add_db_object( - session, models.UserPermission, False, - user.id, api_permission['id'] - ) - - -def _remove_user_permissions(user, session=None, **permission_filters): - """remove permissions from a user.""" - from compass.db.api import permission as permission_api - permission_ids = [ - api_permission['id'] - for api_permission in permission_api.list_permissions( - session=session, **permission_filters - ) - ] - utils.del_db_objects( - session, models.UserPermission, - user_id=user.id, permission_id=permission_ids - ) - - -def _set_user_permissions(user, session=None, **permission_filters): - """set permissions to a user.""" - utils.del_db_objects( - session, models.UserPermission, - user_id=user.id - ) - _add_user_permissions(session, user, **permission_filters) - - -class UserWrapper(UserMixin): - """Wrapper class provided to flask.""" - - def __init__( - self, id, email, crypted_password, - active=True, is_admin=False, - expire_timestamp=None, token='', **kwargs - ): - self.id = id - self.email = email - self.password = crypted_password - self.active = active - self.is_admin = is_admin - self.expire_timestamp = expire_timestamp - if not token: - self.token = self.get_auth_token() - else: - self.token = token - super(UserWrapper, self).__init__() - - def authenticate(self, password): - if not util.encrypt(password, self.password) == self.password: - raise exception.Unauthorized('%s password mismatch' % self.email) - - def get_auth_token(self): - return util.encrypt(self.email) - - def is_active(self): - return self.active - - def get_id(self): - return self.token - - def is_authenticated(self): - current_time = datetime.datetime.now() - return ( - not self.expire_timestamp or - current_time < self.expire_timestamp - ) - - def __str__(self): - return '%s[email:%s,password:%s]' % ( - self.__class__.__name__, self.email, self.password) - - -@database.run_in_session() -def get_user_object(email, session=None, **kwargs): - """get user and convert to UserWrapper object.""" - user = utils.get_db_object( - session, models.User, False, email=email - ) - if not user: - raise exception.Unauthorized( - '%s unauthorized' % email - ) - user_dict = user.to_dict() - user_dict.update(kwargs) - return UserWrapper(**user_dict) - - -@database.run_in_session(exception_when_in_session=False) -def get_user_object_from_token(token, session=None): - """Get user from token and convert to UserWrapper object. - - ::note: - get_user_object_from_token may be called in session. - """ - expire_timestamp = { - 'ge': datetime.datetime.now() - } - user_token = utils.get_db_object( - session, models.UserToken, False, - token=token, expire_timestamp=expire_timestamp - ) - if not user_token: - raise exception.Unauthorized( - 'invalid user token: %s' % token - ) - user_dict = _get_user( - user_token.user_id, session=session - ).to_dict() - user_dict['token'] = token - expire_timestamp = user_token.expire_timestamp - user_dict['expire_timestamp'] = expire_timestamp - return UserWrapper(**user_dict) - - -@utils.supported_filters() -@database.run_in_session() -@utils.wrap_to_dict(RESP_TOKEN_FIELDS) -def record_user_token( - token, expire_timestamp, user=None, session=None -): - """record user token in database.""" - user_token = utils.get_db_object( - session, models.UserToken, False, - user_id=user.id, token=token - ) - if not user_token: - return utils.add_db_object( - session, models.UserToken, True, - token, user_id=user.id, - expire_timestamp=expire_timestamp - ) - elif expire_timestamp > user_token.expire_timestamp: - return utils.update_db_object( - session, user_token, expire_timestamp=expire_timestamp - ) - return user_token - - -@utils.supported_filters() -@database.run_in_session() -@utils.wrap_to_dict(RESP_TOKEN_FIELDS) -def clean_user_token(token, user=None, session=None): - """clean user token in database.""" - return utils.del_db_objects( - session, models.UserToken, - token=token, user_id=user.id - ) - - -def _get_user(user_id, session=None, **kwargs): - """Get user object by user id.""" - if isinstance(user_id, (int, long)): - return utils.get_db_object( - session, models.User, id=user_id, **kwargs - ) - raise exception.InvalidParameter( - 'user id %s type is not int compatible' % user_id - ) - - -@utils.supported_filters() -@database.run_in_session() -@check_user_admin_or_owner() -@utils.wrap_to_dict(RESP_FIELDS) -def get_user( - user_id, exception_when_missing=True, - user=None, session=None, **kwargs -): - """get a user.""" - return _get_user( - user_id, session=session, - exception_when_missing=exception_when_missing - ) - - -@utils.supported_filters() -@database.run_in_session() -@utils.wrap_to_dict(RESP_FIELDS) -def get_current_user( - exception_when_missing=True, user=None, - session=None, **kwargs -): - """get current user.""" - return _get_user( - user.id, session=session, - exception_when_missing=exception_when_missing - ) - - -@utils.supported_filters( - optional_support_keys=SUPPORTED_FIELDS -) -@database.run_in_session() -@check_user_admin() -@utils.wrap_to_dict(RESP_FIELDS) -def list_users(user=None, session=None, **filters): - """List all users.""" - return utils.list_db_objects( - session, models.User, **filters - ) - - -@utils.input_validates(email=_check_email) -@utils.supported_filters( - ADDED_FIELDS, - optional_support_keys=OPTIONAL_ADDED_FIELDS, - ignore_support_keys=IGNORE_FIELDS -) -@database.run_in_session() -@check_user_admin() -@utils.wrap_to_dict(RESP_FIELDS) -def add_user( - exception_when_existing=True, user=None, - session=None, email=None, **kwargs -): - """Create a user and return created user object.""" - add_user = utils.add_db_object( - session, models.User, - exception_when_existing, email, - **kwargs) - _add_user_permissions( - add_user, - session=session, - name=setting.COMPASS_DEFAULT_PERMISSIONS - ) - return add_user - - -@utils.supported_filters() -@database.run_in_session() -@check_user_admin() -@utils.wrap_to_dict(RESP_FIELDS) -def del_user(user_id, user=None, session=None, **kwargs): - """delete a user and return the deleted user object.""" - del_user = _get_user(user_id, session=session) - return utils.del_db_object(session, del_user) - - -@utils.supported_filters( - optional_support_keys=UPDATED_FIELDS, - ignore_support_keys=IGNORE_FIELDS -) -@utils.input_validates(email=_check_email) -@database.run_in_session() -@utils.wrap_to_dict(RESP_FIELDS) -def update_user(user_id, user=None, session=None, **kwargs): - """Update a user and return the updated user object.""" - update_user = _get_user( - user_id, session=session, - ) - allowed_fields = set() - if user.is_admin: - allowed_fields |= set(ADMIN_UPDATED_FIELDS) - if user.id == update_user.id: - allowed_fields |= set(SELF_UPDATED_FIELDS) - unsupported_fields = set(kwargs) - allowed_fields - if unsupported_fields: - # The user is not allowed to update a user. - raise exception.Forbidden( - 'User %s has no permission to update user %s fields %s.' % ( - user.email, user.email, unsupported_fields - ) - ) - return utils.update_db_object(session, update_user, **kwargs) - - -@utils.supported_filters(optional_support_keys=PERMISSION_SUPPORTED_FIELDS) -@database.run_in_session() -@check_user_admin_or_owner() -@utils.wrap_to_dict(PERMISSION_RESP_FIELDS) -def get_permissions( - user_id, user=None, exception_when_missing=True, - session=None, **kwargs -): - """List permissions of a user.""" - get_user = _get_user( - user_id, session=session, - exception_when_missing=exception_when_missing - ) - return utils.list_db_objects( - session, models.UserPermission, user_id=get_user.id, **kwargs - ) - - -def _get_permission(user_id, permission_id, session=None, **kwargs): - """Get user permission by user id and permission id.""" - user = _get_user(user_id, session=session) - from compass.db.api import permission as permission_api - permission = permission_api.get_permission_internal( - permission_id, session=session - ) - return utils.get_db_object( - session, models.UserPermission, - user_id=user.id, permission_id=permission.id, - **kwargs - ) - - -@utils.supported_filters() -@database.run_in_session() -@check_user_admin_or_owner() -@utils.wrap_to_dict(PERMISSION_RESP_FIELDS) -def get_permission( - user_id, permission_id, exception_when_missing=True, - user=None, session=None, **kwargs -): - """Get a permission of a user.""" - return _get_permission( - user_id, permission_id, - exception_when_missing=exception_when_missing, - session=session, - **kwargs - ) - - -@utils.supported_filters() -@database.run_in_session() -@check_user_admin_or_owner() -@utils.wrap_to_dict(PERMISSION_RESP_FIELDS) -def del_permission(user_id, permission_id, user=None, session=None, **kwargs): - """Delete a permission from a user.""" - user_permission = _get_permission( - user_id, permission_id, - session=session, **kwargs - ) - return utils.del_db_object(session, user_permission) - - -@utils.supported_filters( - PERMISSION_ADDED_FIELDS, - ignore_support_keys=IGNORE_FIELDS -) -@database.run_in_session() -@check_user_admin() -@utils.wrap_to_dict(PERMISSION_RESP_FIELDS) -def add_permission( - user_id, permission_id=None, exception_when_existing=True, - user=None, session=None -): - """Add a permission to a user.""" - get_user = _get_user(user_id, session=session) - from compass.db.api import permission as permission_api - get_permission = permission_api.get_permission_internal( - permission_id, session=session - ) - return utils.add_db_object( - session, models.UserPermission, exception_when_existing, - get_user.id, get_permission.id - ) - - -def _get_permission_filters(permission_ids): - """Helper function to filter permissions.""" - if permission_ids == 'all': - return {} - else: - return {'id': permission_ids} - - -@utils.supported_filters( - optional_support_keys=[ - 'add_permissions', 'remove_permissions', 'set_permissions' - ] -) -@database.run_in_session() -@check_user_admin() -@utils.wrap_to_dict(PERMISSION_RESP_FIELDS) -def update_permissions( - user_id, add_permissions=[], remove_permissions=[], - set_permissions=None, user=None, session=None, **kwargs -): - """update user permissions.""" - update_user = _get_user(user_id, session=session) - if remove_permissions: - _remove_user_permissions( - update_user, session=session, - **_get_permission_filters(remove_permissions) - ) - if add_permissions: - _add_user_permissions( - update_user, session=session, - **_get_permission_filters(add_permissions) - ) - if set_permissions is not None: - _set_user_permissions( - update_user, session=session, - **_get_permission_filters(set_permissions) - ) - return update_user.user_permissions |