diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_assignment.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_assignment.py | 2419 |
1 files changed, 1033 insertions, 1386 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_assignment.py b/keystone-moon/keystone/tests/unit/test_v3_assignment.py index 6b15b1c3..86fb9f74 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_assignment.py +++ b/keystone-moon/keystone/tests/unit/test_v3_assignment.py @@ -16,12 +16,10 @@ import uuid from oslo_config import cfg from six.moves import http_client from six.moves import range +from testtools import matchers -from keystone.common import controller -from keystone import exception from keystone.tests import unit from keystone.tests.unit import test_v3 -from keystone.tests.unit import utils CONF = cfg.CONF @@ -29,1042 +27,20 @@ CONF = cfg.CONF class AssignmentTestCase(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin): - """Test domains, projects, roles and role assignments.""" + """Test roles and role assignments.""" def setUp(self): super(AssignmentTestCase, self).setUp() - self.group = self.new_group_ref( - domain_id=self.domain_id) + self.group = unit.new_group_ref(domain_id=self.domain_id) self.group = self.identity_api.create_group(self.group) self.group_id = self.group['id'] - self.credential_id = uuid.uuid4().hex - self.credential = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project_id) - self.credential['id'] = self.credential_id - self.credential_api.create_credential( - self.credential_id, - self.credential) - - # Domain CRUD tests - - def test_create_domain(self): - """Call ``POST /domains``.""" - ref = self.new_domain_ref() - r = self.post( - '/domains', - body={'domain': ref}) - return self.assertValidDomainResponse(r, ref) - - def test_create_domain_case_sensitivity(self): - """Call `POST /domains`` twice with upper() and lower() cased name.""" - ref = self.new_domain_ref() - - # ensure the name is lowercase - ref['name'] = ref['name'].lower() - r = self.post( - '/domains', - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - # ensure the name is uppercase - ref['name'] = ref['name'].upper() - r = self.post( - '/domains', - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - def test_create_domain_bad_request(self): - """Call ``POST /domains``.""" - self.post('/domains', body={'domain': {}}, - expected_status=http_client.BAD_REQUEST) - - def test_list_domains(self): - """Call ``GET /domains``.""" - resource_url = '/domains' - r = self.get(resource_url) - self.assertValidDomainListResponse(r, ref=self.domain, - resource_url=resource_url) - - def test_get_domain(self): - """Call ``GET /domains/{domain_id}``.""" - r = self.get('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}) - self.assertValidDomainResponse(r, self.domain) - - def test_update_domain(self): - """Call ``PATCH /domains/{domain_id}``.""" - ref = self.new_domain_ref() - del ref['id'] - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}, - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - def test_disable_domain(self): - """Call ``PATCH /domains/{domain_id}`` (set enabled=False).""" - # Create a 2nd set of entities in a 2nd domain - self.domain2 = self.new_domain_ref() - self.resource_api.create_domain(self.domain2['id'], self.domain2) - - self.project2 = self.new_project_ref( - domain_id=self.domain2['id']) - self.resource_api.create_project(self.project2['id'], self.project2) - - self.user2 = self.new_user_ref( - domain_id=self.domain2['id'], - project_id=self.project2['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - - self.assignment_api.add_user_to_project(self.project2['id'], - self.user2['id']) - - # First check a user in that domain can authenticate. The v2 user - # cannot authenticate because they exist outside the default domain. - body = { - 'auth': { - 'passwordCredentials': { - 'userId': self.user2['id'], - 'password': self.user2['password'] - }, - 'tenantId': self.project2['id'] - } - } - self.admin_request( - path='/v2.0/tokens', method='POST', body=body, - expected_status=http_client.UNAUTHORIZED) - - auth_data = self.build_authentication_request( - user_id=self.user2['id'], - password=self.user2['password'], - project_id=self.project2['id']) - self.v3_authenticate_token(auth_data) - - # Now disable the domain - self.domain2['enabled'] = False - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain2['id']}, - body={'domain': {'enabled': False}}) - self.assertValidDomainResponse(r, self.domain2) - - # Make sure the user can no longer authenticate, via - # either API - body = { - 'auth': { - 'passwordCredentials': { - 'userId': self.user2['id'], - 'password': self.user2['password'] - }, - 'tenantId': self.project2['id'] - } - } - self.admin_request( - path='/v2.0/tokens', method='POST', body=body, - expected_status=http_client.UNAUTHORIZED) - - # Try looking up in v3 by name and id - auth_data = self.build_authentication_request( - user_id=self.user2['id'], - password=self.user2['password'], - project_id=self.project2['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - auth_data = self.build_authentication_request( - username=self.user2['name'], - user_domain_id=self.domain2['id'], - password=self.user2['password'], - project_id=self.project2['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - def test_delete_enabled_domain_fails(self): - """Call ``DELETE /domains/{domain_id}`` (when domain enabled).""" - - # Try deleting an enabled domain, which should fail - self.delete('/domains/%(domain_id)s' % { - 'domain_id': self.domain['id']}, - expected_status=exception.ForbiddenAction.code) - - def test_delete_domain(self): - """Call ``DELETE /domains/{domain_id}``. - - The sample data set up already has a user, group, project - and credential that is part of self.domain. Since the user - we will authenticate with is in this domain, we create a - another set of entities in a second domain. Deleting this - second domain should delete all these new entities. In addition, - all the entities in the regular self.domain should be unaffected - by the delete. - - Test Plan: - - - Create domain2 and a 2nd set of entities - - Disable domain2 - - Delete domain2 - - Check entities in domain2 have been deleted - - Check entities in self.domain are unaffected - - """ - - # Create a 2nd set of entities in a 2nd domain - self.domain2 = self.new_domain_ref() - self.resource_api.create_domain(self.domain2['id'], self.domain2) - - self.project2 = self.new_project_ref( - domain_id=self.domain2['id']) - self.resource_api.create_project(self.project2['id'], self.project2) - - self.user2 = self.new_user_ref( - domain_id=self.domain2['id'], - project_id=self.project2['id']) - self.user2 = self.identity_api.create_user(self.user2) - - self.group2 = self.new_group_ref( - domain_id=self.domain2['id']) - self.group2 = self.identity_api.create_group(self.group2) - - self.credential2 = self.new_credential_ref( - user_id=self.user2['id'], - project_id=self.project2['id']) - self.credential_api.create_credential( - self.credential2['id'], - self.credential2) - - # Now disable the new domain and delete it - self.domain2['enabled'] = False - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain2['id']}, - body={'domain': {'enabled': False}}) - self.assertValidDomainResponse(r, self.domain2) - self.delete('/domains/%(domain_id)s' % { - 'domain_id': self.domain2['id']}) - - # Check all the domain2 relevant entities are gone - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - self.domain2['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - self.project2['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - self.group2['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - self.user2['id']) - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - self.credential2['id']) - - # ...and that all self.domain entities are still here - r = self.resource_api.get_domain(self.domain['id']) - self.assertDictEqual(r, self.domain) - r = self.resource_api.get_project(self.project['id']) - self.assertDictEqual(r, self.project) - r = self.identity_api.get_group(self.group['id']) - self.assertDictEqual(r, self.group) - r = self.identity_api.get_user(self.user['id']) - self.user.pop('password') - self.assertDictEqual(r, self.user) - r = self.credential_api.get_credential(self.credential['id']) - self.assertDictEqual(r, self.credential) - - def test_delete_default_domain_fails(self): - # Attempting to delete the default domain results in 403 Forbidden. - - # Need to disable it first. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': CONF.identity.default_domain_id}, - body={'domain': {'enabled': False}}) - - self.delete('/domains/%(domain_id)s' % { - 'domain_id': CONF.identity.default_domain_id}, - expected_status=exception.ForbiddenAction.code) - - def test_delete_new_default_domain_fails(self): - # If change the default domain ID, deleting the new default domain - # results in a 403 Forbidden. - - # Create a new domain that's not the default - new_domain = self.new_domain_ref() - new_domain_id = new_domain['id'] - self.resource_api.create_domain(new_domain_id, new_domain) - - # Disable the new domain so can delete it later. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': new_domain_id}, - body={'domain': {'enabled': False}}) - - # Change the default domain - self.config_fixture.config(group='identity', - default_domain_id=new_domain_id) - - # Attempt to delete the new domain - - self.delete('/domains/%(domain_id)s' % {'domain_id': new_domain_id}, - expected_status=exception.ForbiddenAction.code) - - def test_delete_old_default_domain(self): - # If change the default domain ID, deleting the old default domain - # works. - - # Create a new domain that's not the default - new_domain = self.new_domain_ref() - new_domain_id = new_domain['id'] - self.resource_api.create_domain(new_domain_id, new_domain) - - old_default_domain_id = CONF.identity.default_domain_id - - # Disable the default domain so we can delete it later. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': old_default_domain_id}, - body={'domain': {'enabled': False}}) - - # Change the default domain - self.config_fixture.config(group='identity', - default_domain_id=new_domain_id) - - # Delete the old default domain - - self.delete( - '/domains/%(domain_id)s' % {'domain_id': old_default_domain_id}) - - def test_token_revoked_once_domain_disabled(self): - """Test token from a disabled domain has been invalidated. - - Test that a token that was valid for an enabled domain - becomes invalid once that domain is disabled. - - """ - - self.domain = self.new_domain_ref() - self.resource_api.create_domain(self.domain['id'], self.domain) - - self.user2 = self.new_user_ref(domain_id=self.domain['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - - # build a request body - auth_body = self.build_authentication_request( - user_id=self.user2['id'], - password=self.user2['password']) - - # sends a request for the user's token - token_resp = self.post('/auth/tokens', body=auth_body) - - subject_token = token_resp.headers.get('x-subject-token') - - # validates the returned token and it should be valid. - self.head('/auth/tokens', - headers={'x-subject-token': subject_token}, - expected_status=200) - - # now disable the domain - self.domain['enabled'] = False - url = "/domains/%(domain_id)s" % {'domain_id': self.domain['id']} - self.patch(url, - body={'domain': {'enabled': False}}, - expected_status=200) - - # validates the same token again and it should be 'not found' - # as the domain has already been disabled. - self.head('/auth/tokens', - headers={'x-subject-token': subject_token}, - expected_status=http_client.NOT_FOUND) - - def test_delete_domain_hierarchy(self): - """Call ``DELETE /domains/{domain_id}``.""" - domain = self.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - - root_project = self.new_project_ref( - domain_id=domain['id']) - self.resource_api.create_project(root_project['id'], root_project) - - leaf_project = self.new_project_ref( - domain_id=domain['id'], - parent_id=root_project['id']) - self.resource_api.create_project(leaf_project['id'], leaf_project) - - # Need to disable it first. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': domain['id']}, - body={'domain': {'enabled': False}}) - - self.delete( - '/domains/%(domain_id)s' % { - 'domain_id': domain['id']}) - - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - def test_forbid_operations_on_federated_domain(self): - """Make sure one cannot operate on federated domain. - - This includes operations like create, update, delete - on domain identified by id and name where difference variations of - id 'Federated' are used. - - """ - def create_domains(): - for variation in ('Federated', 'FEDERATED', - 'federated', 'fEderated'): - domain = self.new_domain_ref() - domain['id'] = variation - yield domain - - for domain in create_domains(): - self.assertRaises( - AssertionError, self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises( - AssertionError, self.resource_api.update_domain, - domain['id'], domain) - self.assertRaises( - exception.DomainNotFound, self.resource_api.delete_domain, - domain['id']) - - # swap 'name' with 'id' and try again, expecting the request to - # gracefully fail - domain['id'], domain['name'] = domain['name'], domain['id'] - self.assertRaises( - AssertionError, self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises( - AssertionError, self.resource_api.update_domain, - domain['id'], domain) - self.assertRaises( - exception.DomainNotFound, self.resource_api.delete_domain, - domain['id']) - - def test_forbid_operations_on_defined_federated_domain(self): - """Make sure one cannot operate on a user-defined federated domain. - - This includes operations like create, update, delete. - - """ - - non_default_name = 'beta_federated_domain' - self.config_fixture.config(group='federation', - federated_domain_name=non_default_name) - domain = self.new_domain_ref() - domain['name'] = non_default_name - self.assertRaises(AssertionError, - self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises(exception.DomainNotFound, - self.resource_api.delete_domain, - domain['id']) - self.assertRaises(AssertionError, - self.resource_api.update_domain, - domain['id'], domain) - - # Project CRUD tests - - def test_list_projects(self): - """Call ``GET /projects``.""" - resource_url = '/projects' - r = self.get(resource_url) - self.assertValidProjectListResponse(r, ref=self.project, - resource_url=resource_url) - - def test_create_project(self): - """Call ``POST /projects``.""" - ref = self.new_project_ref(domain_id=self.domain_id) - r = self.post( - '/projects', - body={'project': ref}) - self.assertValidProjectResponse(r, ref) - - def test_create_project_bad_request(self): - """Call ``POST /projects``.""" - self.post('/projects', body={'project': {}}, - expected_status=http_client.BAD_REQUEST) - - def test_create_project_invalid_domain_id(self): - """Call ``POST /projects``.""" - ref = self.new_project_ref(domain_id=uuid.uuid4().hex) - self.post('/projects', body={'project': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_create_project_is_domain_not_allowed(self): - """Call ``POST /projects``. - - Setting is_domain=True is not supported yet and should raise - NotImplemented. - - """ - ref = self.new_project_ref(domain_id=self.domain_id, is_domain=True) - self.post('/projects', - body={'project': ref}, - expected_status=501) - - @utils.wip('waiting for projects acting as domains implementation') - def test_create_project_without_parent_id_and_without_domain_id(self): - """Call ``POST /projects``.""" - - # Grant a domain role for the user - collection_url = ( - '/domains/%(domain_id)s/users/%(user_id)s/roles' % { - 'domain_id': self.domain_id, - 'user_id': self.user['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': self.role_id} - self.put(member_url) - - # Create an authentication request for a domain scoped token - auth = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - domain_id=self.domain_id) - - # Without domain_id and parent_id, the domain_id should be - # normalized to the domain on the token, when using a domain - # scoped token. - ref = self.new_project_ref() - r = self.post( - '/projects', - auth=auth, - body={'project': ref}) - ref['domain_id'] = self.domain['id'] - self.assertValidProjectResponse(r, ref) - - @utils.wip('waiting for projects acting as domains implementation') - def test_create_project_with_parent_id_and_no_domain_id(self): - """Call ``POST /projects``.""" - # With only the parent_id, the domain_id should be - # normalized to the parent's domain_id - ref_child = self.new_project_ref(parent_id=self.project['id']) - - r = self.post( - '/projects', - body={'project': ref_child}) - self.assertEqual(r.result['project']['domain_id'], - self.project['domain_id']) - ref_child['domain_id'] = self.domain['id'] - self.assertValidProjectResponse(r, ref_child) - - def _create_projects_hierarchy(self, hierarchy_size=1): - """Creates a single-branched project hierarchy with the specified size. - - :param hierarchy_size: the desired hierarchy size, default is 1 - - a project with one child. - - :returns projects: a list of the projects in the created hierarchy. - - """ - new_ref = self.new_project_ref(domain_id=self.domain_id) - resp = self.post('/projects', body={'project': new_ref}) - - projects = [resp.result] - - for i in range(hierarchy_size): - new_ref = self.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[i]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - - projects.append(resp.result) - - return projects - - def test_list_projects_filtering_by_parent_id(self): - """Call ``GET /projects?parent_id={project_id}``.""" - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Add another child to projects[1] - it will be projects[3] - new_ref = self.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[1]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - - projects.append(resp.result) - - # Query for projects[0] immediate children - it will - # be only projects[1] - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[0]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [projects[1]['project']] - - # projects[0] has projects[1] as child - self.assertEqual(expected_list, projects_result) - - # Query for projects[1] immediate children - it will - # be projects[2] and projects[3] - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[1]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [projects[2]['project'], projects[3]['project']] - - # projects[1] has projects[2] and projects[3] as children - self.assertEqual(expected_list, projects_result) - - # Query for projects[2] immediate children - it will be an empty list - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[2]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [] - - # projects[2] has no child, projects_result must be an empty list - self.assertEqual(expected_list, projects_result) - - def test_create_hierarchical_project(self): - """Call ``POST /projects``.""" - self._create_projects_hierarchy() - - def test_get_project(self): - """Call ``GET /projects/{project_id}``.""" - r = self.get( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}) - self.assertValidProjectResponse(r, self.project) - - def test_get_project_with_parents_as_list_with_invalid_id(self): - """Call ``GET /projects/{project_id}?parents_as_list``.""" - self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': None}, expected_status=http_client.NOT_FOUND) - - self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_get_project_with_subtree_as_list_with_invalid_id(self): - """Call ``GET /projects/{project_id}?subtree_as_list``.""" - self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': None}, expected_status=http_client.NOT_FOUND) - - self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_get_project_with_parents_as_ids(self): - """Call ``GET /projects/{project_id}?parents_as_ids``.""" - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Query for projects[2] parents_as_ids - r = self.get( - '/projects/%(project_id)s?parents_as_ids' % { - 'project_id': projects[2]['project']['id']}) - - self.assertValidProjectResponse(r, projects[2]['project']) - parents_as_ids = r.result['project']['parents'] - - # Assert parents_as_ids is a structured dictionary correctly - # representing the hierarchy. The request was made using projects[2] - # id, hence its parents should be projects[1] and projects[0]. It - # should have the following structure: - # { - # projects[1]: { - # projects[0]: None - # } - # } - expected_dict = { - projects[1]['project']['id']: { - projects[0]['project']['id']: None - } - } - self.assertDictEqual(expected_dict, parents_as_ids) - - # Query for projects[0] parents_as_ids - r = self.get( - '/projects/%(project_id)s?parents_as_ids' % { - 'project_id': projects[0]['project']['id']}) - - self.assertValidProjectResponse(r, projects[0]['project']) - parents_as_ids = r.result['project']['parents'] - - # projects[0] has no parents, parents_as_ids must be None - self.assertIsNone(parents_as_ids) - - def test_get_project_with_parents_as_list_with_full_access(self): - """``GET /projects/{project_id}?parents_as_list`` with full access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on each one of those projects; - - Check that calling parents_as_list on 'subproject' returns both - 'project' and 'parent'. - - """ - - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on all the created projects - for proj in (parent, project, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?parents_as_list' % - {'project_id': subproject['project']['id']}) - self.assertValidProjectResponse(r, subproject['project']) - - # Assert only 'project' and 'parent' are in the parents list - self.assertIn(project, r.result['project']['parents']) - self.assertIn(parent, r.result['project']['parents']) - self.assertEqual(2, len(r.result['project']['parents'])) - - def test_get_project_with_parents_as_list_with_partial_access(self): - """``GET /projects/{project_id}?parents_as_list`` with partial access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on 'parent' and 'subproject'; - - Check that calling parents_as_list on 'subproject' only returns - 'parent'. - - """ - - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on parent and subproject - for proj in (parent, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?parents_as_list' % - {'project_id': subproject['project']['id']}) - self.assertValidProjectResponse(r, subproject['project']) - - # Assert only 'parent' is in the parents list - self.assertIn(parent, r.result['project']['parents']) - self.assertEqual(1, len(r.result['project']['parents'])) - - def test_get_project_with_parents_as_list_and_parents_as_ids(self): - """Call ``GET /projects/{project_id}?parents_as_list&parents_as_ids``. - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - self.get( - '/projects/%(project_id)s?parents_as_list&parents_as_ids' % { - 'project_id': projects[1]['project']['id']}, - expected_status=http_client.BAD_REQUEST) - - def test_get_project_with_subtree_as_ids(self): - """Call ``GET /projects/{project_id}?subtree_as_ids``. - - This test creates a more complex hierarchy to test if the structured - dictionary returned by using the ``subtree_as_ids`` query param - correctly represents the hierarchy. - - The hierarchy contains 5 projects with the following structure:: - - +--A--+ - | | - +--B--+ C - | | - D E - - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Add another child to projects[0] - it will be projects[3] - new_ref = self.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[0]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - projects.append(resp.result) - - # Add another child to projects[1] - it will be projects[4] - new_ref = self.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[1]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - projects.append(resp.result) - - # Query for projects[0] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[0]['project']['id']}) - self.assertValidProjectResponse(r, projects[0]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # The subtree hierarchy from projects[0] should have the following - # structure: - # { - # projects[1]: { - # projects[2]: None, - # projects[4]: None - # }, - # projects[3]: None - # } - expected_dict = { - projects[1]['project']['id']: { - projects[2]['project']['id']: None, - projects[4]['project']['id']: None - }, - projects[3]['project']['id']: None - } - self.assertDictEqual(expected_dict, subtree_as_ids) - - # Now query for projects[1] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[1]['project']['id']}) - self.assertValidProjectResponse(r, projects[1]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # The subtree hierarchy from projects[1] should have the following - # structure: - # { - # projects[2]: None, - # projects[4]: None - # } - expected_dict = { - projects[2]['project']['id']: None, - projects[4]['project']['id']: None - } - self.assertDictEqual(expected_dict, subtree_as_ids) - - # Now query for projects[3] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[3]['project']['id']}) - self.assertValidProjectResponse(r, projects[3]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # projects[3] has no subtree, subtree_as_ids must be None - self.assertIsNone(subtree_as_ids) - - def test_get_project_with_subtree_as_list_with_full_access(self): - """``GET /projects/{project_id}?subtree_as_list`` with full access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on each one of those projects; - - Check that calling subtree_as_list on 'parent' returns both 'parent' - and 'subproject'. - - """ - - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on all the created projects - for proj in (parent, project, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?subtree_as_list' % - {'project_id': parent['project']['id']}) - self.assertValidProjectResponse(r, parent['project']) - - # Assert only 'project' and 'subproject' are in the subtree - self.assertIn(project, r.result['project']['subtree']) - self.assertIn(subproject, r.result['project']['subtree']) - self.assertEqual(2, len(r.result['project']['subtree'])) - - def test_get_project_with_subtree_as_list_with_partial_access(self): - """``GET /projects/{project_id}?subtree_as_list`` with partial access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on 'parent' and 'subproject'; - - Check that calling subtree_as_list on 'parent' returns 'subproject'. - - """ - - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on parent and subproject - for proj in (parent, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?subtree_as_list' % - {'project_id': parent['project']['id']}) - self.assertValidProjectResponse(r, parent['project']) - - # Assert only 'subproject' is in the subtree - self.assertIn(subproject, r.result['project']['subtree']) - self.assertEqual(1, len(r.result['project']['subtree'])) - - def test_get_project_with_subtree_as_list_and_subtree_as_ids(self): - """Call ``GET /projects/{project_id}?subtree_as_list&subtree_as_ids``. - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - self.get( - '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % { - 'project_id': projects[1]['project']['id']}, - expected_status=http_client.BAD_REQUEST) - - def test_update_project(self): - """Call ``PATCH /projects/{project_id}``.""" - ref = self.new_project_ref(domain_id=self.domain_id) - del ref['id'] - r = self.patch( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}, - body={'project': ref}) - self.assertValidProjectResponse(r, ref) - - def test_update_project_domain_id(self): - """Call ``PATCH /projects/{project_id}`` with domain_id.""" - project = self.new_project_ref(domain_id=self.domain['id']) - self.resource_api.create_project(project['id'], project) - project['domain_id'] = CONF.identity.default_domain_id - r = self.patch('/projects/%(project_id)s' % { - 'project_id': project['id']}, - body={'project': project}, - expected_status=exception.ValidationError.code) - self.config_fixture.config(domain_id_immutable=False) - project['domain_id'] = self.domain['id'] - r = self.patch('/projects/%(project_id)s' % { - 'project_id': project['id']}, - body={'project': project}) - self.assertValidProjectResponse(r, project) - - def test_update_project_parent_id(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - leaf_project = projects[1]['project'] - leaf_project['parent_id'] = None - self.patch( - '/projects/%(project_id)s' % { - 'project_id': leaf_project['id']}, - body={'project': leaf_project}, - expected_status=http_client.FORBIDDEN) - - def test_update_project_is_domain_not_allowed(self): - """Call ``PATCH /projects/{project_id}`` with is_domain. - - The is_domain flag is immutable. - """ - project = self.new_project_ref(domain_id=self.domain['id']) - resp = self.post('/projects', - body={'project': project}) - self.assertFalse(resp.result['project']['is_domain']) - - project['is_domain'] = True - self.patch('/projects/%(project_id)s' % { - 'project_id': resp.result['project']['id']}, - body={'project': project}, - expected_status=http_client.BAD_REQUEST) - - def test_disable_leaf_project(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - leaf_project = projects[1]['project'] - leaf_project['enabled'] = False - r = self.patch( - '/projects/%(project_id)s' % { - 'project_id': leaf_project['id']}, - body={'project': leaf_project}) - self.assertEqual( - leaf_project['enabled'], r.result['project']['enabled']) - - def test_disable_not_leaf_project(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - root_project = projects[0]['project'] - root_project['enabled'] = False - self.patch( - '/projects/%(project_id)s' % { - 'project_id': root_project['id']}, - body={'project': root_project}, - expected_status=http_client.FORBIDDEN) - - def test_delete_project(self): - """Call ``DELETE /projects/{project_id}`` - - As well as making sure the delete succeeds, we ensure - that any credentials that reference this projects are - also deleted, while other credentials are unaffected. - - """ - # First check the credential for this project is present - r = self.credential_api.get_credential(self.credential['id']) - self.assertDictEqual(r, self.credential) - # Create a second credential with a different project - self.project2 = self.new_project_ref( - domain_id=self.domain['id']) - self.resource_api.create_project(self.project2['id'], self.project2) - self.credential2 = self.new_credential_ref( - user_id=self.user['id'], - project_id=self.project2['id']) - self.credential_api.create_credential( - self.credential2['id'], - self.credential2) - - # Now delete the project - self.delete( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}) - - # Deleting the project should have deleted any credentials - # that reference this project - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - credential_id=self.credential['id']) - # But the credential for project2 is unaffected - r = self.credential_api.get_credential(self.credential2['id']) - self.assertDictEqual(r, self.credential2) - - def test_delete_not_leaf_project(self): - """Call ``DELETE /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - self.delete( - '/projects/%(project_id)s' % { - 'project_id': projects[0]['project']['id']}, - expected_status=http_client.FORBIDDEN) - # Role CRUD tests def test_create_role(self): """Call ``POST /roles``.""" - ref = self.new_role_ref() + ref = unit.new_role_ref() r = self.post( '/roles', body={'role': ref}) @@ -1090,7 +66,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, def test_update_role(self): """Call ``PATCH /roles/{role_id}``.""" - ref = self.new_role_ref() + ref = unit.new_role_ref() del ref['id'] r = self.patch('/roles/%(role_id)s' % { 'role_id': self.role_id}, @@ -1105,8 +81,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, def test_create_member_role(self): """Call ``POST /roles``.""" # specify only the name on creation - ref = self.new_role_ref() - ref['name'] = CONF.member_role_name + ref = unit.new_role_ref(name=CONF.member_role_name) r = self.post( '/roles', body={'role': ref}) @@ -1118,35 +93,41 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # Role Grants tests def test_crud_user_project_role_grants(self): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + collection_url = ( '/projects/%(project_id)s/users/%(user_id)s/roles' % { 'project_id': self.project['id'], 'user_id': self.user['id']}) member_url = '%(collection_url)s/%(role_id)s' % { 'collection_url': collection_url, - 'role_id': self.role_id} + 'role_id': role['id']} + + # There is a role assignment for self.user on self.project + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=self.role, + expected_length=1) self.put(member_url) self.head(member_url) r = self.get(collection_url) - self.assertValidRoleListResponse(r, ref=self.role, - resource_url=collection_url) + self.assertValidRoleListResponse(r, ref=role, + resource_url=collection_url, + expected_length=2) - # FIXME(gyee): this test is no longer valid as user - # have no role in the project. Can't get a scoped token - # self.delete(member_url) - # r = self.get(collection_url) - # self.assertValidRoleListResponse(r, expected_length=0) - # self.assertIn(collection_url, r.result['links']['self']) + self.delete(member_url) + r = self.get(collection_url) + self.assertValidRoleListResponse(r, ref=self.role, expected_length=1) + self.assertIn(collection_url, r.result['links']['self']) def test_crud_user_project_role_grants_no_user(self): - """Grant role on a project to a user that doesn't exist, 404 result. + """Grant role on a project to a user that doesn't exist. When grant a role on a project to a user that doesn't exist, the server returns Not Found for the user. """ - user_id = uuid.uuid4().hex collection_url = ( @@ -1179,13 +160,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase, resource_url=collection_url) def test_crud_user_domain_role_grants_no_user(self): - """Grant role on a domain to a user that doesn't exist, 404 result. + """Grant role on a domain to a user that doesn't exist. When grant a role on a domain to a user that doesn't exist, the server returns 404 Not Found for the user. """ - user_id = uuid.uuid4().hex collection_url = ( @@ -1218,13 +198,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase, resource_url=collection_url) def test_crud_group_project_role_grants_no_group(self): - """Grant role on a project to a group that doesn't exist, 404 result. + """Grant role on a project to a group that doesn't exist. When grant a role on a project to a group that doesn't exist, the server returns 404 Not Found for the group. """ - group_id = uuid.uuid4().hex collection_url = ( @@ -1258,13 +237,12 @@ class AssignmentTestCase(test_v3.RestfulTestCase, resource_url=collection_url) def test_crud_group_domain_role_grants_no_group(self): - """Grant role on a domain to a group that doesn't exist, 404 result. + """Grant role on a domain to a group that doesn't exist. When grant a role on a domain to a group that doesn't exist, the server returns 404 Not Found for the group. """ - group_id = uuid.uuid4().hex collection_url = ( @@ -1280,7 +258,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, def _create_new_user_and_assign_role_on_project(self): """Create a new user and assign user a role on a project.""" # Create a new user - new_user = self.new_user_ref(domain_id=self.domain_id) + new_user = unit.new_user_ref(domain_id=self.domain_id) user_ref = self.identity_api.create_user(new_user) # Assign the user a role on the project collection_url = ( @@ -1290,9 +268,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase, member_url = ('%(collection_url)s/%(role_id)s' % { 'collection_url': collection_url, 'role_id': self.role_id}) - self.put(member_url, expected_status=204) + self.put(member_url) # Check the user has the role assigned - self.head(member_url, expected_status=204) + self.head(member_url) return member_url, user_ref def test_delete_user_before_removing_role_assignment_succeeds(self): @@ -1301,7 +279,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # Delete the user from identity backend self.identity_api.driver.delete_user(user['id']) # Clean up the role assignment - self.delete(member_url, expected_status=204) + self.delete(member_url) # Make sure the role is gone self.head(member_url, expected_status=http_client.NOT_FOUND) @@ -1310,8 +288,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase, member_url, user = self._create_new_user_and_assign_role_on_project() # Delete the user from identity backend self.identity_api.delete_user(user['id']) - # We should get a 404 when looking for the user in the identity - # backend because we're not performing a delete operation on the role. + # We should get a 404 Not Found when looking for the user in the + # identity backend because we're not performing a delete operation on + # the role. self.head(member_url, expected_status=http_client.NOT_FOUND) def test_token_revoked_once_group_role_grant_revoked(self): @@ -1344,7 +323,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # validates the returned token; it should be valid. self.head('/auth/tokens', headers={'x-subject-token': token}, - expected_status=200) + expected_status=http_client.OK) # revokes the grant from group on project. self.assignment_api.delete_grant(role_id=self.role['id'], @@ -1356,6 +335,126 @@ class AssignmentTestCase(test_v3.RestfulTestCase, headers={'x-subject-token': token}, expected_status=http_client.NOT_FOUND) + @unit.skip_if_cache_disabled('assignment') + def test_delete_grant_from_user_and_project_invalidate_cache(self): + # create a new project + new_project = unit.new_project_ref(domain_id=self.domain_id) + self.resource_api.create_project(new_project['id'], new_project) + + collection_url = ( + '/projects/%(project_id)s/users/%(user_id)s/roles' % { + 'project_id': new_project['id'], + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + + # create the user a grant on the new project + self.put(member_url) + + # check the grant that was just created + self.head(member_url) + resp = self.get(collection_url) + self.assertValidRoleListResponse(resp, ref=self.role, + resource_url=collection_url) + + # delete the grant + self.delete(member_url) + + # get the collection and ensure there are no roles on the project + resp = self.get(collection_url) + self.assertListEqual(resp.json_body['roles'], []) + + @unit.skip_if_cache_disabled('assignment') + def test_delete_grant_from_user_and_domain_invalidates_cache(self): + # create a new domain + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + + collection_url = ( + '/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': new_domain['id'], + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + + # create the user a grant on the new domain + self.put(member_url) + + # check the grant that was just created + self.head(member_url) + resp = self.get(collection_url) + self.assertValidRoleListResponse(resp, ref=self.role, + resource_url=collection_url) + + # delete the grant + self.delete(member_url) + + # get the collection and ensure there are no roles on the domain + resp = self.get(collection_url) + self.assertListEqual(resp.json_body['roles'], []) + + @unit.skip_if_cache_disabled('assignment') + def test_delete_grant_from_group_and_project_invalidates_cache(self): + # create a new project + new_project = unit.new_project_ref(domain_id=self.domain_id) + self.resource_api.create_project(new_project['id'], new_project) + + collection_url = ( + '/projects/%(project_id)s/groups/%(group_id)s/roles' % { + 'project_id': new_project['id'], + 'group_id': self.group['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + + # create the group a grant on the new project + self.put(member_url) + + # check the grant that was just created + self.head(member_url) + resp = self.get(collection_url) + self.assertValidRoleListResponse(resp, ref=self.role, + resource_url=collection_url) + + # delete the grant + self.delete(member_url) + + # get the collection and ensure there are no roles on the project + resp = self.get(collection_url) + self.assertListEqual(resp.json_body['roles'], []) + + @unit.skip_if_cache_disabled('assignment') + def test_delete_grant_from_group_and_domain_invalidates_cache(self): + # create a new domain + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + + collection_url = ( + '/domains/%(domain_id)s/groups/%(group_id)s/roles' % { + 'domain_id': new_domain['id'], + 'group_id': self.group['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + + # create the group a grant on the new domain + self.put(member_url) + + # check the grant that was just created + self.head(member_url) + resp = self.get(collection_url) + self.assertValidRoleListResponse(resp, ref=self.role, + resource_url=collection_url) + + # delete the grant + self.delete(member_url) + + # get the collection and ensure there are no roles on the domain + resp = self.get(collection_url) + self.assertListEqual(resp.json_body['roles'], []) + # Role Assignments tests def test_get_role_assignments(self): @@ -1384,13 +483,11 @@ class AssignmentTestCase(test_v3.RestfulTestCase, been removed """ - # Since the default fixtures already assign some roles to the # user it creates, we also need a new user that will not have any # existing assignments - self.user1 = self.new_user_ref( - domain_id=self.domain['id']) - self.user1 = self.identity_api.create_user(self.user1) + user1 = unit.new_user_ref(domain_id=self.domain['id']) + user1 = self.identity_api.create_user(user1) collection_url = '/role_assignments' r = self.get(collection_url) @@ -1412,7 +509,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, gd_entity) ud_entity = self.build_role_assignment_entity(domain_id=self.domain_id, - user_id=self.user1['id'], + user_id=user1['id'], role_id=self.role_id) self.put(ud_entity['links']['assignment']) r = self.get(collection_url) @@ -1434,7 +531,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, gp_entity) up_entity = self.build_role_assignment_entity( - project_id=self.project_id, user_id=self.user1['id'], + project_id=self.project_id, user_id=user1['id'], role_id=self.role_id) self.put(up_entity['links']['assignment']) r = self.get(collection_url) @@ -1475,18 +572,13 @@ class AssignmentTestCase(test_v3.RestfulTestCase, for each of the group members. """ - self.user1 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user1['password'] - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - self.user2 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) - self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + user1 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + user2 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + + self.identity_api.add_user_to_group(user1['id'], self.group['id']) + self.identity_api.add_user_to_group(user2['id'], self.group['id']) collection_url = '/role_assignments' r = self.get(collection_url) @@ -1516,11 +608,11 @@ class AssignmentTestCase(test_v3.RestfulTestCase, resource_url=collection_url) ud_entity = self.build_role_assignment_entity( link=gd_entity['links']['assignment'], domain_id=self.domain_id, - user_id=self.user1['id'], role_id=self.role_id) + user_id=user1['id'], role_id=self.role_id) self.assertRoleAssignmentInListResponse(r, ud_entity) ud_entity = self.build_role_assignment_entity( link=gd_entity['links']['assignment'], domain_id=self.domain_id, - user_id=self.user2['id'], role_id=self.role_id) + user_id=user2['id'], role_id=self.role_id) self.assertRoleAssignmentInListResponse(r, ud_entity) def test_check_effective_values_for_role_assignments(self): @@ -1549,18 +641,13 @@ class AssignmentTestCase(test_v3.RestfulTestCase, know if we are getting effective roles or not """ - self.user1 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user1['password'] - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - self.user2 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - self.identity_api.add_user_to_group(self.user1['id'], self.group['id']) - self.identity_api.add_user_to_group(self.user2['id'], self.group['id']) + user1 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + user2 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + + self.identity_api.add_user_to_group(user1['id'], self.group['id']) + self.identity_api.add_user_to_group(user2['id'], self.group['id']) collection_url = '/role_assignments' r = self.get(collection_url) @@ -1633,61 +720,53 @@ class AssignmentTestCase(test_v3.RestfulTestCase, token (all effective roles for a user on a project) """ - # Since the default fixtures already assign some roles to the # user it creates, we also need a new user that will not have any # existing assignments - self.user1 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user1['password'] - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - self.user2 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - self.group1 = self.new_group_ref( - domain_id=self.domain['id']) - self.group1 = self.identity_api.create_group(self.group1) - self.identity_api.add_user_to_group(self.user1['id'], - self.group1['id']) - self.identity_api.add_user_to_group(self.user2['id'], - self.group1['id']) - self.project1 = self.new_project_ref( - domain_id=self.domain['id']) - self.resource_api.create_project(self.project1['id'], self.project1) - self.role1 = self.new_role_ref() + user1 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + user2 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + + group1 = unit.new_group_ref(domain_id=self.domain['id']) + group1 = self.identity_api.create_group(group1) + self.identity_api.add_user_to_group(user1['id'], group1['id']) + self.identity_api.add_user_to_group(user2['id'], group1['id']) + project1 = unit.new_project_ref(domain_id=self.domain['id']) + self.resource_api.create_project(project1['id'], project1) + self.role1 = unit.new_role_ref() self.role_api.create_role(self.role1['id'], self.role1) - self.role2 = self.new_role_ref() + self.role2 = unit.new_role_ref() self.role_api.create_role(self.role2['id'], self.role2) # Now add one of each of the four types of assignment gd_entity = self.build_role_assignment_entity( - domain_id=self.domain_id, group_id=self.group1['id'], + domain_id=self.domain_id, group_id=group1['id'], role_id=self.role1['id']) self.put(gd_entity['links']['assignment']) ud_entity = self.build_role_assignment_entity(domain_id=self.domain_id, - user_id=self.user1['id'], + user_id=user1['id'], role_id=self.role2['id']) self.put(ud_entity['links']['assignment']) gp_entity = self.build_role_assignment_entity( - project_id=self.project1['id'], group_id=self.group1['id'], + project_id=project1['id'], + group_id=group1['id'], role_id=self.role1['id']) self.put(gp_entity['links']['assignment']) up_entity = self.build_role_assignment_entity( - project_id=self.project1['id'], user_id=self.user1['id'], + project_id=project1['id'], + user_id=user1['id'], role_id=self.role2['id']) self.put(up_entity['links']['assignment']) # Now list by various filters to make sure we get back the right ones collection_url = ('/role_assignments?scope.project.id=%s' % - self.project1['id']) + project1['id']) r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=2, @@ -1704,7 +783,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, ud_entity) self.assertRoleAssignmentInListResponse(r, gd_entity) - collection_url = '/role_assignments?user.id=%s' % self.user1['id'] + collection_url = '/role_assignments?user.id=%s' % user1['id'] r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=2, @@ -1712,7 +791,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, up_entity) self.assertRoleAssignmentInListResponse(r, ud_entity) - collection_url = '/role_assignments?group.id=%s' % self.group1['id'] + collection_url = '/role_assignments?group.id=%s' % group1['id'] r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=2, @@ -1733,8 +812,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase, collection_url = ( '/role_assignments?user.id=%(user_id)s' '&scope.project.id=%(project_id)s' % { - 'user_id': self.user1['id'], - 'project_id': self.project1['id']}) + 'user_id': user1['id'], + 'project_id': project1['id']}) r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=1, @@ -1746,7 +825,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # assigned as well as by virtue of group membership collection_url = ('/role_assignments?effective&user.id=%s' % - self.user1['id']) + user1['id']) r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=4, @@ -1756,17 +835,18 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, ud_entity) # ...and the two via group membership... gp1_link = self.build_role_assignment_link( - project_id=self.project1['id'], group_id=self.group1['id'], + project_id=project1['id'], + group_id=group1['id'], role_id=self.role1['id']) gd1_link = self.build_role_assignment_link(domain_id=self.domain_id, - group_id=self.group1['id'], + group_id=group1['id'], role_id=self.role1['id']) up1_entity = self.build_role_assignment_entity( - link=gp1_link, project_id=self.project1['id'], - user_id=self.user1['id'], role_id=self.role1['id']) + link=gp1_link, project_id=project1['id'], + user_id=user1['id'], role_id=self.role1['id']) ud1_entity = self.build_role_assignment_entity( - link=gd1_link, domain_id=self.domain_id, user_id=self.user1['id'], + link=gd1_link, domain_id=self.domain_id, user_id=user1['id'], role_id=self.role1['id']) self.assertRoleAssignmentInListResponse(r, up1_entity) self.assertRoleAssignmentInListResponse(r, ud1_entity) @@ -1778,8 +858,8 @@ class AssignmentTestCase(test_v3.RestfulTestCase, collection_url = ( '/role_assignments?effective&user.id=%(user_id)s' '&scope.project.id=%(project_id)s' % { - 'user_id': self.user1['id'], - 'project_id': self.project1['id']}) + 'user_id': user1['id'], + 'project_id': project1['id']}) r = self.get(collection_url) self.assertValidRoleAssignmentListResponse(r, expected_length=2, @@ -1804,7 +884,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, """ def create_project_hierarchy(parent_id, depth): - "Creates a random project hierarchy." + """Creates a random project hierarchy.""" if depth == 0: return @@ -1812,7 +892,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, subprojects = [] for i in range(breadth): - subprojects.append(self.new_project_ref( + subprojects.append(unit.new_project_ref( domain_id=self.domain_id, parent_id=parent_id)) self.resource_api.create_project(subprojects[-1]['id'], subprojects[-1]) @@ -1823,12 +903,12 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, super(RoleAssignmentBaseTestCase, self).load_sample_data() # Create a domain - self.domain = self.new_domain_ref() + self.domain = unit.new_domain_ref() self.domain_id = self.domain['id'] self.resource_api.create_domain(self.domain_id, self.domain) # Create a project hierarchy - self.project = self.new_project_ref(domain_id=self.domain_id) + self.project = unit.new_project_ref(domain_id=self.domain_id) self.project_id = self.project['id'] self.resource_api.create_project(self.project_id, self.project) @@ -1839,14 +919,14 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, # Create 3 users self.user_ids = [] for i in range(3): - user = self.new_user_ref(domain_id=self.domain_id) + user = unit.new_user_ref(domain_id=self.domain_id) user = self.identity_api.create_user(user) self.user_ids.append(user['id']) # Create 3 groups self.group_ids = [] for i in range(3): - group = self.new_group_ref(domain_id=self.domain_id) + group = unit.new_group_ref(domain_id=self.domain_id) group = self.identity_api.create_group(group) self.group_ids.append(group['id']) @@ -1861,7 +941,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, role_id=self.role_id) # Create a role - self.role = self.new_role_ref() + self.role = unit.new_role_ref() self.role_id = self.role['id'] self.role_api.create_role(self.role_id, self.role) @@ -1869,7 +949,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, self.default_user_id = self.user_ids[0] self.default_group_id = self.group_ids[0] - def get_role_assignments(self, expected_status=200, **filters): + def get_role_assignments(self, expected_status=http_client.OK, **filters): """Returns the result from querying role assignment API + queried URL. Calls GET /v3/role_assignments?<params> and returns its result, where @@ -1880,7 +960,6 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, queried URL. """ - query_url = self._get_role_assignments_query_url(**filters) response = self.get(query_url, expected_status=expected_status) @@ -1903,11 +982,11 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, class RoleAssignmentFailureTestCase(RoleAssignmentBaseTestCase): """Class for testing invalid query params on /v3/role_assignments API. - Querying domain and project, or user and group results in a HTTP 400, since - a role assignment must contain only a single pair of (actor, target). In - addition, since filtering on role assignments applies only to the final - result, effective mode cannot be combined with i) group or ii) domain and - inherited, because it would always result in an empty list. + Querying domain and project, or user and group results in a HTTP 400 Bad + Request, since a role assignment must contain only a single pair of (actor, + target). In addition, since filtering on role assignments applies only to + the final result, effective mode cannot be combined with i) group or ii) + domain and inherited, because it would always result in an empty list. """ @@ -1959,7 +1038,6 @@ class RoleAssignmentDirectTestCase(RoleAssignmentBaseTestCase): group_id, user_id and inherited_to_projects. """ - # Fills default assignment with provided filters test_assignment = self._set_default_assignment_attributes(**filters) @@ -2188,10 +1266,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, def test_get_token_from_inherited_user_domain_role_grants(self): # Create a new user to ensure that no grant is loaded from sample data - user = self.new_user_ref(domain_id=self.domain_id) - password = user['password'] - user = self.identity_api.create_user(user) - user['password'] = password + user = unit.create_user(self.identity_api, domain_id=self.domain_id) # Define domain and project authentication data domain_auth_data = self.build_authentication_request( @@ -2204,10 +1279,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=self.project_id) # Check the user cannot get a domain nor a project token - self.v3_authenticate_token(domain_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on domain non_inher_ud_link = self.build_role_assignment_link( @@ -2215,12 +1290,12 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_ud_link) # Check the user can get only a domain token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Create inherited role - inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'} + inherited_role = unit.new_role_ref(name='inherited') self.role_api.create_role(inherited_role['id'], inherited_role) # Grant inherited role for user on domain @@ -2230,33 +1305,30 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_ud_link) # Check the user can get both a domain and a project token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data) # Delete inherited grant self.delete(inher_ud_link) # Check the user can only get a domain token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Delete non-inherited grant self.delete(non_inher_ud_link) # Check the user cannot get a domain token anymore - self.v3_authenticate_token(domain_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_token_from_inherited_group_domain_role_grants(self): # Create a new group and put a new user in it to # ensure that no grant is loaded from sample data - user = self.new_user_ref(domain_id=self.domain_id) - password = user['password'] - user = self.identity_api.create_user(user) - user['password'] = password + user = unit.create_user(self.identity_api, domain_id=self.domain_id) - group = self.new_group_ref(domain_id=self.domain['id']) + group = unit.new_group_ref(domain_id=self.domain['id']) group = self.identity_api.create_group(group) self.identity_api.add_user_to_group(user['id'], group['id']) @@ -2271,10 +1343,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=self.project_id) # Check the user cannot get a domain nor a project token - self.v3_authenticate_token(domain_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on domain non_inher_gd_link = self.build_role_assignment_link( @@ -2282,12 +1354,12 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_gd_link) # Check the user can get only a domain token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Create inherited role - inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'} + inherited_role = unit.new_role_ref(name='inherited') self.role_api.create_role(inherited_role['id'], inherited_role) # Grant inherited role for user on domain @@ -2297,27 +1369,27 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_gd_link) # Check the user can get both a domain and a project token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data) # Delete inherited grant self.delete(inher_gd_link) # Check the user can only get a domain token - self.v3_authenticate_token(domain_auth_data) - self.v3_authenticate_token(project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data) + self.v3_create_token(project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Delete non-inherited grant self.delete(non_inher_gd_link) # Check the user cannot get a domain token anymore - self.v3_authenticate_token(domain_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(domain_auth_data, + expected_status=http_client.UNAUTHORIZED) def _test_crud_inherited_and_direct_assignment_on_target(self, target_url): # Create a new role to avoid assignments loaded from sample data - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # Define URLs @@ -2360,7 +1432,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, def test_crud_user_inherited_domain_role_grants(self): role_list = [] for _ in range(2): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) @@ -2409,22 +1481,16 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, """ role_list = [] for _ in range(4): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) - domain = self.new_domain_ref() + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user1 = self.new_user_ref( - domain_id=domain['id']) - password = user1['password'] - user1 = self.identity_api.create_user(user1) - user1['password'] = password - project1 = self.new_project_ref( - domain_id=domain['id']) + user1 = unit.create_user(self.identity_api, domain_id=domain['id']) + project1 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project1['id'], project1) - project2 = self.new_project_ref( - domain_id=domain['id']) + project2 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project2['id'], project2) # Add some roles to the project self.assignment_api.add_role_to_user_and_project( @@ -2490,6 +1556,98 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, inherited_to_projects=True) self.assertRoleAssignmentInListResponse(r, up_entity) + def test_list_role_assignments_include_names(self): + """Call ``GET /role_assignments with include names``. + + Test Plan: + + - Create a domain with a group and a user + - Create a project with a group and a user + + """ + role1 = unit.new_role_ref() + self.role_api.create_role(role1['id'], role1) + user1 = unit.create_user(self.identity_api, domain_id=self.domain_id) + group = unit.new_group_ref(domain_id=self.domain_id) + group = self.identity_api.create_group(group) + project1 = unit.new_project_ref(domain_id=self.domain_id) + self.resource_api.create_project(project1['id'], project1) + + expected_entity1 = self.build_role_assignment_entity_include_names( + role_ref=role1, + project_ref=project1, + user_ref=user1) + self.put(expected_entity1['links']['assignment']) + expected_entity2 = self.build_role_assignment_entity_include_names( + role_ref=role1, + domain_ref=self.domain, + group_ref=group) + self.put(expected_entity2['links']['assignment']) + expected_entity3 = self.build_role_assignment_entity_include_names( + role_ref=role1, + domain_ref=self.domain, + user_ref=user1) + self.put(expected_entity3['links']['assignment']) + expected_entity4 = self.build_role_assignment_entity_include_names( + role_ref=role1, + project_ref=project1, + group_ref=group) + self.put(expected_entity4['links']['assignment']) + + collection_url_domain = ( + '/role_assignments?include_names&scope.domain.id=%(domain_id)s' % { + 'domain_id': self.domain_id}) + rs_domain = self.get(collection_url_domain) + collection_url_project = ( + '/role_assignments?include_names&' + 'scope.project.id=%(project_id)s' % { + 'project_id': project1['id']}) + rs_project = self.get(collection_url_project) + collection_url_group = ( + '/role_assignments?include_names&group.id=%(group_id)s' % { + 'group_id': group['id']}) + rs_group = self.get(collection_url_group) + collection_url_user = ( + '/role_assignments?include_names&user.id=%(user_id)s' % { + 'user_id': user1['id']}) + rs_user = self.get(collection_url_user) + collection_url_role = ( + '/role_assignments?include_names&role.id=%(role_id)s' % { + 'role_id': role1['id']}) + rs_role = self.get(collection_url_role) + # Make sure all entities were created successfully + self.assertEqual(rs_domain.status_int, http_client.OK) + self.assertEqual(rs_project.status_int, http_client.OK) + self.assertEqual(rs_group.status_int, http_client.OK) + self.assertEqual(rs_user.status_int, http_client.OK) + # Make sure we can get back the correct number of entities + self.assertValidRoleAssignmentListResponse( + rs_domain, + expected_length=2, + resource_url=collection_url_domain) + self.assertValidRoleAssignmentListResponse( + rs_project, + expected_length=2, + resource_url=collection_url_project) + self.assertValidRoleAssignmentListResponse( + rs_group, + expected_length=2, + resource_url=collection_url_group) + self.assertValidRoleAssignmentListResponse( + rs_user, + expected_length=2, + resource_url=collection_url_user) + self.assertValidRoleAssignmentListResponse( + rs_role, + expected_length=4, + resource_url=collection_url_role) + # Verify all types of entities have the correct format + self.assertRoleAssignmentInListResponse(rs_domain, expected_entity2) + self.assertRoleAssignmentInListResponse(rs_project, expected_entity1) + self.assertRoleAssignmentInListResponse(rs_group, expected_entity4) + self.assertRoleAssignmentInListResponse(rs_user, expected_entity3) + self.assertRoleAssignmentInListResponse(rs_role, expected_entity1) + def test_list_role_assignments_for_disabled_inheritance_extension(self): """Call ``GET /role_assignments with inherited domain grants``. @@ -2503,25 +1661,18 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, shows up. """ - role_list = [] for _ in range(4): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) - domain = self.new_domain_ref() + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user1 = self.new_user_ref( - domain_id=domain['id']) - password = user1['password'] - user1 = self.identity_api.create_user(user1) - user1['password'] = password - project1 = self.new_project_ref( - domain_id=domain['id']) + user1 = unit.create_user(self.identity_api, domain_id=domain['id']) + project1 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project1['id'], project1) - project2 = self.new_project_ref( - domain_id=domain['id']) + project2 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project2['id'], project2) # Add some roles to the project self.assignment_api.add_role_to_user_and_project( @@ -2598,34 +1749,23 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, """ role_list = [] for _ in range(4): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) - domain = self.new_domain_ref() + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user1 = self.new_user_ref( - domain_id=domain['id']) - password = user1['password'] - user1 = self.identity_api.create_user(user1) - user1['password'] = password - user2 = self.new_user_ref( - domain_id=domain['id']) - password = user2['password'] - user2 = self.identity_api.create_user(user2) - user2['password'] = password - group1 = self.new_group_ref( - domain_id=domain['id']) + user1 = unit.create_user(self.identity_api, domain_id=domain['id']) + user2 = unit.create_user(self.identity_api, domain_id=domain['id']) + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) self.identity_api.add_user_to_group(user1['id'], group1['id']) self.identity_api.add_user_to_group(user2['id'], group1['id']) - project1 = self.new_project_ref( - domain_id=domain['id']) + project1 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project1['id'], project1) - project2 = self.new_project_ref( - domain_id=domain['id']) + project2 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project2['id'], project2) # Add some roles to the project self.assignment_api.add_role_to_user_and_project( @@ -2704,25 +1844,18 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, """ role_list = [] for _ in range(5): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) - domain = self.new_domain_ref() + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user1 = self.new_user_ref( - domain_id=domain['id']) - password = user1['password'] - user1 = self.identity_api.create_user(user1) - user1['password'] = password - group1 = self.new_group_ref( - domain_id=domain['id']) + user1 = unit.create_user(self.identity_api, domain_id=domain['id']) + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) - project1 = self.new_project_ref( - domain_id=domain['id']) + project1 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project1['id'], project1) - project2 = self.new_project_ref( - domain_id=domain['id']) + project2 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project2['id'], project2) # Add some spoiler roles to the projects self.assignment_api.add_role_to_user_and_project( @@ -2790,17 +1923,17 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, """ # Create project hierarchy - root = self.new_project_ref(domain_id=self.domain['id']) - leaf = self.new_project_ref(domain_id=self.domain['id'], + root = unit.new_project_ref(domain_id=self.domain['id']) + leaf = unit.new_project_ref(domain_id=self.domain['id'], parent_id=root['id']) self.resource_api.create_project(root['id'], root) self.resource_api.create_project(leaf['id'], leaf) # Create 'non-inherited' and 'inherited' roles - non_inherited_role = {'id': uuid.uuid4().hex, 'name': 'non-inherited'} + non_inherited_role = unit.new_role_ref(name='non-inherited') self.role_api.create_role(non_inherited_role['id'], non_inherited_role) - inherited_role = {'id': uuid.uuid4().hex, 'name': 'inherited'} + inherited_role = unit.new_role_ref(name='inherited') self.role_api.create_role(inherited_role['id'], inherited_role) return (root['id'], leaf['id'], @@ -2822,10 +1955,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=leaf_id) # Check the user cannot get a token on root nor leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for user on leaf project non_inher_up_link = self.build_role_assignment_link( @@ -2834,9 +1967,9 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_up_link) # Check the user can only get a token on leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data) # Grant inherited role for user on root project inher_up_link = self.build_role_assignment_link( @@ -2845,24 +1978,24 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_up_link) # Check the user still can get a token only on leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data) # Delete non-inherited grant self.delete(non_inher_up_link) # Check the inherited role still applies for leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data) # Delete inherited grant self.delete(inher_up_link) # Check the user cannot get a token on leaf project anymore - self.v3_authenticate_token(leaf_project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_token_from_inherited_group_project_role_grants(self): # Create default scenario @@ -2870,7 +2003,7 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self._setup_hierarchical_projects_scenario()) # Create group and add user to it - group = self.new_group_ref(domain_id=self.domain['id']) + group = unit.new_group_ref(domain_id=self.domain['id']) group = self.identity_api.create_group(group) self.identity_api.add_user_to_group(self.user['id'], group['id']) @@ -2885,10 +2018,10 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, project_id=leaf_id) # Check the user cannot get a token on root nor leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) # Grant non-inherited role for group on leaf project non_inher_gp_link = self.build_role_assignment_link( @@ -2897,9 +2030,9 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(non_inher_gp_link) # Check the user can only get a token on leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data) # Grant inherited role for group on root project inher_gp_link = self.build_role_assignment_link( @@ -2908,22 +2041,22 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, self.put(inher_gp_link) # Check the user still can get a token only on leaf project - self.v3_authenticate_token(root_project_auth_data, - expected_status=http_client.UNAUTHORIZED) - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(root_project_auth_data, + expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data) # Delete no-inherited grant self.delete(non_inher_gp_link) # Check the inherited role still applies for leaf project - self.v3_authenticate_token(leaf_project_auth_data) + self.v3_create_token(leaf_project_auth_data) # Delete inherited grant self.delete(inher_gp_link) # Check the user cannot get a token on leaf project anymore - self.v3_authenticate_token(leaf_project_auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(leaf_project_auth_data, + expected_status=http_client.UNAUTHORIZED) def test_get_role_assignments_for_project_hierarchy(self): """Call ``GET /role_assignments``. @@ -3028,6 +2161,154 @@ class AssignmentInheritanceTestCase(test_v3.RestfulTestCase, inher_up_entity['scope']['project']['id'] = leaf_id self.assertRoleAssignmentInListResponse(r, inher_up_entity) + def test_project_id_specified_if_include_subtree_specified(self): + """When using include_subtree, you must specify a project ID.""" + self.get('/role_assignments?include_subtree=True', + expected_status=http_client.BAD_REQUEST) + self.get('/role_assignments?scope.project.id&' + 'include_subtree=True', + expected_status=http_client.BAD_REQUEST) + + def test_get_role_assignments_for_project_tree(self): + """Get role_assignment?scope.project.id=X?include_subtree``. + + Test Plan: + + - Create 2 roles and a hierarchy of projects with one root and one leaf + - Issue the URL to add a non-inherited user role to the root project + and the leaf project + - Issue the URL to get role assignments for the root project but + not the subtree - this should return just the root assignment + - Issue the URL to get role assignments for the root project and + it's subtree - this should return both assignments + - Check that explicitly setting include_subtree to False is the + equivalent to not including it at all in the query. + + """ + # Create default scenario + root_id, leaf_id, non_inherited_role_id, unused_role_id = ( + self._setup_hierarchical_projects_scenario()) + + # Grant non-inherited role to root and leaf projects + non_inher_entity_root = self.build_role_assignment_entity( + project_id=root_id, user_id=self.user['id'], + role_id=non_inherited_role_id) + self.put(non_inher_entity_root['links']['assignment']) + non_inher_entity_leaf = self.build_role_assignment_entity( + project_id=leaf_id, user_id=self.user['id'], + role_id=non_inherited_role_id) + self.put(non_inher_entity_leaf['links']['assignment']) + + # Without the subtree, we should get the one assignment on the + # root project + collection_url = ( + '/role_assignments?scope.project.id=%(project)s' % { + 'project': root_id}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse( + r, resource_url=collection_url) + + self.assertThat(r.result['role_assignments'], matchers.HasLength(1)) + self.assertRoleAssignmentInListResponse(r, non_inher_entity_root) + + # With the subtree, we should get both assignments + collection_url = ( + '/role_assignments?scope.project.id=%(project)s' + '&include_subtree=True' % { + 'project': root_id}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse( + r, resource_url=collection_url) + + self.assertThat(r.result['role_assignments'], matchers.HasLength(2)) + self.assertRoleAssignmentInListResponse(r, non_inher_entity_root) + self.assertRoleAssignmentInListResponse(r, non_inher_entity_leaf) + + # With subtree=0, we should also only get the one assignment on the + # root project + collection_url = ( + '/role_assignments?scope.project.id=%(project)s' + '&include_subtree=0' % { + 'project': root_id}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse( + r, resource_url=collection_url) + + self.assertThat(r.result['role_assignments'], matchers.HasLength(1)) + self.assertRoleAssignmentInListResponse(r, non_inher_entity_root) + + def test_get_effective_role_assignments_for_project_tree(self): + """Get role_assignment ?project_id=X?include_subtree=True?effective``. + + Test Plan: + + - Create 2 roles and a hierarchy of projects with one root and 4 levels + of child project + - Issue the URL to add a non-inherited user role to the root project + and a level 1 project + - Issue the URL to add an inherited user role on the level 2 project + - Issue the URL to get effective role assignments for the level 1 + project and it's subtree - this should return a role (non-inherited) + on the level 1 project and roles (inherited) on each of the level + 2, 3 and 4 projects + + """ + # Create default scenario + root_id, leaf_id, non_inherited_role_id, inherited_role_id = ( + self._setup_hierarchical_projects_scenario()) + + # Add some extra projects to the project hierarchy + level2 = unit.new_project_ref(domain_id=self.domain['id'], + parent_id=leaf_id) + level3 = unit.new_project_ref(domain_id=self.domain['id'], + parent_id=level2['id']) + level4 = unit.new_project_ref(domain_id=self.domain['id'], + parent_id=level3['id']) + self.resource_api.create_project(level2['id'], level2) + self.resource_api.create_project(level3['id'], level3) + self.resource_api.create_project(level4['id'], level4) + + # Grant non-inherited role to root (as a spoiler) and to + # the level 1 (leaf) project + non_inher_entity_root = self.build_role_assignment_entity( + project_id=root_id, user_id=self.user['id'], + role_id=non_inherited_role_id) + self.put(non_inher_entity_root['links']['assignment']) + non_inher_entity_leaf = self.build_role_assignment_entity( + project_id=leaf_id, user_id=self.user['id'], + role_id=non_inherited_role_id) + self.put(non_inher_entity_leaf['links']['assignment']) + + # Grant inherited role to level 2 + inher_entity = self.build_role_assignment_entity( + project_id=level2['id'], user_id=self.user['id'], + role_id=inherited_role_id, inherited_to_projects=True) + self.put(inher_entity['links']['assignment']) + + # Get effective role assignments + collection_url = ( + '/role_assignments?scope.project.id=%(project)s' + '&include_subtree=True&effective' % { + 'project': leaf_id}) + r = self.get(collection_url) + self.assertValidRoleAssignmentListResponse( + r, resource_url=collection_url) + + # There should be three assignments returned in total + self.assertThat(r.result['role_assignments'], matchers.HasLength(3)) + + # Assert that the user does not non-inherited role on root project + self.assertRoleAssignmentNotInListResponse(r, non_inher_entity_root) + + # Assert that the user does have non-inherited role on leaf project + self.assertRoleAssignmentInListResponse(r, non_inher_entity_leaf) + + # Assert that the user has inherited role on levels 3 and 4 + inher_entity['scope']['project']['id'] = level3['id'] + self.assertRoleAssignmentInListResponse(r, inher_entity) + inher_entity['scope']['project']['id'] = level4['id'] + self.assertRoleAssignmentInListResponse(r, inher_entity) + def test_get_inherited_role_assignments_for_project_hierarchy(self): """Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``. @@ -3089,7 +2370,7 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase): self.config_fixture.config(group='os_inherit', enabled=False) def test_crud_inherited_role_grants_failed_if_disabled(self): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) base_collection_url = ( @@ -3107,118 +2388,484 @@ class AssignmentInheritanceDisabledTestCase(test_v3.RestfulTestCase): self.delete(member_url, expected_status=http_client.NOT_FOUND) -class AssignmentV3toV2MethodsTestCase(unit.TestCase): - """Test domain V3 to V2 conversion methods.""" - def _setup_initial_projects(self): - self.project_id = uuid.uuid4().hex - self.domain_id = CONF.identity.default_domain_id - self.parent_id = uuid.uuid4().hex - # Project with only domain_id in ref - self.project1 = {'id': self.project_id, - 'name': self.project_id, - 'domain_id': self.domain_id} - # Project with both domain_id and parent_id in ref - self.project2 = {'id': self.project_id, - 'name': self.project_id, - 'domain_id': self.domain_id, - 'parent_id': self.parent_id} - # Project with no domain_id and parent_id in ref - self.project3 = {'id': self.project_id, - 'name': self.project_id, - 'domain_id': self.domain_id, - 'parent_id': self.parent_id} - # Expected result with no domain_id and parent_id - self.expected_project = {'id': self.project_id, - 'name': self.project_id} - - def test_v2controller_filter_domain_id(self): - # V2.0 is not domain aware, ensure domain_id is popped off the ref. - other_data = uuid.uuid4().hex - domain_id = CONF.identity.default_domain_id - ref = {'domain_id': domain_id, - 'other_data': other_data} - - ref_no_domain = {'other_data': other_data} - expected_ref = ref_no_domain.copy() - - updated_ref = controller.V2Controller.filter_domain_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(ref, expected_ref) - # Make sure we don't error/muck up data if domain_id isn't present - updated_ref = controller.V2Controller.filter_domain_id(ref_no_domain) - self.assertIs(ref_no_domain, updated_ref) - self.assertDictEqual(ref_no_domain, expected_ref) - - def test_v3controller_filter_domain_id(self): - # No data should be filtered out in this case. - other_data = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - ref = {'domain_id': domain_id, - 'other_data': other_data} - - expected_ref = ref.copy() - updated_ref = controller.V3Controller.filter_domain_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(ref, expected_ref) - - def test_v2controller_filter_domain(self): - other_data = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - non_default_domain_ref = {'domain': {'id': domain_id}, - 'other_data': other_data} - default_domain_ref = {'domain': {'id': 'default'}, - 'other_data': other_data} - updated_ref = controller.V2Controller.filter_domain(default_domain_ref) - self.assertNotIn('domain', updated_ref) - self.assertRaises(exception.Unauthorized, - controller.V2Controller.filter_domain, - non_default_domain_ref) - - def test_v2controller_filter_project_parent_id(self): - # V2.0 is not project hierarchy aware, ensure parent_id is popped off. - other_data = uuid.uuid4().hex - parent_id = uuid.uuid4().hex - ref = {'parent_id': parent_id, - 'other_data': other_data} - - ref_no_parent = {'other_data': other_data} - expected_ref = ref_no_parent.copy() - - updated_ref = controller.V2Controller.filter_project_parent_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(ref, expected_ref) - # Make sure we don't error/muck up data if parent_id isn't present - updated_ref = controller.V2Controller.filter_project_parent_id( - ref_no_parent) - self.assertIs(ref_no_parent, updated_ref) - self.assertDictEqual(ref_no_parent, expected_ref) - - def test_v3_to_v2_project_method(self): - self._setup_initial_projects() - updated_project1 = controller.V2Controller.v3_to_v2_project( - self.project1) - self.assertIs(self.project1, updated_project1) - self.assertDictEqual(self.project1, self.expected_project) - updated_project2 = controller.V2Controller.v3_to_v2_project( - self.project2) - self.assertIs(self.project2, updated_project2) - self.assertDictEqual(self.project2, self.expected_project) - updated_project3 = controller.V2Controller.v3_to_v2_project( - self.project3) - self.assertIs(self.project3, updated_project3) - self.assertDictEqual(self.project3, self.expected_project) - - def test_v3_to_v2_project_method_list(self): - self._setup_initial_projects() - project_list = [self.project1, self.project2, self.project3] - updated_list = controller.V2Controller.v3_to_v2_project(project_list) - - self.assertEqual(len(updated_list), len(project_list)) - - for i, ref in enumerate(updated_list): - # Order should not change. - self.assertIs(ref, project_list[i]) - - self.assertDictEqual(self.project1, self.expected_project) - self.assertDictEqual(self.project2, self.expected_project) - self.assertDictEqual(self.project3, self.expected_project) +class ImpliedRolesTests(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin, + unit.TestCase): + def _create_role(self): + """Call ``POST /roles``.""" + ref = unit.new_role_ref() + r = self.post('/roles', body={'role': ref}) + return self.assertValidRoleResponse(r, ref) + + def test_list_implied_roles_none(self): + self.prior = self._create_role() + url = '/roles/%s/implies' % (self.prior['id']) + response = self.get(url).json["role_inference"] + self.assertEqual(self.prior['id'], response['prior_role']['id']) + self.assertEqual(0, len(response['implies'])) + + def _create_implied_role(self, prior, implied): + self.put('/roles/%s/implies/%s' % (prior['id'], implied['id']), + expected_status=http_client.CREATED) + + def _delete_implied_role(self, prior, implied): + self.delete('/roles/%s/implies/%s' % (prior['id'], implied['id'])) + + def _setup_prior_two_implied(self): + self.prior = self._create_role() + self.implied1 = self._create_role() + self._create_implied_role(self.prior, self.implied1) + self.implied2 = self._create_role() + self._create_implied_role(self.prior, self.implied2) + + def _assert_expected_implied_role_response( + self, expected_prior_id, expected_implied_ids): + r = self.get('/roles/%s/implies' % expected_prior_id) + response = r.json["role_inference"] + self.assertEqual(expected_prior_id, response['prior_role']['id']) + + actual_implied_ids = [implied['id'] for implied in response['implies']] + + for expected_id in expected_implied_ids: + self.assertIn(expected_id, actual_implied_ids) + self.assertEqual(len(expected_implied_ids), len(response['implies'])) + + self.assertIsNotNone(response['prior_role']['links']['self']) + for implied in response['implies']: + self.assertIsNotNone(implied['links']['self']) + + def _assert_two_roles_implied(self): + self._assert_expected_implied_role_response( + self.prior['id'], [self.implied1['id'], self.implied2['id']]) + + def _assert_one_role_implied(self): + self._assert_expected_implied_role_response( + self.prior['id'], [self.implied1['id']]) + + self.get('/roles/%s/implies/%s' % + (self.prior['id'], self.implied2['id']), + expected_status=http_client.NOT_FOUND) + + def _assert_two_rules_defined(self): + r = self.get('/role_inferences/') + + rules = r.result['role_inferences'] + + self.assertEqual(self.prior['id'], rules[0]['prior_role']['id']) + self.assertEqual(2, len(rules[0]['implies'])) + implied_ids = [implied['id'] for implied in rules[0]['implies']] + implied_names = [implied['name'] for implied in rules[0]['implies']] + + self.assertIn(self.implied1['id'], implied_ids) + self.assertIn(self.implied2['id'], implied_ids) + self.assertIn(self.implied1['name'], implied_names) + self.assertIn(self.implied2['name'], implied_names) + + def _assert_one_rule_defined(self): + r = self.get('/role_inferences/') + rules = r.result['role_inferences'] + self.assertEqual(self.prior['id'], rules[0]['prior_role']['id']) + self.assertEqual(self.implied1['id'], rules[0]['implies'][0]['id']) + self.assertEqual(self.implied1['name'], rules[0]['implies'][0]['name']) + self.assertEqual(1, len(rules[0]['implies'])) + + def test_list_all_rules(self): + self._setup_prior_two_implied() + self._assert_two_rules_defined() + + self._delete_implied_role(self.prior, self.implied2) + self._assert_one_rule_defined() + + def test_CRD_implied_roles(self): + + self._setup_prior_two_implied() + self._assert_two_roles_implied() + + self._delete_implied_role(self.prior, self.implied2) + self._assert_one_role_implied() + + def _create_three_roles(self): + self.role_list = [] + for _ in range(3): + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + self.role_list.append(role) + + def _create_test_domain_user_project(self): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user = unit.create_user(self.identity_api, domain_id=domain['id']) + project = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project['id'], project) + return domain, user, project + + def _assign_top_role_to_user_on_project(self, user, project): + self.assignment_api.add_role_to_user_and_project( + user['id'], project['id'], self.role_list[0]['id']) + + def _build_effective_role_assignments_url(self, user): + return '/role_assignments?effective&user.id=%(user_id)s' % { + 'user_id': user['id']} + + def _assert_all_roles_in_assignment(self, response, user): + # Now use the list role assignments api to check that all three roles + # appear in the collection + self.assertValidRoleAssignmentListResponse( + response, + expected_length=len(self.role_list), + resource_url=self._build_effective_role_assignments_url(user)) + + def _assert_initial_assignment_in_effective(self, response, user, project): + # The initial assignment should be there (the link url will be + # generated and checked automatically since it matches the assignment) + entity = self.build_role_assignment_entity( + project_id=project['id'], + user_id=user['id'], role_id=self.role_list[0]['id']) + self.assertRoleAssignmentInListResponse(response, entity) + + def _assert_effective_role_for_implied_has_prior_in_links( + self, response, user, project, prior_index, implied_index): + # An effective role for an implied role will have the prior role + # assignment in the links + prior_link = '/prior_roles/%(prior)s/implies/%(implied)s' % { + 'prior': self.role_list[prior_index]['id'], + 'implied': self.role_list[implied_index]['id']} + link = self.build_role_assignment_link( + project_id=project['id'], user_id=user['id'], + role_id=self.role_list[prior_index]['id']) + entity = self.build_role_assignment_entity( + link=link, project_id=project['id'], + user_id=user['id'], role_id=self.role_list[implied_index]['id'], + prior_link=prior_link) + self.assertRoleAssignmentInListResponse(response, entity) + + def test_list_role_assignments_with_implied_roles(self): + """Call ``GET /role_assignments`` with implied role grant. + + Test Plan: + + - Create a domain with a user and a project + - Create 3 roles + - Role 0 implies role 1 and role 1 implies role 2 + - Assign the top role to the project + - Issue the URL to check effective roles on project - this + should return all 3 roles. + - Check the links of the 3 roles indicate the prior role where + appropriate + + """ + (domain, user, project) = self._create_test_domain_user_project() + self._create_three_roles() + self._create_implied_role(self.role_list[0], self.role_list[1]) + self._create_implied_role(self.role_list[1], self.role_list[2]) + self._assign_top_role_to_user_on_project(user, project) + + response = self.get(self._build_effective_role_assignments_url(user)) + r = response + + self._assert_all_roles_in_assignment(r, user) + self._assert_initial_assignment_in_effective(response, user, project) + self._assert_effective_role_for_implied_has_prior_in_links( + response, user, project, 0, 1) + self._assert_effective_role_for_implied_has_prior_in_links( + response, user, project, 1, 2) + + def _create_named_role(self, name): + role = unit.new_role_ref() + role['name'] = name + self.role_api.create_role(role['id'], role) + return role + + def test_root_role_as_implied_role_forbidden(self): + """Test root role is forbidden to be set as an implied role. + + Create 2 roles that are prohibited from being an implied role. + Create 1 additional role which should be accepted as an implied + role. Assure the prohibited role names cannot be set as an implied + role. Assure the accepted role name which is not a member of the + prohibited implied role list can be successfully set an implied + role. + """ + prohibited_name1 = 'root1' + prohibited_name2 = 'root2' + accepted_name1 = 'implied1' + + prohibited_names = [prohibited_name1, prohibited_name2] + self.config_fixture.config(group='assignment', + prohibited_implied_role=prohibited_names) + + prior_role = self._create_role() + + prohibited_role1 = self._create_named_role(prohibited_name1) + url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format( + prior_role_id=prior_role['id'], + implied_role_id=prohibited_role1['id']) + self.put(url, expected_status=http_client.FORBIDDEN) + + prohibited_role2 = self._create_named_role(prohibited_name2) + url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format( + prior_role_id=prior_role['id'], + implied_role_id=prohibited_role2['id']) + self.put(url, expected_status=http_client.FORBIDDEN) + + accepted_role1 = self._create_named_role(accepted_name1) + url = '/roles/{prior_role_id}/implies/{implied_role_id}'.format( + prior_role_id=prior_role['id'], + implied_role_id=accepted_role1['id']) + self.put(url, expected_status=http_client.CREATED) + + def test_trusts_from_implied_role(self): + self._create_three_roles() + self._create_implied_role(self.role_list[0], self.role_list[1]) + self._create_implied_role(self.role_list[1], self.role_list[2]) + self._assign_top_role_to_user_on_project(self.user, self.project) + + # Create a trustee and assign the prior role to her + trustee = unit.create_user(self.identity_api, domain_id=self.domain_id) + ref = unit.new_trust_ref( + trustor_user_id=self.user['id'], + trustee_user_id=trustee['id'], + project_id=self.project['id'], + role_ids=[self.role_list[0]['id']]) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = r.result['trust'] + + # Only the role that was specified is in the trust, NOT implied roles + self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id']) + self.assertThat(trust['roles'], matchers.HasLength(1)) + + # Authenticate as the trustee + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id']) + r = self.v3_create_token(auth_data) + token = r.result['token'] + self.assertThat(token['roles'], + matchers.HasLength(len(self.role_list))) + for role in token['roles']: + self.assertIn(role, self.role_list) + for role in self.role_list: + self.assertIn(role, token['roles']) + + def test_trusts_from_domain_specific_implied_role(self): + self._create_three_roles() + # Overwrite the first role with a domain specific role + role = unit.new_role_ref(domain_id=self.domain_id) + self.role_list[0] = self.role_api.create_role(role['id'], role) + self._create_implied_role(self.role_list[0], self.role_list[1]) + self._create_implied_role(self.role_list[1], self.role_list[2]) + self._assign_top_role_to_user_on_project(self.user, self.project) + + # Create a trustee and assign the prior role to her + trustee = unit.create_user(self.identity_api, domain_id=self.domain_id) + ref = unit.new_trust_ref( + trustor_user_id=self.user['id'], + trustee_user_id=trustee['id'], + project_id=self.project['id'], + role_ids=[self.role_list[0]['id']]) + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = r.result['trust'] + + # Only the role that was specified is in the trust, NOT implied roles + self.assertEqual(self.role_list[0]['id'], trust['roles'][0]['id']) + self.assertThat(trust['roles'], matchers.HasLength(1)) + + # Authenticate as the trustee + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id']) + r = self.v3_create_token(auth_data) + token = r.result['token'] + + # The token should have the roles implies by the domain specific role, + # but not the domain specific role itself. + self.assertThat(token['roles'], + matchers.HasLength(len(self.role_list) - 1)) + for role in token['roles']: + self.assertIn(role, self.role_list) + for role in [self.role_list[1], self.role_list[2]]: + self.assertIn(role, token['roles']) + self.assertNotIn(self.role_list[0], token['roles']) + + +class DomainSpecificRoleTests(test_v3.RestfulTestCase, unit.TestCase): + def setUp(self): + def create_role(domain_id=None): + """Call ``POST /roles``.""" + ref = unit.new_role_ref(domain_id=domain_id) + r = self.post( + '/roles', + body={'role': ref}) + return self.assertValidRoleResponse(r, ref) + + super(DomainSpecificRoleTests, self).setUp() + self.domainA = unit.new_domain_ref() + self.resource_api.create_domain(self.domainA['id'], self.domainA) + self.domainB = unit.new_domain_ref() + self.resource_api.create_domain(self.domainB['id'], self.domainB) + + self.global_role1 = create_role() + self.global_role2 = create_role() + # Since there maybe other global roles already created, let's count + # them, so we can ensure we can check subsequent list responses + # are correct + r = self.get('/roles') + self.existing_global_roles = len(r.result['roles']) + + # And now create some domain specific roles + self.domainA_role1 = create_role(domain_id=self.domainA['id']) + self.domainA_role2 = create_role(domain_id=self.domainA['id']) + self.domainB_role = create_role(domain_id=self.domainB['id']) + + def test_get_and_list_domain_specific_roles(self): + # Check we can get a domain specific role + r = self.get('/roles/%s' % self.domainA_role1['id']) + self.assertValidRoleResponse(r, self.domainA_role1) + + # If we list without specifying a domain, we should only get global + # roles back. + r = self.get('/roles') + self.assertValidRoleListResponse( + r, expected_length=self.existing_global_roles) + self.assertRoleInListResponse(r, self.global_role1) + self.assertRoleInListResponse(r, self.global_role2) + self.assertRoleNotInListResponse(r, self.domainA_role1) + self.assertRoleNotInListResponse(r, self.domainA_role2) + self.assertRoleNotInListResponse(r, self.domainB_role) + + # Now list those in domainA, making sure that's all we get back + r = self.get('/roles?domain_id=%s' % self.domainA['id']) + self.assertValidRoleListResponse(r, expected_length=2) + self.assertRoleInListResponse(r, self.domainA_role1) + self.assertRoleInListResponse(r, self.domainA_role2) + + def test_update_domain_specific_roles(self): + self.domainA_role1['name'] = uuid.uuid4().hex + self.patch('/roles/%(role_id)s' % { + 'role_id': self.domainA_role1['id']}, + body={'role': self.domainA_role1}) + r = self.get('/roles/%s' % self.domainA_role1['id']) + self.assertValidRoleResponse(r, self.domainA_role1) + + def test_delete_domain_specific_roles(self): + # Check delete only removes that one domain role + self.delete('/roles/%(role_id)s' % { + 'role_id': self.domainA_role1['id']}) + + self.get('/roles/%s' % self.domainA_role1['id'], + expected_status=http_client.NOT_FOUND) + # Now re-list those in domainA, making sure there's only one left + r = self.get('/roles?domain_id=%s' % self.domainA['id']) + self.assertValidRoleListResponse(r, expected_length=1) + self.assertRoleInListResponse(r, self.domainA_role2) + + +class ListUserProjectsTestCase(test_v3.RestfulTestCase): + """Tests for /users/<user>/projects""" + + def load_sample_data(self): + # do not load base class's data, keep it focused on the tests + + self.auths = [] + self.domains = [] + self.projects = [] + self.roles = [] + self.users = [] + + # Create 3 sets of domain, roles, projects, and users to demonstrate + # the right user's data is loaded and only projects they can access + # are returned. + + for _ in range(3): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + + user = unit.create_user(self.identity_api, domain_id=domain['id']) + + role = unit.new_role_ref() + self.role_api.create_role(role['id'], role) + + self.assignment_api.create_grant(role['id'], + user_id=user['id'], + domain_id=domain['id']) + + project = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project['id'], project) + + self.assignment_api.create_grant(role['id'], + user_id=user['id'], + project_id=project['id']) + + auth = self.build_authentication_request( + user_id=user['id'], + password=user['password'], + domain_id=domain['id']) + + self.auths.append(auth) + self.domains.append(domain) + self.projects.append(project) + self.roles.append(role) + self.users.append(user) + + def test_list_all(self): + for i in range(len(self.users)): + user = self.users[i] + auth = self.auths[i] + + url = '/users/%s/projects' % user['id'] + result = self.get(url, auth=auth) + projects_result = result.json['projects'] + self.assertEqual(1, len(projects_result)) + self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) + + def test_list_enabled(self): + for i in range(len(self.users)): + user = self.users[i] + auth = self.auths[i] + + # There are no disabled projects + url = '/users/%s/projects?enabled=True' % user['id'] + result = self.get(url, auth=auth) + projects_result = result.json['projects'] + self.assertEqual(1, len(projects_result)) + self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) + + def test_list_disabled(self): + for i in range(len(self.users)): + user = self.users[i] + auth = self.auths[i] + project = self.projects[i] + + # There are no disabled projects + url = '/users/%s/projects?enabled=False' % user['id'] + result = self.get(url, auth=auth) + self.assertEqual(0, len(result.json['projects'])) + + # disable this one and check again + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + result = self.get(url, auth=auth) + projects_result = result.json['projects'] + self.assertEqual(1, len(projects_result)) + self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) + + def test_list_by_domain_id(self): + for i in range(len(self.users)): + user = self.users[i] + domain = self.domains[i] + auth = self.auths[i] + + # Try looking for projects with a non-existent domain_id + url = '/users/%s/projects?domain_id=%s' % (user['id'], + uuid.uuid4().hex) + result = self.get(url, auth=auth) + self.assertEqual(0, len(result.json['projects'])) + + # Now try a valid one + url = '/users/%s/projects?domain_id=%s' % (user['id'], + domain['id']) + result = self.get(url, auth=auth) + projects_result = result.json['projects'] + self.assertEqual(1, len(projects_result)) + self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) |