diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_resource.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_resource.py | 1434 |
1 files changed, 1434 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_resource.py b/keystone-moon/keystone/tests/unit/test_v3_resource.py new file mode 100644 index 00000000..f54fcb57 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v3_resource.py @@ -0,0 +1,1434 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo_config import cfg +from six.moves import http_client +from six.moves import range +from testtools import matchers + +from keystone.common import controller +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit import test_v3 +from keystone.tests.unit import utils as test_utils + + +CONF = cfg.CONF + + +class ResourceTestCase(test_v3.RestfulTestCase, + test_v3.AssignmentTestMixin): + """Test domains and projects.""" + + # Domain CRUD tests + + def test_create_domain(self): + """Call ``POST /domains``.""" + ref = unit.new_domain_ref() + r = self.post( + '/domains', + body={'domain': ref}) + return self.assertValidDomainResponse(r, ref) + + def test_create_domain_case_sensitivity(self): + """Call `POST /domains`` twice with upper() and lower() cased name.""" + ref = unit.new_domain_ref() + + # ensure the name is lowercase + ref['name'] = ref['name'].lower() + r = self.post( + '/domains', + body={'domain': ref}) + self.assertValidDomainResponse(r, ref) + + # ensure the name is uppercase + ref['name'] = ref['name'].upper() + r = self.post( + '/domains', + body={'domain': ref}) + self.assertValidDomainResponse(r, ref) + + def test_create_domain_bad_request(self): + """Call ``POST /domains``.""" + self.post('/domains', body={'domain': {}}, + expected_status=http_client.BAD_REQUEST) + + def test_create_domain_unsafe(self): + """Call ``POST /domains with unsafe names``.""" + unsafe_name = 'i am not / safe' + + self.config_fixture.config(group='resource', + domain_name_url_safe='off') + ref = unit.new_domain_ref(name=unsafe_name) + self.post( + '/domains', + body={'domain': ref}) + + for config_setting in ['new', 'strict']: + self.config_fixture.config(group='resource', + domain_name_url_safe=config_setting) + ref = unit.new_domain_ref(name=unsafe_name) + self.post( + '/domains', + body={'domain': ref}, + expected_status=http_client.BAD_REQUEST) + + def test_create_domain_unsafe_default(self): + """Check default for unsafe names for ``POST /domains``.""" + unsafe_name = 'i am not / safe' + + # By default, we should be able to create unsafe names + ref = unit.new_domain_ref(name=unsafe_name) + self.post( + '/domains', + body={'domain': ref}) + + def test_create_domain_creates_is_domain_project(self): + """Check a project that acts as a domain is created. + + Call ``POST /domains``. + """ + # Create a new domain + domain_ref = unit.new_domain_ref() + r = self.post('/domains', body={'domain': domain_ref}) + self.assertValidDomainResponse(r, domain_ref) + + # Retrieve its correspondent project + r = self.get('/projects/%(project_id)s' % { + 'project_id': r.result['domain']['id']}) + self.assertValidProjectResponse(r) + + # The created project has is_domain flag as True + self.assertTrue(r.result['project']['is_domain']) + + # And its parent_id and domain_id attributes are equal + self.assertIsNone(r.result['project']['parent_id']) + self.assertIsNone(r.result['project']['domain_id']) + + def test_create_is_domain_project_creates_domain(self): + """Call ``POST /projects`` is_domain and check a domain is created.""" + # Create a new project that acts as a domain + project_ref = unit.new_project_ref(domain_id=None, is_domain=True) + r = self.post('/projects', body={'project': project_ref}) + self.assertValidProjectResponse(r) + + # Retrieve its correspondent domain + r = self.get('/domains/%(domain_id)s' % { + 'domain_id': r.result['project']['id']}) + self.assertValidDomainResponse(r) + self.assertIsNotNone(r.result['domain']) + + def test_list_domains(self): + """Call ``GET /domains``.""" + resource_url = '/domains' + r = self.get(resource_url) + self.assertValidDomainListResponse(r, ref=self.domain, + resource_url=resource_url) + + def test_get_domain(self): + """Call ``GET /domains/{domain_id}``.""" + r = self.get('/domains/%(domain_id)s' % { + 'domain_id': self.domain_id}) + self.assertValidDomainResponse(r, self.domain) + + def test_update_domain(self): + """Call ``PATCH /domains/{domain_id}``.""" + ref = unit.new_domain_ref() + del ref['id'] + r = self.patch('/domains/%(domain_id)s' % { + 'domain_id': self.domain_id}, + body={'domain': ref}) + self.assertValidDomainResponse(r, ref) + + def test_update_domain_unsafe(self): + """Call ``POST /domains/{domain_id} with unsafe names``.""" + unsafe_name = 'i am not / safe' + + self.config_fixture.config(group='resource', + domain_name_url_safe='off') + ref = unit.new_domain_ref(name=unsafe_name) + del ref['id'] + self.patch('/domains/%(domain_id)s' % { + 'domain_id': self.domain_id}, + body={'domain': ref}) + + unsafe_name = 'i am still not / safe' + for config_setting in ['new', 'strict']: + self.config_fixture.config(group='resource', + domain_name_url_safe=config_setting) + ref = unit.new_domain_ref(name=unsafe_name) + del ref['id'] + self.patch('/domains/%(domain_id)s' % { + 'domain_id': self.domain_id}, + body={'domain': ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_domain_unsafe_default(self): + """Check default for unsafe names for ``POST /domains``.""" + unsafe_name = 'i am not / safe' + + # By default, we should be able to create unsafe names + ref = unit.new_domain_ref(name=unsafe_name) + del ref['id'] + self.patch('/domains/%(domain_id)s' % { + 'domain_id': self.domain_id}, + body={'domain': ref}) + + def test_update_domain_updates_is_domain_project(self): + """Check the project that acts as a domain is updated. + + Call ``PATCH /domains``. + """ + # Create a new domain + domain_ref = unit.new_domain_ref() + r = self.post('/domains', body={'domain': domain_ref}) + self.assertValidDomainResponse(r, domain_ref) + + # Disable it + self.patch('/domains/%s' % r.result['domain']['id'], + body={'domain': {'enabled': False}}) + + # Retrieve its correspondent project + r = self.get('/projects/%(project_id)s' % { + 'project_id': r.result['domain']['id']}) + self.assertValidProjectResponse(r) + + # The created project is disabled as well + self.assertFalse(r.result['project']['enabled']) + + def test_disable_domain(self): + """Call ``PATCH /domains/{domain_id}`` (set enabled=False).""" + # Create a 2nd set of entities in a 2nd domain + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + + project2 = unit.new_project_ref(domain_id=domain2['id']) + self.resource_api.create_project(project2['id'], project2) + + user2 = unit.create_user(self.identity_api, + domain_id=domain2['id'], + project_id=project2['id']) + + self.assignment_api.add_user_to_project(project2['id'], + user2['id']) + + # First check a user in that domain can authenticate.. + body = { + 'auth': { + 'passwordCredentials': { + 'userId': user2['id'], + 'password': user2['password'] + }, + 'tenantId': project2['id'] + } + } + self.admin_request( + path='/v2.0/tokens', method='POST', body=body) + + auth_data = self.build_authentication_request( + user_id=user2['id'], + password=user2['password'], + project_id=project2['id']) + self.v3_create_token(auth_data) + + # Now disable the domain + domain2['enabled'] = False + r = self.patch('/domains/%(domain_id)s' % { + 'domain_id': domain2['id']}, + body={'domain': {'enabled': False}}) + self.assertValidDomainResponse(r, domain2) + + # Make sure the user can no longer authenticate, via + # either API + body = { + 'auth': { + 'passwordCredentials': { + 'userId': user2['id'], + 'password': user2['password'] + }, + 'tenantId': project2['id'] + } + } + self.admin_request( + path='/v2.0/tokens', method='POST', body=body, + expected_status=http_client.UNAUTHORIZED) + + # Try looking up in v3 by name and id + auth_data = self.build_authentication_request( + user_id=user2['id'], + password=user2['password'], + project_id=project2['id']) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + auth_data = self.build_authentication_request( + username=user2['name'], + user_domain_id=domain2['id'], + password=user2['password'], + project_id=project2['id']) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_delete_enabled_domain_fails(self): + """Call ``DELETE /domains/{domain_id}`` (when domain enabled).""" + # Try deleting an enabled domain, which should fail + self.delete('/domains/%(domain_id)s' % { + 'domain_id': self.domain['id']}, + expected_status=exception.ForbiddenAction.code) + + def test_delete_domain(self): + """Call ``DELETE /domains/{domain_id}``. + + The sample data set up already has a user and project that is part of + self.domain. Additionally we will create a group and a credential + within it. Since the user we will authenticate with is in this domain, + we create a another set of entities in a second domain. Deleting this + second domain should delete all these new entities. In addition, + all the entities in the regular self.domain should be unaffected + by the delete. + + Test Plan: + + - Create domain2 and a 2nd set of entities + - Disable domain2 + - Delete domain2 + - Check entities in domain2 have been deleted + - Check entities in self.domain are unaffected + + """ + # Create a group and a credential in the main domain + group = unit.new_group_ref(domain_id=self.domain_id) + group = self.identity_api.create_group(group) + + credential = unit.new_credential_ref(user_id=self.user['id'], + project_id=self.project_id) + self.credential_api.create_credential(credential['id'], credential) + + # Create a 2nd set of entities in a 2nd domain + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + + project2 = unit.new_project_ref(domain_id=domain2['id']) + project2 = self.resource_api.create_project(project2['id'], project2) + + user2 = unit.new_user_ref(domain_id=domain2['id'], + project_id=project2['id']) + user2 = self.identity_api.create_user(user2) + + group2 = unit.new_group_ref(domain_id=domain2['id']) + group2 = self.identity_api.create_group(group2) + + credential2 = unit.new_credential_ref(user_id=user2['id'], + project_id=project2['id']) + self.credential_api.create_credential(credential2['id'], + credential2) + + # Now disable the new domain and delete it + domain2['enabled'] = False + r = self.patch('/domains/%(domain_id)s' % { + 'domain_id': domain2['id']}, + body={'domain': {'enabled': False}}) + self.assertValidDomainResponse(r, domain2) + self.delete('/domains/%(domain_id)s' % {'domain_id': domain2['id']}) + + # Check all the domain2 relevant entities are gone + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + domain2['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project2['id']) + self.assertRaises(exception.GroupNotFound, + self.identity_api.get_group, + group2['id']) + self.assertRaises(exception.UserNotFound, + self.identity_api.get_user, + user2['id']) + self.assertRaises(exception.CredentialNotFound, + self.credential_api.get_credential, + credential2['id']) + + # ...and that all self.domain entities are still here + r = self.resource_api.get_domain(self.domain['id']) + self.assertDictEqual(self.domain, r) + r = self.resource_api.get_project(self.project['id']) + self.assertDictEqual(self.project, r) + r = self.identity_api.get_group(group['id']) + self.assertDictEqual(group, r) + r = self.identity_api.get_user(self.user['id']) + self.user.pop('password') + self.assertDictEqual(self.user, r) + r = self.credential_api.get_credential(credential['id']) + self.assertDictEqual(credential, r) + + def test_delete_domain_deletes_is_domain_project(self): + """Check the project that acts as a domain is deleted. + + Call ``DELETE /domains``. + """ + # Create a new domain + domain_ref = unit.new_domain_ref() + r = self.post('/domains', body={'domain': domain_ref}) + self.assertValidDomainResponse(r, domain_ref) + + # Retrieve its correspondent project + self.get('/projects/%(project_id)s' % { + 'project_id': r.result['domain']['id']}) + + # Delete the domain + self.patch('/domains/%s' % r.result['domain']['id'], + body={'domain': {'enabled': False}}) + self.delete('/domains/%s' % r.result['domain']['id']) + + # The created project is deleted as well + self.get('/projects/%(project_id)s' % { + 'project_id': r.result['domain']['id']}, expected_status=404) + + def test_delete_default_domain(self): + # Need to disable it first. + self.patch('/domains/%(domain_id)s' % { + 'domain_id': CONF.identity.default_domain_id}, + body={'domain': {'enabled': False}}) + + self.delete( + '/domains/%(domain_id)s' % { + 'domain_id': CONF.identity.default_domain_id}) + + def test_token_revoked_once_domain_disabled(self): + """Test token from a disabled domain has been invalidated. + + Test that a token that was valid for an enabled domain + becomes invalid once that domain is disabled. + + """ + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + + user2 = unit.create_user(self.identity_api, + domain_id=domain['id']) + + # build a request body + auth_body = self.build_authentication_request( + user_id=user2['id'], + password=user2['password']) + + # sends a request for the user's token + token_resp = self.post('/auth/tokens', body=auth_body) + + subject_token = token_resp.headers.get('x-subject-token') + + # validates the returned token and it should be valid. + self.head('/auth/tokens', + headers={'x-subject-token': subject_token}, + expected_status=http_client.OK) + + # now disable the domain + domain['enabled'] = False + url = "/domains/%(domain_id)s" % {'domain_id': domain['id']} + self.patch(url, + body={'domain': {'enabled': False}}) + + # validates the same token again and it should be 'not found' + # as the domain has already been disabled. + self.head('/auth/tokens', + headers={'x-subject-token': subject_token}, + expected_status=http_client.NOT_FOUND) + + def test_delete_domain_hierarchy(self): + """Call ``DELETE /domains/{domain_id}``.""" + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + + root_project = unit.new_project_ref(domain_id=domain['id']) + root_project = self.resource_api.create_project(root_project['id'], + root_project) + + leaf_project = unit.new_project_ref( + domain_id=domain['id'], + parent_id=root_project['id']) + self.resource_api.create_project(leaf_project['id'], leaf_project) + + # Need to disable it first. + self.patch('/domains/%(domain_id)s' % { + 'domain_id': domain['id']}, + body={'domain': {'enabled': False}}) + + self.delete( + '/domains/%(domain_id)s' % { + 'domain_id': domain['id']}) + + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + domain['id']) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + root_project['id']) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + leaf_project['id']) + + def test_forbid_operations_on_federated_domain(self): + """Make sure one cannot operate on federated domain. + + This includes operations like create, update, delete + on domain identified by id and name where difference variations of + id 'Federated' are used. + + """ + def create_domains(): + for variation in ('Federated', 'FEDERATED', + 'federated', 'fEderated'): + domain = unit.new_domain_ref() + domain['id'] = variation + yield domain + + for domain in create_domains(): + self.assertRaises( + AssertionError, self.resource_api.create_domain, + domain['id'], domain) + self.assertRaises( + AssertionError, self.resource_api.update_domain, + domain['id'], domain) + self.assertRaises( + exception.DomainNotFound, self.resource_api.delete_domain, + domain['id']) + + # swap 'name' with 'id' and try again, expecting the request to + # gracefully fail + domain['id'], domain['name'] = domain['name'], domain['id'] + self.assertRaises( + AssertionError, self.resource_api.create_domain, + domain['id'], domain) + self.assertRaises( + AssertionError, self.resource_api.update_domain, + domain['id'], domain) + self.assertRaises( + exception.DomainNotFound, self.resource_api.delete_domain, + domain['id']) + + def test_forbid_operations_on_defined_federated_domain(self): + """Make sure one cannot operate on a user-defined federated domain. + + This includes operations like create, update, delete. + + """ + non_default_name = 'beta_federated_domain' + self.config_fixture.config(group='federation', + federated_domain_name=non_default_name) + domain = unit.new_domain_ref(name=non_default_name) + self.assertRaises(AssertionError, + self.resource_api.create_domain, + domain['id'], domain) + self.assertRaises(exception.DomainNotFound, + self.resource_api.delete_domain, + domain['id']) + self.assertRaises(AssertionError, + self.resource_api.update_domain, + domain['id'], domain) + + # Project CRUD tests + + def test_list_projects(self): + """Call ``GET /projects``.""" + resource_url = '/projects' + r = self.get(resource_url) + self.assertValidProjectListResponse(r, ref=self.project, + resource_url=resource_url) + + def test_create_project(self): + """Call ``POST /projects``.""" + ref = unit.new_project_ref(domain_id=self.domain_id) + r = self.post( + '/projects', + body={'project': ref}) + self.assertValidProjectResponse(r, ref) + + def test_create_project_bad_request(self): + """Call ``POST /projects``.""" + self.post('/projects', body={'project': {}}, + expected_status=http_client.BAD_REQUEST) + + def test_create_project_invalid_domain_id(self): + """Call ``POST /projects``.""" + ref = unit.new_project_ref(domain_id=uuid.uuid4().hex) + self.post('/projects', body={'project': ref}, + expected_status=http_client.BAD_REQUEST) + + def test_create_project_unsafe(self): + """Call ``POST /projects with unsafe names``.""" + unsafe_name = 'i am not / safe' + + self.config_fixture.config(group='resource', + project_name_url_safe='off') + ref = unit.new_project_ref(name=unsafe_name) + self.post( + '/projects', + body={'project': ref}) + + for config_setting in ['new', 'strict']: + self.config_fixture.config(group='resource', + project_name_url_safe=config_setting) + ref = unit.new_project_ref(name=unsafe_name) + self.post( + '/projects', + body={'project': ref}, + expected_status=http_client.BAD_REQUEST) + + def test_create_project_unsafe_default(self): + """Check default for unsafe names for ``POST /projects``.""" + unsafe_name = 'i am not / safe' + + # By default, we should be able to create unsafe names + ref = unit.new_project_ref(name=unsafe_name) + self.post( + '/projects', + body={'project': ref}) + + def test_create_project_with_parent_id_none_and_domain_id_none(self): + """Call ``POST /projects``.""" + # Grant a domain role for the user + collection_url = ( + '/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': self.domain_id, + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + self.put(member_url) + + # Create an authentication request for a domain scoped token + auth = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_id=self.domain_id) + + # Without parent_id and domain_id passed as None, the domain_id should + # be normalized to the domain on the token, when using a domain + # scoped token. + ref = unit.new_project_ref() + r = self.post( + '/projects', + auth=auth, + body={'project': ref}) + ref['domain_id'] = self.domain['id'] + self.assertValidProjectResponse(r, ref) + + def test_create_project_without_parent_id_and_without_domain_id(self): + """Call ``POST /projects``.""" + # Grant a domain role for the user + collection_url = ( + '/domains/%(domain_id)s/users/%(user_id)s/roles' % { + 'domain_id': self.domain_id, + 'user_id': self.user['id']}) + member_url = '%(collection_url)s/%(role_id)s' % { + 'collection_url': collection_url, + 'role_id': self.role_id} + self.put(member_url) + + # Create an authentication request for a domain scoped token + auth = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_id=self.domain_id) + + # Without domain_id and parent_id, the domain_id should be + # normalized to the domain on the token, when using a domain + # scoped token. + ref = unit.new_project_ref() + r = self.post( + '/projects', + auth=auth, + body={'project': ref}) + ref['domain_id'] = self.domain['id'] + self.assertValidProjectResponse(r, ref) + + @test_utils.wip('waiting for support for parent_id to imply domain_id') + def test_create_project_with_parent_id_and_no_domain_id(self): + """Call ``POST /projects``.""" + # With only the parent_id, the domain_id should be + # normalized to the parent's domain_id + ref_child = unit.new_project_ref(parent_id=self.project['id']) + + r = self.post( + '/projects', + body={'project': ref_child}) + self.assertEqual(r.result['project']['domain_id'], + self.project['domain_id']) + ref_child['domain_id'] = self.domain['id'] + self.assertValidProjectResponse(r, ref_child) + + def _create_projects_hierarchy(self, hierarchy_size=1): + """Creates a single-branched project hierarchy with the specified size. + + :param hierarchy_size: the desired hierarchy size, default is 1 - + a project with one child. + + :returns projects: a list of the projects in the created hierarchy. + + """ + new_ref = unit.new_project_ref(domain_id=self.domain_id) + resp = self.post('/projects', body={'project': new_ref}) + + projects = [resp.result] + + for i in range(hierarchy_size): + new_ref = unit.new_project_ref( + domain_id=self.domain_id, + parent_id=projects[i]['project']['id']) + resp = self.post('/projects', + body={'project': new_ref}) + self.assertValidProjectResponse(resp, new_ref) + + projects.append(resp.result) + + return projects + + def test_list_projects_filtering_by_parent_id(self): + """Call ``GET /projects?parent_id={project_id}``.""" + projects = self._create_projects_hierarchy(hierarchy_size=2) + + # Add another child to projects[1] - it will be projects[3] + new_ref = unit.new_project_ref( + domain_id=self.domain_id, + parent_id=projects[1]['project']['id']) + resp = self.post('/projects', + body={'project': new_ref}) + self.assertValidProjectResponse(resp, new_ref) + + projects.append(resp.result) + + # Query for projects[0] immediate children - it will + # be only projects[1] + r = self.get( + '/projects?parent_id=%(project_id)s' % { + 'project_id': projects[0]['project']['id']}) + self.assertValidProjectListResponse(r) + + projects_result = r.result['projects'] + expected_list = [projects[1]['project']] + + # projects[0] has projects[1] as child + self.assertEqual(expected_list, projects_result) + + # Query for projects[1] immediate children - it will + # be projects[2] and projects[3] + r = self.get( + '/projects?parent_id=%(project_id)s' % { + 'project_id': projects[1]['project']['id']}) + self.assertValidProjectListResponse(r) + + projects_result = r.result['projects'] + expected_list = [projects[2]['project'], projects[3]['project']] + + # projects[1] has projects[2] and projects[3] as children + self.assertEqual(expected_list, projects_result) + + # Query for projects[2] immediate children - it will be an empty list + r = self.get( + '/projects?parent_id=%(project_id)s' % { + 'project_id': projects[2]['project']['id']}) + self.assertValidProjectListResponse(r) + + projects_result = r.result['projects'] + expected_list = [] + + # projects[2] has no child, projects_result must be an empty list + self.assertEqual(expected_list, projects_result) + + def test_create_hierarchical_project(self): + """Call ``POST /projects``.""" + self._create_projects_hierarchy() + + def test_get_project(self): + """Call ``GET /projects/{project_id}``.""" + r = self.get( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}) + self.assertValidProjectResponse(r, self.project) + + def test_get_project_with_parents_as_list_with_invalid_id(self): + """Call ``GET /projects/{project_id}?parents_as_list``.""" + self.get('/projects/%(project_id)s?parents_as_list' % { + 'project_id': None}, expected_status=http_client.NOT_FOUND) + + self.get('/projects/%(project_id)s?parents_as_list' % { + 'project_id': uuid.uuid4().hex}, + expected_status=http_client.NOT_FOUND) + + def test_get_project_with_subtree_as_list_with_invalid_id(self): + """Call ``GET /projects/{project_id}?subtree_as_list``.""" + self.get('/projects/%(project_id)s?subtree_as_list' % { + 'project_id': None}, expected_status=http_client.NOT_FOUND) + + self.get('/projects/%(project_id)s?subtree_as_list' % { + 'project_id': uuid.uuid4().hex}, + expected_status=http_client.NOT_FOUND) + + def test_get_project_with_parents_as_ids(self): + """Call ``GET /projects/{project_id}?parents_as_ids``.""" + projects = self._create_projects_hierarchy(hierarchy_size=2) + + # Query for projects[2] parents_as_ids + r = self.get( + '/projects/%(project_id)s?parents_as_ids' % { + 'project_id': projects[2]['project']['id']}) + + self.assertValidProjectResponse(r, projects[2]['project']) + parents_as_ids = r.result['project']['parents'] + + # Assert parents_as_ids is a structured dictionary correctly + # representing the hierarchy. The request was made using projects[2] + # id, hence its parents should be projects[1], projects[0] and the + # is_domain_project, which is the root of the hierarchy. It should + # have the following structure: + # { + # projects[1]: { + # projects[0]: { + # is_domain_project: None + # } + # } + # } + is_domain_project_id = projects[0]['project']['domain_id'] + expected_dict = { + projects[1]['project']['id']: { + projects[0]['project']['id']: {is_domain_project_id: None} + } + } + self.assertDictEqual(expected_dict, parents_as_ids) + + # Query for projects[0] parents_as_ids + r = self.get( + '/projects/%(project_id)s?parents_as_ids' % { + 'project_id': projects[0]['project']['id']}) + + self.assertValidProjectResponse(r, projects[0]['project']) + parents_as_ids = r.result['project']['parents'] + + # projects[0] has only the project that acts as a domain as parent + expected_dict = { + is_domain_project_id: None + } + self.assertDictEqual(expected_dict, parents_as_ids) + + # Query for is_domain_project parents_as_ids + r = self.get( + '/projects/%(project_id)s?parents_as_ids' % { + 'project_id': is_domain_project_id}) + + parents_as_ids = r.result['project']['parents'] + + # the project that acts as a domain has no parents, parents_as_ids + # must be None + self.assertIsNone(parents_as_ids) + + def test_get_project_with_parents_as_list_with_full_access(self): + """``GET /projects/{project_id}?parents_as_list`` with full access. + + Test plan: + + - Create 'parent', 'project' and 'subproject' projects; + - Assign a user a role on each one of those projects; + - Check that calling parents_as_list on 'subproject' returns both + 'project' and 'parent'. + + """ + # Create the project hierarchy + parent, project, subproject = self._create_projects_hierarchy(2) + + # Assign a role for the user on all the created projects + for proj in (parent, project, subproject): + self.put(self.build_role_assignment_link( + role_id=self.role_id, user_id=self.user_id, + project_id=proj['project']['id'])) + + # Make the API call + r = self.get('/projects/%(project_id)s?parents_as_list' % + {'project_id': subproject['project']['id']}) + self.assertValidProjectResponse(r, subproject['project']) + + # Assert only 'project' and 'parent' are in the parents list + self.assertIn(project, r.result['project']['parents']) + self.assertIn(parent, r.result['project']['parents']) + self.assertEqual(2, len(r.result['project']['parents'])) + + def test_get_project_with_parents_as_list_with_partial_access(self): + """``GET /projects/{project_id}?parents_as_list`` with partial access. + + Test plan: + + - Create 'parent', 'project' and 'subproject' projects; + - Assign a user a role on 'parent' and 'subproject'; + - Check that calling parents_as_list on 'subproject' only returns + 'parent'. + + """ + # Create the project hierarchy + parent, project, subproject = self._create_projects_hierarchy(2) + + # Assign a role for the user on parent and subproject + for proj in (parent, subproject): + self.put(self.build_role_assignment_link( + role_id=self.role_id, user_id=self.user_id, + project_id=proj['project']['id'])) + + # Make the API call + r = self.get('/projects/%(project_id)s?parents_as_list' % + {'project_id': subproject['project']['id']}) + self.assertValidProjectResponse(r, subproject['project']) + + # Assert only 'parent' is in the parents list + self.assertIn(parent, r.result['project']['parents']) + self.assertEqual(1, len(r.result['project']['parents'])) + + def test_get_project_with_parents_as_list_and_parents_as_ids(self): + """Attempt to list a project's parents as both a list and as IDs. + + This uses ``GET /projects/{project_id}?parents_as_list&parents_as_ids`` + which should fail with a Bad Request due to the conflicting query + strings. + + """ + projects = self._create_projects_hierarchy(hierarchy_size=2) + + self.get( + '/projects/%(project_id)s?parents_as_list&parents_as_ids' % { + 'project_id': projects[1]['project']['id']}, + expected_status=http_client.BAD_REQUEST) + + def test_list_project_is_domain_filter(self): + """Call ``GET /projects?is_domain=True/False``.""" + # Get the initial number of projects, both acting as a domain as well + # as regular. + r = self.get('/projects?is_domain=True', expected_status=200) + initial_number_is_domain_true = len(r.result['projects']) + r = self.get('/projects?is_domain=False', expected_status=200) + initial_number_is_domain_false = len(r.result['projects']) + + # Add some more projects acting as domains + new_is_domain_project = unit.new_project_ref(is_domain=True) + new_is_domain_project = self.resource_api.create_project( + new_is_domain_project['id'], new_is_domain_project) + new_is_domain_project2 = unit.new_project_ref(is_domain=True) + new_is_domain_project2 = self.resource_api.create_project( + new_is_domain_project2['id'], new_is_domain_project2) + number_is_domain_true = initial_number_is_domain_true + 2 + + r = self.get('/projects?is_domain=True', expected_status=200) + self.assertThat(r.result['projects'], + matchers.HasLength(number_is_domain_true)) + self.assertIn(new_is_domain_project['id'], + [p['id'] for p in r.result['projects']]) + self.assertIn(new_is_domain_project2['id'], + [p['id'] for p in r.result['projects']]) + + # Now add a regular project + new_regular_project = unit.new_project_ref(domain_id=self.domain_id) + new_regular_project = self.resource_api.create_project( + new_regular_project['id'], new_regular_project) + number_is_domain_false = initial_number_is_domain_false + 1 + + # Check we still have the same number of projects acting as domains + r = self.get('/projects?is_domain=True', expected_status=200) + self.assertThat(r.result['projects'], + matchers.HasLength(number_is_domain_true)) + + # Check the number of regular projects is correct + r = self.get('/projects?is_domain=False', expected_status=200) + self.assertThat(r.result['projects'], + matchers.HasLength(number_is_domain_false)) + self.assertIn(new_regular_project['id'], + [p['id'] for p in r.result['projects']]) + + def test_list_project_is_domain_filter_default(self): + """Default project list should not see projects acting as domains""" + # Get the initial count of regular projects + r = self.get('/projects?is_domain=False', expected_status=200) + number_is_domain_false = len(r.result['projects']) + + # Make sure we have at least one project acting as a domain + new_is_domain_project = unit.new_project_ref(is_domain=True) + new_is_domain_project = self.resource_api.create_project( + new_is_domain_project['id'], new_is_domain_project) + + r = self.get('/projects', expected_status=200) + self.assertThat(r.result['projects'], + matchers.HasLength(number_is_domain_false)) + self.assertNotIn(new_is_domain_project, r.result['projects']) + + def test_get_project_with_subtree_as_ids(self): + """Call ``GET /projects/{project_id}?subtree_as_ids``. + + This test creates a more complex hierarchy to test if the structured + dictionary returned by using the ``subtree_as_ids`` query param + correctly represents the hierarchy. + + The hierarchy contains 5 projects with the following structure:: + + +--A--+ + | | + +--B--+ C + | | + D E + + + """ + projects = self._create_projects_hierarchy(hierarchy_size=2) + + # Add another child to projects[0] - it will be projects[3] + new_ref = unit.new_project_ref( + domain_id=self.domain_id, + parent_id=projects[0]['project']['id']) + resp = self.post('/projects', + body={'project': new_ref}) + self.assertValidProjectResponse(resp, new_ref) + projects.append(resp.result) + + # Add another child to projects[1] - it will be projects[4] + new_ref = unit.new_project_ref( + domain_id=self.domain_id, + parent_id=projects[1]['project']['id']) + resp = self.post('/projects', + body={'project': new_ref}) + self.assertValidProjectResponse(resp, new_ref) + projects.append(resp.result) + + # Query for projects[0] subtree_as_ids + r = self.get( + '/projects/%(project_id)s?subtree_as_ids' % { + 'project_id': projects[0]['project']['id']}) + self.assertValidProjectResponse(r, projects[0]['project']) + subtree_as_ids = r.result['project']['subtree'] + + # The subtree hierarchy from projects[0] should have the following + # structure: + # { + # projects[1]: { + # projects[2]: None, + # projects[4]: None + # }, + # projects[3]: None + # } + expected_dict = { + projects[1]['project']['id']: { + projects[2]['project']['id']: None, + projects[4]['project']['id']: None + }, + projects[3]['project']['id']: None + } + self.assertDictEqual(expected_dict, subtree_as_ids) + + # Now query for projects[1] subtree_as_ids + r = self.get( + '/projects/%(project_id)s?subtree_as_ids' % { + 'project_id': projects[1]['project']['id']}) + self.assertValidProjectResponse(r, projects[1]['project']) + subtree_as_ids = r.result['project']['subtree'] + + # The subtree hierarchy from projects[1] should have the following + # structure: + # { + # projects[2]: None, + # projects[4]: None + # } + expected_dict = { + projects[2]['project']['id']: None, + projects[4]['project']['id']: None + } + self.assertDictEqual(expected_dict, subtree_as_ids) + + # Now query for projects[3] subtree_as_ids + r = self.get( + '/projects/%(project_id)s?subtree_as_ids' % { + 'project_id': projects[3]['project']['id']}) + self.assertValidProjectResponse(r, projects[3]['project']) + subtree_as_ids = r.result['project']['subtree'] + + # projects[3] has no subtree, subtree_as_ids must be None + self.assertIsNone(subtree_as_ids) + + def test_get_project_with_subtree_as_list_with_full_access(self): + """``GET /projects/{project_id}?subtree_as_list`` with full access. + + Test plan: + + - Create 'parent', 'project' and 'subproject' projects; + - Assign a user a role on each one of those projects; + - Check that calling subtree_as_list on 'parent' returns both 'parent' + and 'subproject'. + + """ + # Create the project hierarchy + parent, project, subproject = self._create_projects_hierarchy(2) + + # Assign a role for the user on all the created projects + for proj in (parent, project, subproject): + self.put(self.build_role_assignment_link( + role_id=self.role_id, user_id=self.user_id, + project_id=proj['project']['id'])) + + # Make the API call + r = self.get('/projects/%(project_id)s?subtree_as_list' % + {'project_id': parent['project']['id']}) + self.assertValidProjectResponse(r, parent['project']) + + # Assert only 'project' and 'subproject' are in the subtree + self.assertIn(project, r.result['project']['subtree']) + self.assertIn(subproject, r.result['project']['subtree']) + self.assertEqual(2, len(r.result['project']['subtree'])) + + def test_get_project_with_subtree_as_list_with_partial_access(self): + """``GET /projects/{project_id}?subtree_as_list`` with partial access. + + Test plan: + + - Create 'parent', 'project' and 'subproject' projects; + - Assign a user a role on 'parent' and 'subproject'; + - Check that calling subtree_as_list on 'parent' returns 'subproject'. + + """ + # Create the project hierarchy + parent, project, subproject = self._create_projects_hierarchy(2) + + # Assign a role for the user on parent and subproject + for proj in (parent, subproject): + self.put(self.build_role_assignment_link( + role_id=self.role_id, user_id=self.user_id, + project_id=proj['project']['id'])) + + # Make the API call + r = self.get('/projects/%(project_id)s?subtree_as_list' % + {'project_id': parent['project']['id']}) + self.assertValidProjectResponse(r, parent['project']) + + # Assert only 'subproject' is in the subtree + self.assertIn(subproject, r.result['project']['subtree']) + self.assertEqual(1, len(r.result['project']['subtree'])) + + def test_get_project_with_subtree_as_list_and_subtree_as_ids(self): + """Attempt to get a project subtree as both a list and as IDs. + + This uses ``GET /projects/{project_id}?subtree_as_list&subtree_as_ids`` + which should fail with a bad request due to the conflicting query + strings. + + """ + projects = self._create_projects_hierarchy(hierarchy_size=2) + + self.get( + '/projects/%(project_id)s?subtree_as_list&subtree_as_ids' % { + 'project_id': projects[1]['project']['id']}, + expected_status=http_client.BAD_REQUEST) + + def test_update_project(self): + """Call ``PATCH /projects/{project_id}``.""" + ref = unit.new_project_ref(domain_id=self.domain_id, + parent_id=self.project['parent_id']) + del ref['id'] + r = self.patch( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}, + body={'project': ref}) + self.assertValidProjectResponse(r, ref) + + def test_update_project_unsafe(self): + """Call ``POST /projects/{project_id} with unsafe names``.""" + unsafe_name = 'i am not / safe' + + self.config_fixture.config(group='resource', + project_name_url_safe='off') + ref = unit.new_project_ref(name=unsafe_name, + domain_id=self.domain_id, + parent_id=self.project['parent_id']) + del ref['id'] + self.patch( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}, + body={'project': ref}) + + unsafe_name = 'i am still not / safe' + for config_setting in ['new', 'strict']: + self.config_fixture.config(group='resource', + project_name_url_safe=config_setting) + ref = unit.new_project_ref(name=unsafe_name, + domain_id=self.domain_id, + parent_id=self.project['parent_id']) + del ref['id'] + self.patch( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}, + body={'project': ref}, + expected_status=http_client.BAD_REQUEST) + + def test_update_project_unsafe_default(self): + """Check default for unsafe names for ``POST /projects``.""" + unsafe_name = 'i am not / safe' + + # By default, we should be able to create unsafe names + ref = unit.new_project_ref(name=unsafe_name, + domain_id=self.domain_id, + parent_id=self.project['parent_id']) + del ref['id'] + self.patch( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}, + body={'project': ref}) + + def test_update_project_domain_id(self): + """Call ``PATCH /projects/{project_id}`` with domain_id.""" + project = unit.new_project_ref(domain_id=self.domain['id']) + project = self.resource_api.create_project(project['id'], project) + project['domain_id'] = CONF.identity.default_domain_id + r = self.patch('/projects/%(project_id)s' % { + 'project_id': project['id']}, + body={'project': project}, + expected_status=exception.ValidationError.code) + self.config_fixture.config(domain_id_immutable=False) + project['domain_id'] = self.domain['id'] + r = self.patch('/projects/%(project_id)s' % { + 'project_id': project['id']}, + body={'project': project}) + self.assertValidProjectResponse(r, project) + + def test_update_project_parent_id(self): + """Call ``PATCH /projects/{project_id}``.""" + projects = self._create_projects_hierarchy() + leaf_project = projects[1]['project'] + leaf_project['parent_id'] = None + self.patch( + '/projects/%(project_id)s' % { + 'project_id': leaf_project['id']}, + body={'project': leaf_project}, + expected_status=http_client.FORBIDDEN) + + def test_update_project_is_domain_not_allowed(self): + """Call ``PATCH /projects/{project_id}`` with is_domain. + + The is_domain flag is immutable. + """ + project = unit.new_project_ref(domain_id=self.domain['id']) + resp = self.post('/projects', + body={'project': project}) + self.assertFalse(resp.result['project']['is_domain']) + + project['parent_id'] = resp.result['project']['parent_id'] + project['is_domain'] = True + self.patch('/projects/%(project_id)s' % { + 'project_id': resp.result['project']['id']}, + body={'project': project}, + expected_status=http_client.BAD_REQUEST) + + def test_disable_leaf_project(self): + """Call ``PATCH /projects/{project_id}``.""" + projects = self._create_projects_hierarchy() + leaf_project = projects[1]['project'] + leaf_project['enabled'] = False + r = self.patch( + '/projects/%(project_id)s' % { + 'project_id': leaf_project['id']}, + body={'project': leaf_project}) + self.assertEqual( + leaf_project['enabled'], r.result['project']['enabled']) + + def test_disable_not_leaf_project(self): + """Call ``PATCH /projects/{project_id}``.""" + projects = self._create_projects_hierarchy() + root_project = projects[0]['project'] + root_project['enabled'] = False + self.patch( + '/projects/%(project_id)s' % { + 'project_id': root_project['id']}, + body={'project': root_project}, + expected_status=http_client.FORBIDDEN) + + def test_delete_project(self): + """Call ``DELETE /projects/{project_id}`` + + As well as making sure the delete succeeds, we ensure + that any credentials that reference this projects are + also deleted, while other credentials are unaffected. + + """ + credential = unit.new_credential_ref(user_id=self.user['id'], + project_id=self.project_id) + self.credential_api.create_credential(credential['id'], credential) + + # First check the credential for this project is present + r = self.credential_api.get_credential(credential['id']) + self.assertDictEqual(credential, r) + # Create a second credential with a different project + project2 = unit.new_project_ref(domain_id=self.domain['id']) + self.resource_api.create_project(project2['id'], project2) + credential2 = unit.new_credential_ref(user_id=self.user['id'], + project_id=project2['id']) + self.credential_api.create_credential(credential2['id'], credential2) + + # Now delete the project + self.delete( + '/projects/%(project_id)s' % { + 'project_id': self.project_id}) + + # Deleting the project should have deleted any credentials + # that reference this project + self.assertRaises(exception.CredentialNotFound, + self.credential_api.get_credential, + credential_id=credential['id']) + # But the credential for project2 is unaffected + r = self.credential_api.get_credential(credential2['id']) + self.assertDictEqual(credential2, r) + + def test_delete_not_leaf_project(self): + """Call ``DELETE /projects/{project_id}``.""" + projects = self._create_projects_hierarchy() + self.delete( + '/projects/%(project_id)s' % { + 'project_id': projects[0]['project']['id']}, + expected_status=http_client.FORBIDDEN) + + +class ResourceV3toV2MethodsTestCase(unit.TestCase): + """Test domain V3 to V2 conversion methods.""" + + def _setup_initial_projects(self): + self.project_id = uuid.uuid4().hex + self.domain_id = CONF.identity.default_domain_id + self.parent_id = uuid.uuid4().hex + # Project with only domain_id in ref + self.project1 = unit.new_project_ref(id=self.project_id, + name=self.project_id, + domain_id=self.domain_id) + # Project with both domain_id and parent_id in ref + self.project2 = unit.new_project_ref(id=self.project_id, + name=self.project_id, + domain_id=self.domain_id, + parent_id=self.parent_id) + # Project with no domain_id and parent_id in ref + self.project3 = unit.new_project_ref(id=self.project_id, + name=self.project_id, + domain_id=self.domain_id, + parent_id=self.parent_id) + # Expected result with no domain_id and parent_id + self.expected_project = {'id': self.project_id, + 'name': self.project_id} + + def test_v2controller_filter_domain_id(self): + # V2.0 is not domain aware, ensure domain_id is popped off the ref. + other_data = uuid.uuid4().hex + domain_id = CONF.identity.default_domain_id + ref = {'domain_id': domain_id, + 'other_data': other_data} + + ref_no_domain = {'other_data': other_data} + expected_ref = ref_no_domain.copy() + + updated_ref = controller.V2Controller.filter_domain_id(ref) + self.assertIs(ref, updated_ref) + self.assertDictEqual(expected_ref, ref) + # Make sure we don't error/muck up data if domain_id isn't present + updated_ref = controller.V2Controller.filter_domain_id(ref_no_domain) + self.assertIs(ref_no_domain, updated_ref) + self.assertDictEqual(expected_ref, ref_no_domain) + + def test_v3controller_filter_domain_id(self): + # No data should be filtered out in this case. + other_data = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + ref = {'domain_id': domain_id, + 'other_data': other_data} + + expected_ref = ref.copy() + updated_ref = controller.V3Controller.filter_domain_id(ref) + self.assertIs(ref, updated_ref) + self.assertDictEqual(expected_ref, ref) + + def test_v2controller_filter_domain(self): + other_data = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + non_default_domain_ref = {'domain': {'id': domain_id}, + 'other_data': other_data} + default_domain_ref = {'domain': {'id': 'default'}, + 'other_data': other_data} + updated_ref = controller.V2Controller.filter_domain(default_domain_ref) + self.assertNotIn('domain', updated_ref) + self.assertNotIn( + 'domain', + controller.V2Controller.filter_domain(non_default_domain_ref)) + + def test_v2controller_filter_project_parent_id(self): + # V2.0 is not project hierarchy aware, ensure parent_id is popped off. + other_data = uuid.uuid4().hex + parent_id = uuid.uuid4().hex + ref = {'parent_id': parent_id, + 'other_data': other_data} + + ref_no_parent = {'other_data': other_data} + expected_ref = ref_no_parent.copy() + + updated_ref = controller.V2Controller.filter_project_parent_id(ref) + self.assertIs(ref, updated_ref) + self.assertDictEqual(expected_ref, ref) + # Make sure we don't error/muck up data if parent_id isn't present + updated_ref = controller.V2Controller.filter_project_parent_id( + ref_no_parent) + self.assertIs(ref_no_parent, updated_ref) + self.assertDictEqual(expected_ref, ref_no_parent) + + def test_v3_to_v2_project_method(self): + self._setup_initial_projects() + + # TODO(shaleh): these optional fields are not handled well by the + # v3_to_v2 code. Manually remove them for now. Eventually update + # new_project_ref to not return optional values + del self.project1['enabled'] + del self.project1['description'] + del self.project2['enabled'] + del self.project2['description'] + del self.project3['enabled'] + del self.project3['description'] + + updated_project1 = controller.V2Controller.v3_to_v2_project( + self.project1) + self.assertIs(self.project1, updated_project1) + self.assertDictEqual(self.expected_project, self.project1) + updated_project2 = controller.V2Controller.v3_to_v2_project( + self.project2) + self.assertIs(self.project2, updated_project2) + self.assertDictEqual(self.expected_project, self.project2) + updated_project3 = controller.V2Controller.v3_to_v2_project( + self.project3) + self.assertIs(self.project3, updated_project3) + self.assertDictEqual(self.expected_project, self.project2) + + def test_v3_to_v2_project_method_list(self): + self._setup_initial_projects() + project_list = [self.project1, self.project2, self.project3] + + # TODO(shaleh): these optional fields are not handled well by the + # v3_to_v2 code. Manually remove them for now. Eventually update + # new_project_ref to not return optional values + for p in project_list: + del p['enabled'] + del p['description'] + updated_list = controller.V2Controller.v3_to_v2_project(project_list) + + self.assertEqual(len(updated_list), len(project_list)) + + for i, ref in enumerate(updated_list): + # Order should not change. + self.assertIs(ref, project_list[i]) + + self.assertDictEqual(self.expected_project, self.project1) + self.assertDictEqual(self.expected_project, self.project2) + self.assertDictEqual(self.expected_project, self.project3) |