diff options
Diffstat (limited to 'keystone-moon/keystone/assignment/controllers.py')
-rw-r--r-- | keystone-moon/keystone/assignment/controllers.py | 972 |
1 files changed, 0 insertions, 972 deletions
diff --git a/keystone-moon/keystone/assignment/controllers.py b/keystone-moon/keystone/assignment/controllers.py deleted file mode 100644 index 1b163013..00000000 --- a/keystone-moon/keystone/assignment/controllers.py +++ /dev/null @@ -1,972 +0,0 @@ -# Copyright 2013 Metacloud, Inc. -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Workflow Logic the Assignment service.""" - -import functools -import uuid - -from oslo_config import cfg -from oslo_log import log -from six.moves import urllib - -from keystone.assignment import schema -from keystone.common import controller -from keystone.common import dependency -from keystone.common import utils -from keystone.common import validation -from keystone.common import wsgi -from keystone import exception -from keystone.i18n import _ -from keystone import notifications - - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - - -@dependency.requires('assignment_api', 'identity_api', 'token_provider_api') -class TenantAssignment(controller.V2Controller): - """The V2 Project APIs that are processing assignments.""" - - @controller.v2_auth_deprecated - def get_projects_for_token(self, context, **kw): - """Get valid tenants for token based on token used to authenticate. - - Pulls the token from the context, validates it and gets the valid - tenants for the user in the token. - - Doesn't care about token scopedness. - - """ - token_ref = utils.get_token_ref(context) - - tenant_refs = ( - self.assignment_api.list_projects_for_user(token_ref.user_id)) - tenant_refs = [self.v3_to_v2_project(ref) for ref in tenant_refs - if ref['domain_id'] == CONF.identity.default_domain_id] - params = { - 'limit': context['query_string'].get('limit'), - 'marker': context['query_string'].get('marker'), - } - return self.format_project_list(tenant_refs, **params) - - @controller.v2_deprecated - def get_project_users(self, context, tenant_id, **kw): - self.assert_admin(context) - user_refs = [] - user_ids = self.assignment_api.list_user_ids_for_project(tenant_id) - for user_id in user_ids: - try: - user_ref = self.identity_api.get_user(user_id) - except exception.UserNotFound: - # Log that user is missing and continue on. - message = ("User %(user_id)s in project %(project_id)s " - "doesn't exist.") - LOG.debug(message, - {'user_id': user_id, 'project_id': tenant_id}) - else: - user_refs.append(self.v3_to_v2_user(user_ref)) - return {'users': user_refs} - - -@dependency.requires('assignment_api', 'role_api') -class Role(controller.V2Controller): - """The Role management APIs.""" - - @controller.v2_deprecated - def get_role(self, context, role_id): - self.assert_admin(context) - return {'role': self.role_api.get_role(role_id)} - - @controller.v2_deprecated - def create_role(self, context, role): - role = self._normalize_dict(role) - self.assert_admin(context) - - if 'name' not in role or not role['name']: - msg = _('Name field is required and cannot be empty') - raise exception.ValidationError(message=msg) - - if role['name'] == CONF.member_role_name: - # Use the configured member role ID when creating the configured - # member role name. This avoids the potential of creating a - # "member" role with an unexpected ID. - role_id = CONF.member_role_id - else: - role_id = uuid.uuid4().hex - - role['id'] = role_id - initiator = notifications._get_request_audit_info(context) - role_ref = self.role_api.create_role(role_id, role, initiator) - return {'role': role_ref} - - @controller.v2_deprecated - def delete_role(self, context, role_id): - self.assert_admin(context) - initiator = notifications._get_request_audit_info(context) - self.role_api.delete_role(role_id, initiator) - - @controller.v2_deprecated - def get_roles(self, context): - self.assert_admin(context) - return {'roles': self.role_api.list_roles()} - - -@dependency.requires('assignment_api', 'resource_api', 'role_api') -class RoleAssignmentV2(controller.V2Controller): - """The V2 Role APIs that are processing assignments.""" - - # COMPAT(essex-3) - @controller.v2_deprecated - def get_user_roles(self, context, user_id, tenant_id=None): - """Get the roles for a user and tenant pair. - - Since we're trying to ignore the idea of user-only roles we're - not implementing them in hopes that the idea will die off. - - """ - self.assert_admin(context) - # NOTE(davechen): Router without project id is defined, - # but we don't plan on implementing this. - if tenant_id is None: - raise exception.NotImplemented( - message=_('User roles not supported: tenant_id required')) - roles = self.assignment_api.get_roles_for_user_and_project( - user_id, tenant_id) - return {'roles': [self.role_api.get_role(x) - for x in roles]} - - @controller.v2_deprecated - def add_role_to_user(self, context, user_id, role_id, tenant_id=None): - """Add a role to a user and tenant pair. - - Since we're trying to ignore the idea of user-only roles we're - not implementing them in hopes that the idea will die off. - - """ - self.assert_admin(context) - if tenant_id is None: - raise exception.NotImplemented( - message=_('User roles not supported: tenant_id required')) - - self.assignment_api.add_role_to_user_and_project( - user_id, tenant_id, role_id) - - role_ref = self.role_api.get_role(role_id) - return {'role': role_ref} - - @controller.v2_deprecated - def remove_role_from_user(self, context, user_id, role_id, tenant_id=None): - """Remove a role from a user and tenant pair. - - Since we're trying to ignore the idea of user-only roles we're - not implementing them in hopes that the idea will die off. - - """ - self.assert_admin(context) - if tenant_id is None: - raise exception.NotImplemented( - message=_('User roles not supported: tenant_id required')) - - # This still has the weird legacy semantics that adding a role to - # a user also adds them to a tenant, so we must follow up on that - self.assignment_api.remove_role_from_user_and_project( - user_id, tenant_id, role_id) - - # COMPAT(diablo): CRUD extension - @controller.v2_deprecated - def get_role_refs(self, context, user_id): - """Ultimate hack to get around having to make role_refs first-class. - - This will basically iterate over the various roles the user has in - all tenants the user is a member of and create fake role_refs where - the id encodes the user-tenant-role information so we can look - up the appropriate data when we need to delete them. - - """ - self.assert_admin(context) - tenants = self.assignment_api.list_projects_for_user(user_id) - o = [] - for tenant in tenants: - # As a v2 call, we should limit the response to those projects in - # the default domain. - if tenant['domain_id'] != CONF.identity.default_domain_id: - continue - role_ids = self.assignment_api.get_roles_for_user_and_project( - user_id, tenant['id']) - for role_id in role_ids: - ref = {'roleId': role_id, - 'tenantId': tenant['id'], - 'userId': user_id} - ref['id'] = urllib.parse.urlencode(ref) - o.append(ref) - return {'roles': o} - - # COMPAT(diablo): CRUD extension - @controller.v2_deprecated - def create_role_ref(self, context, user_id, role): - """This is actually used for adding a user to a tenant. - - In the legacy data model adding a user to a tenant required setting - a role. - - """ - self.assert_admin(context) - # TODO(termie): for now we're ignoring the actual role - tenant_id = role.get('tenantId') - role_id = role.get('roleId') - self.assignment_api.add_role_to_user_and_project( - user_id, tenant_id, role_id) - - role_ref = self.role_api.get_role(role_id) - return {'role': role_ref} - - # COMPAT(diablo): CRUD extension - @controller.v2_deprecated - def delete_role_ref(self, context, user_id, role_ref_id): - """This is actually used for deleting a user from a tenant. - - In the legacy data model removing a user from a tenant required - deleting a role. - - To emulate this, we encode the tenant and role in the role_ref_id, - and if this happens to be the last role for the user-tenant pair, - we remove the user from the tenant. - - """ - self.assert_admin(context) - # TODO(termie): for now we're ignoring the actual role - role_ref_ref = urllib.parse.parse_qs(role_ref_id) - tenant_id = role_ref_ref.get('tenantId')[0] - role_id = role_ref_ref.get('roleId')[0] - self.assignment_api.remove_role_from_user_and_project( - user_id, tenant_id, role_id) - - -@dependency.requires('assignment_api', 'resource_api') -class ProjectAssignmentV3(controller.V3Controller): - """The V3 Project APIs that are processing assignments.""" - - collection_name = 'projects' - member_name = 'project' - - def __init__(self): - super(ProjectAssignmentV3, self).__init__() - self.get_member_from_driver = self.resource_api.get_project - - @controller.filterprotected('domain_id', 'enabled', 'name') - def list_user_projects(self, context, filters, user_id): - hints = ProjectAssignmentV3.build_driver_hints(context, filters) - refs = self.assignment_api.list_projects_for_user(user_id, - hints=hints) - return ProjectAssignmentV3.wrap_collection(context, refs, hints=hints) - - -@dependency.requires('role_api') -class RoleV3(controller.V3Controller): - """The V3 Role CRUD APIs. - - To ease complexity (and hence risk) in writing the policy rules for the - role APIs, we create separate policy actions for roles that are domain - specific, as opposed to those that are global. In order to achieve this - each of the role API methods has a wrapper method that checks to see if the - role is global or domain specific. - - NOTE (henry-nash): If this separate global vs scoped policy action pattern - becomes repeated for other entities, we should consider encapsulating this - into a specialized router class. - - """ - - collection_name = 'roles' - member_name = 'role' - - def __init__(self): - super(RoleV3, self).__init__() - self.get_member_from_driver = self.role_api.get_role - - def _is_domain_role(self, role): - return role.get('domain_id') is not None - - def _is_domain_role_target(self, role_id): - try: - role = self.role_api.get_role(role_id) - except exception.RoleNotFound: - # We hide this error since we have not yet carried out a policy - # check - and it maybe that the caller isn't authorized to make - # this call. If so, we want that error to be raised instead. - return False - return self._is_domain_role(role) - - def create_role_wrapper(self, context, role): - if self._is_domain_role(role): - return self.create_domain_role(context, role=role) - else: - return self.create_role(context, role=role) - - @controller.protected() - @validation.validated(schema.role_create, 'role') - def create_role(self, context, role): - return self._create_role(context, role) - - @controller.protected() - @validation.validated(schema.role_create, 'role') - def create_domain_role(self, context, role): - return self._create_role(context, role) - - def list_roles_wrapper(self, context): - # If there is no domain_id filter defined, then we only want to return - # global roles, so we set the domain_id filter to None. - params = context['query_string'] - if 'domain_id' not in params: - context['query_string']['domain_id'] = None - - if context['query_string']['domain_id'] is not None: - return self.list_domain_roles(context) - else: - return self.list_roles(context) - - @controller.filterprotected('name', 'domain_id') - def list_roles(self, context, filters): - return self._list_roles(context, filters) - - @controller.filterprotected('name', 'domain_id') - def list_domain_roles(self, context, filters): - return self._list_roles(context, filters) - - def get_role_wrapper(self, context, role_id): - if self._is_domain_role_target(role_id): - return self.get_domain_role(context, role_id=role_id) - else: - return self.get_role(context, role_id=role_id) - - @controller.protected() - def get_role(self, context, role_id): - return self._get_role(context, role_id) - - @controller.protected() - def get_domain_role(self, context, role_id): - return self._get_role(context, role_id) - - def update_role_wrapper(self, context, role_id, role): - # Since we don't allow you change whether a role is global or domain - # specific, we can ignore the new update attributes and just look at - # the existing role. - if self._is_domain_role_target(role_id): - return self.update_domain_role( - context, role_id=role_id, role=role) - else: - return self.update_role(context, role_id=role_id, role=role) - - @controller.protected() - @validation.validated(schema.role_update, 'role') - def update_role(self, context, role_id, role): - return self._update_role(context, role_id, role) - - @controller.protected() - @validation.validated(schema.role_update, 'role') - def update_domain_role(self, context, role_id, role): - return self._update_role(context, role_id, role) - - def delete_role_wrapper(self, context, role_id): - if self._is_domain_role_target(role_id): - return self.delete_domain_role(context, role_id=role_id) - else: - return self.delete_role(context, role_id=role_id) - - @controller.protected() - def delete_role(self, context, role_id): - return self._delete_role(context, role_id) - - @controller.protected() - def delete_domain_role(self, context, role_id): - return self._delete_role(context, role_id) - - def _create_role(self, context, role): - if role['name'] == CONF.member_role_name: - # Use the configured member role ID when creating the configured - # member role name. This avoids the potential of creating a - # "member" role with an unexpected ID. - role['id'] = CONF.member_role_id - else: - role = self._assign_unique_id(role) - - ref = self._normalize_dict(role) - - initiator = notifications._get_request_audit_info(context) - ref = self.role_api.create_role(ref['id'], ref, initiator) - return RoleV3.wrap_member(context, ref) - - def _list_roles(self, context, filters): - hints = RoleV3.build_driver_hints(context, filters) - refs = self.role_api.list_roles( - hints=hints) - return RoleV3.wrap_collection(context, refs, hints=hints) - - def _get_role(self, context, role_id): - ref = self.role_api.get_role(role_id) - return RoleV3.wrap_member(context, ref) - - def _update_role(self, context, role_id, role): - self._require_matching_id(role_id, role) - initiator = notifications._get_request_audit_info(context) - ref = self.role_api.update_role(role_id, role, initiator) - return RoleV3.wrap_member(context, ref) - - def _delete_role(self, context, role_id): - initiator = notifications._get_request_audit_info(context) - self.role_api.delete_role(role_id, initiator) - - -@dependency.requires('role_api') -class ImpliedRolesV3(controller.V3Controller): - """The V3 ImpliedRoles CRD APIs. There is no Update.""" - - def _prior_role_stanza(self, endpoint, prior_role_id, prior_role_name): - return { - "id": prior_role_id, - "links": { - "self": endpoint + "/v3/roles/" + prior_role_id - }, - "name": prior_role_name - } - - def _implied_role_stanza(self, endpoint, implied_role): - implied_id = implied_role['id'] - implied_response = { - "id": implied_id, - "links": { - "self": endpoint + "/v3/roles/" + implied_id - }, - "name": implied_role['name'] - } - return implied_response - - def _populate_prior_role_response(self, endpoint, prior_id): - prior_role = self.role_api.get_role(prior_id) - response = { - "role_inference": { - "prior_role": self._prior_role_stanza( - endpoint, prior_id, prior_role['name']) - } - } - return response - - def _populate_implied_roles_response(self, endpoint, - prior_id, implied_ids): - response = self._populate_prior_role_response(endpoint, prior_id) - response["role_inference"]['implies'] = [] - for implied_id in implied_ids: - implied_role = self.role_api.get_role(implied_id) - implied_response = self._implied_role_stanza( - endpoint, implied_role) - response["role_inference"]['implies'].append(implied_response) - return response - - def _populate_implied_role_response(self, endpoint, prior_id, implied_id): - response = self._populate_prior_role_response(endpoint, prior_id) - implied_role = self.role_api.get_role(implied_id) - stanza = self._implied_role_stanza(endpoint, implied_role) - response["role_inference"]['implies'] = stanza - return response - - @controller.protected() - def get_implied_role(self, context, prior_role_id, implied_role_id): - ref = self.role_api.get_implied_role(prior_role_id, implied_role_id) - - prior_id = ref['prior_role_id'] - implied_id = ref['implied_role_id'] - endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url( - context, 'public') - response = self._populate_implied_role_response( - endpoint, prior_id, implied_id) - return response - - @controller.protected() - def check_implied_role(self, context, prior_role_id, implied_role_id): - self.role_api.get_implied_role(prior_role_id, implied_role_id) - - @controller.protected() - def create_implied_role(self, context, prior_role_id, implied_role_id): - self.role_api.create_implied_role(prior_role_id, implied_role_id) - return wsgi.render_response( - self.get_implied_role(context, prior_role_id, implied_role_id), - status=(201, 'Created')) - - @controller.protected() - def delete_implied_role(self, context, prior_role_id, implied_role_id): - self.role_api.delete_implied_role(prior_role_id, implied_role_id) - - @controller.protected() - def list_implied_roles(self, context, prior_role_id): - ref = self.role_api.list_implied_roles(prior_role_id) - implied_ids = [r['implied_role_id'] for r in ref] - endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url( - context, 'public') - - results = self._populate_implied_roles_response( - endpoint, prior_role_id, implied_ids) - - return results - - @controller.protected() - def list_role_inference_rules(self, context): - refs = self.role_api.list_role_inference_rules() - role_dict = {role_ref['id']: role_ref - for role_ref in self.role_api.list_roles()} - - rules = dict() - endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url( - context, 'public') - - for ref in refs: - implied_role_id = ref['implied_role_id'] - prior_role_id = ref['prior_role_id'] - implied = rules.get(prior_role_id, []) - implied.append(self._implied_role_stanza( - endpoint, role_dict[implied_role_id])) - rules[prior_role_id] = implied - - inferences = [] - for prior_id, implied in rules.items(): - prior_response = self._prior_role_stanza( - endpoint, prior_id, role_dict[prior_id]['name']) - inferences.append({'prior_role': prior_response, - 'implies': implied}) - results = {'role_inferences': inferences} - return results - - -@dependency.requires('assignment_api', 'identity_api', 'resource_api', - 'role_api') -class GrantAssignmentV3(controller.V3Controller): - """The V3 Grant Assignment APIs.""" - - collection_name = 'roles' - member_name = 'role' - - def __init__(self): - super(GrantAssignmentV3, self).__init__() - self.get_member_from_driver = self.role_api.get_role - - def _require_domain_xor_project(self, domain_id, project_id): - if domain_id and project_id: - msg = _('Specify a domain or project, not both') - raise exception.ValidationError(msg) - if not domain_id and not project_id: - msg = _('Specify one of domain or project') - raise exception.ValidationError(msg) - - def _require_user_xor_group(self, user_id, group_id): - if user_id and group_id: - msg = _('Specify a user or group, not both') - raise exception.ValidationError(msg) - if not user_id and not group_id: - msg = _('Specify one of user or group') - raise exception.ValidationError(msg) - - def _check_if_inherited(self, context): - return (CONF.os_inherit.enabled and - context['path'].startswith('/OS-INHERIT') and - context['path'].endswith('/inherited_to_projects')) - - def _check_grant_protection(self, context, protection, role_id=None, - user_id=None, group_id=None, - domain_id=None, project_id=None, - allow_no_user=False): - """Check protection for role grant APIs. - - The policy rule might want to inspect attributes of any of the entities - involved in the grant. So we get these and pass them to the - check_protection() handler in the controller. - - """ - ref = {} - if role_id: - ref['role'] = self.role_api.get_role(role_id) - if user_id: - try: - ref['user'] = self.identity_api.get_user(user_id) - except exception.UserNotFound: - if not allow_no_user: - raise - else: - ref['group'] = self.identity_api.get_group(group_id) - - if domain_id: - ref['domain'] = self.resource_api.get_domain(domain_id) - else: - ref['project'] = self.resource_api.get_project(project_id) - - self.check_protection(context, protection, ref) - - @controller.protected(callback=_check_grant_protection) - def create_grant(self, context, role_id, user_id=None, - group_id=None, domain_id=None, project_id=None): - """Grants a role to a user or group on either a domain or project.""" - self._require_domain_xor_project(domain_id, project_id) - self._require_user_xor_group(user_id, group_id) - - self.assignment_api.create_grant( - role_id, user_id, group_id, domain_id, project_id, - self._check_if_inherited(context), context) - - @controller.protected(callback=_check_grant_protection) - def list_grants(self, context, user_id=None, - group_id=None, domain_id=None, project_id=None): - """Lists roles granted to user/group on either a domain or project.""" - self._require_domain_xor_project(domain_id, project_id) - self._require_user_xor_group(user_id, group_id) - - refs = self.assignment_api.list_grants( - user_id, group_id, domain_id, project_id, - self._check_if_inherited(context)) - return GrantAssignmentV3.wrap_collection(context, refs) - - @controller.protected(callback=_check_grant_protection) - def check_grant(self, context, role_id, user_id=None, - group_id=None, domain_id=None, project_id=None): - """Checks if a role has been granted on either a domain or project.""" - self._require_domain_xor_project(domain_id, project_id) - self._require_user_xor_group(user_id, group_id) - - self.assignment_api.get_grant( - role_id, user_id, group_id, domain_id, project_id, - self._check_if_inherited(context)) - - # NOTE(lbragstad): This will allow users to clean up role assignments - # from the backend in the event the user was removed prior to the role - # assignment being removed. - @controller.protected(callback=functools.partial( - _check_grant_protection, allow_no_user=True)) - def revoke_grant(self, context, role_id, user_id=None, - group_id=None, domain_id=None, project_id=None): - """Revokes a role from user/group on either a domain or project.""" - self._require_domain_xor_project(domain_id, project_id) - self._require_user_xor_group(user_id, group_id) - - self.assignment_api.delete_grant( - role_id, user_id, group_id, domain_id, project_id, - self._check_if_inherited(context), context) - - -@dependency.requires('assignment_api', 'identity_api', 'resource_api') -class RoleAssignmentV3(controller.V3Controller): - """The V3 Role Assignment APIs, really just list_role_assignment().""" - - # TODO(henry-nash): The current implementation does not provide a full - # first class entity for role-assignment. There is no role_assignment_id - # and only the list_role_assignment call is supported. Further, since it - # is not a first class entity, the links for the individual entities - # reference the individual role grant APIs. - - collection_name = 'role_assignments' - member_name = 'role_assignment' - - @classmethod - def wrap_member(cls, context, ref): - # NOTE(henry-nash): Since we are not yet a true collection, we override - # the wrapper as have already included the links in the entities - pass - - def _format_entity(self, context, entity): - """Format an assignment entity for API response. - - The driver layer returns entities as dicts containing the ids of the - actor (e.g. user or group), target (e.g. domain or project) and role. - If it is an inherited role, then this is also indicated. Examples: - - For a non-inherited expanded assignment from group membership: - {'user_id': user_id, - 'project_id': project_id, - 'role_id': role_id, - 'indirect': {'group_id': group_id}} - - or, for a project inherited role: - - {'user_id': user_id, - 'project_id': project_id, - 'role_id': role_id, - 'indirect': {'project_id': parent_id}} - - or, for a role that was implied by a prior role: - - {'user_id': user_id, - 'project_id': project_id, - 'role_id': role_id, - 'indirect': {'role_id': prior role_id}} - - It is possible to deduce if a role assignment came from group - membership if it has both 'user_id' in the main body of the dict and - 'group_id' in the 'indirect' subdict, as well as it is possible to - deduce if it has come from inheritance if it contains both a - 'project_id' in the main body of the dict and 'parent_id' in the - 'indirect' subdict. - - This function maps this into the format to be returned via the API, - e.g. for the second example above: - - { - 'user': { - {'id': user_id} - }, - 'scope': { - 'project': { - {'id': project_id} - }, - 'OS-INHERIT:inherited_to': 'projects' - }, - 'role': { - {'id': role_id} - }, - 'links': { - 'assignment': '/OS-INHERIT/projects/parent_id/users/user_id/' - 'roles/role_id/inherited_to_projects' - } - } - - """ - formatted_entity = {'links': {}} - inherited_assignment = entity.get('inherited_to_projects') - - if 'project_id' in entity: - if 'project_name' in entity: - formatted_entity['scope'] = {'project': { - 'id': entity['project_id'], - 'name': entity['project_name'], - 'domain': {'id': entity['project_domain_id'], - 'name': entity['project_domain_name']}}} - else: - formatted_entity['scope'] = { - 'project': {'id': entity['project_id']}} - - if 'domain_id' in entity.get('indirect', {}): - inherited_assignment = True - formatted_link = ('/domains/%s' % - entity['indirect']['domain_id']) - elif 'project_id' in entity.get('indirect', {}): - inherited_assignment = True - formatted_link = ('/projects/%s' % - entity['indirect']['project_id']) - else: - formatted_link = '/projects/%s' % entity['project_id'] - elif 'domain_id' in entity: - if 'domain_name' in entity: - formatted_entity['scope'] = { - 'domain': {'id': entity['domain_id'], - 'name': entity['domain_name']}} - else: - formatted_entity['scope'] = { - 'domain': {'id': entity['domain_id']}} - formatted_link = '/domains/%s' % entity['domain_id'] - - if 'user_id' in entity: - if 'user_name' in entity: - formatted_entity['user'] = { - 'id': entity['user_id'], - 'name': entity['user_name'], - 'domain': {'id': entity['user_domain_id'], - 'name': entity['user_domain_name']}} - else: - formatted_entity['user'] = {'id': entity['user_id']} - if 'group_id' in entity.get('indirect', {}): - membership_url = ( - self.base_url(context, '/groups/%s/users/%s' % ( - entity['indirect']['group_id'], entity['user_id']))) - formatted_entity['links']['membership'] = membership_url - formatted_link += '/groups/%s' % entity['indirect']['group_id'] - else: - formatted_link += '/users/%s' % entity['user_id'] - elif 'group_id' in entity: - if 'group_name' in entity: - formatted_entity['group'] = { - 'id': entity['group_id'], - 'name': entity['group_name'], - 'domain': {'id': entity['group_domain_id'], - 'name': entity['group_domain_name']}} - else: - formatted_entity['group'] = {'id': entity['group_id']} - formatted_link += '/groups/%s' % entity['group_id'] - - if 'role_name' in entity: - formatted_entity['role'] = {'id': entity['role_id'], - 'name': entity['role_name']} - else: - formatted_entity['role'] = {'id': entity['role_id']} - prior_role_link = '' - if 'role_id' in entity.get('indirect', {}): - formatted_link += '/roles/%s' % entity['indirect']['role_id'] - prior_role_link = ( - '/prior_role/%(prior)s/implies/%(implied)s' % { - 'prior': entity['role_id'], - 'implied': entity['indirect']['role_id'] - }) - else: - formatted_link += '/roles/%s' % entity['role_id'] - - if inherited_assignment: - formatted_entity['scope']['OS-INHERIT:inherited_to'] = ( - 'projects') - formatted_link = ('/OS-INHERIT%s/inherited_to_projects' % - formatted_link) - - formatted_entity['links']['assignment'] = self.base_url(context, - formatted_link) - if prior_role_link: - formatted_entity['links']['prior_role'] = ( - self.base_url(context, prior_role_link)) - - return formatted_entity - - def _assert_effective_filters(self, inherited, group, domain): - """Assert that useless filter combinations are avoided. - - In effective mode, the following filter combinations are useless, since - they would always return an empty list of role assignments: - - group id, since no group assignment is returned in effective mode; - - domain id and inherited, since no domain inherited assignment is - returned in effective mode. - - """ - if group: - msg = _('Combining effective and group filter will always ' - 'result in an empty list.') - raise exception.ValidationError(msg) - - if inherited and domain: - msg = _('Combining effective, domain and inherited filters will ' - 'always result in an empty list.') - raise exception.ValidationError(msg) - - def _assert_domain_nand_project(self, domain_id, project_id): - if domain_id and project_id: - msg = _('Specify a domain or project, not both') - raise exception.ValidationError(msg) - - def _assert_user_nand_group(self, user_id, group_id): - if user_id and group_id: - msg = _('Specify a user or group, not both') - raise exception.ValidationError(msg) - - def _list_role_assignments(self, context, filters, include_subtree=False): - """List role assignments to user and groups on domains and projects. - - Return a list of all existing role assignments in the system, filtered - by assignments attributes, if provided. - - If effective option is used and OS-INHERIT extension is enabled, the - following functions will be applied: - 1) For any group role assignment on a target, replace it by a set of - role assignments containing one for each user of that group on that - target; - 2) For any inherited role assignment for an actor on a target, replace - it by a set of role assignments for that actor on every project under - that target. - - It means that, if effective mode is used, no group or domain inherited - assignments will be present in the resultant list. Thus, combining - effective with them is invalid. - - As a role assignment contains only one actor and one target, providing - both user and group ids or domain and project ids is invalid as well. - - """ - params = context['query_string'] - effective = 'effective' in params and ( - self.query_filter_is_true(params['effective'])) - include_names = ('include_names' in params and - self.query_filter_is_true(params['include_names'])) - - if 'scope.OS-INHERIT:inherited_to' in params: - inherited = ( - params['scope.OS-INHERIT:inherited_to'] == 'projects') - else: - # None means querying both inherited and direct assignments - inherited = None - - self._assert_domain_nand_project(params.get('scope.domain.id'), - params.get('scope.project.id')) - self._assert_user_nand_group(params.get('user.id'), - params.get('group.id')) - - if effective: - self._assert_effective_filters(inherited=inherited, - group=params.get('group.id'), - domain=params.get( - 'scope.domain.id')) - - refs = self.assignment_api.list_role_assignments( - role_id=params.get('role.id'), - user_id=params.get('user.id'), - group_id=params.get('group.id'), - domain_id=params.get('scope.domain.id'), - project_id=params.get('scope.project.id'), - include_subtree=include_subtree, - inherited=inherited, effective=effective, - include_names=include_names) - - formatted_refs = [self._format_entity(context, ref) for ref in refs] - - return self.wrap_collection(context, formatted_refs) - - @controller.filterprotected('group.id', 'role.id', - 'scope.domain.id', 'scope.project.id', - 'scope.OS-INHERIT:inherited_to', 'user.id') - def list_role_assignments(self, context, filters): - return self._list_role_assignments(context, filters) - - def _check_list_tree_protection(self, context, protection_info): - """Check protection for list assignment for tree API. - - The policy rule might want to inspect the domain of any project filter - so if one is defined, then load the project ref and pass it to the - check protection method. - - """ - ref = {} - for filter, value in protection_info['filter_attr'].items(): - if filter == 'scope.project.id' and value: - ref['project'] = self.resource_api.get_project(value) - - self.check_protection(context, protection_info, ref) - - @controller.filterprotected('group.id', 'role.id', - 'scope.domain.id', 'scope.project.id', - 'scope.OS-INHERIT:inherited_to', 'user.id', - callback=_check_list_tree_protection) - def list_role_assignments_for_tree(self, context, filters): - if not context['query_string'].get('scope.project.id'): - msg = _('scope.project.id must be specified if include_subtree ' - 'is also specified') - raise exception.ValidationError(message=msg) - return self._list_role_assignments(context, filters, - include_subtree=True) - - def list_role_assignments_wrapper(self, context): - """Main entry point from router for list role assignments. - - Since we want different policy file rules to be applicable based on - whether there the include_subtree query parameter is part of the API - call, this method checks for this and then calls the appropriate - protected entry point. - - """ - params = context['query_string'] - if 'include_subtree' in params and ( - self.query_filter_is_true(params['include_subtree'])): - return self.list_role_assignments_for_tree(context) - else: - return self.list_role_assignments(context) |