aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_assignment.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_assignment.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_assignment.py2419
1 files changed, 1033 insertions, 1386 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_assignment.py b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
index 6b15b1c3..86fb9f74 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_assignment.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
@@ -16,12 +16,10 @@ import uuid
from oslo_config import cfg
from six.moves import http_client
from six.moves import range
+from testtools import matchers
-from keystone.common import controller
-from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import test_v3
-from keystone.tests.unit import utils
CONF = cfg.CONF
@@ -29,1042 +27,20 @@ CONF = cfg.CONF
class AssignmentTestCase(test_v3.RestfulTestCase,
test_v3.AssignmentTestMixin):
- """Test domains, projects, roles and role assignments."""
+ """Test roles and role assignments."""
def setUp(self):
super(AssignmentTestCase, self).setUp()
- self.group = self.new_group_ref(
- domain_id=self.domain_id)
+ self.group = unit.new_group_ref(domain_id=self.domain_id)
self.group = self.identity_api.create_group(self.group)
self.group_id = self.group['id']
- self.credential_id = uuid.uuid4().hex
- self.credential = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project_id)
- self.credential['id'] = self.credential_id
- self.credential_api.create_credential(
- self.credential_id,
- self.credential)
-
- # Domain CRUD tests
-
- def test_create_domain(self):
- """Call ``POST /domains``."""
- ref = self.new_domain_ref()
- r = self.post(
- '/domains',
- body={'domain': ref})
- return self.assertValidDomainResponse(r, ref)
-
- def test_create_domain_case_sensitivity(self):
- """Call `POST /domains`` twice with upper() and lower() cased name."""
- ref = self.new_domain_ref()
-
- # ensure the name is lowercase
- ref['name'] = ref['name'].lower()
- r = self.post(
- '/domains',
- body={'domain': ref})
- self.assertValidDomainResponse(r, ref)
-
- # ensure the name is uppercase
- ref['name'] = ref['name'].upper()
- r = self.post(
- '/domains',
- body={'domain': ref})
- self.assertValidDomainResponse(r, ref)
-
- def test_create_domain_bad_request(self):
- """Call ``POST /domains``."""
- self.post('/domains', body={'domain': {}},
- expected_status=http_client.BAD_REQUEST)
-
- def test_list_domains(self):
- """Call ``GET /domains``."""
- resource_url = '/domains'
- r = self.get(resource_url)
- self.assertValidDomainListResponse(r, ref=self.domain,
- resource_url=resource_url)
-
- def test_get_domain(self):
- """Call ``GET /domains/{domain_id}``."""
- r = self.get('/domains/%(domain_id)s' % {
- 'domain_id': self.domain_id})
- self.assertValidDomainResponse(r, self.domain)
-
- def test_update_domain(self):
- """Call ``PATCH /domains/{domain_id}``."""
- ref = self.new_domain_ref()
- del ref['id']
- r = self.patch('/domains/%(domain_id)s' % {
- 'domain_id': self.domain_id},
- body={'domain': ref})
- self.assertValidDomainResponse(r, ref)
-
- def test_disable_domain(self):
- """Call ``PATCH /domains/{domain_id}`` (set enabled=False)."""
- # Create a 2nd set of entities in a 2nd domain
- self.domain2 = self.new_domain_ref()
- self.resource_api.create_domain(self.domain2['id'], self.domain2)
-
- self.project2 = self.new_project_ref(
- domain_id=self.domain2['id'])
- self.resource_api.create_project(self.project2['id'], self.project2)
-
- self.user2 = self.new_user_ref(
- domain_id=self.domain2['id'],
- project_id=self.project2['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
-
- self.assignment_api.add_user_to_project(self.project2['id'],
- self.user2['id'])
-
- # First check a user in that domain can authenticate. The v2 user
- # cannot authenticate because they exist outside the default domain.
- body = {
- 'auth': {
- 'passwordCredentials': {
- 'userId': self.user2['id'],
- 'password': self.user2['password']
- },
- 'tenantId': self.project2['id']
- }
- }
- self.admin_request(
- path='/v2.0/tokens', method='POST', body=body,
- expected_status=http_client.UNAUTHORIZED)
-
- auth_data = self.build_authentication_request(
- user_id=self.user2['id'],
- password=self.user2['password'],
- project_id=self.project2['id'])
- self.v3_authenticate_token(auth_data)
-
- # Now disable the domain
- self.domain2['enabled'] = False
- r = self.patch('/domains/%(domain_id)s' % {
- 'domain_id': self.domain2['id']},
- body={'domain': {'enabled': False}})
- self.assertValidDomainResponse(r, self.domain2)
-
- # Make sure the user can no longer authenticate, via
- # either API
- body = {
- 'auth': {
- 'passwordCredentials': {
- 'userId': self.user2['id'],
- 'password': self.user2['password']
- },
- 'tenantId': self.project2['id']
- }
- }
- self.admin_request(
- path='/v2.0/tokens', method='POST', body=body,
- expected_status=http_client.UNAUTHORIZED)
-
- # Try looking up in v3 by name and id
- auth_data = self.build_authentication_request(
- user_id=self.user2['id'],
- password=self.user2['password'],
- project_id=self.project2['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
-
- auth_data = self.build_authentication_request(
- username=self.user2['name'],
- user_domain_id=self.domain2['id'],
- password=self.user2['password'],
- project_id=self.project2['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_delete_enabled_domain_fails(self):
- """Call ``DELETE /domains/{domain_id}`` (when domain enabled)."""
-
- # Try deleting an enabled domain, which should fail
- self.delete('/domains/%(domain_id)s' % {
- 'domain_id': self.domain['id']},
- expected_status=exception.ForbiddenAction.code)
-
- def test_delete_domain(self):
- """Call ``DELETE /domains/{domain_id}``.
-
- The sample data set up already has a user, group, project
- and credential that is part of self.domain. Since the user
- we will authenticate with is in this domain, we create a
- another set of entities in a second domain. Deleting this
- second domain should delete all these new entities. In addition,
- all the entities in the regular self.domain should be unaffected
- by the delete.
-
- Test Plan:
-
- - Create domain2 and a 2nd set of entities
- - Disable domain2
- - Delete domain2
- - Check entities in domain2 have been deleted
- - Check entities in self.domain are unaffected
-
- """
-
- # Create a 2nd set of entities in a 2nd domain
- self.domain2 = self.new_domain_ref()
- self.resource_api.create_domain(self.domain2['id'], self.domain2)
-
- self.project2 = self.new_project_ref(
- domain_id=self.domain2['id'])
- self.resource_api.create_project(self.project2['id'], self.project2)
-
- self.user2 = self.new_user_ref(
- domain_id=self.domain2['id'],
- project_id=self.project2['id'])
- self.user2 = self.identity_api.create_user(self.user2)
-
- self.group2 = self.new_group_ref(
- domain_id=self.domain2['id'])
- self.group2 = self.identity_api.create_group(self.group2)
-
- self.credential2 = self.new_credential_ref(
- user_id=self.user2['id'],
- project_id=self.project2['id'])
- self.credential_api.create_credential(
- self.credential2['id'],
- self.credential2)
-
- # Now disable the new domain and delete it
- self.domain2['enabled'] = False
- r = self.patch('/domains/%(domain_id)s' % {
- 'domain_id': self.domain2['id']},
- body={'domain': {'enabled': False}})
- self.assertValidDomainResponse(r, self.domain2)
- self.delete('/domains/%(domain_id)s' % {
- 'domain_id': self.domain2['id']})
-
- # Check all the domain2 relevant entities are gone
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- self.domain2['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- self.project2['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group,
- self.group2['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- self.user2['id'])
- self.assertRaises(exception.CredentialNotFound,
- self.credential_api.get_credential,
- self.credential2['id'])
-
- # ...and that all self.domain entities are still here
- r = self.resource_api.get_domain(self.domain['id'])
- self.assertDictEqual(r, self.domain)
- r = self.resource_api.get_project(self.project['id'])
- self.assertDictEqual(r, self.project)
- r = self.identity_api.get_group(self.group['id'])
- self.assertDictEqual(r, self.group)
- r = self.identity_api.get_user(self.user['id'])
- self.user.pop('password')
- self.assertDictEqual(r, self.user)
- r = self.credential_api.get_credential(self.credential['id'])
- self.assertDictEqual(r, self.credential)
-
- def test_delete_default_domain_fails(self):
- # Attempting to delete the default domain results in 403 Forbidden.
-
- # Need to disable it first.
- self.patch('/domains/%(domain_id)s' % {
- 'domain_id': CONF.identity.default_domain_id},
- body={'domain': {'enabled': False}})
-
- self.delete('/domains/%(domain_id)s' % {
- 'domain_id': CONF.identity.default_domain_id},
- expected_status=exception.ForbiddenAction.code)
-
- def test_delete_new_default_domain_fails(self):
- # If change the default domain ID, deleting the new default domain
- # results in a 403 Forbidden.
-
- # Create a new domain that's not the default
- new_domain = self.new_domain_ref()
- new_domain_id = new_domain['id']
- self.resource_api.create_domain(new_domain_id, new_domain)
-
- # Disable the new domain so can delete it later.
- self.patch('/domains/%(domain_id)s' % {
- 'domain_id': new_domain_id},
- body={'domain': {'enabled': False}})
-
- # Change the default domain
- self.config_fixture.config(group='identity',
- default_domain_id=new_domain_id)
-
- # Attempt to delete the new domain
-
- self.delete('/domains/%(domain_id)s' % {'domain_id': new_domain_id},
- expected_status=exception.ForbiddenAction.code)
-
- def test_delete_old_default_domain(self):
- # If change the default domain ID, deleting the old default domain
- # works.
-
- # Create a new domain that's not the default
- new_domain = self.new_domain_ref()
- new_domain_id = new_domain['id']
- self.resource_api.create_domain(new_domain_id, new_domain)
-
- old_default_domain_id = CONF.identity.default_domain_id
-
- # Disable the default domain so we can delete it later.
- self.patch('/domains/%(domain_id)s' % {
- 'domain_id': old_default_domain_id},
- body={'domain': {'enabled': False}})
-
- # Change the default domain
- self.config_fixture.config(group='identity',
- default_domain_id=new_domain_id)
-
- # Delete the old default domain
-
- self.delete(
- '/domains/%(domain_id)s' % {'domain_id': old_default_domain_id})
-
- def test_token_revoked_once_domain_disabled(self):
- """Test token from a disabled domain has been invalidated.
-
- Test that a token that was valid for an enabled domain
- becomes invalid once that domain is disabled.
-
- """
-
- self.domain = self.new_domain_ref()
- self.resource_api.create_domain(self.domain['id'], self.domain)
-
- self.user2 = self.new_user_ref(domain_id=self.domain['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
-
- # build a request body
- auth_body = self.build_authentication_request(
- user_id=self.user2['id'],
- password=self.user2['password'])
-
- # sends a request for the user's token
- token_resp = self.post('/auth/tokens', body=auth_body)
-
- subject_token = token_resp.headers.get('x-subject-token')
-
- # validates the returned token and it should be valid.
- self.head('/auth/tokens',
- headers={'x-subject-token': subject_token},
- expected_status=200)
-
- # now disable the domain
- self.domain['enabled'] = False
- url = "/domains/%(domain_id)s" % {'domain_id': self.domain['id']}
- self.patch(url,
- body={'domain': {'enabled': False}},
- expected_status=200)
-
- # validates the same token again and it should be 'not found'
- # as the domain has already been disabled.
- self.head('/auth/tokens',
- headers={'x-subject-token': subject_token},
- expected_status=http_client.NOT_FOUND)
-
- def test_delete_domain_hierarchy(self):
- """Call ``DELETE /domains/{domain_id}``."""
- domain = self.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
-
- root_project = self.new_project_ref(
- domain_id=domain['id'])
- self.resource_api.create_project(root_project['id'], root_project)
-
- leaf_project = self.new_project_ref(
- domain_id=domain['id'],
- parent_id=root_project['id'])
- self.resource_api.create_project(leaf_project['id'], leaf_project)
-
- # Need to disable it first.
- self.patch('/domains/%(domain_id)s' % {
- 'domain_id': domain['id']},
- body={'domain': {'enabled': False}})
-
- self.delete(
- '/domains/%(domain_id)s' % {
- 'domain_id': domain['id']})
-
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- root_project['id'])
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- leaf_project['id'])
-
- def test_forbid_operations_on_federated_domain(self):
- """Make sure one cannot operate on federated domain.
-
- This includes operations like create, update, delete
- on domain identified by id and name where difference variations of
- id 'Federated' are used.
-
- """
- def create_domains():
- for variation in ('Federated', 'FEDERATED',
- 'federated', 'fEderated'):
- domain = self.new_domain_ref()
- domain['id'] = variation
- yield domain
-
- for domain in create_domains():
- self.assertRaises(
- AssertionError, self.resource_api.create_domain,
- domain['id'], domain)
- self.assertRaises(
- AssertionError, self.resource_api.update_domain,
- domain['id'], domain)
- self.assertRaises(
- exception.DomainNotFound, self.resource_api.delete_domain,
- domain['id'])
-
- # swap 'name' with 'id' and try again, expecting the request to
- # gracefully fail
- domain['id'], domain['name'] = domain['name'], domain['id']
- self.assertRaises(
- AssertionError, self.resource_api.create_domain,
- domain['id'], domain)
- self.assertRaises(
- AssertionError, self.resource_api.update_domain,
- domain['id'], domain)
- self.assertRaises(
- exception.DomainNotFound, self.resource_api.delete_domain,
- domain['id'])
-
- def test_forbid_operations_on_defined_federated_domain(self):
- """Make sure one cannot operate on a user-defined federated domain.
-
- This includes operations like create, update, delete.
-
- """
-
- non_default_name = 'beta_federated_domain'
- self.config_fixture.config(group='federation',
- federated_domain_name=non_default_name)
- domain = self.new_domain_ref()
- domain['name'] = non_default_name
- self.assertRaises(AssertionError,
- self.resource_api.create_domain,
- domain['id'], domain)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.delete_domain,
- domain['id'])
- self.assertRaises(AssertionError,
- self.resource_api.update_domain,
- domain['id'], domain)
-
- # Project CRUD tests
-
- def test_list_projects(self):
- """Call ``GET /projects``."""
- resource_url = '/projects'
- r = self.get(resource_url)
- self.assertValidProjectListResponse(r, ref=self.project,
- resource_url=resource_url)
-
- def test_create_project(self):
- """Call ``POST /projects``."""
- ref = self.new_project_ref(domain_id=self.domain_id)
- r = self.post(
- '/projects',
- body={'project': ref})
- self.assertValidProjectResponse(r, ref)
-
- def test_create_project_bad_request(self):
- """Call ``POST /projects``."""
- self.post('/projects', body={'project': {}},
- expected_status=http_client.BAD_REQUEST)
-
- def test_create_project_invalid_domain_id(self):
- """Call ``POST /projects``."""
- ref = self.new_project_ref(domain_id=uuid.uuid4().hex)
- self.post('/projects', body={'project': ref},
- expected_status=http_client.BAD_REQUEST)
-
- def test_create_project_is_domain_not_allowed(self):
- """Call ``POST /projects``.
-
- Setting is_domain=True is not supported yet and should raise
- NotImplemented.
-
- """
- ref = self.new_project_ref(domain_id=self.domain_id, is_domain=True)
- self.post('/projects',
- body={'project': ref},
- expected_status=501)
-
- @utils.wip('waiting for projects acting as domains implementation')
- def test_create_project_without_parent_id_and_without_domain_id(self):
- """Call ``POST /projects``."""
-
- # Grant a domain role for the user
- collection_url = (
- '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
- 'domain_id': self.domain_id,
- 'user_id': self.user['id']})
- member_url = '%(collection_url)s/%(role_id)s' % {
- 'collection_url': collection_url,
- 'role_id': self.role_id}
- self.put(member_url)
-
- # Create an authentication request for a domain scoped token
- auth = self.build_authentication_request(
- user_id=self.user['id'],
- password=self.user['password'],
- domain_id=self.domain_id)
-
- # Without domain_id and parent_id, the domain_id should be
- # normalized to the domain on the token, when using a domain
- # scoped token.
- ref = self.new_project_ref()
- r = self.post(
- '/projects',
- auth=auth,
- body={'project': ref})
- ref['domain_id'] = self.domain['id']
- self.assertValidProjectResponse(r, ref)
-
- @utils.wip('waiting for projects acting as domains implementation')
- def test_create_project_with_parent_id_and_no_domain_id(self):
- """Call ``POST /projects``."""
- # With only the parent_id, the domain_id should be
- # normalized to the parent's domain_id
- ref_child = self.new_project_ref(parent_id=self.project['id'])
-
- r = self.post(
- '/projects',
- body={'project': ref_child})
- self.assertEqual(r.result['project']['domain_id'],
- self.project['domain_id'])
- ref_child['domain_id'] = self.domain['id']
- self.assertValidProjectResponse(r, ref_child)
-
- def _create_projects_hierarchy(self, hierarchy_size=1):
- """Creates a single-branched project hierarchy with the specified size.
-
- :param hierarchy_size: the desired hierarchy size, default is 1 -
- a project with one child.
-
- :returns projects: a list of the projects in the created hierarchy.
-
- """
- new_ref = self.new_project_ref(domain_id=self.domain_id)
- resp = self.post('/projects', body={'project': new_ref})
-
- projects = [resp.result]
-
- for i in range(hierarchy_size):
- new_ref = self.new_project_ref(
- domain_id=self.domain_id,
- parent_id=projects[i]['project']['id'])
- resp = self.post('/projects',
- body={'project': new_ref})
- self.assertValidProjectResponse(resp, new_ref)
-
- projects.append(resp.result)
-
- return projects
-
- def test_list_projects_filtering_by_parent_id(self):
- """Call ``GET /projects?parent_id={project_id}``."""
- projects = self._create_projects_hierarchy(hierarchy_size=2)
-
- # Add another child to projects[1] - it will be projects[3]
- new_ref = self.new_project_ref(
- domain_id=self.domain_id,
- parent_id=projects[1]['project']['id'])
- resp = self.post('/projects',
- body={'project': new_ref})
- self.assertValidProjectResponse(resp, new_ref)
-
- projects.append(resp.result)
-
- # Query for projects[0] immediate children - it will
- # be only projects[1]
- r = self.get(
- '/projects?parent_id=%(project_id)s' % {
- 'project_id': projects[0]['project']['id']})
- self.assertValidProjectListResponse(r)
-
- projects_result = r.result['projects']
- expected_list = [projects[1]['project']]
-
- # projects[0] has projects[1] as child
- self.assertEqual(expected_list, projects_result)
-
- # Query for projects[1] immediate children - it will
- # be projects[2] and projects[3]
- r = self.get(
- '/projects?parent_id=%(project_id)s' % {
- 'project_id': projects[1]['project']['id']})
- self.assertValidProjectListResponse(r)
-
- projects_result = r.result['projects']
- expected_list = [projects[2]['project'], projects[3]['project']]
-
- # projects[1] has projects[2] and projects[3] as children
- self.assertEqual(expected_list, projects_result)
-
- # Query for projects[2] immediate children - it will be an empty list
- r = self.get(
- '/projects?parent_id=%(project_id)s' % {
- 'project_id': projects[2]['project']['id']})
- self.assertValidProjectListResponse(r)
-
- projects_result = r.result['projects']
- expected_list = []
-
- # projects[2] has no child, projects_result must be an empty list
- self.assertEqual(expected_list, projects_result)
-
- def test_create_hierarchical_project(self):
- """Call ``POST /projects``."""
- self._create_projects_hierarchy()
-
- def test_get_project(self):
- """Call ``GET /projects/{project_id}``."""
- r = self.get(
- '/projects/%(project_id)s' % {
- 'project_id': self.project_id})
- self.assertValidProjectResponse(r, self.project)
-
- def test_get_project_with_parents_as_list_with_invalid_id(self):
- """Call ``GET /projects/{project_id}?parents_as_list``."""
- self.get('/projects/%(project_id)s?parents_as_list' % {
- 'project_id': None}, expected_status=http_client.NOT_FOUND)
-
- self.get('/projects/%(project_id)s?parents_as_list' % {
- 'project_id': uuid.uuid4().hex},
- expected_status=http_client.NOT_FOUND)
-
- def test_get_project_with_subtree_as_list_with_invalid_id(self):
- """Call ``GET /projects/{project_id}?subtree_as_list``."""
- self.get('/projects/%(project_id)s?subtree_as_list' % {
- 'project_id': None}, expected_status=http_client.NOT_FOUND)
-
- self.get('/projects/%(project_id)s?subtree_as_list' % {
- 'project_id': uuid.uuid4().hex},
- expected_status=http_client.NOT_FOUND)
-
- def test_get_project_with_parents_as_ids(self):
- """Call ``GET /projects/{project_id}?parents_as_ids``."""
- projects = self._create_projects_hierarchy(hierarchy_size=2)
-
- # Query for projects[2] parents_as_ids
- r = self.get(
- '/projects/%(project_id)s?parents_as_ids' % {
- 'project_id': projects[2]['project']['id']})
-
- self.assertValidProjectResponse(r, projects[2]['project'])
- parents_as_ids = r.result['project']['parents']
-
- # Assert parents_as_ids is a structured dictionary correctly
- # representing the hierarchy. The request was made using projects[2]
- # id, hence its parents should be projects[1] and projects[0]. It
- # should have the following structure:
- # {
- # projects[1]: {
- # projects[0]: None
- # }
- # }
- expected_dict = {
- projects[1]['project']['id']: {
- projects[0]['project']['id']: None
- }
- }
- self.assertDictEqual(expected_dict, parents_as_ids)
-
- # Query for projects[0] parents_as_ids
- r = self.get(
- '/projects/%(project_id)s?parents_as_ids' % {
- 'project_id': projects[0]['project']['id']})
-
- self.assertValidProjectResponse(r, projects[0]['project'])
- parents_as_ids = r.result['project']['parents']
-
- # projects[0] has no parents, parents_as_ids must be None
- self.assertIsNone(parents_as_ids)
-
- def test_get_project_with_parents_as_list_with_full_access(self):
- """``GET /projects/{project_id}?parents_as_list`` with full access.
-
- Test plan:
-
- - Create 'parent', 'project' and 'subproject' projects;
- - Assign a user a role on each one of those projects;
- - Check that calling parents_as_list on 'subproject' returns both
- 'project' and 'parent'.
-
- """
-
- # Create the project hierarchy
- parent, project, subproject = self._create_projects_hierarchy(2)
-
- # Assign a role for the user on all the created projects
- for proj in (parent, project, subproject):
- self.put(self.build_role_assignment_link(
- role_id=self.role_id, user_id=self.user_id,
- project_id=proj['project']['id']))
-
- # Make the API call
- r = self.get('/projects/%(project_id)s?parents_as_list' %
- {'project_id': subproject['project']['id']})
- self.assertValidProjectResponse(r, subproject['project'])
-
- # Assert only 'project' and 'parent' are in the parents list
- self.assertIn(project, r.result['project']['parents'])
- self.assertIn(parent, r.result['project']['parents'])
- self.assertEqual(2, len(r.result['project']['parents']))
-
- def test_get_project_with_parents_as_list_with_partial_access(self):
- """``GET /projects/{project_id}?parents_as_list`` with partial access.
-
- Test plan:
-
- - Create 'parent', 'project' and 'subproject' projects;
- - Assign a user a role on 'parent' and 'subproject';
- - Check that calling parents_as_list on 'subproject' only returns
- 'parent'.
-
- """
-
- # Create the project hierarchy
- parent, project, subproject = self._create_projects_hierarchy(2)
-
- # Assign a role for the user on parent and subproject
- for proj in (parent, subproject):
- self.put(self.build_role_assignment_link(
- role_id=self.role_id, user_id=self.user_id,
- project_id=proj['project']['id']))
-
- # Make the API call
- r = self.get('/projects/%(project_id)s?parents_as_list' %
- {'project_id': subproject['project']['id']})
- self.assertValidProjectResponse(r, subproject['project'])
-
- # Assert only 'parent' is in the parents list
- self.assertIn(parent, r.result['project']['parents'])
- self.assertEqual(1, len(r.result['project']['parents']))
-
- def test_get_project_with_parents_as_list_and_parents_as_ids(self):
- """Call ``GET /projects/{project_id}?parents_as_list&parents_as_ids``.
-
- """
- projects = self._create_projects_hierarchy(hierarchy_size=2)
-
- self.get(
- '/projects/%(project_id)s?parents_as_list&parents_as_ids' % {
- 'project_id': projects[1]['project']['id']},
- expected_status=http_client.BAD_REQUEST)
-
- def test_get_project_with_subtree_as_ids(self):
- """Call ``GET /projects/{project_id}?subtree_as_ids``.
-
- This test creates a more complex hierarchy to test if the structured
- dictionary returned by using the ``subtree_as_ids`` query param
- correctly represents the hierarchy.
-
- The hierarchy contains 5 projects with the following structure::
-
- +--A--+
- | |
- +--B--+ C
- | |
- D E
-
-
- """
- projects = self._create_projects_hierarchy(hierarchy_size=2)
-
- # Add another child to projects[0] - it will be projects[3]
- new_ref = self.new_project_ref(
- domain_id=self.domain_id,
- parent_id=projects[0]['project']['id'])
- resp = self.post('/projects',
- body={'project': new_ref})
- self.assertValidProjectResponse(resp, new_ref)
- projects.append(resp.result)
-
- # Add another child to projects[1] - it will be projects[4]
- new_ref = self.new_project_ref(
- domain_id=self.domain_id,
- parent_id=projects[1]['project']['id'])
- resp = self.post('/projects',
- body={'project': new_ref})
- self.assertValidProjectResponse(resp, new_ref)
- projects.append(resp.result)
-
- # Query for projects[0] subtree_as_ids
- r = self.get(
- '/projects/%(project_id)s?subtree_as_ids' % {
- 'project_id': projects[0]['project']['id']})
- self.assertValidProjectResponse(r, projects[0]['project'])
- subtree_as_ids = r.result['project']['subtree']
-
- # The subtree hierarchy from projects[0] should have the following
- # structure:
- # {
- # projects[1]: {
- # projects[2]: None,
- # projects[4]: None
- # },
- # projects[3]: None
- # }
- expected_dict = {
- projects[1]['project']['id']: {
- projects[2]['project']['id']: None,
- projects[4]['project']['id']: None
- },
- projects[3]['project']['id']: None
- }
- self.assertDictEqual(expected_dict, subtree_as_ids)
-
- # Now query for projects[1] subtree_as_ids
- r = self.get(
- '/projects/%(project_id)s?subtree_as_ids' % {
- 'project_id': projects[1]['project']['id']})
- self.assertValidProjectResponse(r, projects[1]['project'])
- subtree_as_ids = r.result['project']['subtree']
-
- # The subtree hierarchy from projects[1] should have the following
- # structure:
- # {
- # projects[2]: None,
- # projects[4]: None
- # }
- expected_dict = {
- projects[2]['project']['id']: None,
- projects[4]['project']['id']: None
- }
- self.assertDictEqual(expected_dict, subtree_as_ids)
-
- # Now query for projects[3] subtree_as_ids
- r = self.get(
- '/projects/%(project_id)s?subtree_as_ids' % {
- 'project_id': projects[3]['project']['id']})
- self.assertValidProjectResponse(r, projects[3]['project'])
- subtree_as_ids = r.result['project']['subtree']
-
- # projects[3] has no subtree, subtree_as_ids must be None
- self.assertIsNone(subtree_as_ids)
-
- def test_get_project_with_subtree_as_list_with_full_access(self):
- """``GET /projects/{project_id}?subtree_as_list`` with full access.
-
- Test plan:
-
- - Create 'parent', 'project' and 'subproject' projects;
- - Assign a user a role on each one of those projects;
- - Check that calling subtree_as_list on 'parent' returns both 'parent'
- and 'subproject'.
-
- """
-
- # Create the project hierarchy
- parent, project, subproject = self._create_projects_hierarchy(2)
-
- # Assign a role for the user on all the created projects
- for proj in (parent, project, subproject):
- self.put(self.build_role_assignment_link(
- role_id=self.role_id, user_id=self.user_id,
- project_id=proj['project']['id']))
-
- # Make the API call
- r = self.get('/projects/%(project_id)s?subtree_as_list' %
- {'project_id': parent['project']['id']})
- self.assertValidProjectResponse(r, parent['project'])
-
- # Assert only 'project' and 'subproject' are in the subtree
- self.assertIn(project, r.result['project']['subtree'])
- self.assertIn(subproject, r.result['project']['subtree'])
- self.assertEqual(2, len(r.result['project']['subtree']))
-
- def test_get_project_with_subtree_as_list_with_partial_access(self):
- """``GET /projects/{project_id}?subtree_as_list`` with partial access.
-
- Test plan:
-
- - Create 'parent', 'project' and 'subproject' projects;
- - Assign a user a role on 'parent' and 'subproject';
- - Check that calling subtree_as_list on 'parent' returns 'subproject'.
-
- """
-
- # Create the project hierarchy
- parent, project, subproject = self._create_projects_hierarchy(2)
-
- # Assign a role for the user on parent and subproject
- for proj in (parent, subproject):
- self.put(self.build_role_assignment_link(
- role_id=self.role_id, user_id=self.user_id,
- project_id=proj['project']['id']))
-
- # Make the API call
- r = self.get('/projects/%(project_id)s?subtree_as_list' %
- {'project_id': parent['project']['id']})
- self.assertValidProjectResponse(r, parent['project'])
-
- # Assert only 'subproject' is in the subtree
- self.assertIn(subproject, r.result['project']['subtree'])
- self.assertEqual(1, len(r.result['project']['subtree']))
-
- def test_get_project_with_subtree_as_list_and_subtree_as_ids(self):
- """Call ``GET /projects/{project_id}?subtree_as_list&subtree_as_ids``.
-
- """
- projects = self._create_projects_hierarchy(hierarchy_size=2)
-
- self.get(
- '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % {
- 'project_id': projects[1]['project']['id']},
- expected_status=http_client.BAD_REQUEST)
-
- def test_update_project(self):
- """Call ``PATCH /projects/{project_id}``."""
- ref = self.new_project_ref(domain_id=self.domain_id)
- del ref['id']
- r = self.patch(
- '/projects/%(project_id)s' % {
- 'project_id': self.project_id},
- body={'project': ref})
- self.assertValidProjectResponse(r, ref)
-
- def test_update_project_domain_id(self):
- """Call ``PATCH /projects/{project_id}`` with domain_id."""
- project = self.new_project_ref(domain_id=self.domain['id'])
- self.resource_api.create_project(project['id'], project)
- project['domain_id'] = CONF.identity.default_domain_id
- r = self.patch('/projects/%(project_id)s' % {
- 'project_id': project['id']},
- body={'project': project},
- expected_status=exception.ValidationError.code)
- self.config_fixture.config(domain_id_immutable=False)
- project['domain_id'] = self.domain['id']
- r = self.patch('/projects/%(project_id)s' % {
- 'project_id': project['id']},
- body={'project': project})
- self.assertValidProjectResponse(r, project)
-
- def test_update_project_parent_id(self):
- """Call ``PATCH /projects/{project_id}``."""
- projects = self._create_projects_hierarchy()
- leaf_project = projects[1]['project']
- leaf_project['parent_id'] = None
- self.patch(
- '/projects/%(project_id)s' % {
- 'project_id': leaf_project['id']},
- body={'project': leaf_project},
- expected_status=http_client.FORBIDDEN)
-
- def test_update_project_is_domain_not_allowed(self):
- """Call ``PATCH /projects/{project_id}`` with is_domain.
-
- The is_domain flag is immutable.
- """
- project = self.new_project_ref(domain_id=self.domain['id'])
- resp = self.post('/projects',
- body={'project': project})
- self.assertFalse(resp.result['project']['is_domain'])
-
- project['is_domain'] = True
- self.patch('/projects/%(project_id)s' % {
- 'project_id': resp.result['project']['id']},
- body={'project': project},
- expected_status=http_client.BAD_REQUEST)
-
- def test_disable_leaf_project(self):
- """Call ``PATCH /projects/{project_id}``."""
- projects = self._create_projects_hierarchy()
- leaf_project = projects[1]['project']
- leaf_project['enabled'] = False
- r = self.patch(
- '/projects/%(project_id)s' % {
- 'project_id': leaf_project['id']},
- body={'project': leaf_project})
- self.assertEqual(
- leaf_project['enabled'], r.result['project']['enabled'])
-
- def test_disable_not_leaf_project(self):
- """Call ``PATCH /projects/{project_id}``."""
- projects = self._create_projects_hierarchy()
- root_project = projects[0]['project']
- root_project['enabled'] = False
- self.patch(
- '/projects/%(project_id)s' % {
- 'project_id': root_project['id']},
- body={'project': root_project},
- expected_status=http_client.FORBIDDEN)
-
- def test_delete_project(self):
- """Call ``DELETE /projects/{project_id}``
-
- As well as making sure the delete succeeds, we ensure
- that any credentials that reference this projects are
- also deleted, while other credentials are unaffected.
-
- """
- # First check the credential for this project is present
- r = self.credential_api.get_credential(self.credential['id'])
- self.assertDictEqual(r, self.credential)
- # Create a second credential with a different project
- self.project2 = self.new_project_ref(
- domain_id=self.domain['id'])
- self.resource_api.create_project(self.project2['id'], self.project2)
- self.credential2 = self.new_credential_ref(
- user_id=self.user['id'],
- project_id=self.project2['id'])
- self.credential_api.create_credential(
- self.credential2['id'],
- self.credential2)
-
- # Now delete the project
- self.delete(
- '/projects/%(project_id)s' % {
- 'project_id': self.project_id})
-
- # Deleting the project should have deleted any credentials
- # that reference this project
- self.assertRaises(exception.CredentialNotFound,
- self.credential_api.get_credential,
- credential_id=self.credential['id'])
- # But the credential for project2 is unaffected
- r = self.credential_api.get_credential(self.credential2['id'])
- self.assertDictEqual(r, self.credential2)
-
- def test_delete_not_leaf_project(self):
- """Call ``DELETE /projects/{project_id}``."""
- projects = self._create_projects_hierarchy()
- self.delete(
- '/projects/%(project_id)s' % {
- 'project_id': projects[0]['project']['id']},
- expected_status=http_client.FORBIDDEN)
-
# Role CRUD tests
def test_create_role(self):
"""Call ``POST /roles``."""
- ref = self.new_role_ref()
+ ref = unit.new_role_ref()
r = self.post(
'/roles',
body={'role': ref})
@@ -1090,7 +66,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
def test_update_role(self):
"""Call ``PATCH /roles/{role_id}``."""
- ref = self.new_role_ref()
+ ref = unit.new_role_ref()
del ref['id']
r = self.patch('/roles/%(role_id)s' % {
'role_id': self.role_id},
@@ -1105,8 +81,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
def test_create_member_role(self):
"""Call ``POST /roles``."""
# specify only the name on creation
- ref = self.new_role_ref()
- ref['name'] = CONF.member_role_name
+ ref = unit.new_role_ref(name=CONF.member_role_name)
r = self.post(
'/roles',
body={'role': ref})
@@ -1118,35 +93,41 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# Role Grants tests
def test_crud_user_project_role_grants(self):
+ role = unit.new_role_ref()
+ self.role_api.create_role(role['id'], role)
+
collection_url = (
'/projects/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.project['id'],
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
- 'role_id': self.role_id}
+ 'role_id': role['id']}
+
+ # There is a role assignment for self.user on self.project
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role,
+ expected_length=1)
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
- self.assertValidRoleListResponse(r, ref=self.role,
- resource_url=collection_url)
+ self.assertValidRoleListResponse(r, ref=role,
+ resource_url=collection_url,
+ expected_length=2)
- # FIXME(gyee): this test is no longer valid as user
- # have no role in the project. Can't get a scoped token
- # self.delete(member_url)
- # r = self.get(collection_url)
- # self.assertValidRoleListResponse(r, expected_length=0)
- # self.assertIn(collection_url, r.result['links']['self'])
+ self.delete(member_url)
+ r = self.get(collection_url)
+ self.assertValidRoleListResponse(r, ref=self.role, expected_length=1)
+ self.assertIn(collection_url, r.result['links']['self'])
def test_crud_user_project_role_grants_no_user(self):
- """Grant role on a project to a user that doesn't exist, 404 result.
+ """Grant role on a project to a user that doesn't exist.
When grant a role on a project to a user that doesn't exist, the server
returns Not Found for the user.
"""
-
user_id = uuid.uuid4().hex
collection_url = (
@@ -1179,13 +160,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
resource_url=collection_url)
def test_crud_user_domain_role_grants_no_user(self):
- """Grant role on a domain to a user that doesn't exist, 404 result.
+ """Grant role on a domain to a user that doesn't exist.
When grant a role on a domain to a user that doesn't exist, the server
returns 404 Not Found for the user.
"""
-
user_id = uuid.uuid4().hex
collection_url = (
@@ -1218,13 +198,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
resource_url=collection_url)
def test_crud_group_project_role_grants_no_group(self):
- """Grant role on a project to a group that doesn't exist, 404 result.
+ """Grant role on a project to a group that doesn't exist.
When grant a role on a project to a group that doesn't exist, the
server returns 404 Not Found for the group.
"""
-
group_id = uuid.uuid4().hex
collection_url = (
@@ -1258,13 +237,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
resource_url=collection_url)
def test_crud_group_domain_role_grants_no_group(self):
- """Grant role on a domain to a group that doesn't exist, 404 result.
+ """Grant role on a domain to a group that doesn't exist.
When grant a role on a domain to a group that doesn't exist, the server
returns 404 Not Found for the group.
"""
-
group_id = uuid.uuid4().hex
collection_url = (
@@ -1280,7 +258,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
def _create_new_user_and_assign_role_on_project(self):
"""Create a new user and assign user a role on a project."""
# Create a new user
- new_user = self.new_user_ref(domain_id=self.domain_id)
+ new_user = unit.new_user_ref(domain_id=self.domain_id)
user_ref = self.identity_api.create_user(new_user)
# Assign the user a role on the project
collection_url = (
@@ -1290,9 +268,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
member_url = ('%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id})
- self.put(member_url, expected_status=204)
+ self.put(member_url)
# Check the user has the role assigned
- self.head(member_url, expected_status=204)
+ self.head(member_url)
return member_url, user_ref
def test_delete_user_before_removing_role_assignment_succeeds(self):
@@ -1301,7 +279,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# Delete the user from identity backend
self.identity_api.driver.delete_user(user['id'])
# Clean up the role assignment
- self.delete(member_url, expected_status=204)
+ self.delete(member_url)
# Make sure the role is gone
self.head(member_url, expected_status=http_client.NOT_FOUND)
@@ -1310,8 +288,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
member_url, user = self._create_new_user_and_assign_role_on_project()
# Delete the user from identity backend
self.identity_api.delete_user(user['id'])
- # We should get a 404 when looking for the user in the identity
- # backend because we're not performing a delete operation on the role.
+ # We should get a 404 Not Found when looking for the user in the
+ # identity backend because we're not performing a delete operation on
+ # the role.
self.head(member_url, expected_status=http_client.NOT_FOUND)
def test_token_revoked_once_group_role_grant_revoked(self):
@@ -1344,7 +323,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# validates the returned token; it should be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# revokes the grant from group on project.
self.assignment_api.delete_grant(role_id=self.role['id'],
@@ -1356,6 +335,126 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
headers={'x-subject-token': token},
expected_status=http_client.NOT_FOUND)
+ @unit.skip_if_cache_disabled('assignment')
+ def test_delete_grant_from_user_and_project_invalidate_cache(self):
+ # create a new project
+ new_project = unit.new_project_ref(domain_id=self.domain_id)
+ self.resource_api.create_project(new_project['id'], new_project)
+
+ collection_url = (
+ '/projects/%(project_id)s/users/%(user_id)s/roles' % {
+ 'project_id': new_project['id'],
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ # create the user a grant on the new project
+ self.put(member_url)
+
+ # check the grant that was just created
+ self.head(member_url)
+ resp = self.get(collection_url)
+ self.assertValidRoleListResponse(resp, ref=self.role,
+ resource_url=collection_url)
+
+ # delete the grant
+ self.delete(member_url)
+
+ # get the collection and ensure there are no roles on the project
+ resp = self.get(collection_url)
+ self.assertListEqual(resp.json_body['roles'], [])
+
+ @unit.skip_if_cache_disabled('assignment')
+ def test_delete_grant_from_user_and_domain_invalidates_cache(self):
+ # create a new domain
+ new_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(new_domain['id'], new_domain)
+
+ collection_url = (
+ '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': new_domain['id'],
+ 'user_id': self.user['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ # create the user a grant on the new domain
+ self.put(member_url)
+
+ # check the grant that was just created
+ self.head(member_url)
+ resp = self.get(collection_url)
+ self.assertValidRoleListResponse(resp, ref=self.role,
+ resource_url=collection_url)
+
+ # delete the grant
+ self.delete(member_url)
+
+ # get the collection and ensure there are no roles on the domain
+ resp = self.get(collection_url)
+ self.assertListEqual(resp.json_body['roles'], [])
+
+ @unit.skip_if_cache_disabled('assignment')
+ def test_delete_grant_from_group_and_project_invalidates_cache(self):
+ # create a new project
+ new_project = unit.new_project_ref(domain_id=self.domain_id)
+ self.resource_api.create_project(new_project['id'], new_project)
+
+ collection_url = (
+ '/projects/%(project_id)s/groups/%(group_id)s/roles' % {
+ 'project_id': new_project['id'],
+ 'group_id': self.group['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ # create the group a grant on the new project
+ self.put(member_url)
+
+ # check the grant that was just created
+ self.head(member_url)
+ resp = self.get(collection_url)
+ self.assertValidRoleListResponse(resp, ref=self.role,
+ resource_url=collection_url)
+
+ # delete the grant
+ self.delete(member_url)
+
+ # get the collection and ensure there are no roles on the project
+ resp = self.get(collection_url)
+ self.assertListEqual(resp.json_body['roles'], [])
+
+ @unit.skip_if_cache_disabled('assignment')
+ def test_delete_grant_from_group_and_domain_invalidates_cache(self):
+ # create a new domain
+ new_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(new_domain['id'], new_domain)
+
+ collection_url = (
+ '/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
+ 'domain_id': new_domain['id'],
+ 'group_id': self.group['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role_id}
+
+ # create the group a grant on the new domain
+ self.put(member_url)
+
+ # check the grant that was just created
+ self.head(member_url)
+ resp = self.get(collection_url)
+ self.assertValidRoleListResponse(resp, ref=self.role,
+ resource_url=collection_url)
+
+ # delete the grant
+ self.delete(member_url)
+
+ # get the collection and ensure there are no roles on the domain
+ resp = self.get(collection_url)
+ self.assertListEqual(resp.json_body['roles'], [])
+
# Role Assignments tests
def test_get_role_assignments(self):
@@ -1384,13 +483,11 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
been removed
"""
-
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
- self.user1 = self.new_user_ref(
- domain_id=self.domain['id'])
- self.user1 = self.identity_api.create_user(self.user1)
+ user1 = unit.new_user_ref(domain_id=self.domain['id'])
+ user1 = self.identity_api.create_user(user1)
collection_url = '/role_assignments'
r = self.get(collection_url)
@@ -1412,7 +509,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, gd_entity)
ud_entity = self.build_role_assignment_entity(domain_id=self.domain_id,
- user_id=self.user1['id'],
+ user_id=user1['id'],
role_id=self.role_id)
self.put(ud_entity['links']['assignment'])
r = self.get(collection_url)
@@ -1434,7 +531,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, gp_entity)
up_entity = self.build_role_assignment_entity(
- project_id=self.project_id, user_id=self.user1['id'],
+ project_id=self.project_id, user_id=user1['id'],
role_id=self.role_id)
self.put(up_entity['links']['assignment'])
r = self.get(collection_url)
@@ -1475,18 +572,13 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
for each of the group members.
"""
- self.user1 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user1['password']
- self.user1 = self.identity_api.create_user(self.user1)
- self.user1['password'] = password
- self.user2 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
- self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
- self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
+ user1 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+ user2 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+
+ self.identity_api.add_user_to_group(user1['id'], self.group['id'])
+ self.identity_api.add_user_to_group(user2['id'], self.group['id'])
collection_url = '/role_assignments'
r = self.get(collection_url)
@@ -1516,11 +608,11 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
resource_url=collection_url)
ud_entity = self.build_role_assignment_entity(
link=gd_entity['links']['assignment'], domain_id=self.domain_id,
- user_id=self.user1['id'], role_id=self.role_id)
+ user_id=user1['id'], role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity)
ud_entity = self.build_role_assignment_entity(
link=gd_entity['links']['assignment'], domain_id=self.domain_id,
- user_id=self.user2['id'], role_id=self.role_id)
+ user_id=user2['id'], role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity)
def test_check_effective_values_for_role_assignments(self):
@@ -1549,18 +641,13 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
know if we are getting effective roles or not
"""
- self.user1 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user1['password']
- self.user1 = self.identity_api.create_user(self.user1)
- self.user1['password'] = password
- self.user2 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
- self.identity_api.add_user_to_group(self.user1['id'], self.group['id'])
- self.identity_api.add_user_to_group(self.user2['id'], self.group['id'])
+ user1 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+ user2 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+
+ self.identity_api.add_user_to_group(user1['id'], self.group['id'])
+ self.identity_api.add_user_to_group(user2['id'], self.group['id'])
collection_url = '/role_assignments'
r = self.get(collection_url)
@@ -1633,61 +720,53 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
token (all effective roles for a user on a project)
"""
-
# Since the default fixtures already assign some roles to the
# user it creates, we also need a new user that will not have any
# existing assignments
- self.user1 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user1['password']
- self.user1 = self.identity_api.create_user(self.user1)
- self.user1['password'] = password
- self.user2 = self.new_user_ref(
- domain_id=self.domain['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
- self.group1 = self.new_group_ref(
- domain_id=self.domain['id'])
- self.group1 = self.identity_api.create_group(self.group1)
- self.identity_api.add_user_to_group(self.user1['id'],
- self.group1['id'])
- self.identity_api.add_user_to_group(self.user2['id'],
- self.group1['id'])
- self.project1 = self.new_project_ref(
- domain_id=self.domain['id'])
- self.resource_api.create_project(self.project1['id'], self.project1)
- self.role1 = self.new_role_ref()
+ user1 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+ user2 = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+
+ group1 = unit.new_group_ref(domain_id=self.domain['id'])
+ group1 = self.identity_api.create_group(group1)
+ self.identity_api.add_user_to_group(user1['id'], group1['id'])
+ self.identity_api.add_user_to_group(user2['id'], group1['id'])
+ project1 = unit.new_project_ref(domain_id=self.domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ self.role1 = unit.new_role_ref()
self.role_api.create_role(self.role1['id'], self.role1)
- self.role2 = self.new_role_ref()
+ self.role2 = unit.new_role_ref()
self.role_api.create_role(self.role2['id'], self.role2)
# Now add one of each of the four types of assignment
gd_entity = self.build_role_assignment_entity(
- domain_id=self.domain_id, group_id=self.group1['id'],
+ domain_id=self.domain_id, group_id=group1['id'],
role_id=self.role1['id'])
self.put(gd_entity['links']['assignment'])
ud_entity = self.build_role_assignment_entity(domain_id=self.domain_id,
- user_id=self.user1['id'],
+ user_id=user1['id'],
role_id=self.role2['id'])
self.put(ud_entity['links']['assignment'])
gp_entity = self.build_role_assignment_entity(
- project_id=self.project1['id'], group_id=self.group1['id'],
+ project_id=project1['id'],
+ group_id=group1['id'],
role_id=self.role1['id'])
self.put(gp_entity['links']['assignment'])
up_entity = self.build_role_assignment_entity(
- project_id=self.project1['id'], user_id=self.user1['id'],
+ project_id=project1['id'],
+ user_id=user1['id'],
role_id=self.role2['id'])
self.put(up_entity['links']['assignment'])
# Now list by various filters to make sure we get back the right ones
collection_url = ('/role_assignments?scope.project.id=%s' %
- self.project1['id'])
+ project1['id'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=2,
@@ -1704,7 +783,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, ud_entity)
self.assertRoleAssignmentInListResponse(r, gd_entity)
- collection_url = '/role_assignments?user.id=%s' % self.user1['id']
+ collection_url = '/role_assignments?user.id=%s' % user1['id']
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=2,
@@ -1712,7 +791,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, up_entity)
self.assertRoleAssignmentInListResponse(r, ud_entity)
- collection_url = '/role_assignments?group.id=%s' % self.group1['id']
+ collection_url = '/role_assignments?group.id=%s' % group1['id']
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=2,
@@ -1733,8 +812,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
collection_url = (
'/role_assignments?user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
- 'user_id': self.user1['id'],
- 'project_id': self.project1['id']})
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=1,
@@ -1746,7 +825,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# assigned as well as by virtue of group membership
collection_url = ('/role_assignments?effective&user.id=%s' %
- self.user1['id'])
+ user1['id'])
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=4,
@@ -1756,17 +835,18 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, ud_entity)
# ...and the two via group membership...
gp1_link = self.build_role_assignment_link(
- project_id=self.project1['id'], group_id=self.group1['id'],
+ project_id=project1['id'],
+ group_id=group1['id'],
role_id=self.role1['id'])
gd1_link = self.build_role_assignment_link(domain_id=self.domain_id,
- group_id=self.group1['id'],
+ group_id=group1['id'],
role_id=self.role1['id'])
up1_entity = self.build_role_assignment_entity(
- link=gp1_link, project_id=self.project1['id'],
- user_id=self.user1['id'], role_id=self.role1['id'])
+ link=gp1_link, project_id=project1['id'],
+ user_id=user1['id'], role_id=self.role1['id'])
ud1_entity = self.build_role_assignment_entity(
- link=gd1_link, domain_id=self.domain_id, user_id=self.user1['id'],
+ link=gd1_link, domain_id=self.domain_id, user_id=user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up1_entity)
self.assertRoleAssignmentInListResponse(r, ud1_entity)
@@ -1778,8 +858,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
collection_url = (
'/role_assignments?effective&user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
- 'user_id': self.user1['id'],
- 'project_id': self.project1['id']})
+ 'user_id': user1['id'],
+ 'project_id': project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r,
expected_length=2,
@@ -1804,7 +884,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
"""
def create_project_hierarchy(parent_id, depth):
- "Creates a random project hierarchy."
+ """Creates a random project hierarchy."""
if depth == 0:
return
@@ -1812,7 +892,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
subprojects = []
for i in range(breadth):
- subprojects.append(self.new_project_ref(
+ subprojects.append(unit.new_project_ref(
domain_id=self.domain_id, parent_id=parent_id))
self.resource_api.create_project(subprojects[-1]['id'],
subprojects[-1])
@@ -1823,12 +903,12 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
super(RoleAssignmentBaseTestCase, self).load_sample_data()
# Create a domain
- self.domain = self.new_domain_ref()
+ self.domain = unit.new_domain_ref()
self.domain_id = self.domain['id']
self.resource_api.create_domain(self.domain_id, self.domain)
# Create a project hierarchy
- self.project = self.new_project_ref(domain_id=self.domain_id)
+ self.project = unit.new_project_ref(domain_id=self.domain_id)
self.project_id = self.project['id']
self.resource_api.create_project(self.project_id, self.project)
@@ -1839,14 +919,14 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
# Create 3 users
self.user_ids = []
for i in range(3):
- user = self.new_user_ref(domain_id=self.domain_id)
+ user = unit.new_user_ref(domain_id=self.domain_id)
user = self.identity_api.create_user(user)
self.user_ids.append(user['id'])
# Create 3 groups
self.group_ids = []
for i in range(3):
- group = self.new_group_ref(domain_id=self.domain_id)
+ group = unit.new_group_ref(domain_id=self.domain_id)
group = self.identity_api.create_group(group)
self.group_ids.append(group['id'])
@@ -1861,7 +941,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
role_id=self.role_id)
# Create a role
- self.role = self.new_role_ref()
+ self.role = unit.new_role_ref()
self.role_id = self.role['id']
self.role_api.create_role(self.role_id, self.role)
@@ -1869,7 +949,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
self.default_user_id = self.user_ids[0]
self.default_group_id = self.group_ids[0]
- def get_role_assignments(self, expected_status=200, **filters):
+ def get_role_assignments(self, expected_status=http_client.OK, **filters):
"""Returns the result from querying role assignment API + queried URL.
Calls GET /v3/role_assignments?<params> and returns its result, where
@@ -1880,7 +960,6 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
queried URL.
"""
-
query_url = self._get_role_assignments_query_url(**filters)
response = self.get(query_url, expected_status=expected_status)
@@ -1903,11 +982,11 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
class RoleAssignmentFailureTestCase(RoleAssignmentBaseTestCase):
"""Class for testing invalid query params on /v3/role_assignments API.
- Querying domain and project, or user and group results in a HTTP 400, since
- a role assignment must contain only a single pair of (actor, target). In
- addition, since filtering on role assignments applies only to the final
- result, effective mode cannot be combined with i) group or ii) domain and
- inherited, because it would always result in an empty list.
+ Querying domain and project, or user and group results in a HTTP 400 Bad
+ Request, since a role assignment must contain only a single pair of (actor,
+ target). In addition, since filtering on role assignments applies only to
+ the final result, effective mode cannot be combined with i) group or ii)
+ domain and inherited, because it would always result in an empty list.
"""
@@ -1959,7 +1038,6 @@ class RoleAssignmentDirectTestCase(RoleAssignmentBaseTestCase):
group_id, user_id and inherited_to_projects.
"""
-
# Fills default assignment with provided filters
test_assignment = self._set_default_assignment_attributes(**filters)
@@ -2188,10 +1266,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
def test_get_token_from_inherited_user_domain_role_grants(self):
# Create a new user to ensure that no grant is loaded from sample data
- user = self.new_user_ref(domain_id=self.domain_id)
- password = user['password']
- user = self.identity_api.create_user(user)
- user['password'] = password
+ user = unit.create_user(self.identity_api, domain_id=self.domain_id)
# Define domain and project authentication data
domain_auth_data = self.build_authentication_request(
@@ -2204,10 +1279,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=self.project_id)
# Check the user cannot get a domain nor a project token
- self.v3_authenticate_token(domain_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on domain
non_inher_ud_link = self.build_role_assignment_link(
@@ -2215,12 +1290,12 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_ud_link)
# Check the user can get only a domain token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Create inherited role
- inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ inherited_role = unit.new_role_ref(name='inherited')
self.role_api.create_role(inherited_role['id'], inherited_role)
# Grant inherited role for user on domain
@@ -2230,33 +1305,30 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_ud_link)
# Check the user can get both a domain and a project token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data)
# Delete inherited grant
self.delete(inher_ud_link)
# Check the user can only get a domain token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Delete non-inherited grant
self.delete(non_inher_ud_link)
# Check the user cannot get a domain token anymore
- self.v3_authenticate_token(domain_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_get_token_from_inherited_group_domain_role_grants(self):
# Create a new group and put a new user in it to
# ensure that no grant is loaded from sample data
- user = self.new_user_ref(domain_id=self.domain_id)
- password = user['password']
- user = self.identity_api.create_user(user)
- user['password'] = password
+ user = unit.create_user(self.identity_api, domain_id=self.domain_id)
- group = self.new_group_ref(domain_id=self.domain['id'])
+ group = unit.new_group_ref(domain_id=self.domain['id'])
group = self.identity_api.create_group(group)
self.identity_api.add_user_to_group(user['id'], group['id'])
@@ -2271,10 +1343,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=self.project_id)
# Check the user cannot get a domain nor a project token
- self.v3_authenticate_token(domain_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on domain
non_inher_gd_link = self.build_role_assignment_link(
@@ -2282,12 +1354,12 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_gd_link)
# Check the user can get only a domain token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Create inherited role
- inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ inherited_role = unit.new_role_ref(name='inherited')
self.role_api.create_role(inherited_role['id'], inherited_role)
# Grant inherited role for user on domain
@@ -2297,27 +1369,27 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_gd_link)
# Check the user can get both a domain and a project token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data)
# Delete inherited grant
self.delete(inher_gd_link)
# Check the user can only get a domain token
- self.v3_authenticate_token(domain_auth_data)
- self.v3_authenticate_token(project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data)
+ self.v3_create_token(project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Delete non-inherited grant
self.delete(non_inher_gd_link)
# Check the user cannot get a domain token anymore
- self.v3_authenticate_token(domain_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(domain_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def _test_crud_inherited_and_direct_assignment_on_target(self, target_url):
# Create a new role to avoid assignments loaded from sample data
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# Define URLs
@@ -2360,7 +1432,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
def test_crud_user_inherited_domain_role_grants(self):
role_list = []
for _ in range(2):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
@@ -2409,22 +1481,16 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
"""
role_list = []
for _ in range(4):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
- domain = self.new_domain_ref()
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user1 = self.new_user_ref(
- domain_id=domain['id'])
- password = user1['password']
- user1 = self.identity_api.create_user(user1)
- user1['password'] = password
- project1 = self.new_project_ref(
- domain_id=domain['id'])
+ user1 = unit.create_user(self.identity_api, domain_id=domain['id'])
+ project1 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project1['id'], project1)
- project2 = self.new_project_ref(
- domain_id=domain['id'])
+ project2 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
@@ -2490,6 +1556,98 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, up_entity)
+ def test_list_role_assignments_include_names(self):
+ """Call ``GET /role_assignments with include names``.
+
+ Test Plan:
+
+ - Create a domain with a group and a user
+ - Create a project with a group and a user
+
+ """
+ role1 = unit.new_role_ref()
+ self.role_api.create_role(role1['id'], role1)
+ user1 = unit.create_user(self.identity_api, domain_id=self.domain_id)
+ group = unit.new_group_ref(domain_id=self.domain_id)
+ group = self.identity_api.create_group(group)
+ project1 = unit.new_project_ref(domain_id=self.domain_id)
+ self.resource_api.create_project(project1['id'], project1)
+
+ expected_entity1 = self.build_role_assignment_entity_include_names(
+ role_ref=role1,
+ project_ref=project1,
+ user_ref=user1)
+ self.put(expected_entity1['links']['assignment'])
+ expected_entity2 = self.build_role_assignment_entity_include_names(
+ role_ref=role1,
+ domain_ref=self.domain,
+ group_ref=group)
+ self.put(expected_entity2['links']['assignment'])
+ expected_entity3 = self.build_role_assignment_entity_include_names(
+ role_ref=role1,
+ domain_ref=self.domain,
+ user_ref=user1)
+ self.put(expected_entity3['links']['assignment'])
+ expected_entity4 = self.build_role_assignment_entity_include_names(
+ role_ref=role1,
+ project_ref=project1,
+ group_ref=group)
+ self.put(expected_entity4['links']['assignment'])
+
+ collection_url_domain = (
+ '/role_assignments?include_names&scope.domain.id=%(domain_id)s' % {
+ 'domain_id': self.domain_id})
+ rs_domain = self.get(collection_url_domain)
+ collection_url_project = (
+ '/role_assignments?include_names&'
+ 'scope.project.id=%(project_id)s' % {
+ 'project_id': project1['id']})
+ rs_project = self.get(collection_url_project)
+ collection_url_group = (
+ '/role_assignments?include_names&group.id=%(group_id)s' % {
+ 'group_id': group['id']})
+ rs_group = self.get(collection_url_group)
+ collection_url_user = (
+ '/role_assignments?include_names&user.id=%(user_id)s' % {
+ 'user_id': user1['id']})
+ rs_user = self.get(collection_url_user)
+ collection_url_role = (
+ '/role_assignments?include_names&role.id=%(role_id)s' % {
+ 'role_id': role1['id']})
+ rs_role = self.get(collection_url_role)
+ # Make sure all entities were created successfully
+ self.assertEqual(rs_domain.status_int, http_client.OK)
+ self.assertEqual(rs_project.status_int, http_client.OK)
+ self.assertEqual(rs_group.status_int, http_client.OK)
+ self.assertEqual(rs_user.status_int, http_client.OK)
+ # Make sure we can get back the correct number of entities
+ self.assertValidRoleAssignmentListResponse(
+ rs_domain,
+ expected_length=2,
+ resource_url=collection_url_domain)
+ self.assertValidRoleAssignmentListResponse(
+ rs_project,
+ expected_length=2,
+ resource_url=collection_url_project)
+ self.assertValidRoleAssignmentListResponse(
+ rs_group,
+ expected_length=2,
+ resource_url=collection_url_group)
+ self.assertValidRoleAssignmentListResponse(
+ rs_user,
+ expected_length=2,
+ resource_url=collection_url_user)
+ self.assertValidRoleAssignmentListResponse(
+ rs_role,
+ expected_length=4,
+ resource_url=collection_url_role)
+ # Verify all types of entities have the correct format
+ self.assertRoleAssignmentInListResponse(rs_domain, expected_entity2)
+ self.assertRoleAssignmentInListResponse(rs_project, expected_entity1)
+ self.assertRoleAssignmentInListResponse(rs_group, expected_entity4)
+ self.assertRoleAssignmentInListResponse(rs_user, expected_entity3)
+ self.assertRoleAssignmentInListResponse(rs_role, expected_entity1)
+
def test_list_role_assignments_for_disabled_inheritance_extension(self):
"""Call ``GET /role_assignments with inherited domain grants``.
@@ -2503,25 +1661,18 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
shows up.
"""
-
role_list = []
for _ in range(4):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
- domain = self.new_domain_ref()
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user1 = self.new_user_ref(
- domain_id=domain['id'])
- password = user1['password']
- user1 = self.identity_api.create_user(user1)
- user1['password'] = password
- project1 = self.new_project_ref(
- domain_id=domain['id'])
+ user1 = unit.create_user(self.identity_api, domain_id=domain['id'])
+ project1 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project1['id'], project1)
- project2 = self.new_project_ref(
- domain_id=domain['id'])
+ project2 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
@@ -2598,34 +1749,23 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
"""
role_list = []
for _ in range(4):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
- domain = self.new_domain_ref()
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user1 = self.new_user_ref(
- domain_id=domain['id'])
- password = user1['password']
- user1 = self.identity_api.create_user(user1)
- user1['password'] = password
- user2 = self.new_user_ref(
- domain_id=domain['id'])
- password = user2['password']
- user2 = self.identity_api.create_user(user2)
- user2['password'] = password
- group1 = self.new_group_ref(
- domain_id=domain['id'])
+ user1 = unit.create_user(self.identity_api, domain_id=domain['id'])
+ user2 = unit.create_user(self.identity_api, domain_id=domain['id'])
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user2['id'],
group1['id'])
- project1 = self.new_project_ref(
- domain_id=domain['id'])
+ project1 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project1['id'], project1)
- project2 = self.new_project_ref(
- domain_id=domain['id'])
+ project2 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
@@ -2704,25 +1844,18 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
"""
role_list = []
for _ in range(5):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
- domain = self.new_domain_ref()
+ domain = unit.new_domain_ref()
self.resource_api.create_domain(domain['id'], domain)
- user1 = self.new_user_ref(
- domain_id=domain['id'])
- password = user1['password']
- user1 = self.identity_api.create_user(user1)
- user1['password'] = password
- group1 = self.new_group_ref(
- domain_id=domain['id'])
+ user1 = unit.create_user(self.identity_api, domain_id=domain['id'])
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
- project1 = self.new_project_ref(
- domain_id=domain['id'])
+ project1 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project1['id'], project1)
- project2 = self.new_project_ref(
- domain_id=domain['id'])
+ project2 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project2['id'], project2)
# Add some spoiler roles to the projects
self.assignment_api.add_role_to_user_and_project(
@@ -2790,17 +1923,17 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
"""
# Create project hierarchy
- root = self.new_project_ref(domain_id=self.domain['id'])
- leaf = self.new_project_ref(domain_id=self.domain['id'],
+ root = unit.new_project_ref(domain_id=self.domain['id'])
+ leaf = unit.new_project_ref(domain_id=self.domain['id'],
parent_id=root['id'])
self.resource_api.create_project(root['id'], root)
self.resource_api.create_project(leaf['id'], leaf)
# Create 'non-inherited' and 'inherited' roles
- non_inherited_role = {'id': uuid.uuid4().hex, 'name': 'non-inherited'}
+ non_inherited_role = unit.new_role_ref(name='non-inherited')
self.role_api.create_role(non_inherited_role['id'], non_inherited_role)
- inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'}
+ inherited_role = unit.new_role_ref(name='inherited')
self.role_api.create_role(inherited_role['id'], inherited_role)
return (root['id'], leaf['id'],
@@ -2822,10 +1955,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=leaf_id)
# Check the user cannot get a token on root nor leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for user on leaf project
non_inher_up_link = self.build_role_assignment_link(
@@ -2834,9 +1967,9 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_up_link)
# Check the user can only get a token on leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data)
# Grant inherited role for user on root project
inher_up_link = self.build_role_assignment_link(
@@ -2845,24 +1978,24 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_up_link)
# Check the user still can get a token only on leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data)
# Delete non-inherited grant
self.delete(non_inher_up_link)
# Check the inherited role still applies for leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data)
# Delete inherited grant
self.delete(inher_up_link)
# Check the user cannot get a token on leaf project anymore
- self.v3_authenticate_token(leaf_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_get_token_from_inherited_group_project_role_grants(self):
# Create default scenario
@@ -2870,7 +2003,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self._setup_hierarchical_projects_scenario())
# Create group and add user to it
- group = self.new_group_ref(domain_id=self.domain['id'])
+ group = unit.new_group_ref(domain_id=self.domain['id'])
group = self.identity_api.create_group(group)
self.identity_api.add_user_to_group(self.user['id'], group['id'])
@@ -2885,10 +2018,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
project_id=leaf_id)
# Check the user cannot get a token on root nor leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# Grant non-inherited role for group on leaf project
non_inher_gp_link = self.build_role_assignment_link(
@@ -2897,9 +2030,9 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(non_inher_gp_link)
# Check the user can only get a token on leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data)
# Grant inherited role for group on root project
inher_gp_link = self.build_role_assignment_link(
@@ -2908,22 +2041,22 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
self.put(inher_gp_link)
# Check the user still can get a token only on leaf project
- self.v3_authenticate_token(root_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(root_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data)
# Delete no-inherited grant
self.delete(non_inher_gp_link)
# Check the inherited role still applies for leaf project
- self.v3_authenticate_token(leaf_project_auth_data)
+ self.v3_create_token(leaf_project_auth_data)
# Delete inherited grant
self.delete(inher_gp_link)
# Check the user cannot get a token on leaf project anymore
- self.v3_authenticate_token(leaf_project_auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(leaf_project_auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_get_role_assignments_for_project_hierarchy(self):
"""Call ``GET /role_assignments``.
@@ -3028,6 +2161,154 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase,
inher_up_entity['scope']['project']['id'] = leaf_id
self.assertRoleAssignmentInListResponse(r, inher_up_entity)
+ def test_project_id_specified_if_include_subtree_specified(self):
+ """When using include_subtree, you must specify a project ID."""
+ self.get('/role_assignments?include_subtree=True',
+ expected_status=http_client.BAD_REQUEST)
+ self.get('/role_assignments?scope.project.id&'
+ 'include_subtree=True',
+ expected_status=http_client.BAD_REQUEST)
+
+ def test_get_role_assignments_for_project_tree(self):
+ """Get role_assignment?scope.project.id=X?include_subtree``.
+
+ Test Plan:
+
+ - Create 2 roles and a hierarchy of projects with one root and one leaf
+ - Issue the URL to add a non-inherited user role to the root project
+ and the leaf project
+ - Issue the URL to get role assignments for the root project but
+ not the subtree - this should return just the root assignment
+ - Issue the URL to get role assignments for the root project and
+ it's subtree - this should return both assignments
+ - Check that explicitly setting include_subtree to False is the
+ equivalent to not including it at all in the query.
+
+ """
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, unused_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Grant non-inherited role to root and leaf projects
+ non_inher_entity_root = self.build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_entity_root['links']['assignment'])
+ non_inher_entity_leaf = self.build_role_assignment_entity(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_entity_leaf['links']['assignment'])
+
+ # Without the subtree, we should get the one assignment on the
+ # root project
+ collection_url = (
+ '/role_assignments?scope.project.id=%(project)s' % {
+ 'project': root_id})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r, resource_url=collection_url)
+
+ self.assertThat(r.result['role_assignments'], matchers.HasLength(1))
+ self.assertRoleAssignmentInListResponse(r, non_inher_entity_root)
+
+ # With the subtree, we should get both assignments
+ collection_url = (
+ '/role_assignments?scope.project.id=%(project)s'
+ '&include_subtree=True' % {
+ 'project': root_id})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r, resource_url=collection_url)
+
+ self.assertThat(r.result['role_assignments'], matchers.HasLength(2))
+ self.assertRoleAssignmentInListResponse(r, non_inher_entity_root)
+ self.assertRoleAssignmentInListResponse(r, non_inher_entity_leaf)
+
+ # With subtree=0, we should also only get the one assignment on the
+ # root project
+ collection_url = (
+ '/role_assignments?scope.project.id=%(project)s'
+ '&include_subtree=0' % {
+ 'project': root_id})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r, resource_url=collection_url)
+
+ self.assertThat(r.result['role_assignments'], matchers.HasLength(1))
+ self.assertRoleAssignmentInListResponse(r, non_inher_entity_root)
+
+ def test_get_effective_role_assignments_for_project_tree(self):
+ """Get role_assignment ?project_id=X?include_subtree=True?effective``.
+
+ Test Plan:
+
+ - Create 2 roles and a hierarchy of projects with one root and 4 levels
+ of child project
+ - Issue the URL to add a non-inherited user role to the root project
+ and a level 1 project
+ - Issue the URL to add an inherited user role on the level 2 project
+ - Issue the URL to get effective role assignments for the level 1
+ project and it's subtree - this should return a role (non-inherited)
+ on the level 1 project and roles (inherited) on each of the level
+ 2, 3 and 4 projects
+
+ """
+ # Create default scenario
+ root_id, leaf_id, non_inherited_role_id, inherited_role_id = (
+ self._setup_hierarchical_projects_scenario())
+
+ # Add some extra projects to the project hierarchy
+ level2 = unit.new_project_ref(domain_id=self.domain['id'],
+ parent_id=leaf_id)
+ level3 = unit.new_project_ref(domain_id=self.domain['id'],
+ parent_id=level2['id'])
+ level4 = unit.new_project_ref(domain_id=self.domain['id'],
+ parent_id=level3['id'])
+ self.resource_api.create_project(level2['id'], level2)
+ self.resource_api.create_project(level3['id'], level3)
+ self.resource_api.create_project(level4['id'], level4)
+
+ # Grant non-inherited role to root (as a spoiler) and to
+ # the level 1 (leaf) project
+ non_inher_entity_root = self.build_role_assignment_entity(
+ project_id=root_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_entity_root['links']['assignment'])
+ non_inher_entity_leaf = self.build_role_assignment_entity(
+ project_id=leaf_id, user_id=self.user['id'],
+ role_id=non_inherited_role_id)
+ self.put(non_inher_entity_leaf['links']['assignment'])
+
+ # Grant inherited role to level 2
+ inher_entity = self.build_role_assignment_entity(
+ project_id=level2['id'], user_id=self.user['id'],
+ role_id=inherited_role_id, inherited_to_projects=True)
+ self.put(inher_entity['links']['assignment'])
+
+ # Get effective role assignments
+ collection_url = (
+ '/role_assignments?scope.project.id=%(project)s'
+ '&include_subtree=True&effective' % {
+ 'project': leaf_id})
+ r = self.get(collection_url)
+ self.assertValidRoleAssignmentListResponse(
+ r, resource_url=collection_url)
+
+ # There should be three assignments returned in total
+ self.assertThat(r.result['role_assignments'], matchers.HasLength(3))
+
+ # Assert that the user does not non-inherited role on root project
+ self.assertRoleAssignmentNotInListResponse(r, non_inher_entity_root)
+
+ # Assert that the user does have non-inherited role on leaf project
+ self.assertRoleAssignmentInListResponse(r, non_inher_entity_leaf)
+
+ # Assert that the user has inherited role on levels 3 and 4
+ inher_entity['scope']['project']['id'] = level3['id']
+ self.assertRoleAssignmentInListResponse(r, inher_entity)
+ inher_entity['scope']['project']['id'] = level4['id']
+ self.assertRoleAssignmentInListResponse(r, inher_entity)
+
def test_get_inherited_role_assignments_for_project_hierarchy(self):
"""Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``.
@@ -3089,7 +2370,7 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase):
self.config_fixture.config(group='os_inherit', enabled=False)
def test_crud_inherited_role_grants_failed_if_disabled(self):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
base_collection_url = (
@@ -3107,118 +2388,484 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase):
self.delete(member_url, expected_status=http_client.NOT_FOUND)
-class AssignmentV3toV2MethodsTestCase(unit.TestCase):
- """Test domain V3 to V2 conversion methods."""
- def _setup_initial_projects(self):
- self.project_id = uuid.uuid4().hex
- self.domain_id = CONF.identity.default_domain_id
- self.parent_id = uuid.uuid4().hex
- # Project with only domain_id in ref
- self.project1 = {'id': self.project_id,
- 'name': self.project_id,
- 'domain_id': self.domain_id}
- # Project with both domain_id and parent_id in ref
- self.project2 = {'id': self.project_id,
- 'name': self.project_id,
- 'domain_id': self.domain_id,
- 'parent_id': self.parent_id}
- # Project with no domain_id and parent_id in ref
- self.project3 = {'id': self.project_id,
- 'name': self.project_id,
- 'domain_id': self.domain_id,
- 'parent_id': self.parent_id}
- # Expected result with no domain_id and parent_id
- self.expected_project = {'id': self.project_id,
- 'name': self.project_id}
-
- def test_v2controller_filter_domain_id(self):
- # V2.0 is not domain aware, ensure domain_id is popped off the ref.
- other_data = uuid.uuid4().hex
- domain_id = CONF.identity.default_domain_id
- ref = {'domain_id': domain_id,
- 'other_data': other_data}
-
- ref_no_domain = {'other_data': other_data}
- expected_ref = ref_no_domain.copy()
-
- updated_ref = controller.V2Controller.filter_domain_id(ref)
- self.assertIs(ref, updated_ref)
- self.assertDictEqual(ref, expected_ref)
- # Make sure we don't error/muck up data if domain_id isn't present
- updated_ref = controller.V2Controller.filter_domain_id(ref_no_domain)
- self.assertIs(ref_no_domain, updated_ref)
- self.assertDictEqual(ref_no_domain, expected_ref)
-
- def test_v3controller_filter_domain_id(self):
- # No data should be filtered out in this case.
- other_data = uuid.uuid4().hex
- domain_id = uuid.uuid4().hex
- ref = {'domain_id': domain_id,
- 'other_data': other_data}
-
- expected_ref = ref.copy()
- updated_ref = controller.V3Controller.filter_domain_id(ref)
- self.assertIs(ref, updated_ref)
- self.assertDictEqual(ref, expected_ref)
-
- def test_v2controller_filter_domain(self):
- other_data = uuid.uuid4().hex
- domain_id = uuid.uuid4().hex
- non_default_domain_ref = {'domain': {'id': domain_id},
- 'other_data': other_data}
- default_domain_ref = {'domain': {'id': 'default'},
- 'other_data': other_data}
- updated_ref = controller.V2Controller.filter_domain(default_domain_ref)
- self.assertNotIn('domain', updated_ref)
- self.assertRaises(exception.Unauthorized,
- controller.V2Controller.filter_domain,
- non_default_domain_ref)
-
- def test_v2controller_filter_project_parent_id(self):
- # V2.0 is not project hierarchy aware, ensure parent_id is popped off.
- other_data = uuid.uuid4().hex
- parent_id = uuid.uuid4().hex
- ref = {'parent_id': parent_id,
- 'other_data': other_data}
-
- ref_no_parent = {'other_data': other_data}
- expected_ref = ref_no_parent.copy()
-
- updated_ref = controller.V2Controller.filter_project_parent_id(ref)
- self.assertIs(ref, updated_ref)
- self.assertDictEqual(ref, expected_ref)
- # Make sure we don't error/muck up data if parent_id isn't present
- updated_ref = controller.V2Controller.filter_project_parent_id(
- ref_no_parent)
- self.assertIs(ref_no_parent, updated_ref)
- self.assertDictEqual(ref_no_parent, expected_ref)
-
- def test_v3_to_v2_project_method(self):
- self._setup_initial_projects()
- updated_project1 = controller.V2Controller.v3_to_v2_project(
- self.project1)
- self.assertIs(self.project1, updated_project1)
- self.assertDictEqual(self.project1, self.expected_project)
- updated_project2 = controller.V2Controller.v3_to_v2_project(
- self.project2)
- self.assertIs(self.project2, updated_project2)
- self.assertDictEqual(self.project2, self.expected_project)
- updated_project3 = controller.V2Controller.v3_to_v2_project(
- self.project3)
- self.assertIs(self.project3, updated_project3)
- self.assertDictEqual(self.project3, self.expected_project)
-
- def test_v3_to_v2_project_method_list(self):
- self._setup_initial_projects()
- project_list = [self.project1, self.project2, self.project3]
- updated_list = controller.V2Controller.v3_to_v2_project(project_list)
-
- self.assertEqual(len(updated_list), len(project_list))
-
- for i, ref in enumerate(updated_list):
- # Order should not change.
- self.assertIs(ref, project_list[i])
-
- self.assertDictEqual(self.project1, self.expected_project)
- self.assertDictEqual(self.project2, self.expected_project)
- self.assertDictEqual(self.project3, self.expected_project)
+class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin,
+ unit.TestCase):
+ def _create_role(self):
+ """Call ``POST /roles``."""
+ ref = unit.new_role_ref()
+ r = self.post('/roles', body={'role': ref})
+ return self.assertValidRoleResponse(r, ref)
+
+ def test_list_implied_roles_none(self):
+ self.prior = self._create_role()
+ url = '/roles/%s/implies' % (self.prior['id'])
+ response = self.get(url).json["role_inference"]
+ self.assertEqual(self.prior['id'], response['prior_role']['id'])
+ self.assertEqual(0, len(response['implies']))
+
+ def _create_implied_role(self, prior, implied):
+ self.put('/roles/%s/implies/%s' % (prior['id'], implied['id']),
+ expected_status=http_client.CREATED)
+
+ def _delete_implied_role(self, prior, implied):
+ self.delete('/roles/%s/implies/%s' % (prior['id'], implied['id']))
+
+ def _setup_prior_two_implied(self):
+ self.prior = self._create_role()
+ self.implied1 = self._create_role()
+ self._create_implied_role(self.prior, self.implied1)
+ self.implied2 = self._create_role()
+ self._create_implied_role(self.prior, self.implied2)
+
+ def _assert_expected_implied_role_response(
+ self, expected_prior_id, expected_implied_ids):
+ r = self.get('/roles/%s/implies' % expected_prior_id)
+ response = r.json["role_inference"]
+ self.assertEqual(expected_prior_id, response['prior_role']['id'])
+
+ actual_implied_ids = [implied['id'] for implied in response['implies']]
+
+ for expected_id in expected_implied_ids:
+ self.assertIn(expected_id, actual_implied_ids)
+ self.assertEqual(len(expected_implied_ids), len(response['implies']))
+
+ self.assertIsNotNone(response['prior_role']['links']['self'])
+ for implied in response['implies']:
+ self.assertIsNotNone(implied['links']['self'])
+
+ def _assert_two_roles_implied(self):
+ self._assert_expected_implied_role_response(
+ self.prior['id'], [self.implied1['id'], self.implied2['id']])
+
+ def _assert_one_role_implied(self):
+ self._assert_expected_implied_role_response(
+ self.prior['id'], [self.implied1['id']])
+
+ self.get('/roles/%s/implies/%s' %
+ (self.prior['id'], self.implied2['id']),
+ expected_status=http_client.NOT_FOUND)
+
+ def _assert_two_rules_defined(self):
+ r = self.get('/role_inferences/')
+
+ rules = r.result['role_inferences']
+
+ self.assertEqual(self.prior['id'], rules[0]['prior_role']['id'])
+ self.assertEqual(2, len(rules[0]['implies']))
+ implied_ids = [implied['id'] for implied in rules[0]['implies']]
+ implied_names = [implied['name'] for implied in rules[0]['implies']]
+
+ self.assertIn(self.implied1['id'], implied_ids)
+ self.assertIn(self.implied2['id'], implied_ids)
+ self.assertIn(self.implied1['name'], implied_names)
+ self.assertIn(self.implied2['name'], implied_names)
+
+ def _assert_one_rule_defined(self):
+ r = self.get('/role_inferences/')
+ rules = r.result['role_inferences']
+ self.assertEqual(self.prior['id'], rules[0]['prior_role']['id'])
+ self.assertEqual(self.implied1['id'], rules[0]['implies'][0]['id'])
+ self.assertEqual(self.implied1['name'], rules[0]['implies'][0]['name'])
+ self.assertEqual(1, len(rules[0]['implies']))
+
+ def test_list_all_rules(self):
+ self._setup_prior_two_implied()
+ self._assert_two_rules_defined()
+
+ self._delete_implied_role(self.prior, self.implied2)
+ self._assert_one_rule_defined()
+
+ def test_CRD_implied_roles(self):
+
+ self._setup_prior_two_implied()
+ self._assert_two_roles_implied()
+
+ self._delete_implied_role(self.prior, self.implied2)
+ self._assert_one_role_implied()
+
+ def _create_three_roles(self):
+ self.role_list = []
+ for _ in range(3):
+ role = unit.new_role_ref()
+ self.role_api.create_role(role['id'], role)
+ self.role_list.append(role)
+
+ def _create_test_domain_user_project(self):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ user = unit.create_user(self.identity_api, domain_id=domain['id'])
+ project = unit.new_project_ref(domain_id=domain['id'])
+ self.resource_api.create_project(project['id'], project)
+ return domain, user, project
+
+ def _assign_top_role_to_user_on_project(self, user, project):
+ self.assignment_api.add_role_to_user_and_project(
+ user['id'], project['id'], self.role_list[0]['id'])
+
+ def _build_effective_role_assignments_url(self, user):
+ return '/role_assignments?effective&user.id=%(user_id)s' % {
+ 'user_id': user['id']}
+
+ def _assert_all_roles_in_assignment(self, response, user):
+ # Now use the list role assignments api to check that all three roles
+ # appear in the collection
+ self.assertValidRoleAssignmentListResponse(
+ response,
+ expected_length=len(self.role_list),
+ resource_url=self._build_effective_role_assignments_url(user))
+
+ def _assert_initial_assignment_in_effective(self, response, user, project):
+ # The initial assignment should be there (the link url will be
+ # generated and checked automatically since it matches the assignment)
+ entity = self.build_role_assignment_entity(
+ project_id=project['id'],
+ user_id=user['id'], role_id=self.role_list[0]['id'])
+ self.assertRoleAssignmentInListResponse(response, entity)
+
+ def _assert_effective_role_for_implied_has_prior_in_links(
+ self, response, user, project, prior_index, implied_index):
+ # An effective role for an implied role will have the prior role
+ # assignment in the links
+ prior_link = '/prior_roles/%(prior)s/implies/%(implied)s' % {
+ 'prior': self.role_list[prior_index]['id'],
+ 'implied': self.role_list[implied_index]['id']}
+ link = self.build_role_assignment_link(
+ project_id=project['id'], user_id=user['id'],
+ role_id=self.role_list[prior_index]['id'])
+ entity = self.build_role_assignment_entity(
+ link=link, project_id=project['id'],
+ user_id=user['id'], role_id=self.role_list[implied_index]['id'],
+ prior_link=prior_link)
+ self.assertRoleAssignmentInListResponse(response, entity)
+
+ def test_list_role_assignments_with_implied_roles(self):
+ """Call ``GET /role_assignments`` with implied role grant.
+
+ Test Plan:
+
+ - Create a domain with a user and a project
+ - Create 3 roles
+ - Role 0 implies role 1 and role 1 implies role 2
+ - Assign the top role to the project
+ - Issue the URL to check effective roles on project - this
+ should return all 3 roles.
+ - Check the links of the 3 roles indicate the prior role where
+ appropriate
+
+ """
+ (domain, user, project) = self._create_test_domain_user_project()
+ self._create_three_roles()
+ self._create_implied_role(self.role_list[0], self.role_list[1])
+ self._create_implied_role(self.role_list[1], self.role_list[2])
+ self._assign_top_role_to_user_on_project(user, project)
+
+ response = self.get(self._build_effective_role_assignments_url(user))
+ r = response
+
+ self._assert_all_roles_in_assignment(r, user)
+ self._assert_initial_assignment_in_effective(response, user, project)
+ self._assert_effective_role_for_implied_has_prior_in_links(
+ response, user, project, 0, 1)
+ self._assert_effective_role_for_implied_has_prior_in_links(
+ response, user, project, 1, 2)
+
+ def _create_named_role(self, name):
+ role = unit.new_role_ref()
+ role['name'] = name
+ self.role_api.create_role(role['id'], role)
+ return role
+
+ def test_root_role_as_implied_role_forbidden(self):
+ """Test root role is forbidden to be set as an implied role.
+
+ Create 2 roles that are prohibited from being an implied role.
+ Create 1 additional role which should be accepted as an implied
+ role. Assure the prohibited role names cannot be set as an implied
+ role. Assure the accepted role name which is not a member of the
+ prohibited implied role list can be successfully set an implied
+ role.
+ """
+ prohibited_name1 = 'root1'
+ prohibited_name2 = 'root2'
+ accepted_name1 = 'implied1'
+
+ prohibited_names = [prohibited_name1, prohibited_name2]
+ self.config_fixture.config(group='assignment',
+ prohibited_implied_role=prohibited_names)
+
+ prior_role = self._create_role()
+
+ prohibited_role1 = self._create_named_role(prohibited_name1)
+ url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format(
+ prior_role_id=prior_role['id'],
+ implied_role_id=prohibited_role1['id'])
+ self.put(url, expected_status=http_client.FORBIDDEN)
+
+ prohibited_role2 = self._create_named_role(prohibited_name2)
+ url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format(
+ prior_role_id=prior_role['id'],
+ implied_role_id=prohibited_role2['id'])
+ self.put(url, expected_status=http_client.FORBIDDEN)
+
+ accepted_role1 = self._create_named_role(accepted_name1)
+ url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format(
+ prior_role_id=prior_role['id'],
+ implied_role_id=accepted_role1['id'])
+ self.put(url, expected_status=http_client.CREATED)
+
+ def test_trusts_from_implied_role(self):
+ self._create_three_roles()
+ self._create_implied_role(self.role_list[0], self.role_list[1])
+ self._create_implied_role(self.role_list[1], self.role_list[2])
+ self._assign_top_role_to_user_on_project(self.user, self.project)
+
+ # Create a trustee and assign the prior role to her
+ trustee = unit.create_user(self.identity_api, domain_id=self.domain_id)
+ ref = unit.new_trust_ref(
+ trustor_user_id=self.user['id'],
+ trustee_user_id=trustee['id'],
+ project_id=self.project['id'],
+ role_ids=[self.role_list[0]['id']])
+ r = self.post('/OS-TRUST/trusts', body={'trust': ref})
+ trust = r.result['trust']
+
+ # Only the role that was specified is in the trust, NOT implied roles
+ self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id'])
+ self.assertThat(trust['roles'], matchers.HasLength(1))
+
+ # Authenticate as the trustee
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password'],
+ trust_id=trust['id'])
+ r = self.v3_create_token(auth_data)
+ token = r.result['token']
+ self.assertThat(token['roles'],
+ matchers.HasLength(len(self.role_list)))
+ for role in token['roles']:
+ self.assertIn(role, self.role_list)
+ for role in self.role_list:
+ self.assertIn(role, token['roles'])
+
+ def test_trusts_from_domain_specific_implied_role(self):
+ self._create_three_roles()
+ # Overwrite the first role with a domain specific role
+ role = unit.new_role_ref(domain_id=self.domain_id)
+ self.role_list[0] = self.role_api.create_role(role['id'], role)
+ self._create_implied_role(self.role_list[0], self.role_list[1])
+ self._create_implied_role(self.role_list[1], self.role_list[2])
+ self._assign_top_role_to_user_on_project(self.user, self.project)
+
+ # Create a trustee and assign the prior role to her
+ trustee = unit.create_user(self.identity_api, domain_id=self.domain_id)
+ ref = unit.new_trust_ref(
+ trustor_user_id=self.user['id'],
+ trustee_user_id=trustee['id'],
+ project_id=self.project['id'],
+ role_ids=[self.role_list[0]['id']])
+ r = self.post('/OS-TRUST/trusts', body={'trust': ref})
+ trust = r.result['trust']
+
+ # Only the role that was specified is in the trust, NOT implied roles
+ self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id'])
+ self.assertThat(trust['roles'], matchers.HasLength(1))
+
+ # Authenticate as the trustee
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password'],
+ trust_id=trust['id'])
+ r = self.v3_create_token(auth_data)
+ token = r.result['token']
+
+ # The token should have the roles implies by the domain specific role,
+ # but not the domain specific role itself.
+ self.assertThat(token['roles'],
+ matchers.HasLength(len(self.role_list) - 1))
+ for role in token['roles']:
+ self.assertIn(role, self.role_list)
+ for role in [self.role_list[1], self.role_list[2]]:
+ self.assertIn(role, token['roles'])
+ self.assertNotIn(self.role_list[0], token['roles'])
+
+
+class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase):
+ def setUp(self):
+ def create_role(domain_id=None):
+ """Call ``POST /roles``."""
+ ref = unit.new_role_ref(domain_id=domain_id)
+ r = self.post(
+ '/roles',
+ body={'role': ref})
+ return self.assertValidRoleResponse(r, ref)
+
+ super(DomainSpecificRoleTests, self).setUp()
+ self.domainA = unit.new_domain_ref()
+ self.resource_api.create_domain(self.domainA['id'], self.domainA)
+ self.domainB = unit.new_domain_ref()
+ self.resource_api.create_domain(self.domainB['id'], self.domainB)
+
+ self.global_role1 = create_role()
+ self.global_role2 = create_role()
+ # Since there maybe other global roles already created, let's count
+ # them, so we can ensure we can check subsequent list responses
+ # are correct
+ r = self.get('/roles')
+ self.existing_global_roles = len(r.result['roles'])
+
+ # And now create some domain specific roles
+ self.domainA_role1 = create_role(domain_id=self.domainA['id'])
+ self.domainA_role2 = create_role(domain_id=self.domainA['id'])
+ self.domainB_role = create_role(domain_id=self.domainB['id'])
+
+ def test_get_and_list_domain_specific_roles(self):
+ # Check we can get a domain specific role
+ r = self.get('/roles/%s' % self.domainA_role1['id'])
+ self.assertValidRoleResponse(r, self.domainA_role1)
+
+ # If we list without specifying a domain, we should only get global
+ # roles back.
+ r = self.get('/roles')
+ self.assertValidRoleListResponse(
+ r, expected_length=self.existing_global_roles)
+ self.assertRoleInListResponse(r, self.global_role1)
+ self.assertRoleInListResponse(r, self.global_role2)
+ self.assertRoleNotInListResponse(r, self.domainA_role1)
+ self.assertRoleNotInListResponse(r, self.domainA_role2)
+ self.assertRoleNotInListResponse(r, self.domainB_role)
+
+ # Now list those in domainA, making sure that's all we get back
+ r = self.get('/roles?domain_id=%s' % self.domainA['id'])
+ self.assertValidRoleListResponse(r, expected_length=2)
+ self.assertRoleInListResponse(r, self.domainA_role1)
+ self.assertRoleInListResponse(r, self.domainA_role2)
+
+ def test_update_domain_specific_roles(self):
+ self.domainA_role1['name'] = uuid.uuid4().hex
+ self.patch('/roles/%(role_id)s' % {
+ 'role_id': self.domainA_role1['id']},
+ body={'role': self.domainA_role1})
+ r = self.get('/roles/%s' % self.domainA_role1['id'])
+ self.assertValidRoleResponse(r, self.domainA_role1)
+
+ def test_delete_domain_specific_roles(self):
+ # Check delete only removes that one domain role
+ self.delete('/roles/%(role_id)s' % {
+ 'role_id': self.domainA_role1['id']})
+
+ self.get('/roles/%s' % self.domainA_role1['id'],
+ expected_status=http_client.NOT_FOUND)
+ # Now re-list those in domainA, making sure there's only one left
+ r = self.get('/roles?domain_id=%s' % self.domainA['id'])
+ self.assertValidRoleListResponse(r, expected_length=1)
+ self.assertRoleInListResponse(r, self.domainA_role2)
+
+
+class ListUserProjectsTestCase(test_v3.RestfulTestCase):
+ """Tests for /users/<user>/projects"""
+
+ def load_sample_data(self):
+ # do not load base class's data, keep it focused on the tests
+
+ self.auths = []
+ self.domains = []
+ self.projects = []
+ self.roles = []
+ self.users = []
+
+ # Create 3 sets of domain, roles, projects, and users to demonstrate
+ # the right user's data is loaded and only projects they can access
+ # are returned.
+
+ for _ in range(3):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+
+ user = unit.create_user(self.identity_api, domain_id=domain['id'])
+
+ role = unit.new_role_ref()
+ self.role_api.create_role(role['id'], role)
+
+ self.assignment_api.create_grant(role['id'],
+ user_id=user['id'],
+ domain_id=domain['id'])
+
+ project = unit.new_project_ref(domain_id=domain['id'])
+ self.resource_api.create_project(project['id'], project)
+
+ self.assignment_api.create_grant(role['id'],
+ user_id=user['id'],
+ project_id=project['id'])
+
+ auth = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password'],
+ domain_id=domain['id'])
+
+ self.auths.append(auth)
+ self.domains.append(domain)
+ self.projects.append(project)
+ self.roles.append(role)
+ self.users.append(user)
+
+ def test_list_all(self):
+ for i in range(len(self.users)):
+ user = self.users[i]
+ auth = self.auths[i]
+
+ url = '/users/%s/projects' % user['id']
+ result = self.get(url, auth=auth)
+ projects_result = result.json['projects']
+ self.assertEqual(1, len(projects_result))
+ self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])
+
+ def test_list_enabled(self):
+ for i in range(len(self.users)):
+ user = self.users[i]
+ auth = self.auths[i]
+
+ # There are no disabled projects
+ url = '/users/%s/projects?enabled=True' % user['id']
+ result = self.get(url, auth=auth)
+ projects_result = result.json['projects']
+ self.assertEqual(1, len(projects_result))
+ self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])
+
+ def test_list_disabled(self):
+ for i in range(len(self.users)):
+ user = self.users[i]
+ auth = self.auths[i]
+ project = self.projects[i]
+
+ # There are no disabled projects
+ url = '/users/%s/projects?enabled=False' % user['id']
+ result = self.get(url, auth=auth)
+ self.assertEqual(0, len(result.json['projects']))
+
+ # disable this one and check again
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+ result = self.get(url, auth=auth)
+ projects_result = result.json['projects']
+ self.assertEqual(1, len(projects_result))
+ self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])
+
+ def test_list_by_domain_id(self):
+ for i in range(len(self.users)):
+ user = self.users[i]
+ domain = self.domains[i]
+ auth = self.auths[i]
+
+ # Try looking for projects with a non-existent domain_id
+ url = '/users/%s/projects?domain_id=%s' % (user['id'],
+ uuid.uuid4().hex)
+ result = self.get(url, auth=auth)
+ self.assertEqual(0, len(result.json['projects']))
+
+ # Now try a valid one
+ url = '/users/%s/projects?domain_id=%s' % (user['id'],
+ domain['id'])
+ result = self.get(url, auth=auth)
+ projects_result = result.json['projects']
+ self.assertEqual(1, len(projects_result))
+ self.assertEqual(self.projects[i]['id'], projects_result[0]['id'])