diff options
Diffstat (limited to 'keystone-moon/keystone/assignment/core.py')
-rw-r--r-- | keystone-moon/keystone/assignment/core.py | 1790 |
1 files changed, 0 insertions, 1790 deletions
diff --git a/keystone-moon/keystone/assignment/core.py b/keystone-moon/keystone/assignment/core.py deleted file mode 100644 index 05368fbf..00000000 --- a/keystone-moon/keystone/assignment/core.py +++ /dev/null @@ -1,1790 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Main entry point into the Assignment service.""" - -import abc -import copy - -from oslo_cache import core as oslo_cache -from oslo_config import cfg -from oslo_log import log -from oslo_log import versionutils -import six - -from keystone.common import cache -from keystone.common import dependency -from keystone.common import driver_hints -from keystone.common import manager -from keystone import exception -from keystone.i18n import _ -from keystone.i18n import _LI, _LE, _LW -from keystone import notifications - - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - -# This is a general cache region for assignment administration (CRUD -# operations). -MEMOIZE = cache.get_memoization_decorator(group='role') - -# This builds a discrete cache region dedicated to role assignments computed -# for a given user + project/domain pair. Any write operation to add or remove -# any role assignment should invalidate this entire cache region. -COMPUTED_ASSIGNMENTS_REGION = oslo_cache.create_region() -MEMOIZE_COMPUTED_ASSIGNMENTS = cache.get_memoization_decorator( - group='role', - region=COMPUTED_ASSIGNMENTS_REGION) - - -@notifications.listener -@dependency.provider('assignment_api') -@dependency.requires('credential_api', 'identity_api', 'resource_api', - 'revoke_api', 'role_api') -class Manager(manager.Manager): - """Default pivot point for the Assignment backend. - - See :class:`keystone.common.manager.Manager` for more details on how this - dynamically calls the backend. - - """ - - driver_namespace = 'keystone.assignment' - - _PROJECT = 'project' - _ROLE_REMOVED_FROM_USER = 'role_removed_from_user' - _INVALIDATION_USER_PROJECT_TOKENS = 'invalidate_user_project_tokens' - - def __init__(self): - assignment_driver = CONF.assignment.driver - # If there is no explicit assignment driver specified, we let the - # identity driver tell us what to use. This is for backward - # compatibility reasons from the time when identity, resource and - # assignment were all part of identity. - if assignment_driver is None: - msg = _('Use of the identity driver config to automatically ' - 'configure the same assignment driver has been ' - 'deprecated, in the "O" release, the assignment driver ' - 'will need to be expicitly configured if different ' - 'than the default (SQL).') - versionutils.report_deprecated_feature(LOG, msg) - try: - identity_driver = dependency.get_provider( - 'identity_api').driver - assignment_driver = identity_driver.default_assignment_driver() - except ValueError: - msg = _('Attempted automatic driver selection for assignment ' - 'based upon [identity]\driver option failed since ' - 'driver %s is not found. Set [assignment]/driver to ' - 'a valid driver in keystone config.') - LOG.critical(msg) - raise exception.KeystoneConfigurationError(msg) - super(Manager, self).__init__(assignment_driver) - - # Make sure it is a driver version we support, and if it is a legacy - # driver, then wrap it. - if isinstance(self.driver, AssignmentDriverV8): - self.driver = V9AssignmentWrapperForV8Driver(self.driver) - elif not isinstance(self.driver, AssignmentDriverV9): - raise exception.UnsupportedDriverVersion(driver=assignment_driver) - - self.event_callbacks = { - notifications.ACTIONS.deleted: { - 'domain': [self._delete_domain_assignments], - }, - } - - def _delete_domain_assignments(self, service, resource_type, operations, - payload): - domain_id = payload['resource_info'] - self.driver.delete_domain_assignments(domain_id) - - def _get_group_ids_for_user_id(self, user_id): - # TODO(morganfainberg): Implement a way to get only group_ids - # instead of the more expensive to_dict() call for each record. - return [x['id'] for - x in self.identity_api.list_groups_for_user(user_id)] - - def list_user_ids_for_project(self, tenant_id): - self.resource_api.get_project(tenant_id) - assignment_list = self.list_role_assignments( - project_id=tenant_id, effective=True) - # Use set() to process the list to remove any duplicates - return list(set([x['user_id'] for x in assignment_list])) - - def _list_parent_ids_of_project(self, project_id): - if CONF.os_inherit.enabled: - return [x['id'] for x in ( - self.resource_api.list_project_parents(project_id))] - else: - return [] - - @MEMOIZE_COMPUTED_ASSIGNMENTS - def get_roles_for_user_and_project(self, user_id, tenant_id): - """Get the roles associated with a user within given project. - - This includes roles directly assigned to the user on the - project, as well as those by virtue of group membership or - inheritance. - - :returns: a list of role ids. - :raises keystone.exception.ProjectNotFound: If the project doesn't - exist. - - """ - self.resource_api.get_project(tenant_id) - assignment_list = self.list_role_assignments( - user_id=user_id, project_id=tenant_id, effective=True) - # Use set() to process the list to remove any duplicates - return list(set([x['role_id'] for x in assignment_list])) - - @MEMOIZE_COMPUTED_ASSIGNMENTS - def get_roles_for_user_and_domain(self, user_id, domain_id): - """Get the roles associated with a user within given domain. - - :returns: a list of role ids. - :raises keystone.exception.DomainNotFound: If the domain doesn't exist. - - """ - self.resource_api.get_domain(domain_id) - assignment_list = self.list_role_assignments( - user_id=user_id, domain_id=domain_id, effective=True) - # Use set() to process the list to remove any duplicates - return list(set([x['role_id'] for x in assignment_list])) - - def get_roles_for_groups(self, group_ids, project_id=None, domain_id=None): - """Get a list of roles for this group on domain and/or project.""" - if project_id is not None: - self.resource_api.get_project(project_id) - assignment_list = self.list_role_assignments( - source_from_group_ids=group_ids, project_id=project_id, - effective=True) - elif domain_id is not None: - assignment_list = self.list_role_assignments( - source_from_group_ids=group_ids, domain_id=domain_id, - effective=True) - else: - raise AttributeError(_("Must specify either domain or project")) - - role_ids = list(set([x['role_id'] for x in assignment_list])) - return self.role_api.list_roles_from_ids(role_ids) - - def add_user_to_project(self, tenant_id, user_id): - """Add user to a tenant by creating a default role relationship. - - :raises keystone.exception.ProjectNotFound: If the project doesn't - exist. - :raises keystone.exception.UserNotFound: If the user doesn't exist. - - """ - self.resource_api.get_project(tenant_id) - try: - self.role_api.get_role(CONF.member_role_id) - self.driver.add_role_to_user_and_project( - user_id, - tenant_id, - CONF.member_role_id) - except exception.RoleNotFound: - LOG.info(_LI("Creating the default role %s " - "because it does not exist."), - CONF.member_role_id) - role = {'id': CONF.member_role_id, - 'name': CONF.member_role_name} - try: - self.role_api.create_role(CONF.member_role_id, role) - except exception.Conflict: - LOG.info(_LI("Creating the default role %s failed because it " - "was already created"), - CONF.member_role_id) - # now that default role exists, the add should succeed - self.driver.add_role_to_user_and_project( - user_id, - tenant_id, - CONF.member_role_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - @notifications.role_assignment('created') - def _add_role_to_user_and_project_adapter(self, role_id, user_id=None, - group_id=None, domain_id=None, - project_id=None, - inherited_to_projects=False, - context=None): - - # The parameters for this method must match the parameters for - # create_grant so that the notifications.role_assignment decorator - # will work. - - self.resource_api.get_project(project_id) - self.role_api.get_role(role_id) - self.driver.add_role_to_user_and_project(user_id, project_id, role_id) - - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): - self._add_role_to_user_and_project_adapter( - role_id, user_id=user_id, project_id=tenant_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - def remove_user_from_project(self, tenant_id, user_id): - """Remove user from a tenant - - :raises keystone.exception.ProjectNotFound: If the project doesn't - exist. - :raises keystone.exception.UserNotFound: If the user doesn't exist. - - """ - roles = self.get_roles_for_user_and_project(user_id, tenant_id) - if not roles: - raise exception.NotFound(tenant_id) - for role_id in roles: - try: - self.driver.remove_role_from_user_and_project(user_id, - tenant_id, - role_id) - self.revoke_api.revoke_by_grant(role_id, user_id=user_id, - project_id=tenant_id) - - except exception.RoleNotFound: - LOG.debug("Removing role %s failed because it does not exist.", - role_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - # TODO(henry-nash): We might want to consider list limiting this at some - # point in the future. - def list_projects_for_user(self, user_id, hints=None): - assignment_list = self.list_role_assignments( - user_id=user_id, effective=True) - # Use set() to process the list to remove any duplicates - project_ids = list(set([x['project_id'] for x in assignment_list - if x.get('project_id')])) - return self.resource_api.list_projects_from_ids(list(project_ids)) - - # TODO(henry-nash): We might want to consider list limiting this at some - # point in the future. - def list_domains_for_user(self, user_id, hints=None): - assignment_list = self.list_role_assignments( - user_id=user_id, effective=True) - # Use set() to process the list to remove any duplicates - domain_ids = list(set([x['domain_id'] for x in assignment_list - if x.get('domain_id')])) - return self.resource_api.list_domains_from_ids(domain_ids) - - def list_domains_for_groups(self, group_ids): - assignment_list = self.list_role_assignments( - source_from_group_ids=group_ids, effective=True) - domain_ids = list(set([x['domain_id'] for x in assignment_list - if x.get('domain_id')])) - return self.resource_api.list_domains_from_ids(domain_ids) - - def list_projects_for_groups(self, group_ids): - assignment_list = self.list_role_assignments( - source_from_group_ids=group_ids, effective=True) - project_ids = list(set([x['project_id'] for x in assignment_list - if x.get('project_id')])) - return self.resource_api.list_projects_from_ids(project_ids) - - @notifications.role_assignment('deleted') - def _remove_role_from_user_and_project_adapter(self, role_id, user_id=None, - group_id=None, - domain_id=None, - project_id=None, - inherited_to_projects=False, - context=None): - - # The parameters for this method must match the parameters for - # delete_grant so that the notifications.role_assignment decorator - # will work. - - self.driver.remove_role_from_user_and_project(user_id, project_id, - role_id) - if project_id: - self._emit_invalidate_grant_token_persistence(user_id, project_id) - else: - self.identity_api.emit_invalidate_user_token_persistence(user_id) - self.revoke_api.revoke_by_grant(role_id, user_id=user_id, - project_id=project_id) - - def remove_role_from_user_and_project(self, user_id, tenant_id, role_id): - self._remove_role_from_user_and_project_adapter( - role_id, user_id=user_id, project_id=tenant_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - def _emit_invalidate_user_token_persistence(self, user_id): - self.identity_api.emit_invalidate_user_token_persistence(user_id) - - # NOTE(lbragstad): The previous notification decorator behavior didn't - # send the notification unless the operation was successful. We - # maintain that behavior here by calling to the notification module - # after the call to emit invalid user tokens. - notifications.Audit.internal( - notifications.INVALIDATE_USER_TOKEN_PERSISTENCE, user_id - ) - - def _emit_invalidate_grant_token_persistence(self, user_id, project_id): - self.identity_api.emit_invalidate_grant_token_persistence( - {'user_id': user_id, 'project_id': project_id} - ) - - @notifications.role_assignment('created') - def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False, context=None): - self.role_api.get_role(role_id) - if domain_id: - self.resource_api.get_domain(domain_id) - if project_id: - self.resource_api.get_project(project_id) - self.driver.create_grant(role_id, user_id, group_id, domain_id, - project_id, inherited_to_projects) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - def get_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - role_ref = self.role_api.get_role(role_id) - if domain_id: - self.resource_api.get_domain(domain_id) - if project_id: - self.resource_api.get_project(project_id) - self.check_grant_role_id( - role_id, user_id, group_id, domain_id, project_id, - inherited_to_projects) - return role_ref - - def list_grants(self, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - if domain_id: - self.resource_api.get_domain(domain_id) - if project_id: - self.resource_api.get_project(project_id) - grant_ids = self.list_grant_role_ids( - user_id, group_id, domain_id, project_id, inherited_to_projects) - return self.role_api.list_roles_from_ids(grant_ids) - - @notifications.role_assignment('deleted') - def _emit_revoke_user_grant(self, role_id, user_id, domain_id, project_id, - inherited_to_projects, context): - self._emit_invalidate_grant_token_persistence(user_id, project_id) - - def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False, context=None): - if group_id is None: - self.revoke_api.revoke_by_grant(user_id=user_id, - role_id=role_id, - domain_id=domain_id, - project_id=project_id) - self._emit_revoke_user_grant( - role_id, user_id, domain_id, project_id, - inherited_to_projects, context) - else: - try: - # Group may contain a lot of users so revocation will be - # by role & domain/project - if domain_id is None: - self.revoke_api.revoke_by_project_role_assignment( - project_id, role_id - ) - else: - self.revoke_api.revoke_by_domain_role_assignment( - domain_id, role_id - ) - if CONF.token.revoke_by_id: - # NOTE(morganfainberg): The user ids are the important part - # for invalidating tokens below, so extract them here. - for user in self.identity_api.list_users_in_group( - group_id): - self._emit_revoke_user_grant( - role_id, user['id'], domain_id, project_id, - inherited_to_projects, context) - except exception.GroupNotFound: - LOG.debug('Group %s not found, no tokens to invalidate.', - group_id) - - # TODO(henry-nash): While having the call to get_role here mimics the - # previous behavior (when it was buried inside the driver delete call), - # this seems an odd place to have this check, given what we have - # already done so far in this method. See Bug #1406776. - self.role_api.get_role(role_id) - - if domain_id: - self.resource_api.get_domain(domain_id) - if project_id: - self.resource_api.get_project(project_id) - self.driver.delete_grant(role_id, user_id, group_id, domain_id, - project_id, inherited_to_projects) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - # The methods _expand_indirect_assignment, _list_direct_role_assignments - # and _list_effective_role_assignments below are only used on - # list_role_assignments, but they are not in its scope as nested functions - # since it would significantly increase McCabe complexity, that should be - # kept as it is in order to detect unnecessarily complex code, which is not - # this case. - - def _expand_indirect_assignment(self, ref, user_id=None, project_id=None, - subtree_ids=None, expand_groups=True): - """Returns a list of expanded role assignments. - - This methods is called for each discovered assignment that either needs - a group assignment expanded into individual user assignments, or needs - an inherited assignment to be applied to its children. - - In all cases, if either user_id and/or project_id is specified, then we - filter the result on those values. - - If project_id is specified and subtree_ids is None, then this - indicates that we are only interested in that one project. If - subtree_ids is not None, then this is an indicator that any - inherited assignments need to be expanded down the tree. The - actual subtree_ids don't need to be used as a filter here, since we - already ensured only those assignments that could affect them - were passed to this method. - - If expand_groups is True then we expand groups out to a list of - assignments, one for each member of that group. - - """ - def create_group_assignment(base_ref, user_id): - """Creates a group assignment from the provided ref.""" - ref = copy.deepcopy(base_ref) - - ref['user_id'] = user_id - - indirect = ref.setdefault('indirect', {}) - indirect['group_id'] = ref.pop('group_id') - - return ref - - def expand_group_assignment(ref, user_id): - """Expands group role assignment. - - For any group role assignment on a target, it is replaced by a list - of role assignments containing one for each user of that group on - that target. - - An example of accepted ref is:: - - { - 'group_id': group_id, - 'project_id': project_id, - 'role_id': role_id - } - - Once expanded, it should be returned as a list of entities like the - one below, one for each each user_id in the provided group_id. - - :: - - { - 'user_id': user_id, - 'project_id': project_id, - 'role_id': role_id, - 'indirect' : { - 'group_id': group_id - } - } - - Returned list will be formatted by the Controller, which will - deduce a role assignment came from group membership if it has both - 'user_id' in the main body of the dict and 'group_id' in indirect - subdict. - - """ - if user_id: - return [create_group_assignment(ref, user_id=user_id)] - - return [create_group_assignment(ref, user_id=m['id']) - for m in self.identity_api.list_users_in_group( - ref['group_id'])] - - def expand_inherited_assignment(ref, user_id, project_id, subtree_ids, - expand_groups): - """Expands inherited role assignments. - - If expand_groups is True and this is a group role assignment on a - target, replace it by a list of role assignments containing one for - each user of that group, on every project under that target. If - expand_groups is False, then return a group assignment on an - inherited target. - - If this is a user role assignment on a specific target (i.e. - project_id is specified, but subtree_ids is None) then simply - format this as a single assignment (since we are effectively - filtering on project_id). If however, project_id is None or - subtree_ids is not None, then replace this one assignment with a - list of role assignments for that user on every project under - that target. - - An example of accepted ref is:: - - { - 'group_id': group_id, - 'project_id': parent_id, - 'role_id': role_id, - 'inherited_to_projects': 'projects' - } - - Once expanded, it should be returned as a list of entities like the - one below, one for each each user_id in the provided group_id and - for each subproject_id in the project_id subtree. - - :: - - { - 'user_id': user_id, - 'project_id': subproject_id, - 'role_id': role_id, - 'indirect' : { - 'group_id': group_id, - 'project_id': parent_id - } - } - - Returned list will be formatted by the Controller, which will - deduce a role assignment came from group membership if it has both - 'user_id' in the main body of the dict and 'group_id' in the - 'indirect' subdict, as well as it is possible to deduce if it has - come from inheritance if it contains both a 'project_id' in the - main body of the dict and 'parent_id' in the 'indirect' subdict. - - """ - def create_inherited_assignment(base_ref, project_id): - """Creates a project assignment from the provided ref. - - base_ref can either be a project or domain inherited - assignment ref. - - """ - ref = copy.deepcopy(base_ref) - - indirect = ref.setdefault('indirect', {}) - if ref.get('project_id'): - indirect['project_id'] = ref.pop('project_id') - else: - indirect['domain_id'] = ref.pop('domain_id') - - ref['project_id'] = project_id - ref.pop('inherited_to_projects') - - return ref - - # Define expanded project list to which to apply this assignment - if project_id: - # Since ref is an inherited assignment and we are filtering by - # project(s), we are only going to apply the assignment to the - # relevant project(s) - project_ids = [project_id] - if subtree_ids: - project_ids += subtree_ids - # If this is a domain inherited assignment, then we know - # that all the project_ids will get this assignment. If - # it's a project inherited assignment, and the assignment - # point is an ancestor of project_id, then we know that - # again all the project_ids will get the assignment. If, - # however, the assignment point is within the subtree, - # then only a partial tree will get the assignment. - if ref.get('project_id'): - if ref['project_id'] in project_ids: - project_ids = ( - [x['id'] for x in - self.resource_api.list_projects_in_subtree( - ref['project_id'])]) - elif ref.get('domain_id'): - # A domain inherited assignment, so apply it to all projects - # in this domain - project_ids = ( - [x['id'] for x in - self.resource_api.list_projects_in_domain( - ref['domain_id'])]) - else: - # It must be a project assignment, so apply it to its subtree - project_ids = ( - [x['id'] for x in - self.resource_api.list_projects_in_subtree( - ref['project_id'])]) - - new_refs = [] - if 'group_id' in ref: - if expand_groups: - # Expand role assignment to all group members on any - # inherited target of any of the projects - for ref in expand_group_assignment(ref, user_id): - new_refs += [create_inherited_assignment(ref, proj_id) - for proj_id in project_ids] - else: - # Just place the group assignment on any inherited target - # of any of the projects - new_refs += [create_inherited_assignment(ref, proj_id) - for proj_id in project_ids] - else: - # Expand role assignment for all projects - new_refs += [create_inherited_assignment(ref, proj_id) - for proj_id in project_ids] - - return new_refs - - if ref.get('inherited_to_projects') == 'projects': - return expand_inherited_assignment( - ref, user_id, project_id, subtree_ids, expand_groups) - elif 'group_id' in ref and expand_groups: - return expand_group_assignment(ref, user_id) - return [ref] - - def add_implied_roles(self, role_refs): - """Expand out implied roles. - - The role_refs passed in have had all inheritance and group assignments - expanded out. We now need to look at the role_id in each ref and see - if it is a prior role for some implied roles. If it is, then we need to - duplicate that ref, one for each implied role. We store the prior role - in the indirect dict that is part of such a duplicated ref, so that a - caller can determine where the assignment came from. - - """ - def _make_implied_ref_copy(prior_ref, implied_role_id): - # Create a ref for an implied role from the ref of a prior role, - # setting the new role_id to be the implied role and the indirect - # role_id to be the prior role - implied_ref = copy.deepcopy(prior_ref) - implied_ref['role_id'] = implied_role_id - indirect = implied_ref.setdefault('indirect', {}) - indirect['role_id'] = prior_ref['role_id'] - return implied_ref - - if not CONF.token.infer_roles: - return role_refs - try: - implied_roles_cache = {} - role_refs_to_check = list(role_refs) - ref_results = list(role_refs) - checked_role_refs = list() - while(role_refs_to_check): - next_ref = role_refs_to_check.pop() - checked_role_refs.append(next_ref) - next_role_id = next_ref['role_id'] - if next_role_id in implied_roles_cache: - implied_roles = implied_roles_cache[next_role_id] - else: - implied_roles = ( - self.role_api.list_implied_roles(next_role_id)) - implied_roles_cache[next_role_id] = implied_roles - for implied_role in implied_roles: - implied_ref = ( - _make_implied_ref_copy( - next_ref, implied_role['implied_role_id'])) - if implied_ref in checked_role_refs: - msg = _LE('Circular reference found ' - 'role inference rules - %(prior_role_id)s.') - LOG.error(msg, {'prior_role_id': next_ref['role_id']}) - else: - ref_results.append(implied_ref) - role_refs_to_check.append(implied_ref) - except exception.NotImplemented: - LOG.error('Role driver does not support implied roles.') - - return ref_results - - def _filter_by_role_id(self, role_id, ref_results): - # if we arrive here, we need to filer by role_id. - filter_results = [] - for ref in ref_results: - if ref['role_id'] == role_id: - filter_results.append(ref) - return filter_results - - def _strip_domain_roles(self, role_refs): - """Post process assignment list for domain roles. - - Domain roles are only designed to do the job of inferring other roles - and since that has been done before this method is called, we need to - remove any assignments that include a domain role. - - """ - def _role_is_global(role_id): - ref = self.role_api.get_role(role_id) - return (ref['domain_id'] is None) - - filter_results = [] - for ref in role_refs: - if _role_is_global(ref['role_id']): - filter_results.append(ref) - return filter_results - - def _list_effective_role_assignments(self, role_id, user_id, group_id, - domain_id, project_id, subtree_ids, - inherited, source_from_group_ids, - strip_domain_roles): - """List role assignments in effective mode. - - When using effective mode, besides the direct assignments, the indirect - ones that come from grouping or inheritance are retrieved and will then - be expanded. - - The resulting list of assignments will be filtered by the provided - parameters. If subtree_ids is not None, then we also want to include - all subtree_ids in the filter as well. Since we are in effective mode, - group can never act as a filter (since group assignments are expanded - into user roles) and domain can only be filter if we want non-inherited - assignments, since domains can't inherit assignments. - - The goal of this method is to only ask the driver for those - assignments as could effect the result based on the parameter filters - specified, hence avoiding retrieving a huge list. - - """ - def list_role_assignments_for_actor( - role_id, inherited, user_id=None, group_ids=None, - project_id=None, subtree_ids=None, domain_id=None): - """List role assignments for actor on target. - - List direct and indirect assignments for an actor, optionally - for a given target (i.e. projects or domain). - - :param role_id: List for a specific role, can be None meaning all - roles - :param inherited: Indicates whether inherited assignments or only - direct assignments are required. If None, then - both are required. - :param user_id: If not None, list only assignments that affect this - user. - :param group_ids: A list of groups required. Only one of user_id - and group_ids can be specified - :param project_id: If specified, only include those assignments - that affect at least this project, with - additionally any projects specified in - subtree_ids - :param subtree_ids: The list of projects in the subtree. If - specified, also include those assignments that - affect these projects. These projects are - guaranteed to be in the same domain as the - project specified in project_id. subtree_ids - can only be specified if project_id has also - been specified. - :param domain_id: If specified, only include those assignments - that affect this domain - by definition this will - not include any inherited assignments - - :returns: List of assignments matching the criteria. Any inherited - or group assignments that could affect the resulting - response are included. - - """ - project_ids_of_interest = None - if project_id: - if subtree_ids: - project_ids_of_interest = subtree_ids + [project_id] - else: - project_ids_of_interest = [project_id] - - # List direct project role assignments - non_inherited_refs = [] - if inherited is False or inherited is None: - # Get non inherited assignments - non_inherited_refs = self.driver.list_role_assignments( - role_id=role_id, domain_id=domain_id, - project_ids=project_ids_of_interest, user_id=user_id, - group_ids=group_ids, inherited_to_projects=False) - - inherited_refs = [] - if inherited is True or inherited is None: - # Get inherited assignments - if project_id: - # The project and any subtree are guaranteed to be owned by - # the same domain, so since we are filtering by these - # specific projects, then we can only get inherited - # assignments from their common domain or from any of - # their parents projects. - - # List inherited assignments from the project's domain - proj_domain_id = self.resource_api.get_project( - project_id)['domain_id'] - inherited_refs += self.driver.list_role_assignments( - role_id=role_id, domain_id=proj_domain_id, - user_id=user_id, group_ids=group_ids, - inherited_to_projects=True) - - # For inherited assignments from projects, since we know - # they are from the same tree the only places these can - # come from are from parents of the main project or - # inherited assignments on the project or subtree itself. - source_ids = [project['id'] for project in - self.resource_api.list_project_parents( - project_id)] - if subtree_ids: - source_ids += project_ids_of_interest - if source_ids: - inherited_refs += self.driver.list_role_assignments( - role_id=role_id, project_ids=source_ids, - user_id=user_id, group_ids=group_ids, - inherited_to_projects=True) - else: - # List inherited assignments without filtering by target - inherited_refs = self.driver.list_role_assignments( - role_id=role_id, user_id=user_id, group_ids=group_ids, - inherited_to_projects=True) - - return non_inherited_refs + inherited_refs - - # If filtering by group or inherited domain assignment the list is - # guaranteed to be empty - if group_id or (domain_id and inherited): - return [] - - if user_id and source_from_group_ids: - # You can't do both - and since source_from_group_ids is only used - # internally, this must be a coding error by the caller. - msg = _('Cannot list assignments sourced from groups and filtered ' - 'by user ID.') - raise exception.UnexpectedError(msg) - - # If filtering by domain, then only non-inherited assignments are - # relevant, since domains don't inherit assignments - inherited = False if domain_id else inherited - - # List user or explicit group assignments. - # Due to the need to expand implied roles, this call will skip - # filtering by role_id and instead return the whole set of roles. - # Matching on the specified role is performed at the end. - direct_refs = list_role_assignments_for_actor( - role_id=None, user_id=user_id, group_ids=source_from_group_ids, - project_id=project_id, subtree_ids=subtree_ids, - domain_id=domain_id, inherited=inherited) - - # And those from the user's groups, so long as we are not restricting - # to a set of source groups (in which case we already got those - # assignments in the direct listing above). - group_refs = [] - if not source_from_group_ids and user_id: - group_ids = self._get_group_ids_for_user_id(user_id) - if group_ids: - group_refs = list_role_assignments_for_actor( - role_id=None, project_id=project_id, - subtree_ids=subtree_ids, group_ids=group_ids, - domain_id=domain_id, inherited=inherited) - - # Expand grouping and inheritance on retrieved role assignments - refs = [] - expand_groups = (source_from_group_ids is None) - for ref in (direct_refs + group_refs): - refs += self._expand_indirect_assignment( - ref, user_id, project_id, subtree_ids, expand_groups) - - refs = self.add_implied_roles(refs) - if strip_domain_roles: - refs = self._strip_domain_roles(refs) - if role_id: - refs = self._filter_by_role_id(role_id, refs) - - return refs - - def _list_direct_role_assignments(self, role_id, user_id, group_id, - domain_id, project_id, subtree_ids, - inherited): - """List role assignments without applying expansion. - - Returns a list of direct role assignments, where their attributes match - the provided filters. If subtree_ids is not None, then we also want to - include all subtree_ids in the filter as well. - - """ - group_ids = [group_id] if group_id else None - project_ids_of_interest = None - if project_id: - if subtree_ids: - project_ids_of_interest = subtree_ids + [project_id] - else: - project_ids_of_interest = [project_id] - - return self.driver.list_role_assignments( - role_id=role_id, user_id=user_id, group_ids=group_ids, - domain_id=domain_id, project_ids=project_ids_of_interest, - inherited_to_projects=inherited) - - def list_role_assignments(self, role_id=None, user_id=None, group_id=None, - domain_id=None, project_id=None, - include_subtree=False, inherited=None, - effective=None, include_names=False, - source_from_group_ids=None, - strip_domain_roles=True): - """List role assignments, honoring effective mode and provided filters. - - Returns a list of role assignments, where their attributes match the - provided filters (role_id, user_id, group_id, domain_id, project_id and - inherited). If include_subtree is True, then assignments on all - descendants of the project specified by project_id are also included. - The inherited filter defaults to None, meaning to get both - non-inherited and inherited role assignments. - - If effective mode is specified, this means that rather than simply - return the assignments that match the filters, any group or - inheritance assignments will be expanded. Group assignments will - become assignments for all the users in that group, and inherited - assignments will be shown on the projects below the assignment point. - Think of effective mode as being the list of assignments that actually - affect a user, for example the roles that would be placed in a token. - - If include_names is set to true the entities' names are returned - in addition to their id's. - - source_from_group_ids is a list of group IDs and, if specified, then - only those assignments that are derived from membership of these groups - are considered, and any such assignments will not be expanded into - their user membership assignments. This is different to a group filter - of the resulting list, instead being a restriction on which assignments - should be considered before expansion of inheritance. This option is - only used internally (i.e. it is not exposed at the API level) and is - only supported in effective mode (since in regular mode there is no - difference between this and a group filter, other than it is a list of - groups). - - In effective mode, any domain specific roles are usually stripped from - the returned assignments (since such roles are not placed in tokens). - This stripping can be disabled by specifying strip_domain_roles=False, - which is useful for internal calls like trusts which need to examine - the full set of roles. - - If OS-INHERIT extension is disabled or the used driver does not support - inherited roles retrieval, inherited role assignments will be ignored. - - """ - if not CONF.os_inherit.enabled: - if inherited: - return [] - inherited = False - - subtree_ids = None - if project_id and include_subtree: - subtree_ids = ( - [x['id'] for x in - self.resource_api.list_projects_in_subtree(project_id)]) - - if effective: - role_assignments = self._list_effective_role_assignments( - role_id, user_id, group_id, domain_id, project_id, - subtree_ids, inherited, source_from_group_ids, - strip_domain_roles) - else: - role_assignments = self._list_direct_role_assignments( - role_id, user_id, group_id, domain_id, project_id, - subtree_ids, inherited) - - if include_names: - return self._get_names_from_role_assignments(role_assignments) - return role_assignments - - def _get_names_from_role_assignments(self, role_assignments): - role_assign_list = [] - - for role_asgmt in role_assignments: - new_assign = {} - for id_type, id_ in role_asgmt.items(): - if id_type == 'domain_id': - _domain = self.resource_api.get_domain(id_) - new_assign['domain_id'] = _domain['id'] - new_assign['domain_name'] = _domain['name'] - elif id_type == 'user_id': - _user = self.identity_api.get_user(id_) - new_assign['user_id'] = _user['id'] - new_assign['user_name'] = _user['name'] - new_assign['user_domain_id'] = _user['domain_id'] - new_assign['user_domain_name'] = ( - self.resource_api.get_domain(_user['domain_id']) - ['name']) - elif id_type == 'group_id': - _group = self.identity_api.get_group(id_) - new_assign['group_id'] = _group['id'] - new_assign['group_name'] = _group['name'] - new_assign['group_domain_id'] = _group['domain_id'] - new_assign['group_domain_name'] = ( - self.resource_api.get_domain(_group['domain_id']) - ['name']) - elif id_type == 'project_id': - _project = self.resource_api.get_project(id_) - new_assign['project_id'] = _project['id'] - new_assign['project_name'] = _project['name'] - new_assign['project_domain_id'] = _project['domain_id'] - new_assign['project_domain_name'] = ( - self.resource_api.get_domain(_project['domain_id']) - ['name']) - elif id_type == 'role_id': - _role = self.role_api.get_role(id_) - new_assign['role_id'] = _role['id'] - new_assign['role_name'] = _role['name'] - role_assign_list.append(new_assign) - return role_assign_list - - def delete_tokens_for_role_assignments(self, role_id): - assignments = self.list_role_assignments(role_id=role_id) - - # Iterate over the assignments for this role and build the list of - # user or user+project IDs for the tokens we need to delete - user_ids = set() - user_and_project_ids = list() - for assignment in assignments: - # If we have a project assignment, then record both the user and - # project IDs so we can target the right token to delete. If it is - # a domain assignment, we might as well kill all the tokens for - # the user, since in the vast majority of cases all the tokens - # for a user will be within one domain anyway, so not worth - # trying to delete tokens for each project in the domain. - if 'user_id' in assignment: - if 'project_id' in assignment: - user_and_project_ids.append( - (assignment['user_id'], assignment['project_id'])) - elif 'domain_id' in assignment: - self._emit_invalidate_user_token_persistence( - assignment['user_id']) - elif 'group_id' in assignment: - # Add in any users for this group, being tolerant of any - # cross-driver database integrity errors. - try: - users = self.identity_api.list_users_in_group( - assignment['group_id']) - except exception.GroupNotFound: - # Ignore it, but log a debug message - if 'project_id' in assignment: - target = _('Project (%s)') % assignment['project_id'] - elif 'domain_id' in assignment: - target = _('Domain (%s)') % assignment['domain_id'] - else: - target = _('Unknown Target') - msg = ('Group (%(group)s), referenced in assignment ' - 'for %(target)s, not found - ignoring.') - LOG.debug(msg, {'group': assignment['group_id'], - 'target': target}) - continue - - if 'project_id' in assignment: - for user in users: - user_and_project_ids.append( - (user['id'], assignment['project_id'])) - elif 'domain_id' in assignment: - for user in users: - self._emit_invalidate_user_token_persistence( - user['id']) - - # Now process the built up lists. Before issuing calls to delete any - # tokens, let's try and minimize the number of calls by pruning out - # any user+project deletions where a general token deletion for that - # same user is also planned. - user_and_project_ids_to_action = [] - for user_and_project_id in user_and_project_ids: - if user_and_project_id[0] not in user_ids: - user_and_project_ids_to_action.append(user_and_project_id) - - for user_id, project_id in user_and_project_ids_to_action: - payload = {'user_id': user_id, 'project_id': project_id} - notifications.Audit.internal( - notifications.INVALIDATE_USER_PROJECT_TOKEN_PERSISTENCE, - payload - ) - - -# The AssignmentDriverBase class is the set of driver methods from earlier -# drivers that we still support, that have not been removed or modified. This -# class is then used to created the augmented V8 and V9 version abstract driver -# classes, without having to duplicate a lot of abstract method signatures. -# If you remove a method from V9, then move the abstract methods from this Base -# class to the V8 class. Do not modify any of the method signatures in the Base -# class - changes should only be made in the V8 and subsequent classes. -@six.add_metaclass(abc.ABCMeta) -class AssignmentDriverBase(object): - - def _get_list_limit(self): - return CONF.assignment.list_limit or CONF.list_limit - - @abc.abstractmethod - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): - """Add a role to a user within given tenant. - - :raises keystone.exception.Conflict: If a duplicate role assignment - exists. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def remove_role_from_user_and_project(self, user_id, tenant_id, role_id): - """Remove a role from a user within given tenant. - - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - # assignment/grant crud - - @abc.abstractmethod - def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - """Creates a new assignment/grant. - - If the assignment is to a domain, then optionally it may be - specified as inherited to owned projects (this requires - the OS-INHERIT extension to be enabled). - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_grant_role_ids(self, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - """Lists role ids for assignments/grants.""" - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def check_grant_role_id(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - """Checks an assignment/grant role id. - - :raises keystone.exception.RoleAssignmentNotFound: If the role - assignment doesn't exist. - :returns: None or raises an exception if grant not found - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - """Deletes assignments/grants. - - :raises keystone.exception.RoleAssignmentNotFound: If the role - assignment doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_role_assignments(self, role_id=None, - user_id=None, group_ids=None, - domain_id=None, project_ids=None, - inherited_to_projects=None): - """Returns a list of role assignments for actors on targets. - - Available parameters represent values in which the returned role - assignments attributes need to be filtered on. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_project_assignments(self, project_id): - """Deletes all assignments for a project. - - :raises keystone.exception.ProjectNotFound: If the project doesn't - exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_role_assignments(self, role_id): - """Deletes all assignments for a role.""" - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_user_assignments(self, user_id): - """Deletes all assignments for a user. - - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_group_assignments(self, group_id): - """Deletes all assignments for a group. - - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - -class AssignmentDriverV8(AssignmentDriverBase): - """Removed or redefined methods from V8. - - Move the abstract methods of any methods removed or modified in later - versions of the driver from AssignmentDriverBase to here. We maintain this - so that legacy drivers, which will be a subclass of AssignmentDriverV8, can - still reference them. - - """ - - @abc.abstractmethod - def list_user_ids_for_project(self, tenant_id): - """Lists all user IDs with a role assignment in the specified project. - - :returns: a list of user_ids or an empty set. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_project_ids_for_user(self, user_id, group_ids, hints, - inherited=False): - """List all project ids associated with a given user. - - :param user_id: the user in question - :param group_ids: the groups this user is a member of. This list is - built in the Manager, so that the driver itself - does not have to call across to identity. - :param hints: filter hints which the driver should - implement if at all possible. - :param inherited: whether assignments marked as inherited should - be included. - - :returns: a list of project ids or an empty list. - - This method should not try and expand any inherited assignments, - just report the projects that have the role for this user. The manager - method is responsible for expanding out inherited assignments. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_domain_ids_for_user(self, user_id, group_ids, hints, - inherited=False): - """List all domain ids associated with a given user. - - :param user_id: the user in question - :param group_ids: the groups this user is a member of. This list is - built in the Manager, so that the driver itself - does not have to call across to identity. - :param hints: filter hints which the driver should - implement if at all possible. - :param inherited: whether to return domain_ids that have inherited - assignments or not. - - :returns: a list of domain ids or an empty list. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_project_ids_for_groups(self, group_ids, hints, - inherited=False): - """List project ids accessible to specified groups. - - :param group_ids: List of group ids. - :param hints: filter hints which the driver should - implement if at all possible. - :param inherited: whether assignments marked as inherited should - be included. - :returns: List of project ids accessible to specified groups. - - This method should not try and expand any inherited assignments, - just report the projects that have the role for this group. The manager - method is responsible for expanding out inherited assignments. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_domain_ids_for_groups(self, group_ids, inherited=False): - """List domain ids accessible to specified groups. - - :param group_ids: List of group ids. - :param inherited: whether to return domain_ids that have inherited - assignments or not. - :returns: List of domain ids accessible to specified groups. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_role_ids_for_groups_on_project( - self, group_ids, project_id, project_domain_id, project_parents): - """List the group role ids for a specific project. - - Supports the ``OS-INHERIT`` role inheritance from the project's domain - if supported by the assignment driver. - - :param group_ids: list of group ids - :type group_ids: list - :param project_id: project identifier - :type project_id: str - :param project_domain_id: project's domain identifier - :type project_domain_id: str - :param project_parents: list of parent ids of this project - :type project_parents: list - :returns: list of role ids for the project - :rtype: list - """ - raise exception.NotImplemented() - - @abc.abstractmethod - def list_role_ids_for_groups_on_domain(self, group_ids, domain_id): - """List the group role ids for a specific domain. - - :param group_ids: list of group ids - :type group_ids: list - :param domain_id: domain identifier - :type domain_id: str - :returns: list of role ids for the project - :rtype: list - """ - raise exception.NotImplemented() - - -class AssignmentDriverV9(AssignmentDriverBase): - """New or redefined methods from V8. - - Add any new V9 abstract methods (or those with modified signatures) to - this class. - - """ - - @abc.abstractmethod - def delete_domain_assignments(self, domain_id): - """Deletes all assignments for a domain.""" - raise exception.NotImplemented() - - -class V9AssignmentWrapperForV8Driver(AssignmentDriverV9): - """Wrapper class to supported a V8 legacy driver. - - In order to support legacy drivers without having to make the manager code - driver-version aware, we wrap legacy drivers so that they look like the - latest version. For the various changes made in a new driver, here are the - actions needed in this wrapper: - - Method removed from new driver - remove the call-through method from this - class, since the manager will no longer be - calling it. - Method signature (or meaning) changed - wrap the old method in a new - signature here, and munge the input - and output parameters accordingly. - New method added to new driver - add a method to implement the new - functionality here if possible. If that is - not possible, then return NotImplemented, - since we do not guarantee to support new - functionality with legacy drivers. - - """ - - @versionutils.deprecated( - as_of=versionutils.deprecated.MITAKA, - what='keystone.assignment.AssignmentDriverV8', - in_favor_of='keystone.assignment.AssignmentDriverV9', - remove_in=+2) - def __init__(self, wrapped_driver): - self.driver = wrapped_driver - - def delete_domain_assignments(self, domain_id): - """Deletes all assignments for a domain.""" - msg = _LW('delete_domain_assignments method not found in custom ' - 'assignment driver. Domain assignments for domain (%s) to ' - 'users from other domains will not be removed. This was ' - 'added in V9 of the assignment driver.') - LOG.warning(msg, domain_id) - - def default_role_driver(self): - return self.driver.default_role_driver() - - def default_resource_driver(self): - return self.driver.default_resource_driver() - - def add_role_to_user_and_project(self, user_id, tenant_id, role_id): - self.driver.add_role_to_user_and_project(user_id, tenant_id, role_id) - - def remove_role_from_user_and_project(self, user_id, tenant_id, role_id): - self.driver.remove_role_from_user_and_project( - user_id, tenant_id, role_id) - - def create_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - self.driver.create_grant( - role_id, user_id=user_id, group_id=group_id, - domain_id=domain_id, project_id=project_id, - inherited_to_projects=inherited_to_projects) - - def list_grant_role_ids(self, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - return self.driver.list_grant_role_ids( - user_id=user_id, group_id=group_id, - domain_id=domain_id, project_id=project_id, - inherited_to_projects=inherited_to_projects) - - def check_grant_role_id(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - self.driver.check_grant_role_id( - role_id, user_id=user_id, group_id=group_id, - domain_id=domain_id, project_id=project_id, - inherited_to_projects=inherited_to_projects) - - def delete_grant(self, role_id, user_id=None, group_id=None, - domain_id=None, project_id=None, - inherited_to_projects=False): - self.driver.delete_grant( - role_id, user_id=user_id, group_id=group_id, - domain_id=domain_id, project_id=project_id, - inherited_to_projects=inherited_to_projects) - - def list_role_assignments(self, role_id=None, - user_id=None, group_ids=None, - domain_id=None, project_ids=None, - inherited_to_projects=None): - return self.driver.list_role_assignments( - role_id=role_id, - user_id=user_id, group_ids=group_ids, - domain_id=domain_id, project_ids=project_ids, - inherited_to_projects=inherited_to_projects) - - def delete_project_assignments(self, project_id): - self.driver.delete_project_assignments(project_id) - - def delete_role_assignments(self, role_id): - self.driver.delete_role_assignments(role_id) - - def delete_user_assignments(self, user_id): - self.driver.delete_user_assignments(user_id) - - def delete_group_assignments(self, group_id): - self.driver.delete_group_assignments(group_id) - - -Driver = manager.create_legacy_driver(AssignmentDriverV8) - - -@dependency.provider('role_api') -@dependency.requires('assignment_api') -class RoleManager(manager.Manager): - """Default pivot point for the Role backend.""" - - driver_namespace = 'keystone.role' - - _ROLE = 'role' - - def __init__(self): - # If there is a specific driver specified for role, then use it. - # Otherwise retrieve the driver type from the assignment driver. - role_driver = CONF.role.driver - - if role_driver is None: - assignment_manager = dependency.get_provider('assignment_api') - role_driver = assignment_manager.default_role_driver() - - super(RoleManager, self).__init__(role_driver) - - # Make sure it is a driver version we support, and if it is a legacy - # driver, then wrap it. - if isinstance(self.driver, RoleDriverV8): - self.driver = V9RoleWrapperForV8Driver(self.driver) - elif not isinstance(self.driver, RoleDriverV9): - raise exception.UnsupportedDriverVersion(driver=role_driver) - - @MEMOIZE - def get_role(self, role_id): - return self.driver.get_role(role_id) - - def create_role(self, role_id, role, initiator=None): - ret = self.driver.create_role(role_id, role) - notifications.Audit.created(self._ROLE, role_id, initiator) - if MEMOIZE.should_cache(ret): - self.get_role.set(ret, self, role_id) - return ret - - @manager.response_truncated - def list_roles(self, hints=None): - return self.driver.list_roles(hints or driver_hints.Hints()) - - def update_role(self, role_id, role, initiator=None): - original_role = self.driver.get_role(role_id) - if ('domain_id' in role and - role['domain_id'] != original_role['domain_id']): - raise exception.ValidationError( - message=_('Update of `domain_id` is not allowed.')) - - ret = self.driver.update_role(role_id, role) - notifications.Audit.updated(self._ROLE, role_id, initiator) - self.get_role.invalidate(self, role_id) - return ret - - def delete_role(self, role_id, initiator=None): - self.assignment_api.delete_tokens_for_role_assignments(role_id) - self.assignment_api.delete_role_assignments(role_id) - self.driver.delete_role(role_id) - notifications.Audit.deleted(self._ROLE, role_id, initiator) - self.get_role.invalidate(self, role_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - # TODO(ayoung): Add notification - def create_implied_role(self, prior_role_id, implied_role_id): - implied_role = self.driver.get_role(implied_role_id) - self.driver.get_role(prior_role_id) - if implied_role['name'] in CONF.assignment.prohibited_implied_role: - raise exception.InvalidImpliedRole(role_id=implied_role_id) - response = self.driver.create_implied_role( - prior_role_id, implied_role_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - return response - - def delete_implied_role(self, prior_role_id, implied_role_id): - self.driver.delete_implied_role(prior_role_id, implied_role_id) - COMPUTED_ASSIGNMENTS_REGION.invalidate() - - -# The RoleDriverBase class is the set of driver methods from earlier -# drivers that we still support, that have not been removed or modified. This -# class is then used to created the augmented V8 and V9 version abstract driver -# classes, without having to duplicate a lot of abstract method signatures. -# If you remove a method from V9, then move the abstract methods from this Base -# class to the V8 class. Do not modify any of the method signatures in the Base -# class - changes should only be made in the V8 and subsequent classes. -@six.add_metaclass(abc.ABCMeta) -class RoleDriverBase(object): - - def _get_list_limit(self): - return CONF.role.list_limit or CONF.list_limit - - @abc.abstractmethod - def create_role(self, role_id, role): - """Creates a new role. - - :raises keystone.exception.Conflict: If a duplicate role exists. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_roles(self, hints): - """List roles in the system. - - :param hints: filter hints which the driver should - implement if at all possible. - - :returns: a list of role_refs or an empty list. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_roles_from_ids(self, role_ids): - """List roles for the provided list of ids. - - :param role_ids: list of ids - - :returns: a list of role_refs. - - This method is used internally by the assignment manager to bulk read - a set of roles given their ids. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def get_role(self, role_id): - """Get a role by ID. - - :returns: role_ref - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def update_role(self, role_id, role): - """Updates an existing role. - - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - :raises keystone.exception.Conflict: If a duplicate role exists. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_role(self, role_id): - """Deletes an existing role. - - :raises keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - -class RoleDriverV8(RoleDriverBase): - """Removed or redefined methods from V8. - - Move the abstract methods of any methods removed or modified in later - versions of the driver from RoleDriverBase to here. We maintain this - so that legacy drivers, which will be a subclass of RoleDriverV8, can - still reference them. - - """ - - pass - - -class RoleDriverV9(RoleDriverBase): - """New or redefined methods from V8. - - Add any new V9 abstract methods (or those with modified signatures) to - this class. - - """ - - @abc.abstractmethod - def get_implied_role(self, prior_role_id, implied_role_id): - """Fetches a role inference rule - - :raises keystone.exception.ImpliedRoleNotFound: If the implied role - doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def create_implied_role(self, prior_role_id, implied_role_id): - """Creates a role inference rule - - :raises: keystone.exception.RoleNotFound: If the role doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def delete_implied_role(self, prior_role_id, implied_role_id): - """Deletes a role inference rule - - :raises keystone.exception.ImpliedRoleNotFound: If the implied role - doesn't exist. - - """ - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_role_inference_rules(self): - """Lists all the rules used to imply one role from another""" - raise exception.NotImplemented() # pragma: no cover - - @abc.abstractmethod - def list_implied_roles(self, prior_role_id): - """Lists roles implied from the prior role ID""" - raise exception.NotImplemented() # pragma: no cover - - -class V9RoleWrapperForV8Driver(RoleDriverV9): - """Wrapper class to supported a V8 legacy driver. - - In order to support legacy drivers without having to make the manager code - driver-version aware, we wrap legacy drivers so that they look like the - latest version. For the various changes made in a new driver, here are the - actions needed in this wrapper: - - Method removed from new driver - remove the call-through method from this - class, since the manager will no longer be - calling it. - Method signature (or meaning) changed - wrap the old method in a new - signature here, and munge the input - and output parameters accordingly. - New method added to new driver - add a method to implement the new - functionality here if possible. If that is - not possible, then return NotImplemented, - since we do not guarantee to support new - functionality with legacy drivers. - - This V8 wrapper contains the following support for newer manager code: - - - The current manager code expects a role entity to have a domain_id - attribute, with a non-None value indicating a domain specific role. V8 - drivers will only understand global roles, hence if a non-None domain_id - is passed to this wrapper, it will raise a NotImplemented exception. - If a None-valued domain_id is passed in, it will be trimmed off before - the underlying driver is called (and a None-valued domain_id attribute - is added in for any entities returned to the manager. - - """ - - @versionutils.deprecated( - as_of=versionutils.deprecated.MITAKA, - what='keystone.assignment.RoleDriverV8', - in_favor_of='keystone.assignment.RoleDriverV9', - remove_in=+2) - def __init__(self, wrapped_driver): - self.driver = wrapped_driver - - def _append_null_domain_id(self, role_or_list): - def _append_null_domain_id_to_dict(role): - if 'domain_id' not in role: - role['domain_id'] = None - return role - - if isinstance(role_or_list, list): - return [_append_null_domain_id_to_dict(x) for x in role_or_list] - else: - return _append_null_domain_id_to_dict(role_or_list) - - def _trim_and_assert_null_domain_id(self, role): - if 'domain_id' in role: - if role['domain_id'] is not None: - raise exception.NotImplemented( - _('Domain specific roles are not supported in the V8 ' - 'role driver')) - else: - new_role = role.copy() - new_role.pop('domain_id') - return new_role - else: - return role - - def create_role(self, role_id, role): - new_role = self._trim_and_assert_null_domain_id(role) - return self._append_null_domain_id( - self.driver.create_role(role_id, new_role)) - - def list_roles(self, hints): - return self._append_null_domain_id(self.driver.list_roles(hints)) - - def list_roles_from_ids(self, role_ids): - return self._append_null_domain_id( - self.driver.list_roles_from_ids(role_ids)) - - def get_role(self, role_id): - return self._append_null_domain_id(self.driver.get_role(role_id)) - - def update_role(self, role_id, role): - update_role = self._trim_and_assert_null_domain_id(role) - return self._append_null_domain_id( - self.driver.update_role(role_id, update_role)) - - def delete_role(self, role_id): - self.driver.delete_role(role_id) - - def get_implied_role(self, prior_role_id, implied_role_id): - raise exception.NotImplemented() # pragma: no cover - - def create_implied_role(self, prior_role_id, implied_role_id): - raise exception.NotImplemented() # pragma: no cover - - def delete_implied_role(self, prior_role_id, implied_role_id): - raise exception.NotImplemented() # pragma: no cover - - def list_implied_roles(self, prior_role_id): - raise exception.NotImplemented() # pragma: no cover - - def list_role_inference_rules(self): - raise exception.NotImplemented() # pragma: no cover - -RoleDriver = manager.create_legacy_driver(RoleDriverV8) |