diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_resource.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_resource.py | 1434 |
1 files changed, 0 insertions, 1434 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_resource.py b/keystone-moon/keystone/tests/unit/test_v3_resource.py deleted file mode 100644 index f54fcb57..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_resource.py +++ /dev/null @@ -1,1434 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslo_config import cfg -from six.moves import http_client -from six.moves import range -from testtools import matchers - -from keystone.common import controller -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import test_v3 -from keystone.tests.unit import utils as test_utils - - -CONF = cfg.CONF - - -class ResourceTestCase(test_v3.RestfulTestCase, - test_v3.AssignmentTestMixin): - """Test domains and projects.""" - - # Domain CRUD tests - - def test_create_domain(self): - """Call ``POST /domains``.""" - ref = unit.new_domain_ref() - r = self.post( - '/domains', - body={'domain': ref}) - return self.assertValidDomainResponse(r, ref) - - def test_create_domain_case_sensitivity(self): - """Call `POST /domains`` twice with upper() and lower() cased name.""" - ref = unit.new_domain_ref() - - # ensure the name is lowercase - ref['name'] = ref['name'].lower() - r = self.post( - '/domains', - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - # ensure the name is uppercase - ref['name'] = ref['name'].upper() - r = self.post( - '/domains', - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - def test_create_domain_bad_request(self): - """Call ``POST /domains``.""" - self.post('/domains', body={'domain': {}}, - expected_status=http_client.BAD_REQUEST) - - def test_create_domain_unsafe(self): - """Call ``POST /domains with unsafe names``.""" - unsafe_name = 'i am not / safe' - - self.config_fixture.config(group='resource', - domain_name_url_safe='off') - ref = unit.new_domain_ref(name=unsafe_name) - self.post( - '/domains', - body={'domain': ref}) - - for config_setting in ['new', 'strict']: - self.config_fixture.config(group='resource', - domain_name_url_safe=config_setting) - ref = unit.new_domain_ref(name=unsafe_name) - self.post( - '/domains', - body={'domain': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_create_domain_unsafe_default(self): - """Check default for unsafe names for ``POST /domains``.""" - unsafe_name = 'i am not / safe' - - # By default, we should be able to create unsafe names - ref = unit.new_domain_ref(name=unsafe_name) - self.post( - '/domains', - body={'domain': ref}) - - def test_create_domain_creates_is_domain_project(self): - """Check a project that acts as a domain is created. - - Call ``POST /domains``. - """ - # Create a new domain - domain_ref = unit.new_domain_ref() - r = self.post('/domains', body={'domain': domain_ref}) - self.assertValidDomainResponse(r, domain_ref) - - # Retrieve its correspondent project - r = self.get('/projects/%(project_id)s' % { - 'project_id': r.result['domain']['id']}) - self.assertValidProjectResponse(r) - - # The created project has is_domain flag as True - self.assertTrue(r.result['project']['is_domain']) - - # And its parent_id and domain_id attributes are equal - self.assertIsNone(r.result['project']['parent_id']) - self.assertIsNone(r.result['project']['domain_id']) - - def test_create_is_domain_project_creates_domain(self): - """Call ``POST /projects`` is_domain and check a domain is created.""" - # Create a new project that acts as a domain - project_ref = unit.new_project_ref(domain_id=None, is_domain=True) - r = self.post('/projects', body={'project': project_ref}) - self.assertValidProjectResponse(r) - - # Retrieve its correspondent domain - r = self.get('/domains/%(domain_id)s' % { - 'domain_id': r.result['project']['id']}) - self.assertValidDomainResponse(r) - self.assertIsNotNone(r.result['domain']) - - def test_list_domains(self): - """Call ``GET /domains``.""" - resource_url = '/domains' - r = self.get(resource_url) - self.assertValidDomainListResponse(r, ref=self.domain, - resource_url=resource_url) - - def test_get_domain(self): - """Call ``GET /domains/{domain_id}``.""" - r = self.get('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}) - self.assertValidDomainResponse(r, self.domain) - - def test_update_domain(self): - """Call ``PATCH /domains/{domain_id}``.""" - ref = unit.new_domain_ref() - del ref['id'] - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}, - body={'domain': ref}) - self.assertValidDomainResponse(r, ref) - - def test_update_domain_unsafe(self): - """Call ``POST /domains/{domain_id} with unsafe names``.""" - unsafe_name = 'i am not / safe' - - self.config_fixture.config(group='resource', - domain_name_url_safe='off') - ref = unit.new_domain_ref(name=unsafe_name) - del ref['id'] - self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}, - body={'domain': ref}) - - unsafe_name = 'i am still not / safe' - for config_setting in ['new', 'strict']: - self.config_fixture.config(group='resource', - domain_name_url_safe=config_setting) - ref = unit.new_domain_ref(name=unsafe_name) - del ref['id'] - self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}, - body={'domain': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_update_domain_unsafe_default(self): - """Check default for unsafe names for ``POST /domains``.""" - unsafe_name = 'i am not / safe' - - # By default, we should be able to create unsafe names - ref = unit.new_domain_ref(name=unsafe_name) - del ref['id'] - self.patch('/domains/%(domain_id)s' % { - 'domain_id': self.domain_id}, - body={'domain': ref}) - - def test_update_domain_updates_is_domain_project(self): - """Check the project that acts as a domain is updated. - - Call ``PATCH /domains``. - """ - # Create a new domain - domain_ref = unit.new_domain_ref() - r = self.post('/domains', body={'domain': domain_ref}) - self.assertValidDomainResponse(r, domain_ref) - - # Disable it - self.patch('/domains/%s' % r.result['domain']['id'], - body={'domain': {'enabled': False}}) - - # Retrieve its correspondent project - r = self.get('/projects/%(project_id)s' % { - 'project_id': r.result['domain']['id']}) - self.assertValidProjectResponse(r) - - # The created project is disabled as well - self.assertFalse(r.result['project']['enabled']) - - def test_disable_domain(self): - """Call ``PATCH /domains/{domain_id}`` (set enabled=False).""" - # Create a 2nd set of entities in a 2nd domain - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - - project2 = unit.new_project_ref(domain_id=domain2['id']) - self.resource_api.create_project(project2['id'], project2) - - user2 = unit.create_user(self.identity_api, - domain_id=domain2['id'], - project_id=project2['id']) - - self.assignment_api.add_user_to_project(project2['id'], - user2['id']) - - # First check a user in that domain can authenticate.. - body = { - 'auth': { - 'passwordCredentials': { - 'userId': user2['id'], - 'password': user2['password'] - }, - 'tenantId': project2['id'] - } - } - self.admin_request( - path='/v2.0/tokens', method='POST', body=body) - - auth_data = self.build_authentication_request( - user_id=user2['id'], - password=user2['password'], - project_id=project2['id']) - self.v3_create_token(auth_data) - - # Now disable the domain - domain2['enabled'] = False - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': domain2['id']}, - body={'domain': {'enabled': False}}) - self.assertValidDomainResponse(r, domain2) - - # Make sure the user can no longer authenticate, via - # either API - body = { - 'auth': { - 'passwordCredentials': { - 'userId': user2['id'], - 'password': user2['password'] - }, - 'tenantId': project2['id'] - } - } - self.admin_request( - path='/v2.0/tokens', method='POST', body=body, - expected_status=http_client.UNAUTHORIZED) - - # Try looking up in v3 by name and id - auth_data = self.build_authentication_request( - user_id=user2['id'], - password=user2['password'], - project_id=project2['id']) - self.v3_create_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - auth_data = self.build_authentication_request( - username=user2['name'], - user_domain_id=domain2['id'], - password=user2['password'], - project_id=project2['id']) - self.v3_create_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - def test_delete_enabled_domain_fails(self): - """Call ``DELETE /domains/{domain_id}`` (when domain enabled).""" - # Try deleting an enabled domain, which should fail - self.delete('/domains/%(domain_id)s' % { - 'domain_id': self.domain['id']}, - expected_status=exception.ForbiddenAction.code) - - def test_delete_domain(self): - """Call ``DELETE /domains/{domain_id}``. - - The sample data set up already has a user and project that is part of - self.domain. Additionally we will create a group and a credential - within it. Since the user we will authenticate with is in this domain, - we create a another set of entities in a second domain. Deleting this - second domain should delete all these new entities. In addition, - all the entities in the regular self.domain should be unaffected - by the delete. - - Test Plan: - - - Create domain2 and a 2nd set of entities - - Disable domain2 - - Delete domain2 - - Check entities in domain2 have been deleted - - Check entities in self.domain are unaffected - - """ - # Create a group and a credential in the main domain - group = unit.new_group_ref(domain_id=self.domain_id) - group = self.identity_api.create_group(group) - - credential = unit.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - self.credential_api.create_credential(credential['id'], credential) - - # Create a 2nd set of entities in a 2nd domain - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - - project2 = unit.new_project_ref(domain_id=domain2['id']) - project2 = self.resource_api.create_project(project2['id'], project2) - - user2 = unit.new_user_ref(domain_id=domain2['id'], - project_id=project2['id']) - user2 = self.identity_api.create_user(user2) - - group2 = unit.new_group_ref(domain_id=domain2['id']) - group2 = self.identity_api.create_group(group2) - - credential2 = unit.new_credential_ref(user_id=user2['id'], - project_id=project2['id']) - self.credential_api.create_credential(credential2['id'], - credential2) - - # Now disable the new domain and delete it - domain2['enabled'] = False - r = self.patch('/domains/%(domain_id)s' % { - 'domain_id': domain2['id']}, - body={'domain': {'enabled': False}}) - self.assertValidDomainResponse(r, domain2) - self.delete('/domains/%(domain_id)s' % {'domain_id': domain2['id']}) - - # Check all the domain2 relevant entities are gone - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain2['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project2['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - group2['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - user2['id']) - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - credential2['id']) - - # ...and that all self.domain entities are still here - r = self.resource_api.get_domain(self.domain['id']) - self.assertDictEqual(self.domain, r) - r = self.resource_api.get_project(self.project['id']) - self.assertDictEqual(self.project, r) - r = self.identity_api.get_group(group['id']) - self.assertDictEqual(group, r) - r = self.identity_api.get_user(self.user['id']) - self.user.pop('password') - self.assertDictEqual(self.user, r) - r = self.credential_api.get_credential(credential['id']) - self.assertDictEqual(credential, r) - - def test_delete_domain_deletes_is_domain_project(self): - """Check the project that acts as a domain is deleted. - - Call ``DELETE /domains``. - """ - # Create a new domain - domain_ref = unit.new_domain_ref() - r = self.post('/domains', body={'domain': domain_ref}) - self.assertValidDomainResponse(r, domain_ref) - - # Retrieve its correspondent project - self.get('/projects/%(project_id)s' % { - 'project_id': r.result['domain']['id']}) - - # Delete the domain - self.patch('/domains/%s' % r.result['domain']['id'], - body={'domain': {'enabled': False}}) - self.delete('/domains/%s' % r.result['domain']['id']) - - # The created project is deleted as well - self.get('/projects/%(project_id)s' % { - 'project_id': r.result['domain']['id']}, expected_status=404) - - def test_delete_default_domain(self): - # Need to disable it first. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': CONF.identity.default_domain_id}, - body={'domain': {'enabled': False}}) - - self.delete( - '/domains/%(domain_id)s' % { - 'domain_id': CONF.identity.default_domain_id}) - - def test_token_revoked_once_domain_disabled(self): - """Test token from a disabled domain has been invalidated. - - Test that a token that was valid for an enabled domain - becomes invalid once that domain is disabled. - - """ - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - - user2 = unit.create_user(self.identity_api, - domain_id=domain['id']) - - # build a request body - auth_body = self.build_authentication_request( - user_id=user2['id'], - password=user2['password']) - - # sends a request for the user's token - token_resp = self.post('/auth/tokens', body=auth_body) - - subject_token = token_resp.headers.get('x-subject-token') - - # validates the returned token and it should be valid. - self.head('/auth/tokens', - headers={'x-subject-token': subject_token}, - expected_status=http_client.OK) - - # now disable the domain - domain['enabled'] = False - url = "/domains/%(domain_id)s" % {'domain_id': domain['id']} - self.patch(url, - body={'domain': {'enabled': False}}) - - # validates the same token again and it should be 'not found' - # as the domain has already been disabled. - self.head('/auth/tokens', - headers={'x-subject-token': subject_token}, - expected_status=http_client.NOT_FOUND) - - def test_delete_domain_hierarchy(self): - """Call ``DELETE /domains/{domain_id}``.""" - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - - root_project = unit.new_project_ref(domain_id=domain['id']) - root_project = self.resource_api.create_project(root_project['id'], - root_project) - - leaf_project = unit.new_project_ref( - domain_id=domain['id'], - parent_id=root_project['id']) - self.resource_api.create_project(leaf_project['id'], leaf_project) - - # Need to disable it first. - self.patch('/domains/%(domain_id)s' % { - 'domain_id': domain['id']}, - body={'domain': {'enabled': False}}) - - self.delete( - '/domains/%(domain_id)s' % { - 'domain_id': domain['id']}) - - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - def test_forbid_operations_on_federated_domain(self): - """Make sure one cannot operate on federated domain. - - This includes operations like create, update, delete - on domain identified by id and name where difference variations of - id 'Federated' are used. - - """ - def create_domains(): - for variation in ('Federated', 'FEDERATED', - 'federated', 'fEderated'): - domain = unit.new_domain_ref() - domain['id'] = variation - yield domain - - for domain in create_domains(): - self.assertRaises( - AssertionError, self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises( - AssertionError, self.resource_api.update_domain, - domain['id'], domain) - self.assertRaises( - exception.DomainNotFound, self.resource_api.delete_domain, - domain['id']) - - # swap 'name' with 'id' and try again, expecting the request to - # gracefully fail - domain['id'], domain['name'] = domain['name'], domain['id'] - self.assertRaises( - AssertionError, self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises( - AssertionError, self.resource_api.update_domain, - domain['id'], domain) - self.assertRaises( - exception.DomainNotFound, self.resource_api.delete_domain, - domain['id']) - - def test_forbid_operations_on_defined_federated_domain(self): - """Make sure one cannot operate on a user-defined federated domain. - - This includes operations like create, update, delete. - - """ - non_default_name = 'beta_federated_domain' - self.config_fixture.config(group='federation', - federated_domain_name=non_default_name) - domain = unit.new_domain_ref(name=non_default_name) - self.assertRaises(AssertionError, - self.resource_api.create_domain, - domain['id'], domain) - self.assertRaises(exception.DomainNotFound, - self.resource_api.delete_domain, - domain['id']) - self.assertRaises(AssertionError, - self.resource_api.update_domain, - domain['id'], domain) - - # Project CRUD tests - - def test_list_projects(self): - """Call ``GET /projects``.""" - resource_url = '/projects' - r = self.get(resource_url) - self.assertValidProjectListResponse(r, ref=self.project, - resource_url=resource_url) - - def test_create_project(self): - """Call ``POST /projects``.""" - ref = unit.new_project_ref(domain_id=self.domain_id) - r = self.post( - '/projects', - body={'project': ref}) - self.assertValidProjectResponse(r, ref) - - def test_create_project_bad_request(self): - """Call ``POST /projects``.""" - self.post('/projects', body={'project': {}}, - expected_status=http_client.BAD_REQUEST) - - def test_create_project_invalid_domain_id(self): - """Call ``POST /projects``.""" - ref = unit.new_project_ref(domain_id=uuid.uuid4().hex) - self.post('/projects', body={'project': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_create_project_unsafe(self): - """Call ``POST /projects with unsafe names``.""" - unsafe_name = 'i am not / safe' - - self.config_fixture.config(group='resource', - project_name_url_safe='off') - ref = unit.new_project_ref(name=unsafe_name) - self.post( - '/projects', - body={'project': ref}) - - for config_setting in ['new', 'strict']: - self.config_fixture.config(group='resource', - project_name_url_safe=config_setting) - ref = unit.new_project_ref(name=unsafe_name) - self.post( - '/projects', - body={'project': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_create_project_unsafe_default(self): - """Check default for unsafe names for ``POST /projects``.""" - unsafe_name = 'i am not / safe' - - # By default, we should be able to create unsafe names - ref = unit.new_project_ref(name=unsafe_name) - self.post( - '/projects', - body={'project': ref}) - - def test_create_project_with_parent_id_none_and_domain_id_none(self): - """Call ``POST /projects``.""" - # Grant a domain role for the user - collection_url = ( - '/domains/%(domain_id)s/users/%(user_id)s/roles' % { - 'domain_id': self.domain_id, - 'user_id': self.user['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': self.role_id} - self.put(member_url) - - # Create an authentication request for a domain scoped token - auth = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - domain_id=self.domain_id) - - # Without parent_id and domain_id passed as None, the domain_id should - # be normalized to the domain on the token, when using a domain - # scoped token. - ref = unit.new_project_ref() - r = self.post( - '/projects', - auth=auth, - body={'project': ref}) - ref['domain_id'] = self.domain['id'] - self.assertValidProjectResponse(r, ref) - - def test_create_project_without_parent_id_and_without_domain_id(self): - """Call ``POST /projects``.""" - # Grant a domain role for the user - collection_url = ( - '/domains/%(domain_id)s/users/%(user_id)s/roles' % { - 'domain_id': self.domain_id, - 'user_id': self.user['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': self.role_id} - self.put(member_url) - - # Create an authentication request for a domain scoped token - auth = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - domain_id=self.domain_id) - - # Without domain_id and parent_id, the domain_id should be - # normalized to the domain on the token, when using a domain - # scoped token. - ref = unit.new_project_ref() - r = self.post( - '/projects', - auth=auth, - body={'project': ref}) - ref['domain_id'] = self.domain['id'] - self.assertValidProjectResponse(r, ref) - - @test_utils.wip('waiting for support for parent_id to imply domain_id') - def test_create_project_with_parent_id_and_no_domain_id(self): - """Call ``POST /projects``.""" - # With only the parent_id, the domain_id should be - # normalized to the parent's domain_id - ref_child = unit.new_project_ref(parent_id=self.project['id']) - - r = self.post( - '/projects', - body={'project': ref_child}) - self.assertEqual(r.result['project']['domain_id'], - self.project['domain_id']) - ref_child['domain_id'] = self.domain['id'] - self.assertValidProjectResponse(r, ref_child) - - def _create_projects_hierarchy(self, hierarchy_size=1): - """Creates a single-branched project hierarchy with the specified size. - - :param hierarchy_size: the desired hierarchy size, default is 1 - - a project with one child. - - :returns projects: a list of the projects in the created hierarchy. - - """ - new_ref = unit.new_project_ref(domain_id=self.domain_id) - resp = self.post('/projects', body={'project': new_ref}) - - projects = [resp.result] - - for i in range(hierarchy_size): - new_ref = unit.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[i]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - - projects.append(resp.result) - - return projects - - def test_list_projects_filtering_by_parent_id(self): - """Call ``GET /projects?parent_id={project_id}``.""" - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Add another child to projects[1] - it will be projects[3] - new_ref = unit.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[1]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - - projects.append(resp.result) - - # Query for projects[0] immediate children - it will - # be only projects[1] - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[0]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [projects[1]['project']] - - # projects[0] has projects[1] as child - self.assertEqual(expected_list, projects_result) - - # Query for projects[1] immediate children - it will - # be projects[2] and projects[3] - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[1]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [projects[2]['project'], projects[3]['project']] - - # projects[1] has projects[2] and projects[3] as children - self.assertEqual(expected_list, projects_result) - - # Query for projects[2] immediate children - it will be an empty list - r = self.get( - '/projects?parent_id=%(project_id)s' % { - 'project_id': projects[2]['project']['id']}) - self.assertValidProjectListResponse(r) - - projects_result = r.result['projects'] - expected_list = [] - - # projects[2] has no child, projects_result must be an empty list - self.assertEqual(expected_list, projects_result) - - def test_create_hierarchical_project(self): - """Call ``POST /projects``.""" - self._create_projects_hierarchy() - - def test_get_project(self): - """Call ``GET /projects/{project_id}``.""" - r = self.get( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}) - self.assertValidProjectResponse(r, self.project) - - def test_get_project_with_parents_as_list_with_invalid_id(self): - """Call ``GET /projects/{project_id}?parents_as_list``.""" - self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': None}, expected_status=http_client.NOT_FOUND) - - self.get('/projects/%(project_id)s?parents_as_list' % { - 'project_id': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_get_project_with_subtree_as_list_with_invalid_id(self): - """Call ``GET /projects/{project_id}?subtree_as_list``.""" - self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': None}, expected_status=http_client.NOT_FOUND) - - self.get('/projects/%(project_id)s?subtree_as_list' % { - 'project_id': uuid.uuid4().hex}, - expected_status=http_client.NOT_FOUND) - - def test_get_project_with_parents_as_ids(self): - """Call ``GET /projects/{project_id}?parents_as_ids``.""" - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Query for projects[2] parents_as_ids - r = self.get( - '/projects/%(project_id)s?parents_as_ids' % { - 'project_id': projects[2]['project']['id']}) - - self.assertValidProjectResponse(r, projects[2]['project']) - parents_as_ids = r.result['project']['parents'] - - # Assert parents_as_ids is a structured dictionary correctly - # representing the hierarchy. The request was made using projects[2] - # id, hence its parents should be projects[1], projects[0] and the - # is_domain_project, which is the root of the hierarchy. It should - # have the following structure: - # { - # projects[1]: { - # projects[0]: { - # is_domain_project: None - # } - # } - # } - is_domain_project_id = projects[0]['project']['domain_id'] - expected_dict = { - projects[1]['project']['id']: { - projects[0]['project']['id']: {is_domain_project_id: None} - } - } - self.assertDictEqual(expected_dict, parents_as_ids) - - # Query for projects[0] parents_as_ids - r = self.get( - '/projects/%(project_id)s?parents_as_ids' % { - 'project_id': projects[0]['project']['id']}) - - self.assertValidProjectResponse(r, projects[0]['project']) - parents_as_ids = r.result['project']['parents'] - - # projects[0] has only the project that acts as a domain as parent - expected_dict = { - is_domain_project_id: None - } - self.assertDictEqual(expected_dict, parents_as_ids) - - # Query for is_domain_project parents_as_ids - r = self.get( - '/projects/%(project_id)s?parents_as_ids' % { - 'project_id': is_domain_project_id}) - - parents_as_ids = r.result['project']['parents'] - - # the project that acts as a domain has no parents, parents_as_ids - # must be None - self.assertIsNone(parents_as_ids) - - def test_get_project_with_parents_as_list_with_full_access(self): - """``GET /projects/{project_id}?parents_as_list`` with full access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on each one of those projects; - - Check that calling parents_as_list on 'subproject' returns both - 'project' and 'parent'. - - """ - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on all the created projects - for proj in (parent, project, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?parents_as_list' % - {'project_id': subproject['project']['id']}) - self.assertValidProjectResponse(r, subproject['project']) - - # Assert only 'project' and 'parent' are in the parents list - self.assertIn(project, r.result['project']['parents']) - self.assertIn(parent, r.result['project']['parents']) - self.assertEqual(2, len(r.result['project']['parents'])) - - def test_get_project_with_parents_as_list_with_partial_access(self): - """``GET /projects/{project_id}?parents_as_list`` with partial access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on 'parent' and 'subproject'; - - Check that calling parents_as_list on 'subproject' only returns - 'parent'. - - """ - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on parent and subproject - for proj in (parent, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?parents_as_list' % - {'project_id': subproject['project']['id']}) - self.assertValidProjectResponse(r, subproject['project']) - - # Assert only 'parent' is in the parents list - self.assertIn(parent, r.result['project']['parents']) - self.assertEqual(1, len(r.result['project']['parents'])) - - def test_get_project_with_parents_as_list_and_parents_as_ids(self): - """Attempt to list a project's parents as both a list and as IDs. - - This uses ``GET /projects/{project_id}?parents_as_list&parents_as_ids`` - which should fail with a Bad Request due to the conflicting query - strings. - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - self.get( - '/projects/%(project_id)s?parents_as_list&parents_as_ids' % { - 'project_id': projects[1]['project']['id']}, - expected_status=http_client.BAD_REQUEST) - - def test_list_project_is_domain_filter(self): - """Call ``GET /projects?is_domain=True/False``.""" - # Get the initial number of projects, both acting as a domain as well - # as regular. - r = self.get('/projects?is_domain=True', expected_status=200) - initial_number_is_domain_true = len(r.result['projects']) - r = self.get('/projects?is_domain=False', expected_status=200) - initial_number_is_domain_false = len(r.result['projects']) - - # Add some more projects acting as domains - new_is_domain_project = unit.new_project_ref(is_domain=True) - new_is_domain_project = self.resource_api.create_project( - new_is_domain_project['id'], new_is_domain_project) - new_is_domain_project2 = unit.new_project_ref(is_domain=True) - new_is_domain_project2 = self.resource_api.create_project( - new_is_domain_project2['id'], new_is_domain_project2) - number_is_domain_true = initial_number_is_domain_true + 2 - - r = self.get('/projects?is_domain=True', expected_status=200) - self.assertThat(r.result['projects'], - matchers.HasLength(number_is_domain_true)) - self.assertIn(new_is_domain_project['id'], - [p['id'] for p in r.result['projects']]) - self.assertIn(new_is_domain_project2['id'], - [p['id'] for p in r.result['projects']]) - - # Now add a regular project - new_regular_project = unit.new_project_ref(domain_id=self.domain_id) - new_regular_project = self.resource_api.create_project( - new_regular_project['id'], new_regular_project) - number_is_domain_false = initial_number_is_domain_false + 1 - - # Check we still have the same number of projects acting as domains - r = self.get('/projects?is_domain=True', expected_status=200) - self.assertThat(r.result['projects'], - matchers.HasLength(number_is_domain_true)) - - # Check the number of regular projects is correct - r = self.get('/projects?is_domain=False', expected_status=200) - self.assertThat(r.result['projects'], - matchers.HasLength(number_is_domain_false)) - self.assertIn(new_regular_project['id'], - [p['id'] for p in r.result['projects']]) - - def test_list_project_is_domain_filter_default(self): - """Default project list should not see projects acting as domains""" - # Get the initial count of regular projects - r = self.get('/projects?is_domain=False', expected_status=200) - number_is_domain_false = len(r.result['projects']) - - # Make sure we have at least one project acting as a domain - new_is_domain_project = unit.new_project_ref(is_domain=True) - new_is_domain_project = self.resource_api.create_project( - new_is_domain_project['id'], new_is_domain_project) - - r = self.get('/projects', expected_status=200) - self.assertThat(r.result['projects'], - matchers.HasLength(number_is_domain_false)) - self.assertNotIn(new_is_domain_project, r.result['projects']) - - def test_get_project_with_subtree_as_ids(self): - """Call ``GET /projects/{project_id}?subtree_as_ids``. - - This test creates a more complex hierarchy to test if the structured - dictionary returned by using the ``subtree_as_ids`` query param - correctly represents the hierarchy. - - The hierarchy contains 5 projects with the following structure:: - - +--A--+ - | | - +--B--+ C - | | - D E - - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - # Add another child to projects[0] - it will be projects[3] - new_ref = unit.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[0]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - projects.append(resp.result) - - # Add another child to projects[1] - it will be projects[4] - new_ref = unit.new_project_ref( - domain_id=self.domain_id, - parent_id=projects[1]['project']['id']) - resp = self.post('/projects', - body={'project': new_ref}) - self.assertValidProjectResponse(resp, new_ref) - projects.append(resp.result) - - # Query for projects[0] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[0]['project']['id']}) - self.assertValidProjectResponse(r, projects[0]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # The subtree hierarchy from projects[0] should have the following - # structure: - # { - # projects[1]: { - # projects[2]: None, - # projects[4]: None - # }, - # projects[3]: None - # } - expected_dict = { - projects[1]['project']['id']: { - projects[2]['project']['id']: None, - projects[4]['project']['id']: None - }, - projects[3]['project']['id']: None - } - self.assertDictEqual(expected_dict, subtree_as_ids) - - # Now query for projects[1] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[1]['project']['id']}) - self.assertValidProjectResponse(r, projects[1]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # The subtree hierarchy from projects[1] should have the following - # structure: - # { - # projects[2]: None, - # projects[4]: None - # } - expected_dict = { - projects[2]['project']['id']: None, - projects[4]['project']['id']: None - } - self.assertDictEqual(expected_dict, subtree_as_ids) - - # Now query for projects[3] subtree_as_ids - r = self.get( - '/projects/%(project_id)s?subtree_as_ids' % { - 'project_id': projects[3]['project']['id']}) - self.assertValidProjectResponse(r, projects[3]['project']) - subtree_as_ids = r.result['project']['subtree'] - - # projects[3] has no subtree, subtree_as_ids must be None - self.assertIsNone(subtree_as_ids) - - def test_get_project_with_subtree_as_list_with_full_access(self): - """``GET /projects/{project_id}?subtree_as_list`` with full access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on each one of those projects; - - Check that calling subtree_as_list on 'parent' returns both 'parent' - and 'subproject'. - - """ - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on all the created projects - for proj in (parent, project, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?subtree_as_list' % - {'project_id': parent['project']['id']}) - self.assertValidProjectResponse(r, parent['project']) - - # Assert only 'project' and 'subproject' are in the subtree - self.assertIn(project, r.result['project']['subtree']) - self.assertIn(subproject, r.result['project']['subtree']) - self.assertEqual(2, len(r.result['project']['subtree'])) - - def test_get_project_with_subtree_as_list_with_partial_access(self): - """``GET /projects/{project_id}?subtree_as_list`` with partial access. - - Test plan: - - - Create 'parent', 'project' and 'subproject' projects; - - Assign a user a role on 'parent' and 'subproject'; - - Check that calling subtree_as_list on 'parent' returns 'subproject'. - - """ - # Create the project hierarchy - parent, project, subproject = self._create_projects_hierarchy(2) - - # Assign a role for the user on parent and subproject - for proj in (parent, subproject): - self.put(self.build_role_assignment_link( - role_id=self.role_id, user_id=self.user_id, - project_id=proj['project']['id'])) - - # Make the API call - r = self.get('/projects/%(project_id)s?subtree_as_list' % - {'project_id': parent['project']['id']}) - self.assertValidProjectResponse(r, parent['project']) - - # Assert only 'subproject' is in the subtree - self.assertIn(subproject, r.result['project']['subtree']) - self.assertEqual(1, len(r.result['project']['subtree'])) - - def test_get_project_with_subtree_as_list_and_subtree_as_ids(self): - """Attempt to get a project subtree as both a list and as IDs. - - This uses ``GET /projects/{project_id}?subtree_as_list&subtree_as_ids`` - which should fail with a bad request due to the conflicting query - strings. - - """ - projects = self._create_projects_hierarchy(hierarchy_size=2) - - self.get( - '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % { - 'project_id': projects[1]['project']['id']}, - expected_status=http_client.BAD_REQUEST) - - def test_update_project(self): - """Call ``PATCH /projects/{project_id}``.""" - ref = unit.new_project_ref(domain_id=self.domain_id, - parent_id=self.project['parent_id']) - del ref['id'] - r = self.patch( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}, - body={'project': ref}) - self.assertValidProjectResponse(r, ref) - - def test_update_project_unsafe(self): - """Call ``POST /projects/{project_id} with unsafe names``.""" - unsafe_name = 'i am not / safe' - - self.config_fixture.config(group='resource', - project_name_url_safe='off') - ref = unit.new_project_ref(name=unsafe_name, - domain_id=self.domain_id, - parent_id=self.project['parent_id']) - del ref['id'] - self.patch( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}, - body={'project': ref}) - - unsafe_name = 'i am still not / safe' - for config_setting in ['new', 'strict']: - self.config_fixture.config(group='resource', - project_name_url_safe=config_setting) - ref = unit.new_project_ref(name=unsafe_name, - domain_id=self.domain_id, - parent_id=self.project['parent_id']) - del ref['id'] - self.patch( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}, - body={'project': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_update_project_unsafe_default(self): - """Check default for unsafe names for ``POST /projects``.""" - unsafe_name = 'i am not / safe' - - # By default, we should be able to create unsafe names - ref = unit.new_project_ref(name=unsafe_name, - domain_id=self.domain_id, - parent_id=self.project['parent_id']) - del ref['id'] - self.patch( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}, - body={'project': ref}) - - def test_update_project_domain_id(self): - """Call ``PATCH /projects/{project_id}`` with domain_id.""" - project = unit.new_project_ref(domain_id=self.domain['id']) - project = self.resource_api.create_project(project['id'], project) - project['domain_id'] = CONF.identity.default_domain_id - r = self.patch('/projects/%(project_id)s' % { - 'project_id': project['id']}, - body={'project': project}, - expected_status=exception.ValidationError.code) - self.config_fixture.config(domain_id_immutable=False) - project['domain_id'] = self.domain['id'] - r = self.patch('/projects/%(project_id)s' % { - 'project_id': project['id']}, - body={'project': project}) - self.assertValidProjectResponse(r, project) - - def test_update_project_parent_id(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - leaf_project = projects[1]['project'] - leaf_project['parent_id'] = None - self.patch( - '/projects/%(project_id)s' % { - 'project_id': leaf_project['id']}, - body={'project': leaf_project}, - expected_status=http_client.FORBIDDEN) - - def test_update_project_is_domain_not_allowed(self): - """Call ``PATCH /projects/{project_id}`` with is_domain. - - The is_domain flag is immutable. - """ - project = unit.new_project_ref(domain_id=self.domain['id']) - resp = self.post('/projects', - body={'project': project}) - self.assertFalse(resp.result['project']['is_domain']) - - project['parent_id'] = resp.result['project']['parent_id'] - project['is_domain'] = True - self.patch('/projects/%(project_id)s' % { - 'project_id': resp.result['project']['id']}, - body={'project': project}, - expected_status=http_client.BAD_REQUEST) - - def test_disable_leaf_project(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - leaf_project = projects[1]['project'] - leaf_project['enabled'] = False - r = self.patch( - '/projects/%(project_id)s' % { - 'project_id': leaf_project['id']}, - body={'project': leaf_project}) - self.assertEqual( - leaf_project['enabled'], r.result['project']['enabled']) - - def test_disable_not_leaf_project(self): - """Call ``PATCH /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - root_project = projects[0]['project'] - root_project['enabled'] = False - self.patch( - '/projects/%(project_id)s' % { - 'project_id': root_project['id']}, - body={'project': root_project}, - expected_status=http_client.FORBIDDEN) - - def test_delete_project(self): - """Call ``DELETE /projects/{project_id}`` - - As well as making sure the delete succeeds, we ensure - that any credentials that reference this projects are - also deleted, while other credentials are unaffected. - - """ - credential = unit.new_credential_ref(user_id=self.user['id'], - project_id=self.project_id) - self.credential_api.create_credential(credential['id'], credential) - - # First check the credential for this project is present - r = self.credential_api.get_credential(credential['id']) - self.assertDictEqual(credential, r) - # Create a second credential with a different project - project2 = unit.new_project_ref(domain_id=self.domain['id']) - self.resource_api.create_project(project2['id'], project2) - credential2 = unit.new_credential_ref(user_id=self.user['id'], - project_id=project2['id']) - self.credential_api.create_credential(credential2['id'], credential2) - - # Now delete the project - self.delete( - '/projects/%(project_id)s' % { - 'project_id': self.project_id}) - - # Deleting the project should have deleted any credentials - # that reference this project - self.assertRaises(exception.CredentialNotFound, - self.credential_api.get_credential, - credential_id=credential['id']) - # But the credential for project2 is unaffected - r = self.credential_api.get_credential(credential2['id']) - self.assertDictEqual(credential2, r) - - def test_delete_not_leaf_project(self): - """Call ``DELETE /projects/{project_id}``.""" - projects = self._create_projects_hierarchy() - self.delete( - '/projects/%(project_id)s' % { - 'project_id': projects[0]['project']['id']}, - expected_status=http_client.FORBIDDEN) - - -class ResourceV3toV2MethodsTestCase(unit.TestCase): - """Test domain V3 to V2 conversion methods.""" - - def _setup_initial_projects(self): - self.project_id = uuid.uuid4().hex - self.domain_id = CONF.identity.default_domain_id - self.parent_id = uuid.uuid4().hex - # Project with only domain_id in ref - self.project1 = unit.new_project_ref(id=self.project_id, - name=self.project_id, - domain_id=self.domain_id) - # Project with both domain_id and parent_id in ref - self.project2 = unit.new_project_ref(id=self.project_id, - name=self.project_id, - domain_id=self.domain_id, - parent_id=self.parent_id) - # Project with no domain_id and parent_id in ref - self.project3 = unit.new_project_ref(id=self.project_id, - name=self.project_id, - domain_id=self.domain_id, - parent_id=self.parent_id) - # Expected result with no domain_id and parent_id - self.expected_project = {'id': self.project_id, - 'name': self.project_id} - - def test_v2controller_filter_domain_id(self): - # V2.0 is not domain aware, ensure domain_id is popped off the ref. - other_data = uuid.uuid4().hex - domain_id = CONF.identity.default_domain_id - ref = {'domain_id': domain_id, - 'other_data': other_data} - - ref_no_domain = {'other_data': other_data} - expected_ref = ref_no_domain.copy() - - updated_ref = controller.V2Controller.filter_domain_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(expected_ref, ref) - # Make sure we don't error/muck up data if domain_id isn't present - updated_ref = controller.V2Controller.filter_domain_id(ref_no_domain) - self.assertIs(ref_no_domain, updated_ref) - self.assertDictEqual(expected_ref, ref_no_domain) - - def test_v3controller_filter_domain_id(self): - # No data should be filtered out in this case. - other_data = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - ref = {'domain_id': domain_id, - 'other_data': other_data} - - expected_ref = ref.copy() - updated_ref = controller.V3Controller.filter_domain_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(expected_ref, ref) - - def test_v2controller_filter_domain(self): - other_data = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - non_default_domain_ref = {'domain': {'id': domain_id}, - 'other_data': other_data} - default_domain_ref = {'domain': {'id': 'default'}, - 'other_data': other_data} - updated_ref = controller.V2Controller.filter_domain(default_domain_ref) - self.assertNotIn('domain', updated_ref) - self.assertNotIn( - 'domain', - controller.V2Controller.filter_domain(non_default_domain_ref)) - - def test_v2controller_filter_project_parent_id(self): - # V2.0 is not project hierarchy aware, ensure parent_id is popped off. - other_data = uuid.uuid4().hex - parent_id = uuid.uuid4().hex - ref = {'parent_id': parent_id, - 'other_data': other_data} - - ref_no_parent = {'other_data': other_data} - expected_ref = ref_no_parent.copy() - - updated_ref = controller.V2Controller.filter_project_parent_id(ref) - self.assertIs(ref, updated_ref) - self.assertDictEqual(expected_ref, ref) - # Make sure we don't error/muck up data if parent_id isn't present - updated_ref = controller.V2Controller.filter_project_parent_id( - ref_no_parent) - self.assertIs(ref_no_parent, updated_ref) - self.assertDictEqual(expected_ref, ref_no_parent) - - def test_v3_to_v2_project_method(self): - self._setup_initial_projects() - - # TODO(shaleh): these optional fields are not handled well by the - # v3_to_v2 code. Manually remove them for now. Eventually update - # new_project_ref to not return optional values - del self.project1['enabled'] - del self.project1['description'] - del self.project2['enabled'] - del self.project2['description'] - del self.project3['enabled'] - del self.project3['description'] - - updated_project1 = controller.V2Controller.v3_to_v2_project( - self.project1) - self.assertIs(self.project1, updated_project1) - self.assertDictEqual(self.expected_project, self.project1) - updated_project2 = controller.V2Controller.v3_to_v2_project( - self.project2) - self.assertIs(self.project2, updated_project2) - self.assertDictEqual(self.expected_project, self.project2) - updated_project3 = controller.V2Controller.v3_to_v2_project( - self.project3) - self.assertIs(self.project3, updated_project3) - self.assertDictEqual(self.expected_project, self.project2) - - def test_v3_to_v2_project_method_list(self): - self._setup_initial_projects() - project_list = [self.project1, self.project2, self.project3] - - # TODO(shaleh): these optional fields are not handled well by the - # v3_to_v2 code. Manually remove them for now. Eventually update - # new_project_ref to not return optional values - for p in project_list: - del p['enabled'] - del p['description'] - updated_list = controller.V2Controller.v3_to_v2_project(project_list) - - self.assertEqual(len(updated_list), len(project_list)) - - for i, ref in enumerate(updated_list): - # Order should not change. - self.assertIs(ref, project_list[i]) - - self.assertDictEqual(self.expected_project, self.project1) - self.assertDictEqual(self.expected_project, self.project2) - self.assertDictEqual(self.expected_project, self.project3) |