aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/assignment/controllers.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/assignment/controllers.py')
-rw-r--r--keystone-moon/keystone/assignment/controllers.py972
1 files changed, 0 insertions, 972 deletions
diff --git a/keystone-moon/keystone/assignment/controllers.py b/keystone-moon/keystone/assignment/controllers.py
deleted file mode 100644
index 1b163013..00000000
--- a/keystone-moon/keystone/assignment/controllers.py
+++ /dev/null
@@ -1,972 +0,0 @@
-# Copyright 2013 Metacloud, Inc.
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Workflow Logic the Assignment service."""
-
-import functools
-import uuid
-
-from oslo_config import cfg
-from oslo_log import log
-from six.moves import urllib
-
-from keystone.assignment import schema
-from keystone.common import controller
-from keystone.common import dependency
-from keystone.common import utils
-from keystone.common import validation
-from keystone.common import wsgi
-from keystone import exception
-from keystone.i18n import _
-from keystone import notifications
-
-
-CONF = cfg.CONF
-LOG = log.getLogger(__name__)
-
-
-@dependency.requires('assignment_api', 'identity_api', 'token_provider_api')
-class TenantAssignment(controller.V2Controller):
- """The V2 Project APIs that are processing assignments."""
-
- @controller.v2_auth_deprecated
- def get_projects_for_token(self, context, **kw):
- """Get valid tenants for token based on token used to authenticate.
-
- Pulls the token from the context, validates it and gets the valid
- tenants for the user in the token.
-
- Doesn't care about token scopedness.
-
- """
- token_ref = utils.get_token_ref(context)
-
- tenant_refs = (
- self.assignment_api.list_projects_for_user(token_ref.user_id))
- tenant_refs = [self.v3_to_v2_project(ref) for ref in tenant_refs
- if ref['domain_id'] == CONF.identity.default_domain_id]
- params = {
- 'limit': context['query_string'].get('limit'),
- 'marker': context['query_string'].get('marker'),
- }
- return self.format_project_list(tenant_refs, **params)
-
- @controller.v2_deprecated
- def get_project_users(self, context, tenant_id, **kw):
- self.assert_admin(context)
- user_refs = []
- user_ids = self.assignment_api.list_user_ids_for_project(tenant_id)
- for user_id in user_ids:
- try:
- user_ref = self.identity_api.get_user(user_id)
- except exception.UserNotFound:
- # Log that user is missing and continue on.
- message = ("User %(user_id)s in project %(project_id)s "
- "doesn't exist.")
- LOG.debug(message,
- {'user_id': user_id, 'project_id': tenant_id})
- else:
- user_refs.append(self.v3_to_v2_user(user_ref))
- return {'users': user_refs}
-
-
-@dependency.requires('assignment_api', 'role_api')
-class Role(controller.V2Controller):
- """The Role management APIs."""
-
- @controller.v2_deprecated
- def get_role(self, context, role_id):
- self.assert_admin(context)
- return {'role': self.role_api.get_role(role_id)}
-
- @controller.v2_deprecated
- def create_role(self, context, role):
- role = self._normalize_dict(role)
- self.assert_admin(context)
-
- if 'name' not in role or not role['name']:
- msg = _('Name field is required and cannot be empty')
- raise exception.ValidationError(message=msg)
-
- if role['name'] == CONF.member_role_name:
- # Use the configured member role ID when creating the configured
- # member role name. This avoids the potential of creating a
- # "member" role with an unexpected ID.
- role_id = CONF.member_role_id
- else:
- role_id = uuid.uuid4().hex
-
- role['id'] = role_id
- initiator = notifications._get_request_audit_info(context)
- role_ref = self.role_api.create_role(role_id, role, initiator)
- return {'role': role_ref}
-
- @controller.v2_deprecated
- def delete_role(self, context, role_id):
- self.assert_admin(context)
- initiator = notifications._get_request_audit_info(context)
- self.role_api.delete_role(role_id, initiator)
-
- @controller.v2_deprecated
- def get_roles(self, context):
- self.assert_admin(context)
- return {'roles': self.role_api.list_roles()}
-
-
-@dependency.requires('assignment_api', 'resource_api', 'role_api')
-class RoleAssignmentV2(controller.V2Controller):
- """The V2 Role APIs that are processing assignments."""
-
- # COMPAT(essex-3)
- @controller.v2_deprecated
- def get_user_roles(self, context, user_id, tenant_id=None):
- """Get the roles for a user and tenant pair.
-
- Since we're trying to ignore the idea of user-only roles we're
- not implementing them in hopes that the idea will die off.
-
- """
- self.assert_admin(context)
- # NOTE(davechen): Router without project id is defined,
- # but we don't plan on implementing this.
- if tenant_id is None:
- raise exception.NotImplemented(
- message=_('User roles not supported: tenant_id required'))
- roles = self.assignment_api.get_roles_for_user_and_project(
- user_id, tenant_id)
- return {'roles': [self.role_api.get_role(x)
- for x in roles]}
-
- @controller.v2_deprecated
- def add_role_to_user(self, context, user_id, role_id, tenant_id=None):
- """Add a role to a user and tenant pair.
-
- Since we're trying to ignore the idea of user-only roles we're
- not implementing them in hopes that the idea will die off.
-
- """
- self.assert_admin(context)
- if tenant_id is None:
- raise exception.NotImplemented(
- message=_('User roles not supported: tenant_id required'))
-
- self.assignment_api.add_role_to_user_and_project(
- user_id, tenant_id, role_id)
-
- role_ref = self.role_api.get_role(role_id)
- return {'role': role_ref}
-
- @controller.v2_deprecated
- def remove_role_from_user(self, context, user_id, role_id, tenant_id=None):
- """Remove a role from a user and tenant pair.
-
- Since we're trying to ignore the idea of user-only roles we're
- not implementing them in hopes that the idea will die off.
-
- """
- self.assert_admin(context)
- if tenant_id is None:
- raise exception.NotImplemented(
- message=_('User roles not supported: tenant_id required'))
-
- # This still has the weird legacy semantics that adding a role to
- # a user also adds them to a tenant, so we must follow up on that
- self.assignment_api.remove_role_from_user_and_project(
- user_id, tenant_id, role_id)
-
- # COMPAT(diablo): CRUD extension
- @controller.v2_deprecated
- def get_role_refs(self, context, user_id):
- """Ultimate hack to get around having to make role_refs first-class.
-
- This will basically iterate over the various roles the user has in
- all tenants the user is a member of and create fake role_refs where
- the id encodes the user-tenant-role information so we can look
- up the appropriate data when we need to delete them.
-
- """
- self.assert_admin(context)
- tenants = self.assignment_api.list_projects_for_user(user_id)
- o = []
- for tenant in tenants:
- # As a v2 call, we should limit the response to those projects in
- # the default domain.
- if tenant['domain_id'] != CONF.identity.default_domain_id:
- continue
- role_ids = self.assignment_api.get_roles_for_user_and_project(
- user_id, tenant['id'])
- for role_id in role_ids:
- ref = {'roleId': role_id,
- 'tenantId': tenant['id'],
- 'userId': user_id}
- ref['id'] = urllib.parse.urlencode(ref)
- o.append(ref)
- return {'roles': o}
-
- # COMPAT(diablo): CRUD extension
- @controller.v2_deprecated
- def create_role_ref(self, context, user_id, role):
- """This is actually used for adding a user to a tenant.
-
- In the legacy data model adding a user to a tenant required setting
- a role.
-
- """
- self.assert_admin(context)
- # TODO(termie): for now we're ignoring the actual role
- tenant_id = role.get('tenantId')
- role_id = role.get('roleId')
- self.assignment_api.add_role_to_user_and_project(
- user_id, tenant_id, role_id)
-
- role_ref = self.role_api.get_role(role_id)
- return {'role': role_ref}
-
- # COMPAT(diablo): CRUD extension
- @controller.v2_deprecated
- def delete_role_ref(self, context, user_id, role_ref_id):
- """This is actually used for deleting a user from a tenant.
-
- In the legacy data model removing a user from a tenant required
- deleting a role.
-
- To emulate this, we encode the tenant and role in the role_ref_id,
- and if this happens to be the last role for the user-tenant pair,
- we remove the user from the tenant.
-
- """
- self.assert_admin(context)
- # TODO(termie): for now we're ignoring the actual role
- role_ref_ref = urllib.parse.parse_qs(role_ref_id)
- tenant_id = role_ref_ref.get('tenantId')[0]
- role_id = role_ref_ref.get('roleId')[0]
- self.assignment_api.remove_role_from_user_and_project(
- user_id, tenant_id, role_id)
-
-
-@dependency.requires('assignment_api', 'resource_api')
-class ProjectAssignmentV3(controller.V3Controller):
- """The V3 Project APIs that are processing assignments."""
-
- collection_name = 'projects'
- member_name = 'project'
-
- def __init__(self):
- super(ProjectAssignmentV3, self).__init__()
- self.get_member_from_driver = self.resource_api.get_project
-
- @controller.filterprotected('domain_id', 'enabled', 'name')
- def list_user_projects(self, context, filters, user_id):
- hints = ProjectAssignmentV3.build_driver_hints(context, filters)
- refs = self.assignment_api.list_projects_for_user(user_id,
- hints=hints)
- return ProjectAssignmentV3.wrap_collection(context, refs, hints=hints)
-
-
-@dependency.requires('role_api')
-class RoleV3(controller.V3Controller):
- """The V3 Role CRUD APIs.
-
- To ease complexity (and hence risk) in writing the policy rules for the
- role APIs, we create separate policy actions for roles that are domain
- specific, as opposed to those that are global. In order to achieve this
- each of the role API methods has a wrapper method that checks to see if the
- role is global or domain specific.
-
- NOTE (henry-nash): If this separate global vs scoped policy action pattern
- becomes repeated for other entities, we should consider encapsulating this
- into a specialized router class.
-
- """
-
- collection_name = 'roles'
- member_name = 'role'
-
- def __init__(self):
- super(RoleV3, self).__init__()
- self.get_member_from_driver = self.role_api.get_role
-
- def _is_domain_role(self, role):
- return role.get('domain_id') is not None
-
- def _is_domain_role_target(self, role_id):
- try:
- role = self.role_api.get_role(role_id)
- except exception.RoleNotFound:
- # We hide this error since we have not yet carried out a policy
- # check - and it maybe that the caller isn't authorized to make
- # this call. If so, we want that error to be raised instead.
- return False
- return self._is_domain_role(role)
-
- def create_role_wrapper(self, context, role):
- if self._is_domain_role(role):
- return self.create_domain_role(context, role=role)
- else:
- return self.create_role(context, role=role)
-
- @controller.protected()
- @validation.validated(schema.role_create, 'role')
- def create_role(self, context, role):
- return self._create_role(context, role)
-
- @controller.protected()
- @validation.validated(schema.role_create, 'role')
- def create_domain_role(self, context, role):
- return self._create_role(context, role)
-
- def list_roles_wrapper(self, context):
- # If there is no domain_id filter defined, then we only want to return
- # global roles, so we set the domain_id filter to None.
- params = context['query_string']
- if 'domain_id' not in params:
- context['query_string']['domain_id'] = None
-
- if context['query_string']['domain_id'] is not None:
- return self.list_domain_roles(context)
- else:
- return self.list_roles(context)
-
- @controller.filterprotected('name', 'domain_id')
- def list_roles(self, context, filters):
- return self._list_roles(context, filters)
-
- @controller.filterprotected('name', 'domain_id')
- def list_domain_roles(self, context, filters):
- return self._list_roles(context, filters)
-
- def get_role_wrapper(self, context, role_id):
- if self._is_domain_role_target(role_id):
- return self.get_domain_role(context, role_id=role_id)
- else:
- return self.get_role(context, role_id=role_id)
-
- @controller.protected()
- def get_role(self, context, role_id):
- return self._get_role(context, role_id)
-
- @controller.protected()
- def get_domain_role(self, context, role_id):
- return self._get_role(context, role_id)
-
- def update_role_wrapper(self, context, role_id, role):
- # Since we don't allow you change whether a role is global or domain
- # specific, we can ignore the new update attributes and just look at
- # the existing role.
- if self._is_domain_role_target(role_id):
- return self.update_domain_role(
- context, role_id=role_id, role=role)
- else:
- return self.update_role(context, role_id=role_id, role=role)
-
- @controller.protected()
- @validation.validated(schema.role_update, 'role')
- def update_role(self, context, role_id, role):
- return self._update_role(context, role_id, role)
-
- @controller.protected()
- @validation.validated(schema.role_update, 'role')
- def update_domain_role(self, context, role_id, role):
- return self._update_role(context, role_id, role)
-
- def delete_role_wrapper(self, context, role_id):
- if self._is_domain_role_target(role_id):
- return self.delete_domain_role(context, role_id=role_id)
- else:
- return self.delete_role(context, role_id=role_id)
-
- @controller.protected()
- def delete_role(self, context, role_id):
- return self._delete_role(context, role_id)
-
- @controller.protected()
- def delete_domain_role(self, context, role_id):
- return self._delete_role(context, role_id)
-
- def _create_role(self, context, role):
- if role['name'] == CONF.member_role_name:
- # Use the configured member role ID when creating the configured
- # member role name. This avoids the potential of creating a
- # "member" role with an unexpected ID.
- role['id'] = CONF.member_role_id
- else:
- role = self._assign_unique_id(role)
-
- ref = self._normalize_dict(role)
-
- initiator = notifications._get_request_audit_info(context)
- ref = self.role_api.create_role(ref['id'], ref, initiator)
- return RoleV3.wrap_member(context, ref)
-
- def _list_roles(self, context, filters):
- hints = RoleV3.build_driver_hints(context, filters)
- refs = self.role_api.list_roles(
- hints=hints)
- return RoleV3.wrap_collection(context, refs, hints=hints)
-
- def _get_role(self, context, role_id):
- ref = self.role_api.get_role(role_id)
- return RoleV3.wrap_member(context, ref)
-
- def _update_role(self, context, role_id, role):
- self._require_matching_id(role_id, role)
- initiator = notifications._get_request_audit_info(context)
- ref = self.role_api.update_role(role_id, role, initiator)
- return RoleV3.wrap_member(context, ref)
-
- def _delete_role(self, context, role_id):
- initiator = notifications._get_request_audit_info(context)
- self.role_api.delete_role(role_id, initiator)
-
-
-@dependency.requires('role_api')
-class ImpliedRolesV3(controller.V3Controller):
- """The V3 ImpliedRoles CRD APIs. There is no Update."""
-
- def _prior_role_stanza(self, endpoint, prior_role_id, prior_role_name):
- return {
- "id": prior_role_id,
- "links": {
- "self": endpoint + "/v3/roles/" + prior_role_id
- },
- "name": prior_role_name
- }
-
- def _implied_role_stanza(self, endpoint, implied_role):
- implied_id = implied_role['id']
- implied_response = {
- "id": implied_id,
- "links": {
- "self": endpoint + "/v3/roles/" + implied_id
- },
- "name": implied_role['name']
- }
- return implied_response
-
- def _populate_prior_role_response(self, endpoint, prior_id):
- prior_role = self.role_api.get_role(prior_id)
- response = {
- "role_inference": {
- "prior_role": self._prior_role_stanza(
- endpoint, prior_id, prior_role['name'])
- }
- }
- return response
-
- def _populate_implied_roles_response(self, endpoint,
- prior_id, implied_ids):
- response = self._populate_prior_role_response(endpoint, prior_id)
- response["role_inference"]['implies'] = []
- for implied_id in implied_ids:
- implied_role = self.role_api.get_role(implied_id)
- implied_response = self._implied_role_stanza(
- endpoint, implied_role)
- response["role_inference"]['implies'].append(implied_response)
- return response
-
- def _populate_implied_role_response(self, endpoint, prior_id, implied_id):
- response = self._populate_prior_role_response(endpoint, prior_id)
- implied_role = self.role_api.get_role(implied_id)
- stanza = self._implied_role_stanza(endpoint, implied_role)
- response["role_inference"]['implies'] = stanza
- return response
-
- @controller.protected()
- def get_implied_role(self, context, prior_role_id, implied_role_id):
- ref = self.role_api.get_implied_role(prior_role_id, implied_role_id)
-
- prior_id = ref['prior_role_id']
- implied_id = ref['implied_role_id']
- endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url(
- context, 'public')
- response = self._populate_implied_role_response(
- endpoint, prior_id, implied_id)
- return response
-
- @controller.protected()
- def check_implied_role(self, context, prior_role_id, implied_role_id):
- self.role_api.get_implied_role(prior_role_id, implied_role_id)
-
- @controller.protected()
- def create_implied_role(self, context, prior_role_id, implied_role_id):
- self.role_api.create_implied_role(prior_role_id, implied_role_id)
- return wsgi.render_response(
- self.get_implied_role(context, prior_role_id, implied_role_id),
- status=(201, 'Created'))
-
- @controller.protected()
- def delete_implied_role(self, context, prior_role_id, implied_role_id):
- self.role_api.delete_implied_role(prior_role_id, implied_role_id)
-
- @controller.protected()
- def list_implied_roles(self, context, prior_role_id):
- ref = self.role_api.list_implied_roles(prior_role_id)
- implied_ids = [r['implied_role_id'] for r in ref]
- endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url(
- context, 'public')
-
- results = self._populate_implied_roles_response(
- endpoint, prior_role_id, implied_ids)
-
- return results
-
- @controller.protected()
- def list_role_inference_rules(self, context):
- refs = self.role_api.list_role_inference_rules()
- role_dict = {role_ref['id']: role_ref
- for role_ref in self.role_api.list_roles()}
-
- rules = dict()
- endpoint = super(controller.V3Controller, ImpliedRolesV3).base_url(
- context, 'public')
-
- for ref in refs:
- implied_role_id = ref['implied_role_id']
- prior_role_id = ref['prior_role_id']
- implied = rules.get(prior_role_id, [])
- implied.append(self._implied_role_stanza(
- endpoint, role_dict[implied_role_id]))
- rules[prior_role_id] = implied
-
- inferences = []
- for prior_id, implied in rules.items():
- prior_response = self._prior_role_stanza(
- endpoint, prior_id, role_dict[prior_id]['name'])
- inferences.append({'prior_role': prior_response,
- 'implies': implied})
- results = {'role_inferences': inferences}
- return results
-
-
-@dependency.requires('assignment_api', 'identity_api', 'resource_api',
- 'role_api')
-class GrantAssignmentV3(controller.V3Controller):
- """The V3 Grant Assignment APIs."""
-
- collection_name = 'roles'
- member_name = 'role'
-
- def __init__(self):
- super(GrantAssignmentV3, self).__init__()
- self.get_member_from_driver = self.role_api.get_role
-
- def _require_domain_xor_project(self, domain_id, project_id):
- if domain_id and project_id:
- msg = _('Specify a domain or project, not both')
- raise exception.ValidationError(msg)
- if not domain_id and not project_id:
- msg = _('Specify one of domain or project')
- raise exception.ValidationError(msg)
-
- def _require_user_xor_group(self, user_id, group_id):
- if user_id and group_id:
- msg = _('Specify a user or group, not both')
- raise exception.ValidationError(msg)
- if not user_id and not group_id:
- msg = _('Specify one of user or group')
- raise exception.ValidationError(msg)
-
- def _check_if_inherited(self, context):
- return (CONF.os_inherit.enabled and
- context['path'].startswith('/OS-INHERIT') and
- context['path'].endswith('/inherited_to_projects'))
-
- def _check_grant_protection(self, context, protection, role_id=None,
- user_id=None, group_id=None,
- domain_id=None, project_id=None,
- allow_no_user=False):
- """Check protection for role grant APIs.
-
- The policy rule might want to inspect attributes of any of the entities
- involved in the grant. So we get these and pass them to the
- check_protection() handler in the controller.
-
- """
- ref = {}
- if role_id:
- ref['role'] = self.role_api.get_role(role_id)
- if user_id:
- try:
- ref['user'] = self.identity_api.get_user(user_id)
- except exception.UserNotFound:
- if not allow_no_user:
- raise
- else:
- ref['group'] = self.identity_api.get_group(group_id)
-
- if domain_id:
- ref['domain'] = self.resource_api.get_domain(domain_id)
- else:
- ref['project'] = self.resource_api.get_project(project_id)
-
- self.check_protection(context, protection, ref)
-
- @controller.protected(callback=_check_grant_protection)
- def create_grant(self, context, role_id, user_id=None,
- group_id=None, domain_id=None, project_id=None):
- """Grants a role to a user or group on either a domain or project."""
- self._require_domain_xor_project(domain_id, project_id)
- self._require_user_xor_group(user_id, group_id)
-
- self.assignment_api.create_grant(
- role_id, user_id, group_id, domain_id, project_id,
- self._check_if_inherited(context), context)
-
- @controller.protected(callback=_check_grant_protection)
- def list_grants(self, context, user_id=None,
- group_id=None, domain_id=None, project_id=None):
- """Lists roles granted to user/group on either a domain or project."""
- self._require_domain_xor_project(domain_id, project_id)
- self._require_user_xor_group(user_id, group_id)
-
- refs = self.assignment_api.list_grants(
- user_id, group_id, domain_id, project_id,
- self._check_if_inherited(context))
- return GrantAssignmentV3.wrap_collection(context, refs)
-
- @controller.protected(callback=_check_grant_protection)
- def check_grant(self, context, role_id, user_id=None,
- group_id=None, domain_id=None, project_id=None):
- """Checks if a role has been granted on either a domain or project."""
- self._require_domain_xor_project(domain_id, project_id)
- self._require_user_xor_group(user_id, group_id)
-
- self.assignment_api.get_grant(
- role_id, user_id, group_id, domain_id, project_id,
- self._check_if_inherited(context))
-
- # NOTE(lbragstad): This will allow users to clean up role assignments
- # from the backend in the event the user was removed prior to the role
- # assignment being removed.
- @controller.protected(callback=functools.partial(
- _check_grant_protection, allow_no_user=True))
- def revoke_grant(self, context, role_id, user_id=None,
- group_id=None, domain_id=None, project_id=None):
- """Revokes a role from user/group on either a domain or project."""
- self._require_domain_xor_project(domain_id, project_id)
- self._require_user_xor_group(user_id, group_id)
-
- self.assignment_api.delete_grant(
- role_id, user_id, group_id, domain_id, project_id,
- self._check_if_inherited(context), context)
-
-
-@dependency.requires('assignment_api', 'identity_api', 'resource_api')
-class RoleAssignmentV3(controller.V3Controller):
- """The V3 Role Assignment APIs, really just list_role_assignment()."""
-
- # TODO(henry-nash): The current implementation does not provide a full
- # first class entity for role-assignment. There is no role_assignment_id
- # and only the list_role_assignment call is supported. Further, since it
- # is not a first class entity, the links for the individual entities
- # reference the individual role grant APIs.
-
- collection_name = 'role_assignments'
- member_name = 'role_assignment'
-
- @classmethod
- def wrap_member(cls, context, ref):
- # NOTE(henry-nash): Since we are not yet a true collection, we override
- # the wrapper as have already included the links in the entities
- pass
-
- def _format_entity(self, context, entity):
- """Format an assignment entity for API response.
-
- The driver layer returns entities as dicts containing the ids of the
- actor (e.g. user or group), target (e.g. domain or project) and role.
- If it is an inherited role, then this is also indicated. Examples:
-
- For a non-inherited expanded assignment from group membership:
- {'user_id': user_id,
- 'project_id': project_id,
- 'role_id': role_id,
- 'indirect': {'group_id': group_id}}
-
- or, for a project inherited role:
-
- {'user_id': user_id,
- 'project_id': project_id,
- 'role_id': role_id,
- 'indirect': {'project_id': parent_id}}
-
- or, for a role that was implied by a prior role:
-
- {'user_id': user_id,
- 'project_id': project_id,
- 'role_id': role_id,
- 'indirect': {'role_id': prior role_id}}
-
- It is possible to deduce if a role assignment came from group
- membership if it has both 'user_id' in the main body of the dict and
- 'group_id' in the 'indirect' subdict, as well as it is possible to
- deduce if it has come from inheritance if it contains both a
- 'project_id' in the main body of the dict and 'parent_id' in the
- 'indirect' subdict.
-
- This function maps this into the format to be returned via the API,
- e.g. for the second example above:
-
- {
- 'user': {
- {'id': user_id}
- },
- 'scope': {
- 'project': {
- {'id': project_id}
- },
- 'OS-INHERIT:inherited_to': 'projects'
- },
- 'role': {
- {'id': role_id}
- },
- 'links': {
- 'assignment': '/OS-INHERIT/projects/parent_id/users/user_id/'
- 'roles/role_id/inherited_to_projects'
- }
- }
-
- """
- formatted_entity = {'links': {}}
- inherited_assignment = entity.get('inherited_to_projects')
-
- if 'project_id' in entity:
- if 'project_name' in entity:
- formatted_entity['scope'] = {'project': {
- 'id': entity['project_id'],
- 'name': entity['project_name'],
- 'domain': {'id': entity['project_domain_id'],
- 'name': entity['project_domain_name']}}}
- else:
- formatted_entity['scope'] = {
- 'project': {'id': entity['project_id']}}
-
- if 'domain_id' in entity.get('indirect', {}):
- inherited_assignment = True
- formatted_link = ('/domains/%s' %
- entity['indirect']['domain_id'])
- elif 'project_id' in entity.get('indirect', {}):
- inherited_assignment = True
- formatted_link = ('/projects/%s' %
- entity['indirect']['project_id'])
- else:
- formatted_link = '/projects/%s' % entity['project_id']
- elif 'domain_id' in entity:
- if 'domain_name' in entity:
- formatted_entity['scope'] = {
- 'domain': {'id': entity['domain_id'],
- 'name': entity['domain_name']}}
- else:
- formatted_entity['scope'] = {
- 'domain': {'id': entity['domain_id']}}
- formatted_link = '/domains/%s' % entity['domain_id']
-
- if 'user_id' in entity:
- if 'user_name' in entity:
- formatted_entity['user'] = {
- 'id': entity['user_id'],
- 'name': entity['user_name'],
- 'domain': {'id': entity['user_domain_id'],
- 'name': entity['user_domain_name']}}
- else:
- formatted_entity['user'] = {'id': entity['user_id']}
- if 'group_id' in entity.get('indirect', {}):
- membership_url = (
- self.base_url(context, '/groups/%s/users/%s' % (
- entity['indirect']['group_id'], entity['user_id'])))
- formatted_entity['links']['membership'] = membership_url
- formatted_link += '/groups/%s' % entity['indirect']['group_id']
- else:
- formatted_link += '/users/%s' % entity['user_id']
- elif 'group_id' in entity:
- if 'group_name' in entity:
- formatted_entity['group'] = {
- 'id': entity['group_id'],
- 'name': entity['group_name'],
- 'domain': {'id': entity['group_domain_id'],
- 'name': entity['group_domain_name']}}
- else:
- formatted_entity['group'] = {'id': entity['group_id']}
- formatted_link += '/groups/%s' % entity['group_id']
-
- if 'role_name' in entity:
- formatted_entity['role'] = {'id': entity['role_id'],
- 'name': entity['role_name']}
- else:
- formatted_entity['role'] = {'id': entity['role_id']}
- prior_role_link = ''
- if 'role_id' in entity.get('indirect', {}):
- formatted_link += '/roles/%s' % entity['indirect']['role_id']
- prior_role_link = (
- '/prior_role/%(prior)s/implies/%(implied)s' % {
- 'prior': entity['role_id'],
- 'implied': entity['indirect']['role_id']
- })
- else:
- formatted_link += '/roles/%s' % entity['role_id']
-
- if inherited_assignment:
- formatted_entity['scope']['OS-INHERIT:inherited_to'] = (
- 'projects')
- formatted_link = ('/OS-INHERIT%s/inherited_to_projects' %
- formatted_link)
-
- formatted_entity['links']['assignment'] = self.base_url(context,
- formatted_link)
- if prior_role_link:
- formatted_entity['links']['prior_role'] = (
- self.base_url(context, prior_role_link))
-
- return formatted_entity
-
- def _assert_effective_filters(self, inherited, group, domain):
- """Assert that useless filter combinations are avoided.
-
- In effective mode, the following filter combinations are useless, since
- they would always return an empty list of role assignments:
- - group id, since no group assignment is returned in effective mode;
- - domain id and inherited, since no domain inherited assignment is
- returned in effective mode.
-
- """
- if group:
- msg = _('Combining effective and group filter will always '
- 'result in an empty list.')
- raise exception.ValidationError(msg)
-
- if inherited and domain:
- msg = _('Combining effective, domain and inherited filters will '
- 'always result in an empty list.')
- raise exception.ValidationError(msg)
-
- def _assert_domain_nand_project(self, domain_id, project_id):
- if domain_id and project_id:
- msg = _('Specify a domain or project, not both')
- raise exception.ValidationError(msg)
-
- def _assert_user_nand_group(self, user_id, group_id):
- if user_id and group_id:
- msg = _('Specify a user or group, not both')
- raise exception.ValidationError(msg)
-
- def _list_role_assignments(self, context, filters, include_subtree=False):
- """List role assignments to user and groups on domains and projects.
-
- Return a list of all existing role assignments in the system, filtered
- by assignments attributes, if provided.
-
- If effective option is used and OS-INHERIT extension is enabled, the
- following functions will be applied:
- 1) For any group role assignment on a target, replace it by a set of
- role assignments containing one for each user of that group on that
- target;
- 2) For any inherited role assignment for an actor on a target, replace
- it by a set of role assignments for that actor on every project under
- that target.
-
- It means that, if effective mode is used, no group or domain inherited
- assignments will be present in the resultant list. Thus, combining
- effective with them is invalid.
-
- As a role assignment contains only one actor and one target, providing
- both user and group ids or domain and project ids is invalid as well.
-
- """
- params = context['query_string']
- effective = 'effective' in params and (
- self.query_filter_is_true(params['effective']))
- include_names = ('include_names' in params and
- self.query_filter_is_true(params['include_names']))
-
- if 'scope.OS-INHERIT:inherited_to' in params:
- inherited = (
- params['scope.OS-INHERIT:inherited_to'] == 'projects')
- else:
- # None means querying both inherited and direct assignments
- inherited = None
-
- self._assert_domain_nand_project(params.get('scope.domain.id'),
- params.get('scope.project.id'))
- self._assert_user_nand_group(params.get('user.id'),
- params.get('group.id'))
-
- if effective:
- self._assert_effective_filters(inherited=inherited,
- group=params.get('group.id'),
- domain=params.get(
- 'scope.domain.id'))
-
- refs = self.assignment_api.list_role_assignments(
- role_id=params.get('role.id'),
- user_id=params.get('user.id'),
- group_id=params.get('group.id'),
- domain_id=params.get('scope.domain.id'),
- project_id=params.get('scope.project.id'),
- include_subtree=include_subtree,
- inherited=inherited, effective=effective,
- include_names=include_names)
-
- formatted_refs = [self._format_entity(context, ref) for ref in refs]
-
- return self.wrap_collection(context, formatted_refs)
-
- @controller.filterprotected('group.id', 'role.id',
- 'scope.domain.id', 'scope.project.id',
- 'scope.OS-INHERIT:inherited_to', 'user.id')
- def list_role_assignments(self, context, filters):
- return self._list_role_assignments(context, filters)
-
- def _check_list_tree_protection(self, context, protection_info):
- """Check protection for list assignment for tree API.
-
- The policy rule might want to inspect the domain of any project filter
- so if one is defined, then load the project ref and pass it to the
- check protection method.
-
- """
- ref = {}
- for filter, value in protection_info['filter_attr'].items():
- if filter == 'scope.project.id' and value:
- ref['project'] = self.resource_api.get_project(value)
-
- self.check_protection(context, protection_info, ref)
-
- @controller.filterprotected('group.id', 'role.id',
- 'scope.domain.id', 'scope.project.id',
- 'scope.OS-INHERIT:inherited_to', 'user.id',
- callback=_check_list_tree_protection)
- def list_role_assignments_for_tree(self, context, filters):
- if not context['query_string'].get('scope.project.id'):
- msg = _('scope.project.id must be specified if include_subtree '
- 'is also specified')
- raise exception.ValidationError(message=msg)
- return self._list_role_assignments(context, filters,
- include_subtree=True)
-
- def list_role_assignments_wrapper(self, context):
- """Main entry point from router for list role assignments.
-
- Since we want different policy file rules to be applicable based on
- whether there the include_subtree query parameter is part of the API
- call, this method checks for this and then calls the appropriate
- protected entry point.
-
- """
- params = context['query_string']
- if 'include_subtree' in params and (
- self.query_filter_is_true(params['include_subtree'])):
- return self.list_role_assignments_for_tree(context)
- else:
- return self.list_role_assignments(context)