aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_assignment.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_assignment.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_assignment.py2943
1 files changed, 2943 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_assignment.py b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
new file mode 100644
index 00000000..add14bfb
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
@@ -0,0 +1,2943 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import random
+import six
+import uuid
+
+from oslo_config import cfg
+
+from keystone.common import controller
+from keystone import exception
+from keystone.tests import unit as tests
+from keystone.tests.unit import test_v3
+
+
+CONF = cfg.CONF
+
+
+def _build_role_assignment_query_url(effective=False, **filters):
+ '''Build and return a role assignment query url with provided params.
+
+ Available filters are: domain_id, project_id, user_id, group_id, role_id
+ and inherited_to_projects.
+
+ '''
+
+ query_params = '?effective' if effective else ''
+
+ for k, v in six.iteritems(filters):
+ query_params += '?' if not query_params else '&'
+
+ if k == 'inherited_to_projects':
+ query_params += 'scope.OS-INHERIT:inherited_to=projects'
+ else:
+ if k in ['domain_id', 'project_id']:
+ query_params += 'scope.'
+ elif k not in ['user_id', 'group_id', 'role_id']:
+ raise ValueError('Invalid key \'%s\' in provided filters.' % k)
+
+ query_params += '%s=%s' % (k.replace('_', '.'), v)
+
+ return '/role_assignments%s' % query_params
+
+
+def _build_role_assignment_link(**attribs):
+ """Build and return a role assignment link with provided attributes.
+
+ Provided attributes are expected to contain: domain_id or project_id,
+ user_id or group_id, role_id and, optionally, inherited_to_projects.
+
+ """
+
+ if attribs.get('domain_id'):
+ link = '/domains/' + attribs['domain_id']
+ else:
+ link = '/projects/' + attribs['project_id']
+
+ if attribs.get('user_id'):
+ link += '/users/' + attribs['user_id']
+ else:
+ link += '/groups/' + attribs['group_id']
+
+ link += '/roles/' + attribs['role_id']
+
+ if attribs.get('inherited_to_projects'):
+ return '/OS-INHERIT%s/inherited_to_projects' % link
+
+ return link
+
+
+def _build_role_assignment_entity(link=None, **attribs):
+ """Build and return a role assignment entity with provided attributes.
+
+ Provided attributes are expected to contain: domain_id or project_id,
+ user_id or group_id, role_id and, optionally, inherited_to_projects.
+
+ """
+
+ entity = {'links': {'assignment': (
+ link or _build_role_assignment_link(**attribs))}}
+
+ if attribs.get('domain_id'):
+ entity['scope'] = {'domain': {'id': attribs['domain_id']}}
+ else:
+ entity['scope'] = {'project': {'id': attribs['project_id']}}
+
+ if attribs.get('user_id'):
+ entity['user'] = {'id': attribs['user_id']}
+
+ if attribs.get('group_id'):
+ entity['links']['membership'] = ('/groups/%s/users/%s' %
+ (attribs['group_id'],
+ attribs['user_id']))
+ else:
+ entity['group'] = {'id': attribs['group_id']}
+
+ entity['role'] = {'id': attribs['role_id']}
+
+ if attribs.get('inherited_to_projects'):
+ entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
+
+ return entity
+
+
+class AssignmentTestCase(test_v3.RestfulTestCase):
+ """Test domains, projects, roles and role assignments."""
+
+ def setUp(self):
+ super(AssignmentTestCase, self).setUp()
+
+ self.group = self.new_group_ref(
+ domain_id=self.domain_id)
+ self.group = self.identity_api.create_group(self.group)
+ self.group_id = self.group['id']
+
+ self.credential_id = uuid.uuid4().hex
+ self.credential = self.new_credential_ref(
+ user_id=self.user['id'],
+ project_id=self.project_id)
+ self.credential['id'] = self.credential_id
+ self.credential_api.create_credential(
+ self.credential_id,
+ self.credential)
+
+ # Domain CRUD tests
+
+ def test_create_domain(self):
+ """Call ``POST /domains``."""
+ ref = self.new_domain_ref()
+ r = self.post(
+ '/domains',
+ body={'domain': ref})
+ return self.assertValidDomainResponse(r, ref)
+
+ def test_create_domain_case_sensitivity(self):
+ """Call `POST /domains`` twice with upper() and lower() cased name."""
+ ref = self.new_domain_ref()
+
+ # ensure the name is lowercase
+ ref['name'] = ref['name'].lower()
+ r = self.post(
+ '/domains',
+ body={'domain': ref})
+ self.assertValidDomainResponse(r, ref)
+
+ # ensure the name is uppercase
+ ref['name'] = ref['name'].upper()
+ r = self.post(
+ '/domains',
+ body={'domain': ref})
+ self.assertValidDomainResponse(r, ref)
+
+ def test_create_domain_400(self):
+ """Call ``POST /domains``."""
+ self.post('/domains', body={'domain': {}}, expected_status=400)
+
+ def test_list_domains(self):
+ """Call ``GET /domains``."""
+ resource_url = '/domains'
+ r = self.get(resource_url)
+ self.assertValidDomainListResponse(r, ref=self.domain,
+ resource_url=resource_url)
+
+ def test_get_domain(self):
+ """Call ``GET /domains/{domain_id}``."""
+ r = self.get('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain_id})
+ self.assertValidDomainResponse(r, self.domain)
+
+ def test_update_domain(self):
+ """Call ``PATCH /domains/{domain_id}``."""
+ ref = self.new_domain_ref()
+ del ref['id']
+ r = self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain_id},
+ body={'domain': ref})
+ self.assertValidDomainResponse(r, ref)
+
+ def test_disable_domain(self):
+ """Call ``PATCH /domains/{domain_id}`` (set enabled=False)."""
+ # Create a 2nd set of entities in a 2nd domain
+ self.domain2 = self.new_domain_ref()
+ self.resource_api.create_domain(self.domain2['id'], self.domain2)
+
+ self.project2 = self.new_project_ref(
+ domain_id=self.domain2['id'])
+ self.resource_api.create_project(self.project2['id'], self.project2)
+
+ self.user2 = self.new_user_ref(
+ domain_id=self.domain2['id'],
+ project_id=self.project2['id'])
+ password = self.user2['password']
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+
+ self.assignment_api.add_user_to_project(self.project2['id'],
+ self.user2['id'])
+
+ # First check a user in that domain can authenticate, via
+ # Both v2 and v3
+ body = {
+ 'auth': {
+ 'passwordCredentials': {
+ 'userId': self.user2['id'],
+ 'password': self.user2['password']
+ },
+ 'tenantId': self.project2['id']
+ }
+ }
+ self.admin_request(path='/v2.0/tokens', method='POST', body=body)
+
+ auth_data = self.build_authentication_request(
+ user_id=self.user2['id'],
+ password=self.user2['password'],
+ project_id=self.project2['id'])
+ self.v3_authenticate_token(auth_data)
+
+ # Now disable the domain
+ self.domain2['enabled'] = False
+ r = self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain2['id']},
+ body={'domain': {'enabled': False}})
+ self.assertValidDomainResponse(r, self.domain2)
+
+ # Make sure the user can no longer authenticate, via
+ # either API
+ body = {
+ 'auth': {
+ 'passwordCredentials': {
+ 'userId': self.user2['id'],
+ 'password': self.user2['password']
+ },
+ 'tenantId': self.project2['id']
+ }
+ }
+ self.admin_request(
+ path='/v2.0/tokens', method='POST', body=body, expected_status=401)
+
+ # Try looking up in v3 by name and id
+ auth_data = self.build_authentication_request(
+ user_id=self.user2['id'],
+ password=self.user2['password'],
+ project_id=self.project2['id'])
+ self.v3_authenticate_token(auth_data, expected_status=401)
+
+ auth_data = self.build_authentication_request(
+ username=self.user2['name'],
+ user_domain_id=self.domain2['id'],
+ password=self.user2['password'],
+ project_id=self.project2['id'])
+ self.v3_authenticate_token(auth_data, expected_status=401)
+
+ def test_delete_enabled_domain_fails(self):
+ """Call ``DELETE /domains/{domain_id}`` (when domain enabled)."""
+
+ # Try deleting an enabled domain, which should fail
+ self.delete('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain['id']},
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_delete_domain(self):
+ """Call ``DELETE /domains/{domain_id}``.
+
+ The sample data set up already has a user, group, project
+ and credential that is part of self.domain. Since the user
+ we will authenticate with is in this domain, we create a
+ another set of entities in a second domain. Deleting this
+ second domain should delete all these new entities. In addition,
+ all the entities in the regular self.domain should be unaffected
+ by the delete.
+
+ Test Plan:
+
+ - Create domain2 and a 2nd set of entities
+ - Disable domain2
+ - Delete domain2
+ - Check entities in domain2 have been deleted
+ - Check entities in self.domain are unaffected
+
+ """
+
+ # Create a 2nd set of entities in a 2nd domain
+ self.domain2 = self.new_domain_ref()
+ self.resource_api.create_domain(self.domain2['id'], self.domain2)
+
+ self.project2 = self.new_project_ref(
+ domain_id=self.domain2['id'])
+ self.resource_api.create_project(self.project2['id'], self.project2)
+
+ self.user2 = self.new_user_ref(
+ domain_id=self.domain2['id'],
+ project_id=self.project2['id'])
+ self.user2 = self.identity_api.create_user(self.user2)
+
+ self.group2 = self.new_group_ref(
+ domain_id=self.domain2['id'])
+ self.group2 = self.identity_api.create_group(self.group2)
+
+ self.credential2 = self.new_credential_ref(
+ user_id=self.user2['id'],
+ project_id=self.project2['id'])
+ self.credential_api.create_credential(
+ self.credential2['id'],
+ self.credential2)
+
+ # Now disable the new domain and delete it
+ self.domain2['enabled'] = False
+ r = self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain2['id']},
+ body={'domain': {'enabled': False}})
+ self.assertValidDomainResponse(r, self.domain2)
+ self.delete('/domains/%(domain_id)s' % {
+ 'domain_id': self.domain2['id']})
+
+ # Check all the domain2 relevant entities are gone
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ self.domain2['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ self.project2['id'])
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.get_group,
+ self.group2['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ self.user2['id'])
+ self.assertRaises(exception.CredentialNotFound,
+ self.credential_api.get_credential,
+ self.credential2['id'])
+
+ # ...and that all self.domain entities are still here
+ r = self.resource_api.get_domain(self.domain['id'])
+ self.assertDictEqual(r, self.domain)
+ r = self.resource_api.get_project(self.project['id'])
+ self.assertDictEqual(r, self.project)
+ r = self.identity_api.get_group(self.group['id'])
+ self.assertDictEqual(r, self.group)
+ r = self.identity_api.get_user(self.user['id'])
+ self.user.pop('password')
+ self.assertDictEqual(r, self.user)
+ r = self.credential_api.get_credential(self.credential['id'])
+ self.assertDictEqual(r, self.credential)
+
+ def test_delete_default_domain_fails(self):
+ # Attempting to delete the default domain results in 403 Forbidden.
+
+ # Need to disable it first.
+ self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': CONF.identity.default_domain_id},
+ body={'domain': {'enabled': False}})
+
+ self.delete('/domains/%(domain_id)s' % {
+ 'domain_id': CONF.identity.default_domain_id},
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_delete_new_default_domain_fails(self):
+ # If change the default domain ID, deleting the new default domain
+ # results in a 403 Forbidden.
+
+ # Create a new domain that's not the default
+ new_domain = self.new_domain_ref()
+ new_domain_id = new_domain['id']
+ self.resource_api.create_domain(new_domain_id, new_domain)
+
+ # Disable the new domain so can delete it later.
+ self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': new_domain_id},
+ body={'domain': {'enabled': False}})
+
+ # Change the default domain
+ self.config_fixture.config(group='identity',
+ default_domain_id=new_domain_id)
+
+ # Attempt to delete the new domain
+
+ self.delete('/domains/%(domain_id)s' % {'domain_id': new_domain_id},
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_delete_old_default_domain(self):
+ # If change the default domain ID, deleting the old default domain
+ # works.
+
+ # Create a new domain that's not the default
+ new_domain = self.new_domain_ref()
+ new_domain_id = new_domain['id']
+ self.resource_api.create_domain(new_domain_id, new_domain)
+
+ old_default_domain_id = CONF.identity.default_domain_id
+
+ # Disable the default domain so we can delete it later.
+ self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': old_default_domain_id},
+ body={'domain': {'enabled': False}})
+
+ # Change the default domain
+ self.config_fixture.config(group='identity',
+ default_domain_id=new_domain_id)
+
+ # Delete the old default domain
+
+ self.delete(
+ '/domains/%(domain_id)s' % {'domain_id': old_default_domain_id})
+
+ def test_token_revoked_once_domain_disabled(self):
+ """Test token from a disabled domain has been invalidated.
+
+ Test that a token that was valid for an enabled domain
+ becomes invalid once that domain is disabled.
+
+ """
+
+ self.domain = self.new_domain_ref()
+ self.resource_api.create_domain(self.domain['id'], self.domain)
+
+ self.user2 = self.new_user_ref(domain_id=self.domain['id'])
+ password = self.user2['password']
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+
+ # build a request body
+ auth_body = self.build_authentication_request(
+ user_id=self.user2['id'],
+ password=self.user2['password'])
+
+ # sends a request for the user's token
+ token_resp = self.post('/auth/tokens', body=auth_body)
+
+ subject_token = token_resp.headers.get('x-subject-token')
+
+ # validates the returned token and it should be valid.
+ self.head('/auth/tokens',
+ headers={'x-subject-token': subject_token},
+ expected_status=200)
+
+ # now disable the domain
+ self.domain['enabled'] = False
+ url = "/domains/%(domain_id)s" % {'domain_id': self.domain['id']}
+ self.patch(url,
+ body={'domain': {'enabled': False}},
+ expected_status=200)
+
+ # validates the same token again and it should be 'not found'
+ # as the domain has already been disabled.
+ self.head('/auth/tokens',
+ headers={'x-subject-token': subject_token},
+ expected_status=404)
+
+ def test_delete_domain_hierarchy(self):
+ """Call ``DELETE /domains/{domain_id}``."""
+ domain = self.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+
+ root_project = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(root_project['id'], root_project)
+
+ leaf_project = self.new_project_ref(
+ domain_id=domain['id'],
+ parent_id=root_project['id'])
+ self.resource_api.create_project(leaf_project['id'], leaf_project)
+
+ # Need to disable it first.
+ self.patch('/domains/%(domain_id)s' % {
+ 'domain_id': domain['id']},
+ body={'domain': {'enabled': False}})
+
+ self.delete(
+ '/domains/%(domain_id)s' % {
+ 'domain_id': domain['id']})
+
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ domain['id'])
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ root_project['id'])
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ leaf_project['id'])
+
+ def test_forbid_operations_on_federated_domain(self):
+ """Make sure one cannot operate on federated domain.
+
+ This includes operations like create, update, delete
+ on domain identified by id and name where difference variations of
+ id 'Federated' are used.
+
+ """
+ def create_domains():
+ for variation in ('Federated', 'FEDERATED',
+ 'federated', 'fEderated'):
+ domain = self.new_domain_ref()
+ domain['id'] = variation
+ yield domain
+
+ for domain in create_domains():
+ self.assertRaises(
+ AssertionError, self.assignment_api.create_domain,
+ domain['id'], domain)
+ self.assertRaises(
+ AssertionError, self.assignment_api.update_domain,
+ domain['id'], domain)
+ self.assertRaises(
+ exception.DomainNotFound, self.assignment_api.delete_domain,
+ domain['id'])
+
+ # swap 'name' with 'id' and try again, expecting the request to
+ # gracefully fail
+ domain['id'], domain['name'] = domain['name'], domain['id']
+ self.assertRaises(
+ AssertionError, self.assignment_api.create_domain,
+ domain['id'], domain)
+ self.assertRaises(
+ AssertionError, self.assignment_api.update_domain,
+ domain['id'], domain)
+ self.assertRaises(
+ exception.DomainNotFound, self.assignment_api.delete_domain,
+ domain['id'])
+
+ def test_forbid_operations_on_defined_federated_domain(self):
+ """Make sure one cannot operate on a user-defined federated domain.
+
+ This includes operations like create, update, delete.
+
+ """
+
+ non_default_name = 'beta_federated_domain'
+ self.config_fixture.config(group='federation',
+ federated_domain_name=non_default_name)
+ domain = self.new_domain_ref()
+ domain['name'] = non_default_name
+ self.assertRaises(AssertionError,
+ self.assignment_api.create_domain,
+ domain['id'], domain)
+ self.assertRaises(exception.DomainNotFound,
+ self.assignment_api.delete_domain,
+ domain['id'])
+ self.assertRaises(AssertionError,
+ self.assignment_api.update_domain,
+ domain['id'], domain)
+
+ def test_set_federated_domain_when_config_empty(self):
+ """Make sure we are operable even if config value is not properly
+ set.
+
+ This includes operations like create, update, delete.
+
+ """
+ federated_name = 'Federated'
+ self.config_fixture.config(group='federation',
+ federated_domain_name='')
+ domain = self.new_domain_ref()
+ domain['id'] = federated_name
+ self.assertRaises(AssertionError,
+ self.assignment_api.create_domain,
+ domain['id'], domain)
+ self.assertRaises(exception.DomainNotFound,
+ self.assignment_api.delete_domain,
+ domain['id'])
+ self.assertRaises(AssertionError,
+ self.assignment_api.update_domain,
+ domain['id'], domain)
+
+ # swap id with name
+ domain['id'], domain['name'] = domain['name'], domain['id']
+ self.assertRaises(AssertionError,
+ self.assignment_api.create_domain,
+ domain['id'], domain)
+ self.assertRaises(exception.DomainNotFound,
+ self.assignment_api.delete_domain,
+ domain['id'])
+ self.assertRaises(AssertionError,
+ self.assignment_api.update_domain,
+ domain['id'], domain)
+
+ # Project CRUD tests
+
+ def test_list_projects(self):
+ """Call ``GET /projects``."""
+ resource_url = '/projects'
+ r = self.get(resource_url)
+ self.assertValidProjectListResponse(r, ref=self.project,
+ resource_url=resource_url)
+
+ def test_create_project(self):
+ """Call ``POST /projects``."""
+ ref = self.new_project_ref(domain_id=self.domain_id)
+ r = self.post(
+ '/projects',
+ body={'project': ref})
+ self.assertValidProjectResponse(r, ref)
+
+ def test_create_project_400(self):
+ """Call ``POST /projects``."""
+ self.post('/projects', body={'project': {}}, expected_status=400)
+
+ def _create_projects_hierarchy(self, hierarchy_size=1):
+ """Creates a project hierarchy with specified size.
+
+ :param hierarchy_size: the desired hierarchy size, default is 1 -
+ a project with one child.
+
+ :returns projects: a list of the projects in the created hierarchy.
+
+ """
+ resp = self.get(
+ '/projects/%(project_id)s' % {
+ 'project_id': self.project_id})
+
+ projects = [resp.result]
+
+ for i in range(hierarchy_size):
+ new_ref = self.new_project_ref(
+ domain_id=self.domain_id,
+ parent_id=projects[i]['project']['id'])
+ resp = self.post('/projects',
+ body={'project': new_ref})
+ self.assertValidProjectResponse(resp, new_ref)
+
+ projects.append(resp.result)
+
+ return projects
+
+ def test_create_hierarchical_project(self):
+ """Call ``POST /projects``."""
+ self._create_projects_hierarchy()
+
+ def test_get_project(self):
+ """Call ``GET /projects/{project_id}``."""
+ r = self.get(
+ '/projects/%(project_id)s' % {
+ 'project_id': self.project_id})
+ self.assertValidProjectResponse(r, self.project)
+
+ def test_get_project_with_parents_as_ids(self):
+ """Call ``GET /projects/{project_id}?parents_as_ids``."""
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ # Query for projects[2] parents_as_ids
+ r = self.get(
+ '/projects/%(project_id)s?parents_as_ids' % {
+ 'project_id': projects[2]['project']['id']})
+
+ self.assertValidProjectResponse(r, projects[2]['project'])
+ parents_as_ids = r.result['project']['parents']
+
+ # Assert parents_as_ids is a structured dictionary correctly
+ # representing the hierarchy. The request was made using projects[2]
+ # id, hence its parents should be projects[1] and projects[0]. It
+ # should have the following structure:
+ # {
+ # projects[1]: {
+ # projects[0]: None
+ # }
+ # }
+ expected_dict = {
+ projects[1]['project']['id']: {
+ projects[0]['project']['id']: None
+ }
+ }
+ self.assertDictEqual(expected_dict, parents_as_ids)
+
+ # Query for projects[0] parents_as_ids
+ r = self.get(
+ '/projects/%(project_id)s?parents_as_ids' % {
+ 'project_id': projects[0]['project']['id']})
+
+ self.assertValidProjectResponse(r, projects[0]['project'])
+ parents_as_ids = r.result['project']['parents']
+
+ # projects[0] has no parents, parents_as_ids must be None
+ self.assertIsNone(parents_as_ids)
+
+ def test_get_project_with_parents_as_list(self):
+ """Call ``GET /projects/{project_id}?parents_as_list``."""
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ r = self.get(
+ '/projects/%(project_id)s?parents_as_list' % {
+ 'project_id': projects[1]['project']['id']})
+
+ self.assertEqual(1, len(r.result['project']['parents']))
+ self.assertValidProjectResponse(r, projects[1]['project'])
+ self.assertIn(projects[0], r.result['project']['parents'])
+ self.assertNotIn(projects[2], r.result['project']['parents'])
+
+ def test_get_project_with_parents_as_list_and_parents_as_ids(self):
+ """Call ``GET /projects/{project_id}?parents_as_list&parents_as_ids``.
+
+ """
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ self.get(
+ '/projects/%(project_id)s?parents_as_list&parents_as_ids' % {
+ 'project_id': projects[1]['project']['id']},
+ expected_status=400)
+
+ def test_get_project_with_subtree_as_ids(self):
+ """Call ``GET /projects/{project_id}?subtree_as_ids``.
+
+ This test creates a more complex hierarchy to test if the structured
+ dictionary returned by using the ``subtree_as_ids`` query param
+ correctly represents the hierarchy.
+
+ The hierarchy contains 5 projects with the following structure::
+
+ +--A--+
+ | |
+ +--B--+ C
+ | |
+ D E
+
+
+ """
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ # Add another child to projects[0] - it will be projects[3]
+ new_ref = self.new_project_ref(
+ domain_id=self.domain_id,
+ parent_id=projects[0]['project']['id'])
+ resp = self.post('/projects',
+ body={'project': new_ref})
+ self.assertValidProjectResponse(resp, new_ref)
+ projects.append(resp.result)
+
+ # Add another child to projects[1] - it will be projects[4]
+ new_ref = self.new_project_ref(
+ domain_id=self.domain_id,
+ parent_id=projects[1]['project']['id'])
+ resp = self.post('/projects',
+ body={'project': new_ref})
+ self.assertValidProjectResponse(resp, new_ref)
+ projects.append(resp.result)
+
+ # Query for projects[0] subtree_as_ids
+ r = self.get(
+ '/projects/%(project_id)s?subtree_as_ids' % {
+ 'project_id': projects[0]['project']['id']})
+ self.assertValidProjectResponse(r, projects[0]['project'])
+ subtree_as_ids = r.result['project']['subtree']
+
+ # The subtree hierarchy from projects[0] should have the following
+ # structure:
+ # {
+ # projects[1]: {
+ # projects[2]: None,
+ # projects[4]: None
+ # },
+ # projects[3]: None
+ # }
+ expected_dict = {
+ projects[1]['project']['id']: {
+ projects[2]['project']['id']: None,
+ projects[4]['project']['id']: None
+ },
+ projects[3]['project']['id']: None
+ }
+ self.assertDictEqual(expected_dict, subtree_as_ids)
+
+ # Now query for projects[1] subtree_as_ids
+ r = self.get(
+ '/projects/%(project_id)s?subtree_as_ids' % {
+ 'project_id': projects[1]['project']['id']})
+ self.assertValidProjectResponse(r, projects[1]['project'])
+ subtree_as_ids = r.result['project']['subtree']
+
+ # The subtree hierarchy from projects[1] should have the following
+ # structure:
+ # {
+ # projects[2]: None,
+ # projects[4]: None
+ # }
+ expected_dict = {
+ projects[2]['project']['id']: None,
+ projects[4]['project']['id']: None
+ }
+ self.assertDictEqual(expected_dict, subtree_as_ids)
+
+ # Now query for projects[3] subtree_as_ids
+ r = self.get(
+ '/projects/%(project_id)s?subtree_as_ids' % {
+ 'project_id': projects[3]['project']['id']})
+ self.assertValidProjectResponse(r, projects[3]['project'])
+ subtree_as_ids = r.result['project']['subtree']
+
+ # projects[3] has no subtree, subtree_as_ids must be None
+ self.assertIsNone(subtree_as_ids)
+
+ def test_get_project_with_subtree_as_list(self):
+ """Call ``GET /projects/{project_id}?subtree_as_list``."""
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ r = self.get(
+ '/projects/%(project_id)s?subtree_as_list' % {
+ 'project_id': projects[1]['project']['id']})
+
+ self.assertEqual(1, len(r.result['project']['subtree']))
+ self.assertValidProjectResponse(r, projects[1]['project'])
+ self.assertNotIn(projects[0], r.result['project']['subtree'])
+ self.assertIn(projects[2], r.result['project']['subtree'])
+
+ def test_get_project_with_subtree_as_list_and_subtree_as_ids(self):
+ """Call ``GET /projects/{project_id}?subtree_as_list&subtree_as_ids``.
+
+ """
+ projects = self._create_projects_hierarchy(hierarchy_size=2)
+
+ self.get(
+ '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % {
+ 'project_id': projects[1]['project']['id']},
+ expected_status=400)
+
+ def test_update_project(self):
+ """Call ``PATCH /projects/{project_id}``."""
+ ref = self.new_project_ref(domain_id=self.domain_id)
+ del ref['id']
+ r = self.patch(
+ '/projects/%(project_id)s' % {
+ 'project_id': self.project_id},
+ body={'project': ref})
+ self.assertValidProjectResponse(r, ref)
+
+ def test_update_project_domain_id(self):
+ """Call ``PATCH /projects/{project_id}`` with domain_id."""
+ project = self.new_project_ref(domain_id=self.domain['id'])
+ self.resource_api.create_project(project['id'], project)
+ project['domain_id'] = CONF.identity.default_domain_id
+ r = self.patch('/projects/%(project_id)s' % {
+ 'project_id': project['id']},
+ body={'project': project},
+ expected_status=exception.ValidationError.code)
+ self.config_fixture.config(domain_id_immutable=False)
+ project['domain_id'] = self.domain['id']
+ r = self.patch('/projects/%(project_id)s' % {
+ 'project_id': project['id']},
+ body={'project': project})
+ self.assertValidProjectResponse(r, project)
+
+ def test_update_project_parent_id(self):
+ """Call ``PATCH /projects/{project_id}``."""
+ projects = self._create_projects_hierarchy()
+ leaf_project = projects[1]['project']
+ leaf_project['parent_id'] = None
+ self.patch(
+ '/projects/%(project_id)s' % {
+ 'project_id': leaf_project['id']},
+ body={'project': leaf_project},
+ expected_status=403)
+
+ def test_disable_leaf_project(self):
+ """Call ``PATCH /projects/{project_id}``."""
+ projects = self._create_projects_hierarchy()
+ leaf_project = projects[1]['project']
+ leaf_project['enabled'] = False
+ r = self.patch(
+ '/projects/%(project_id)s' % {
+ 'project_id': leaf_project['id']},
+ body={'project': leaf_project})
+ self.assertEqual(
+ leaf_project['enabled'], r.result['project']['enabled'])
+
+ def test_disable_not_leaf_project(self):
+ """Call ``PATCH /projects/{project_id}``."""
+ projects = self._create_projects_hierarchy()
+ root_project = projects[0]['project']
+ root_project['enabled'] = False
+ self.patch(
+ '/projects/%(project_id)s' % {
+ 'project_id': root_project['id']},
+ body={'project': root_project},
+ expected_status=403)
+
+ def test_delete_project(self):
+ """Call ``DELETE /projects/{project_id}``
+
+ As well as making sure the delete succeeds, we ensure
+ that any credentials that reference this projects are
+ also deleted, while other credentials are unaffected.
+
+ """
+ # First check the credential for this project is present
+ r = self.credential_api.get_credential(self.credential['id'])
+ self.assertDictEqual(r, self.credential)
+ # Create a second credential with a different project
+ self.project2 = self.new_project_ref(
+ domain_id=self.domain['id'])
+ self.resource_api.create_project(self.project2['id'], self.project2)
+ self.credential2 = self.new_credential_ref(
+ user_id=self.user['id'],
+ project_id=self.project2['id'])
+ self.credential_api.create_credential(
+ self.credential2['id'],
+ self.credential2)
+
+ # Now delete the project
+ self.delete(
+ '/projects/%(project_id)s' % {
+ 'project_id': self.project_id})
+
+ # Deleting the project should have deleted any credentials
+ # that reference this project
+ self.assertRaises(exception.CredentialNotFound,
+ self.credential_api.get_credential,
+ credential_id=self.credential['id'])
+ # But the credential for project2 is unaffected
+ r = self.credential_api.get_credential(self.credential2['id'])
+ self.assertDictEqual(r, self.credential2)
+
+ def test_delete_not_leaf_project(self):
+ """Call ``DELETE /projects/{project_id}``."""
+ self._create_projects_hierarchy()
+ self.delete(
+ '/projects/%(project_id)s' % {
+ 'project_id': self.project_id},
+ expected_status=403)
+
+ # Role CRUD tests
+
+ def test_create_role(self):
+ """Call ``POST /roles``."""
+ ref = self.new_role_ref()
+ r = self.post(
+ '/roles',
+ body={'role': ref})
+ return self.assertValidRoleResponse(r, ref)
+
+ def test_create_role_400(self):
+ """Call ``POST /roles``."""
+ self.post('/roles', body={'role': {}}, expected_status=400)
+
+ def test_list_roles(self):
+ """Call ``GET /roles``."""
+ resource_url = '/roles'
+ r = self.get(resource_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ resource_url=resource_url)
+
+ def test_get_role(self):
+ """Call ``GET /roles/{role_id}``."""
+ r = self.get('/roles/%(role_id)s' % {
+ 'role_id': self.role_id})
+ self.assertValidRoleResponse(r, self.role)
+
+ def test_update_role(self):
+ """Call ``PATCH /roles/{role_id}``."""
+ ref = self.new_role_ref()
+ del ref['id']
+ r = self.patch('/roles/%(role_id)s' % {
+ 'role_id': self.role_id},
+ body={'role': ref})
+ self.assertValidRoleResponse(r, ref)
+
+ def test_delete_role(self):
+ """Call ``DELETE /roles/{role_id}``."""
+ self.delete('/roles/%(role_id)s' % {
+ 'role_id': self.role_id})
+
+ # Role Grants tests
+
+ def test_crud_user_project_role_grants(self):
+ collection_url = (
+ '/projects/%(project_id)s/users/%(user_id)s/roles' % {
+ 'project_id': self.project['id'],
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ resource_url=collection_url)
+
+ # FIXME(gyee): this test is no longer valid as user
+ # have no role in the project. Can't get a scoped token
+ # self.delete(member_url)
+ # r = self.get(collection_url)
+ # self.assertValidRoleListResponse(r, expected_length=0)
+ # self.assertIn(collection_url, r.result['links']['self'])
+
+ def test_crud_user_project_role_grants_no_user(self):
+ """Grant role on a project to a user that doesn't exist, 404 result.
+
+ When grant a role on a project to a user that doesn't exist, the server
+ returns 404 Not Found for the user.
+
+ """
+
+ user_id = uuid.uuid4().hex
+
+ collection_url = (
+ '/projects/%(project_id)s/users/%(user_id)s/roles' % {
+ 'project_id': self.project['id'], 'user_id': user_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url, expected_status=404)
+
+ def test_crud_user_domain_role_grants(self):
+ collection_url = (
+ '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ resource_url=collection_url)
+
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, expected_length=0,
+ resource_url=collection_url)
+
+ def test_crud_user_domain_role_grants_no_user(self):
+ """Grant role on a domain to a user that doesn't exist, 404 result.
+
+ When grant a role on a domain to a user that doesn't exist, the server
+ returns 404 Not Found for the user.
+
+ """
+
+ user_id = uuid.uuid4().hex
+
+ collection_url = (
+ '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id, 'user_id': user_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url, expected_status=404)
+
+ def test_crud_group_project_role_grants(self):
+ collection_url = (
+ '/projects/%(project_id)s/groups/%(group_id)s/roles' % {
+ 'project_id': self.project_id,
+ 'group_id': self.group_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ resource_url=collection_url)
+
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, expected_length=0,
+ resource_url=collection_url)
+
+ def test_crud_group_project_role_grants_no_group(self):
+ """Grant role on a project to a group that doesn't exist, 404 result.
+
+ When grant a role on a project to a group that doesn't exist, the
+ server returns 404 Not Found for the group.
+
+ """
+
+ group_id = uuid.uuid4().hex
+
+ collection_url = (
+ '/projects/%(project_id)s/groups/%(group_id)s/roles' % {
+ 'project_id': self.project_id,
+ 'group_id': group_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url, expected_status=404)
+
+ def test_crud_group_domain_role_grants(self):
+ collection_url = (
+ '/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'group_id': self.group_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ resource_url=collection_url)
+
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, expected_length=0,
+ resource_url=collection_url)
+
+ def test_crud_group_domain_role_grants_no_group(self):
+ """Grant role on a domain to a group that doesn't exist, 404 result.
+
+ When grant a role on a domain to a group that doesn't exist, the server
+ returns 404 Not Found for the group.
+
+ """
+
+ group_id = uuid.uuid4().hex
+
+ collection_url = (
+ '/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'group_id': group_id})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ self.put(member_url, expected_status=404)
+
+ def _create_new_user_and_assign_role_on_project(self):
+ """Create a new user and assign user a role on a project."""
+ # Create a new user
+ new_user = self.new_user_ref(domain_id=self.domain_id)
+ user_ref = self.identity_api.create_user(new_user)
+ # Assign the user a role on the project
+ collection_url = (
+ '/projects/%(project_id)s/users/%(user_id)s/roles' % {
+ 'project_id': self.project_id,
+ 'user_id': user_ref['id']})
+ member_url = ('%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id})
+ self.put(member_url, expected_status=204)
+ # Check the user has the role assigned
+ self.head(member_url, expected_status=204)
+ return member_url, user_ref
+
+ def test_delete_user_before_removing_role_assignment_succeeds(self):
+ """Call ``DELETE`` on the user before the role assignment."""
+ member_url, user = self._create_new_user_and_assign_role_on_project()
+ # Delete the user from identity backend
+ self.identity_api.driver.delete_user(user['id'])
+ # Clean up the role assignment
+ self.delete(member_url, expected_status=204)
+ # Make sure the role is gone
+ self.head(member_url, expected_status=404)
+
+ def test_delete_user_and_check_role_assignment_fails(self):
+ """Call ``DELETE`` on the user and check the role assignment."""
+ member_url, user = self._create_new_user_and_assign_role_on_project()
+ # Delete the user from identity backend
+ self.identity_api.delete_user(user['id'])
+ # We should get a 404 when looking for the user in the identity
+ # backend because we're not performing a delete operation on the role.
+ self.head(member_url, expected_status=404)
+
+ def test_token_revoked_once_group_role_grant_revoked(self):
+ """Test token is revoked when group role grant is revoked
+
+ When a role granted to a group is revoked for a given scope,
+ all tokens related to this scope and belonging to one of the members
+ of this group should be revoked.
+
+ The revocation should be independently to the presence
+ of the revoke API.
+ """
+ # creates grant from group on project.
+ self.assignment_api.create_grant(role_id=self.role['id'],
+ project_id=self.project['id'],
+ group_id=self.group['id'])
+
+ # adds user to the group.
+ self.identity_api.add_user_to_group(user_id=self.user['id'],
+ group_id=self.group['id'])
+
+ # creates a token for the user
+ auth_body = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id'])
+ token_resp = self.post('/auth/tokens', body=auth_body)
+ token = token_resp.headers.get('x-subject-token')
+
+ # validates the returned token; it should be valid.
+ self.head('/auth/tokens',
+ headers={'x-subject-token': token},
+ expected_status=200)
+
+ # revokes the grant from group on project.
+ self.assignment_api.delete_grant(role_id=self.role['id'],
+ project_id=self.project['id'],
+ group_id=self.group['id'])
+
+ # validates the same token again; it should not longer be valid.
+ self.head('/auth/tokens',
+ headers={'x-subject-token': token},
+ expected_status=404)
+
+ # Role Assignments tests
+
+ def test_get_role_assignments(self):
+ """Call ``GET /role_assignments``.
+
+ The sample data set up already has a user, group and project
+ that is part of self.domain. We use these plus a new user
+ we create as our data set, making sure we ignore any
+ role assignments that are already in existence.
+
+ Since we don't yet support a first class entity for role
+ assignments, we are only testing the LIST API. To create
+ and delete the role assignments we use the old grant APIs.
+
+ Test Plan:
+
+ - Create extra user for tests
+ - Get a list of all existing role assignments
+ - Add a new assignment for each of the four combinations, i.e.
+ group+domain, user+domain, group+project, user+project, using
+ the same role each time
+ - Get a new list of all role assignments, checking these four new
+ ones have been added
+ - Then delete the four we added
+ - Get a new list of all role assignments, checking the four have
+ been removed
+
+ """
+
+ # Since the default fixtures already assign some roles to the
+ # user it creates, we also need a new user that will not have any
+ # existing assignments
+ self.user1 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ self.user1 = self.identity_api.create_user(self.user1)
+
+ collection_url = '/role_assignments'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+ existing_assignments = len(r.result.get('role_assignments'))
+
+ # Now add one of each of the four types of assignment, making sure
+ # that we get them all back.
+ gd_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ group_id=self.group_id,
+ role_id=self.role_id)
+ self.put(gd_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 1,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ ud_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ user_id=self.user1['id'],
+ role_id=self.role_id)
+ self.put(ud_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+
+ gp_entity = _build_role_assignment_entity(project_id=self.project_id,
+ group_id=self.group_id,
+ role_id=self.role_id)
+ self.put(gp_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 3,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gp_entity)
+
+ up_entity = _build_role_assignment_entity(project_id=self.project_id,
+ user_id=self.user1['id'],
+ role_id=self.role_id)
+ self.put(up_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 4,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+
+ # Now delete the four we added and make sure they are removed
+ # from the collection.
+
+ self.delete(gd_entity['links']['assignment'])
+ self.delete(ud_entity['links']['assignment'])
+ self.delete(gp_entity['links']['assignment'])
+ self.delete(up_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments,
+ resource_url=collection_url)
+ self.assertRoleAssignmentNotInListResponse(r, gd_entity)
+ self.assertRoleAssignmentNotInListResponse(r, ud_entity)
+ self.assertRoleAssignmentNotInListResponse(r, gp_entity)
+ self.assertRoleAssignmentNotInListResponse(r, up_entity)
+
+ def test_get_effective_role_assignments(self):
+ """Call ``GET /role_assignments?effective``.
+
+ Test Plan:
+
+ - Create two extra user for tests
+ - Add these users to a group
+ - Add a role assignment for the group on a domain
+ - Get a list of all role assignments, checking one has been added
+ - Then get a list of all effective role assignments - the group
+ assignment should have turned into assignments on the domain
+ for each of the group members.
+
+ """
+ self.user1 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user1['password']
+ self.user1 = self.identity_api.create_user(self.user1)
+ self.user1['password'] = password
+ self.user2 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user2['password']
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+ self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
+ self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
+
+ collection_url = '/role_assignments'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+ existing_assignments = len(r.result.get('role_assignments'))
+
+ gd_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ group_id=self.group_id,
+ role_id=self.role_id)
+ self.put(gd_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 1,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ # Now re-read the collection asking for effective roles - this
+ # should mean the group assignment is translated into the two
+ # member user assignments
+ collection_url = '/role_assignments?effective'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 2,
+ resource_url=collection_url)
+ ud_entity = _build_role_assignment_entity(
+ link=gd_entity['links']['assignment'], domain_id=self.domain_id,
+ user_id=self.user1['id'], role_id=self.role_id)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+ ud_entity = _build_role_assignment_entity(
+ link=gd_entity['links']['assignment'], domain_id=self.domain_id,
+ user_id=self.user2['id'], role_id=self.role_id)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+
+ def test_check_effective_values_for_role_assignments(self):
+ """Call ``GET /role_assignments?effective=value``.
+
+ Check the various ways of specifying the 'effective'
+ query parameter. If the 'effective' query parameter
+ is included then this should always be treated as meaning 'True'
+ unless it is specified as:
+
+ {url}?effective=0
+
+ This is by design to match the agreed way of handling
+ policy checking on query/filter parameters.
+
+ Test Plan:
+
+ - Create two extra user for tests
+ - Add these users to a group
+ - Add a role assignment for the group on a domain
+ - Get a list of all role assignments, checking one has been added
+ - Then issue various request with different ways of defining
+ the 'effective' query parameter. As we have tested the
+ correctness of the data coming back when we get effective roles
+ in other tests, here we just use the count of entities to
+ know if we are getting effective roles or not
+
+ """
+ self.user1 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user1['password']
+ self.user1 = self.identity_api.create_user(self.user1)
+ self.user1['password'] = password
+ self.user2 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user2['password']
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+ self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
+ self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
+
+ collection_url = '/role_assignments'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+ existing_assignments = len(r.result.get('role_assignments'))
+
+ gd_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ group_id=self.group_id,
+ role_id=self.role_id)
+ self.put(gd_entity['links']['assignment'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 1,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ # Now re-read the collection asking for effective roles,
+ # using the most common way of defining "effective'. This
+ # should mean the group assignment is translated into the two
+ # member user assignments
+ collection_url = '/role_assignments?effective'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 2,
+ resource_url=collection_url)
+ # Now set 'effective' to false explicitly - should get
+ # back the regular roles
+ collection_url = '/role_assignments?effective=0'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 1,
+ resource_url=collection_url)
+ # Now try setting 'effective' to 'False' explicitly- this is
+ # NOT supported as a way of setting a query or filter
+ # parameter to false by design. Hence we should get back
+ # effective roles.
+ collection_url = '/role_assignments?effective=False'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 2,
+ resource_url=collection_url)
+ # Now set 'effective' to True explicitly
+ collection_url = '/role_assignments?effective=True'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r,
+ expected_length=existing_assignments + 2,
+ resource_url=collection_url)
+
+ def test_filtered_role_assignments(self):
+ """Call ``GET /role_assignments?filters``.
+
+ Test Plan:
+
+ - Create extra users, group, role and project for tests
+ - Make the following assignments:
+ Give group1, role1 on project1 and domain
+ Give user1, role2 on project1 and domain
+ Make User1 a member of Group1
+ - Test a series of single filter list calls, checking that
+ the correct results are obtained
+ - Test a multi-filtered list call
+ - Test listing all effective roles for a given user
+ - Test the equivalent of the list of roles in a project scoped
+ token (all effective roles for a user on a project)
+
+ """
+
+ # Since the default fixtures already assign some roles to the
+ # user it creates, we also need a new user that will not have any
+ # existing assignments
+ self.user1 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user1['password']
+ self.user1 = self.identity_api.create_user(self.user1)
+ self.user1['password'] = password
+ self.user2 = self.new_user_ref(
+ domain_id=self.domain['id'])
+ password = self.user2['password']
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+ self.group1 = self.new_group_ref(
+ domain_id=self.domain['id'])
+ self.group1 = self.identity_api.create_group(self.group1)
+ self.identity_api.add_user_to_group(self.user1['id'],
+ self.group1['id'])
+ self.identity_api.add_user_to_group(self.user2['id'],
+ self.group1['id'])
+ self.project1 = self.new_project_ref(
+ domain_id=self.domain['id'])
+ self.resource_api.create_project(self.project1['id'], self.project1)
+ self.role1 = self.new_role_ref()
+ self.role_api.create_role(self.role1['id'], self.role1)
+ self.role2 = self.new_role_ref()
+ self.role_api.create_role(self.role2['id'], self.role2)
+
+ # Now add one of each of the four types of assignment
+
+ gd_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ group_id=self.group1['id'],
+ role_id=self.role1['id'])
+ self.put(gd_entity['links']['assignment'])
+
+ ud_entity = _build_role_assignment_entity(domain_id=self.domain_id,
+ user_id=self.user1['id'],
+ role_id=self.role2['id'])
+ self.put(ud_entity['links']['assignment'])
+
+ gp_entity = _build_role_assignment_entity(
+ project_id=self.project1['id'], group_id=self.group1['id'],
+ role_id=self.role1['id'])
+ self.put(gp_entity['links']['assignment'])
+
+ up_entity = _build_role_assignment_entity(
+ project_id=self.project1['id'], user_id=self.user1['id'],
+ role_id=self.role2['id'])
+ self.put(up_entity['links']['assignment'])
+
+ # Now list by various filters to make sure we get back the right ones
+
+ collection_url = ('/role_assignments?scope.project.id=%s' %
+ self.project1['id'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+ self.assertRoleAssignmentInListResponse(r, gp_entity)
+
+ collection_url = ('/role_assignments?scope.domain.id=%s' %
+ self.domain['id'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ collection_url = '/role_assignments?user.id=%s' % self.user1['id']
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+
+ collection_url = '/role_assignments?group.id=%s' % self.group1['id']
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+ self.assertRoleAssignmentInListResponse(r, gp_entity)
+
+ collection_url = '/role_assignments?role.id=%s' % self.role1['id']
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+ self.assertRoleAssignmentInListResponse(r, gp_entity)
+
+ # Let's try combining two filers together....
+
+ collection_url = (
+ '/role_assignments?user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': self.user1['id'],
+ 'project_id': self.project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=1,
+ resource_url=collection_url)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+
+ # Now for a harder one - filter for user with effective
+ # roles - this should return role assignment that were directly
+ # assigned as well as by virtue of group membership
+
+ collection_url = ('/role_assignments?effective&user.id=%s' %
+ self.user1['id'])
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=4,
+ resource_url=collection_url)
+ # Should have the two direct roles...
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+ # ...and the two via group membership...
+ gp1_link = _build_role_assignment_link(project_id=self.project1['id'],
+ group_id=self.group1['id'],
+ role_id=self.role1['id'])
+ gd1_link = _build_role_assignment_link(domain_id=self.domain_id,
+ group_id=self.group1['id'],
+ role_id=self.role1['id'])
+
+ up1_entity = _build_role_assignment_entity(
+ link=gp1_link, project_id=self.project1['id'],
+ user_id=self.user1['id'], role_id=self.role1['id'])
+ ud1_entity = _build_role_assignment_entity(
+ link=gd1_link, domain_id=self.domain_id, user_id=self.user1['id'],
+ role_id=self.role1['id'])
+ self.assertRoleAssignmentInListResponse(r, up1_entity)
+ self.assertRoleAssignmentInListResponse(r, ud1_entity)
+
+ # ...and for the grand-daddy of them all, simulate the request
+ # that would generate the list of effective roles in a project
+ # scoped token.
+
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': self.user1['id'],
+ 'project_id': self.project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ # Should have one direct role and one from group membership...
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+ self.assertRoleAssignmentInListResponse(r, up1_entity)
+
+
+class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase):
+ """Base class for testing /v3/role_assignments API behavior."""
+
+ MAX_HIERARCHY_BREADTH = 3
+ MAX_HIERARCHY_DEPTH = CONF.max_project_tree_depth - 1
+
+ def load_sample_data(self):
+ """Creates sample data to be used on tests.
+
+ Created data are i) a role and ii) a domain containing: a project
+ hierarchy and 3 users within 3 groups.
+
+ """
+ def create_project_hierarchy(parent_id, depth):
+ "Creates a random project hierarchy."
+ if depth == 0:
+ return
+
+ breadth = random.randint(1, self.MAX_HIERARCHY_BREADTH)
+
+ subprojects = []
+ for i in range(breadth):
+ subprojects.append(self.new_project_ref(
+ domain_id=self.domain_id, parent_id=parent_id))
+ self.assignment_api.create_project(subprojects[-1]['id'],
+ subprojects[-1])
+
+ new_parent = subprojects[random.randint(0, breadth - 1)]
+ create_project_hierarchy(new_parent['id'], depth - 1)
+
+ super(RoleAssignmentBaseTestCase, self).load_sample_data()
+
+ # Create a domain
+ self.domain = self.new_domain_ref()
+ self.domain_id = self.domain['id']
+ self.assignment_api.create_domain(self.domain_id, self.domain)
+
+ # Create a project hierarchy
+ self.project = self.new_project_ref(domain_id=self.domain_id)
+ self.project_id = self.project['id']
+ self.assignment_api.create_project(self.project_id, self.project)
+
+ # Create a random project hierarchy
+ create_project_hierarchy(self.project_id,
+ random.randint(1, self.MAX_HIERARCHY_DEPTH))
+
+ # Create 3 users
+ self.user_ids = []
+ for i in range(3):
+ user = self.new_user_ref(domain_id=self.domain_id)
+ user = self.identity_api.create_user(user)
+ self.user_ids.append(user['id'])
+
+ # Create 3 groups
+ self.group_ids = []
+ for i in range(3):
+ group = self.new_group_ref(domain_id=self.domain_id)
+ group = self.identity_api.create_group(group)
+ self.group_ids.append(group['id'])
+
+ # Put 2 members on each group
+ self.identity_api.add_user_to_group(user_id=self.user_ids[i],
+ group_id=group['id'])
+ self.identity_api.add_user_to_group(user_id=self.user_ids[i % 2],
+ group_id=group['id'])
+
+ self.assignment_api.create_grant(user_id=self.user_id,
+ project_id=self.project_id,
+ role_id=self.role_id)
+
+ # Create a role
+ self.role = self.new_role_ref()
+ self.role_id = self.role['id']
+ self.assignment_api.create_role(self.role_id, self.role)
+
+ # Set default user and group to be used on tests
+ self.default_user_id = self.user_ids[0]
+ self.default_group_id = self.group_ids[0]
+
+ def get_role_assignments(self, expected_status=200, **filters):
+ """Returns the result from querying role assignment API + queried URL.
+
+ Calls GET /v3/role_assignments?<params> and returns its result, where
+ <params> is the HTTP query parameters form of effective option plus
+ filters, if provided. Queried URL is returned as well.
+
+ :returns: a tuple containing the list role assignments API response and
+ queried URL.
+
+ """
+
+ query_url = self._get_role_assignments_query_url(**filters)
+ response = self.get(query_url, expected_status=expected_status)
+
+ return (response, query_url)
+
+ def _get_role_assignments_query_url(self, **filters):
+ """Returns non-effective role assignments query URL from given filters.
+
+ :param filters: query parameters are created with the provided filters
+ on role assignments attributes. Valid filters are:
+ role_id, domain_id, project_id, group_id, user_id and
+ inherited_to_projects.
+
+ :returns: role assignments query URL.
+
+ """
+ return _build_role_assignment_query_url(**filters)
+
+
+class RoleAssignmentFailureTestCase(RoleAssignmentBaseTestCase):
+ """Class for testing invalid query params on /v3/role_assignments API.
+
+ Querying domain and project, or user and group results in a HTTP 400, since
+ a role assignment must contain only a single pair of (actor, target). In
+ addition, since filtering on role assignments applies only to the final
+ result, effective mode cannot be combined with i) group or ii) domain and
+ inherited, because it would always result in an empty list.
+
+ """
+
+ def test_get_role_assignments_by_domain_and_project(self):
+ self.get_role_assignments(domain_id=self.domain_id,
+ project_id=self.project_id,
+ expected_status=400)
+
+ def test_get_role_assignments_by_user_and_group(self):
+ self.get_role_assignments(user_id=self.default_user_id,
+ group_id=self.default_group_id,
+ expected_status=400)
+
+ def test_get_role_assignments_by_effective_and_inherited(self):
+ self.config_fixture.config(group='os_inherit', enabled=True)
+
+ self.get_role_assignments(domain_id=self.domain_id, effective=True,
+ inherited_to_projects=True,
+ expected_status=400)
+
+ def test_get_role_assignments_by_effective_and_group(self):
+ self.get_role_assignments(effective=True,
+ group_id=self.default_group_id,
+ expected_status=400)
+
+
+class RoleAssignmentDirectTestCase(RoleAssignmentBaseTestCase):
+ """Class for testing direct assignments on /v3/role_assignments API.
+
+ Direct assignments on a domain or project have effect on them directly,
+ instead of on their project hierarchy, i.e they are non-inherited. In
+ addition, group direct assignments are not expanded to group's users.
+
+ Tests on this class make assertions on the representation and API filtering
+ of direct assignments.
+
+ """
+
+ def _test_get_role_assignments(self, **filters):
+ """Generic filtering test method.
+
+ According to the provided filters, this method:
+ - creates a new role assignment;
+ - asserts that list role assignments API reponds correctly;
+ - deletes the created role assignment.
+
+ :param filters: filters to be considered when listing role assignments.
+ Valid filters are: role_id, domain_id, project_id,
+ group_id, user_id and inherited_to_projects.
+
+ """
+
+ # Fills default assignment with provided filters
+ test_assignment = self._set_default_assignment_attributes(**filters)
+
+ # Create new role assignment for this test
+ self.assignment_api.create_grant(**test_assignment)
+
+ # Get expected role assignments
+ expected_assignments = self._list_expected_role_assignments(
+ **test_assignment)
+
+ # Get role assignments from API
+ response, query_url = self.get_role_assignments(**test_assignment)
+ self.assertValidRoleAssignmentListResponse(response,
+ resource_url=query_url)
+ self.assertEqual(len(expected_assignments),
+ len(response.result.get('role_assignments')))
+
+ # Assert that expected role assignments were returned by the API call
+ for assignment in expected_assignments:
+ self.assertRoleAssignmentInListResponse(response, assignment)
+
+ # Delete created role assignment
+ self.assignment_api.delete_grant(**test_assignment)
+
+ def _set_default_assignment_attributes(self, **attribs):
+ """Inserts default values for missing attributes of role assignment.
+
+ If no actor, target or role are provided, they will default to values
+ from sample data.
+
+ :param attribs: info from a role assignment entity. Valid attributes
+ are: role_id, domain_id, project_id, group_id, user_id
+ and inherited_to_projects.
+
+ """
+ if not any(target in attribs
+ for target in ('domain_id', 'projects_id')):
+ attribs['project_id'] = self.project_id
+
+ if not any(actor in attribs for actor in ('user_id', 'group_id')):
+ attribs['user_id'] = self.default_user_id
+
+ if 'role_id' not in attribs:
+ attribs['role_id'] = self.role_id
+
+ return attribs
+
+ def _list_expected_role_assignments(self, **filters):
+ """Given the filters, it returns expected direct role assignments.
+
+ :param filters: filters that will be considered when listing role
+ assignments. Valid filters are: role_id, domain_id,
+ project_id, group_id, user_id and
+ inherited_to_projects.
+
+ :returns: the list of the expected role assignments.
+
+ """
+ return [_build_role_assignment_entity(**filters)]
+
+ # Test cases below call the generic test method, providing different filter
+ # combinations. Filters are provided as specified in the method name, after
+ # 'by'. For example, test_get_role_assignments_by_project_user_and_role
+ # calls the generic test method with project_id, user_id and role_id.
+
+ def test_get_role_assignments_by_domain(self, **filters):
+ self._test_get_role_assignments(domain_id=self.domain_id, **filters)
+
+ def test_get_role_assignments_by_project(self, **filters):
+ self._test_get_role_assignments(project_id=self.project_id, **filters)
+
+ def test_get_role_assignments_by_user(self, **filters):
+ self._test_get_role_assignments(user_id=self.default_user_id,
+ **filters)
+
+ def test_get_role_assignments_by_group(self, **filters):
+ self._test_get_role_assignments(group_id=self.default_group_id,
+ **filters)
+
+ def test_get_role_assignments_by_role(self, **filters):
+ self._test_get_role_assignments(role_id=self.role_id, **filters)
+
+ def test_get_role_assignments_by_domain_and_user(self, **filters):
+ self.test_get_role_assignments_by_domain(user_id=self.default_user_id,
+ **filters)
+
+ def test_get_role_assignments_by_domain_and_group(self, **filters):
+ self.test_get_role_assignments_by_domain(
+ group_id=self.default_group_id, **filters)
+
+ def test_get_role_assignments_by_project_and_user(self, **filters):
+ self.test_get_role_assignments_by_project(user_id=self.default_user_id,
+ **filters)
+
+ def test_get_role_assignments_by_project_and_group(self, **filters):
+ self.test_get_role_assignments_by_project(
+ group_id=self.default_group_id, **filters)
+
+ def test_get_role_assignments_by_domain_user_and_role(self, **filters):
+ self.test_get_role_assignments_by_domain_and_user(role_id=self.role_id,
+ **filters)
+
+ def test_get_role_assignments_by_domain_group_and_role(self, **filters):
+ self.test_get_role_assignments_by_domain_and_group(
+ role_id=self.role_id, **filters)
+
+ def test_get_role_assignments_by_project_user_and_role(self, **filters):
+ self.test_get_role_assignments_by_project_and_user(
+ role_id=self.role_id, **filters)
+
+ def test_get_role_assignments_by_project_group_and_role(self, **filters):
+ self.test_get_role_assignments_by_project_and_group(
+ role_id=self.role_id, **filters)
+
+
+class RoleAssignmentInheritedTestCase(RoleAssignmentDirectTestCase):
+ """Class for testing inherited assignments on /v3/role_assignments API.
+
+ Inherited assignments on a domain or project have no effect on them
+ directly, but on the projects under them instead.
+
+ Tests on this class do not make assertions on the effect of inherited
+ assignments, but in their representation and API filtering.
+
+ """
+
+ def config_overrides(self):
+ super(RoleAssignmentBaseTestCase, self).config_overrides()
+ self.config_fixture.config(group='os_inherit', enabled=True)
+
+ def _test_get_role_assignments(self, **filters):
+ """Adds inherited_to_project filter to expected entity in tests."""
+ super(RoleAssignmentInheritedTestCase,
+ self)._test_get_role_assignments(inherited_to_projects=True,
+ **filters)
+
+
+class RoleAssignmentEffectiveTestCase(RoleAssignmentInheritedTestCase):
+ """Class for testing inheritance effects on /v3/role_assignments API.
+
+ Inherited assignments on a domain or project have no effect on them
+ directly, but on the projects under them instead.
+
+ Tests on this class make assertions on the effect of inherited assignments
+ and API filtering.
+
+ """
+
+ def _get_role_assignments_query_url(self, **filters):
+ """Returns effective role assignments query URL from given filters.
+
+ For test methods in this class, effetive will always be true. As in
+ effective mode, inherited_to_projects, group_id, domain_id and
+ project_id will always be desconsidered from provided filters.
+
+ :param filters: query parameters are created with the provided filters.
+ Valid filters are: role_id, domain_id, project_id,
+ group_id, user_id and inherited_to_projects.
+
+ :returns: role assignments query URL.
+
+ """
+ query_filters = filters.copy()
+ query_filters.pop('inherited_to_projects')
+
+ query_filters.pop('group_id', None)
+ query_filters.pop('domain_id', None)
+ query_filters.pop('project_id', None)
+
+ return _build_role_assignment_query_url(effective=True,
+ **query_filters)
+
+ def _list_expected_role_assignments(self, **filters):
+ """Given the filters, it returns expected direct role assignments.
+
+ :param filters: filters that will be considered when listing role
+ assignments. Valid filters are: role_id, domain_id,
+ project_id, group_id, user_id and
+ inherited_to_projects.
+
+ :returns: the list of the expected role assignments.
+
+ """
+ # Get assignment link, to be put on 'links': {'assignment': link}
+ assignment_link = _build_role_assignment_link(**filters)
+
+ # Expand group membership
+ user_ids = [None]
+ if filters.get('group_id'):
+ user_ids = [user['id'] for user in
+ self.identity_api.list_users_in_group(
+ filters['group_id'])]
+ else:
+ user_ids = [self.default_user_id]
+
+ # Expand role inheritance
+ project_ids = [None]
+ if filters.get('domain_id'):
+ project_ids = [project['id'] for project in
+ self.assignment_api.list_projects_in_domain(
+ filters.pop('domain_id'))]
+ else:
+ project_ids = [project['id'] for project in
+ self.assignment_api.list_projects_in_subtree(
+ self.project_id)]
+
+ # Compute expected role assignments
+ assignments = []
+ for project_id in project_ids:
+ filters['project_id'] = project_id
+ for user_id in user_ids:
+ filters['user_id'] = user_id
+ assignments.append(_build_role_assignment_entity(
+ link=assignment_link, **filters))
+
+ return assignments
+
+
+class AssignmentInheritanceTestCase(test_v3.RestfulTestCase):
+ """Test inheritance crud and its effects."""
+
+ def config_overrides(self):
+ super(AssignmentInheritanceTestCase, self).config_overrides()
+ self.config_fixture.config(group='os_inherit', enabled=True)
+
+ def test_get_token_from_inherited_user_domain_role_grants(self):
+ # Create a new user to ensure that no grant is loaded from sample data
+ user = self.new_user_ref(domain_id=self.domain_id)
+ password = user['password']
+ user = self.identity_api.create_user(user)
+ user['password'] = password
+
+ # Define domain and project authentication data
+ domain_auth_data = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password'],
+ domain_id=self.domain_id)
+ project_auth_data = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password'],
+ project_id=self.project_id)
+
+ # Check the user cannot get a domain nor a project token
+ self.v3_authenticate_token(domain_auth_data, expected_status=401)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Grant non-inherited role for user on domain
+ non_inher_ud_link = _build_role_assignment_link(
+ domain_id=self.domain_id, user_id=user['id'], role_id=self.role_id)
+ self.put(non_inher_ud_link)
+
+ # Check the user can get only a domain token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Create inherited role
+ inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ self.role_api.create_role(inherited_role['id'], inherited_role)
+
+ # Grant inherited role for user on domain
+ inher_ud_link = _build_role_assignment_link(
+ domain_id=self.domain_id, user_id=user['id'],
+ role_id=inherited_role['id'], inherited_to_projects=True)
+ self.put(inher_ud_link)
+
+ # Check the user can get both a domain and a project token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data)
+
+ # Delete inherited grant
+ self.delete(inher_ud_link)
+
+ # Check the user can only get a domain token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Delete non-inherited grant
+ self.delete(non_inher_ud_link)
+
+ # Check the user cannot get a domain token anymore
+ self.v3_authenticate_token(domain_auth_data, expected_status=401)
+
+ def test_get_token_from_inherited_group_domain_role_grants(self):
+ # Create a new group and put a new user in it to
+ # ensure that no grant is loaded from sample data
+ user = self.new_user_ref(domain_id=self.domain_id)
+ password = user['password']
+ user = self.identity_api.create_user(user)
+ user['password'] = password
+
+ group = self.new_group_ref(domain_id=self.domain['id'])
+ group = self.identity_api.create_group(group)
+ self.identity_api.add_user_to_group(user['id'], group['id'])
+
+ # Define domain and project authentication data
+ domain_auth_data = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password'],
+ domain_id=self.domain_id)
+ project_auth_data = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password'],
+ project_id=self.project_id)
+
+ # Check the user cannot get a domain nor a project token
+ self.v3_authenticate_token(domain_auth_data, expected_status=401)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Grant non-inherited role for user on domain
+ non_inher_gd_link = _build_role_assignment_link(
+ domain_id=self.domain_id, user_id=user['id'], role_id=self.role_id)
+ self.put(non_inher_gd_link)
+
+ # Check the user can get only a domain token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Create inherited role
+ inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ self.role_api.create_role(inherited_role['id'], inherited_role)
+
+ # Grant inherited role for user on domain
+ inher_gd_link = _build_role_assignment_link(
+ domain_id=self.domain_id, user_id=user['id'],
+ role_id=inherited_role['id'], inherited_to_projects=True)
+ self.put(inher_gd_link)
+
+ # Check the user can get both a domain and a project token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data)
+
+ # Delete inherited grant
+ self.delete(inher_gd_link)
+
+ # Check the user can only get a domain token
+ self.v3_authenticate_token(domain_auth_data)
+ self.v3_authenticate_token(project_auth_data, expected_status=401)
+
+ # Delete non-inherited grant
+ self.delete(non_inher_gd_link)
+
+ # Check the user cannot get a domain token anymore
+ self.v3_authenticate_token(domain_auth_data, expected_status=401)
+
+ def test_crud_user_inherited_domain_role_grants(self):
+ role_list = []
+ for _ in range(2):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ # Create a non-inherited role as a spoiler
+ self.assignment_api.create_grant(
+ role_list[1]['id'], user_id=self.user['id'],
+ domain_id=self.domain_id)
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[0]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+
+ # Check we can read it back
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[0],
+ resource_url=collection_url)
+
+ # Now delete and check its gone
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, expected_length=0,
+ resource_url=collection_url)
+
+ def test_list_role_assignments_for_inherited_domain_grants(self):
+ """Call ``GET /role_assignments with inherited domain grants``.
+
+ Test Plan:
+
+ - Create 4 roles
+ - Create a domain with a user and two projects
+ - Assign two direct roles to project1
+ - Assign a spoiler role to project2
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check it is indeed on the domain
+ - Issue the URL to check effective roles on project1 - this
+ should return 3 roles.
+
+ """
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ password = user1['password']
+ user1 = self.identity_api.create_user(user1)
+ user1['password'] = password
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3],
+ resource_url=collection_url)
+
+ # Now use the list domain role assignments api to check if this
+ # is included
+ collection_url = (
+ '/role_assignments?user.id=%(user_id)s'
+ '&scope.domain.id=%(domain_id)s' % {
+ 'user_id': user1['id'],
+ 'domain_id': domain['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=1,
+ resource_url=collection_url)
+ ud_entity = _build_role_assignment_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+
+ # Now ask for effective list role assignments - the role should
+ # turn into a project role, along with the two direct roles that are
+ # on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=3,
+ resource_url=collection_url)
+ # An effective role for an inherited role will be a project
+ # entity, with a domain link to the inherited assignment
+ ud_url = _build_role_assignment_link(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ up_entity = _build_role_assignment_entity(link=ud_url,
+ project_id=project1['id'],
+ user_id=user1['id'],
+ role_id=role_list[3]['id'],
+ inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+
+ def test_list_role_assignments_for_disabled_inheritance_extension(self):
+ """Call ``GET /role_assignments with inherited domain grants``.
+
+ Test Plan:
+
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check effective roles on project include the
+ inherited role
+ - Disable the extension
+ - Re-check the effective roles, proving the inherited role no longer
+ shows up.
+
+ """
+
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ password = user1['password']
+ user1 = self.identity_api.create_user(user1)
+ user1['password'] = password
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3],
+ resource_url=collection_url)
+
+ # Get effective list role assignments - the role should
+ # turn into a project role, along with the two direct roles that are
+ # on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=3,
+ resource_url=collection_url)
+
+ ud_url = _build_role_assignment_link(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ up_entity = _build_role_assignment_entity(link=ud_url,
+ project_id=project1['id'],
+ user_id=user1['id'],
+ role_id=role_list[3]['id'],
+ inherited_to_projects=True)
+
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+
+ # Disable the extension and re-check the list, the role inherited
+ # from the project should no longer show up
+ self.config_fixture.config(group='os_inherit', enabled=False)
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+
+ self.assertRoleAssignmentNotInListResponse(r, up_entity)
+
+ def test_list_role_assignments_for_inherited_group_domain_grants(self):
+ """Call ``GET /role_assignments with inherited group domain grants``.
+
+ Test Plan:
+
+ - Create 4 roles
+ - Create a domain with a user and two projects
+ - Assign two direct roles to project1
+ - Assign a spoiler role to project2
+ - Issue the URL to add inherited role to the domain
+ - Issue the URL to check it is indeed on the domain
+ - Issue the URL to check effective roles on project1 - this
+ should return 3 roles.
+
+ """
+ role_list = []
+ for _ in range(4):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ password = user1['password']
+ user1 = self.identity_api.create_user(user1)
+ user1['password'] = password
+ user2 = self.new_user_ref(
+ domain_id=domain['id'])
+ password = user2['password']
+ user2 = self.identity_api.create_user(user2)
+ user2['password'] = password
+ group1 = self.new_group_ref(
+ domain_id=domain['id'])
+ group1 = self.identity_api.create_group(group1)
+ self.identity_api.add_user_to_group(user1['id'],
+ group1['id'])
+ self.identity_api.add_user_to_group(user2['id'],
+ group1['id'])
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ # Add some roles to the project
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[1]['id'])
+ # ..and one on a different project as a spoiler
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[2]['id'])
+
+ # Now create our inherited role on the domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'group_id': group1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3],
+ resource_url=collection_url)
+
+ # Now use the list domain role assignments api to check if this
+ # is included
+ collection_url = (
+ '/role_assignments?group.id=%(group_id)s'
+ '&scope.domain.id=%(domain_id)s' % {
+ 'group_id': group1['id'],
+ 'domain_id': domain['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=1,
+ resource_url=collection_url)
+ gd_entity = _build_role_assignment_entity(
+ domain_id=domain['id'], group_id=group1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ # Now ask for effective list role assignments - the role should
+ # turn into a user project role, along with the two direct roles
+ # that are on the project
+ collection_url = (
+ '/role_assignments?effective&user.id=%(user_id)s'
+ '&scope.project.id=%(project_id)s' % {
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=3,
+ resource_url=collection_url)
+ # An effective role for an inherited role will be a project
+ # entity, with a domain link to the inherited assignment
+ up_entity = _build_role_assignment_entity(
+ link=gd_entity['links']['assignment'], project_id=project1['id'],
+ user_id=user1['id'], role_id=role_list[3]['id'],
+ inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, up_entity)
+
+ def test_filtered_role_assignments_for_inherited_grants(self):
+ """Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``.
+
+ Test Plan:
+
+ - Create 5 roles
+ - Create a domain with a user, group and two projects
+ - Assign three direct spoiler roles to projects
+ - Issue the URL to add an inherited user role to the domain
+ - Issue the URL to add an inherited group role to the domain
+ - Issue the URL to filter by inherited roles - this should
+ return just the 2 inherited roles.
+
+ """
+ role_list = []
+ for _ in range(5):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+ role_list.append(role)
+
+ domain = self.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ user1 = self.new_user_ref(
+ domain_id=domain['id'])
+ password = user1['password']
+ user1 = self.identity_api.create_user(user1)
+ user1['password'] = password
+ group1 = self.new_group_ref(
+ domain_id=domain['id'])
+ group1 = self.identity_api.create_group(group1)
+ project1 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = self.new_project_ref(
+ domain_id=domain['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ # Add some spoiler roles to the projects
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project1['id'], role_list[0]['id'])
+ self.assignment_api.add_role_to_user_and_project(
+ user1['id'], project2['id'], role_list[1]['id'])
+ # Create a non-inherited role as a spoiler
+ self.assignment_api.create_grant(
+ role_list[2]['id'], user_id=user1['id'], domain_id=domain['id'])
+
+ # Now create two inherited roles on the domain, one for a user
+ # and one for a domain
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'user_id': user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[3]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[3],
+ resource_url=collection_url)
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': domain['id'],
+ 'group_id': group1['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role_list[4]['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url)
+ self.head(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=role_list[4],
+ resource_url=collection_url)
+
+ # Now use the list role assignments api to get a list of inherited
+ # roles on the domain - should get back the two roles
+ collection_url = (
+ '/role_assignments?scope.OS-INHERIT:inherited_to=projects')
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ expected_length=2,
+ resource_url=collection_url)
+ ud_entity = _build_role_assignment_entity(
+ domain_id=domain['id'], user_id=user1['id'],
+ role_id=role_list[3]['id'], inherited_to_projects=True)
+ gd_entity = _build_role_assignment_entity(
+ domain_id=domain['id'], group_id=group1['id'],
+ role_id=role_list[4]['id'], inherited_to_projects=True)
+ self.assertRoleAssignmentInListResponse(r, ud_entity)
+ self.assertRoleAssignmentInListResponse(r, gd_entity)
+
+ def _setup_hierarchical_projects_scenario(self):
+ """Creates basic hierarchical projects scenario.
+
+ This basic scenario contains a root with one leaf project and
+ two roles with the following names: non-inherited and inherited.
+
+ """
+ # Create project hierarchy
+ root = self.new_project_ref(domain_id=self.domain['id'])
+ leaf = self.new_project_ref(domain_id=self.domain['id'],
+ parent_id=root['id'])
+
+ self.resource_api.create_project(root['id'], root)
+ self.resource_api.create_project(leaf['id'], leaf)
+
+ # Create 'non-inherited' and 'inherited' roles
+ non_inherited_role = {'id': uuid.uuid4().hex, 'name': 'non-inherited'}
+ self.role_api.create_role(non_inherited_role['id'], non_inherited_role)
+ inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ self.role_api.create_role(inherited_role['id'], inherited_role)
+
+ return (root['id'], leaf['id'],
+ non_inherited_role['id'], inherited_role['id'])
+
+ def test_get_token_from_inherited_user_project_role_grants(self):
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Define root and leaf projects authentication data
+ root_project_auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=root_id)
+ leaf_project_auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=leaf_id)
+
+ # Check the user cannot get a token on root nor leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
+
+ # Grant non-inherited role for user on leaf project
+ non_inher_up_link = _build_role_assignment_link(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_up_link)
+
+ # Check the user can only get a token on leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Grant inherited role for user on root project
+ inher_up_link = _build_role_assignment_link(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_up_link)
+
+ # Check the user still can get a token only on leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Delete non-inherited grant
+ self.delete(non_inher_up_link)
+
+ # Check the inherited role still applies for leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Delete inherited grant
+ self.delete(inher_up_link)
+
+ # Check the user cannot get a token on leaf project anymore
+ self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
+
+ def test_get_token_from_inherited_group_project_role_grants(self):
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Create group and add user to it
+ group = self.new_group_ref(domain_id=self.domain['id'])
+ group = self.identity_api.create_group(group)
+ self.identity_api.add_user_to_group(self.user['id'], group['id'])
+
+ # Define root and leaf projects authentication data
+ root_project_auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=root_id)
+ leaf_project_auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=leaf_id)
+
+ # Check the user cannot get a token on root nor leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
+
+ # Grant non-inherited role for group on leaf project
+ non_inher_gp_link = _build_role_assignment_link(
+ project_id=leaf_id, group_id=group['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_gp_link)
+
+ # Check the user can only get a token on leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Grant inherited role for group on root project
+ inher_gp_link = _build_role_assignment_link(
+ project_id=root_id, group_id=group['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_gp_link)
+
+ # Check the user still can get a token only on leaf project
+ self.v3_authenticate_token(root_project_auth_data, expected_status=401)
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Delete no-inherited grant
+ self.delete(non_inher_gp_link)
+
+ # Check the inherited role still applies for leaf project
+ self.v3_authenticate_token(leaf_project_auth_data)
+
+ # Delete inherited grant
+ self.delete(inher_gp_link)
+
+ # Check the user cannot get a token on leaf project anymore
+ self.v3_authenticate_token(leaf_project_auth_data, expected_status=401)
+
+ def test_get_role_assignments_for_project_hierarchy(self):
+ """Call ``GET /role_assignments``.
+
+ Test Plan:
+
+ - Create 2 roles
+ - Create a hierarchy of projects with one root and one leaf project
+ - Issue the URL to add a non-inherited user role to the root project
+ - Issue the URL to add an inherited user role to the root project
+ - Issue the URL to get all role assignments - this should return just
+ 2 roles (non-inherited and inherited) in the root project.
+
+ """
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Grant non-inherited role
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_up_entity['links']['assignment'])
+
+ # Grant inherited role
+ inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_up_entity['links']['assignment'])
+
+ # Get role assignments
+ collection_url = '/role_assignments'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+
+ # Assert that the user has non-inherited role on root project
+ self.assertRoleAssignmentInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user has inherited role on root project
+ self.assertRoleAssignmentInListResponse(r, inher_up_entity)
+
+ # Assert that the user does not have non-inherited role on leaf project
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.assertRoleAssignmentNotInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user does not have inherited role on leaf project
+ inher_up_entity['scope']['project']['id'] = leaf_id
+ self.assertRoleAssignmentNotInListResponse(r, inher_up_entity)
+
+ def test_get_effective_role_assignments_for_project_hierarchy(self):
+ """Call ``GET /role_assignments?effective``.
+
+ Test Plan:
+
+ - Create 2 roles
+ - Create a hierarchy of projects with one root and one leaf project
+ - Issue the URL to add a non-inherited user role to the root project
+ - Issue the URL to add an inherited user role to the root project
+ - Issue the URL to get effective role assignments - this should return
+ 1 role (non-inherited) on the root project and 1 role (inherited) on
+ the leaf project.
+
+ """
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Grant non-inherited role
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_up_entity['links']['assignment'])
+
+ # Grant inherited role
+ inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_up_entity['links']['assignment'])
+
+ # Get effective role assignments
+ collection_url = '/role_assignments?effective'
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+
+ # Assert that the user has non-inherited role on root project
+ self.assertRoleAssignmentInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user does not have inherited role on root project
+ self.assertRoleAssignmentNotInListResponse(r, inher_up_entity)
+
+ # Assert that the user does not have non-inherited role on leaf project
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.assertRoleAssignmentNotInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user has inherited role on leaf project
+ inher_up_entity['scope']['project']['id'] = leaf_id
+ self.assertRoleAssignmentInListResponse(r, inher_up_entity)
+
+ def test_get_inherited_role_assignments_for_project_hierarchy(self):
+ """Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``.
+
+ Test Plan:
+
+ - Create 2 roles
+ - Create a hierarchy of projects with one root and one leaf project
+ - Issue the URL to add a non-inherited user role to the root project
+ - Issue the URL to add an inherited user role to the root project
+ - Issue the URL to filter inherited to projects role assignments - this
+ should return 1 role (inherited) on the root project.
+
+ """
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Grant non-inherited role
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_up_entity['links']['assignment'])
+
+ # Grant inherited role
+ inher_up_entity = _build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_up_entity['links']['assignment'])
+
+ # Get inherited role assignments
+ collection_url = ('/role_assignments'
+ '?scope.OS-INHERIT:inherited_to=projects')
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(r,
+ resource_url=collection_url)
+
+ # Assert that the user does not have non-inherited role on root project
+ self.assertRoleAssignmentNotInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user has inherited role on root project
+ self.assertRoleAssignmentInListResponse(r, inher_up_entity)
+
+ # Assert that the user does not have non-inherited role on leaf project
+ non_inher_up_entity = _build_role_assignment_entity(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.assertRoleAssignmentNotInListResponse(r, non_inher_up_entity)
+
+ # Assert that the user does not have inherited role on leaf project
+ inher_up_entity['scope']['project']['id'] = leaf_id
+ self.assertRoleAssignmentNotInListResponse(r, inher_up_entity)
+
+
+class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase):
+ """Test inheritance crud and its effects."""
+
+ def config_overrides(self):
+ super(AssignmentInheritanceDisabledTestCase, self).config_overrides()
+ self.config_fixture.config(group='os_inherit', enabled=False)
+
+ def test_crud_inherited_role_grants_failed_if_disabled(self):
+ role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role['id'], role)
+
+ base_collection_url = (
+ '/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domain_id,
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
+ 'collection_url': base_collection_url,
+ 'role_id': role['id']}
+ collection_url = base_collection_url + '/inherited_to_projects'
+
+ self.put(member_url, expected_status=404)
+ self.head(member_url, expected_status=404)
+ self.get(collection_url, expected_status=404)
+ self.delete(member_url, expected_status=404)
+
+
+class AssignmentV3toV2MethodsTestCase(tests.TestCase):
+ """Test domain V3 to V2 conversion methods."""
+
+ def test_v2controller_filter_domain_id(self):
+ # V2.0 is not domain aware, ensure domain_id is popped off the ref.
+ other_data = uuid.uuid4().hex
+ domain_id = uuid.uuid4().hex
+ ref = {'domain_id': domain_id,
+ 'other_data': other_data}
+
+ ref_no_domain = {'other_data': other_data}
+ expected_ref = ref_no_domain.copy()
+
+ updated_ref = controller.V2Controller.filter_domain_id(ref)
+ self.assertIs(ref, updated_ref)
+ self.assertDictEqual(ref, expected_ref)
+ # Make sure we don't error/muck up data if domain_id isn't present
+ updated_ref = controller.V2Controller.filter_domain_id(ref_no_domain)
+ self.assertIs(ref_no_domain, updated_ref)
+ self.assertDictEqual(ref_no_domain, expected_ref)
+
+ def test_v3controller_filter_domain_id(self):
+ # No data should be filtered out in this case.
+ other_data = uuid.uuid4().hex
+ domain_id = uuid.uuid4().hex
+ ref = {'domain_id': domain_id,
+ 'other_data': other_data}
+
+ expected_ref = ref.copy()
+ updated_ref = controller.V3Controller.filter_domain_id(ref)
+ self.assertIs(ref, updated_ref)
+ self.assertDictEqual(ref, expected_ref)
+
+ def test_v2controller_filter_domain(self):
+ other_data = uuid.uuid4().hex
+ domain_id = uuid.uuid4().hex
+ non_default_domain_ref = {'domain': {'id': domain_id},
+ 'other_data': other_data}
+ default_domain_ref = {'domain': {'id': 'default'},
+ 'other_data': other_data}
+ updated_ref = controller.V2Controller.filter_domain(default_domain_ref)
+ self.assertNotIn('domain', updated_ref)
+ self.assertRaises(exception.Unauthorized,
+ controller.V2Controller.filter_domain,
+ non_default_domain_ref)