From 920a49cfa055733d575282973e23558c33087a4a Mon Sep 17 00:00:00 2001 From: RHE Date: Fri, 24 Nov 2017 13:54:26 +0100 Subject: remove keystone-moon Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE --- .../keystone/tests/unit/resource/test_backends.py | 1669 -------------------- 1 file changed, 1669 deletions(-) delete mode 100644 keystone-moon/keystone/tests/unit/resource/test_backends.py (limited to 'keystone-moon/keystone/tests/unit/resource/test_backends.py') diff --git a/keystone-moon/keystone/tests/unit/resource/test_backends.py b/keystone-moon/keystone/tests/unit/resource/test_backends.py deleted file mode 100644 index eed4c6ba..00000000 --- a/keystone-moon/keystone/tests/unit/resource/test_backends.py +++ /dev/null @@ -1,1669 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from oslo_config import cfg -from six.moves import range -from testtools import matchers - -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import utils as test_utils - - -CONF = cfg.CONF - - -class ResourceTests(object): - - domain_count = len(default_fixtures.DOMAINS) - - def test_get_project(self): - tenant_ref = self.resource_api.get_project(self.tenant_bar['id']) - self.assertDictEqual(self.tenant_bar, tenant_ref) - - def test_get_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - uuid.uuid4().hex) - - def test_get_project_by_name(self): - tenant_ref = self.resource_api.get_project_by_name( - self.tenant_bar['name'], - CONF.identity.default_domain_id) - self.assertDictEqual(self.tenant_bar, tenant_ref) - - @unit.skip_if_no_multiple_domains_support - def test_get_project_by_name_for_project_acting_as_a_domain(self): - """Tests get_project_by_name works when the domain_id is None.""" - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=False) - project = self.resource_api.create_project(project['id'], project) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - project['name'], - None) - - # Test that querying with domain_id as None will find the project - # acting as a domain, even if it's name is the same as the regular - # project above. - project2 = unit.new_project_ref(is_domain=True, - name=project['name']) - project2 = self.resource_api.create_project(project2['id'], project2) - - project_ref = self.resource_api.get_project_by_name( - project2['name'], None) - - self.assertEqual(project2, project_ref) - - def test_get_project_by_name_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - def test_create_duplicate_project_id_fails(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project_id, project) - project['name'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - project_id, - project) - - def test_create_duplicate_project_name_fails(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project_id, project) - project['id'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - project['id'], - project) - - def test_create_duplicate_project_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2 = unit.new_project_ref(name=project1['name'], - domain_id=new_domain['id']) - self.resource_api.create_project(project1['id'], project1) - self.resource_api.create_project(project2['id'], project2) - - def test_move_project_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - project['domain_id'] = domain2['id'] - # Update the project asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.resource_api.update_project(project['id'], project) - self.assertTrue(mock_dep.called) - - updated_project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(domain2['id'], updated_project_ref['domain_id']) - - def test_move_project_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a project in domain1 - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - # Now create a project in domain2 with a potentially clashing - # name - which should work since we have domain separation - project2 = unit.new_project_ref(name=project1['name'], - domain_id=domain2['id']) - self.resource_api.create_project(project2['id'], project2) - # Now try and move project1 into the 2nd domain - which should - # fail since the names clash - project1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project1['id'], - project1) - - @unit.skip_if_no_multiple_domains_support - def test_move_project_with_children_between_domains_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - child_project = unit.new_project_ref(domain_id=domain1['id'], - parent_id=project['id']) - self.resource_api.create_project(child_project['id'], child_project) - project['domain_id'] = domain2['id'] - - # Update is not allowed, since updating the whole subtree would be - # necessary - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - @unit.skip_if_no_multiple_domains_support - def test_move_project_not_root_between_domains_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - child_project = unit.new_project_ref(domain_id=domain1['id'], - parent_id=project['id']) - self.resource_api.create_project(child_project['id'], child_project) - child_project['domain_id'] = domain2['id'] - - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - child_project['id'], - child_project) - - @unit.skip_if_no_multiple_domains_support - def test_move_root_project_between_domains_succeeds(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - root_project = unit.new_project_ref(domain_id=domain1['id']) - root_project = self.resource_api.create_project(root_project['id'], - root_project) - - root_project['domain_id'] = domain2['id'] - self.resource_api.update_project(root_project['id'], root_project) - project_from_db = self.resource_api.get_project(root_project['id']) - - self.assertEqual(domain2['id'], project_from_db['domain_id']) - - @unit.skip_if_no_multiple_domains_support - def test_update_domain_id_project_is_domain_fails(self): - other_domain = unit.new_domain_ref() - self.resource_api.create_domain(other_domain['id'], other_domain) - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - project['domain_id'] = other_domain['id'] - - # Update of domain_id of projects acting as domains is not allowed - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_rename_duplicate_project_name_fails(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project1['id'], project1) - self.resource_api.create_project(project2['id'], project2) - project2['name'] = project1['name'] - self.assertRaises(exception.Error, - self.resource_api.update_project, - project2['id'], - project2) - - def test_update_project_id_does_nothing(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project['id'], project) - project['id'] = 'fake2' - self.resource_api.update_project(project_id, project) - project_ref = self.resource_api.get_project(project_id) - self.assertEqual(project_id, project_ref['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - 'fake2') - - def test_delete_domain_with_user_group_project_links(self): - # TODO(chungg):add test case once expected behaviour defined - pass - - def test_update_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.update_project, - uuid.uuid4().hex, - dict()) - - def test_delete_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.delete_project, - uuid.uuid4().hex) - - def test_create_update_delete_unicode_project(self): - unicode_project_name = u'name \u540d\u5b57' - project = unit.new_project_ref( - name=unicode_project_name, - domain_id=CONF.identity.default_domain_id) - project = self.resource_api.create_project(project['id'], project) - self.resource_api.update_project(project['id'], project) - self.resource_api.delete_project(project['id']) - - def test_create_project_with_no_enabled_field(self): - ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id) - del ref['enabled'] - self.resource_api.create_project(ref['id'], ref) - - project = self.resource_api.get_project(ref['id']) - self.assertIs(project['enabled'], True) - - def test_create_project_long_name_fails(self): - project = unit.new_project_ref( - name='a' * 65, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_blank_name_fails(self): - project = unit.new_project_ref( - name='', domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_invalid_name_fails(self): - project = unit.new_project_ref( - name=None, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - project = unit.new_project_ref( - name=123, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_update_project_blank_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = '' - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_long_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = 'a' * 65 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_invalid_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = None - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - project['name'] = 123 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_invalid_enabled_type_string(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - # Strings are not valid boolean values - project['enabled'] = "false" - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_create_project_invalid_enabled_type_string(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - # invalid string value - enabled="true") - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_invalid_domain_id(self): - project = unit.new_project_ref(domain_id=uuid.uuid4().hex) - self.assertRaises(exception.DomainNotFound, - self.resource_api.create_project, - project['id'], - project) - - def test_list_domains(self): - domain1 = unit.new_domain_ref() - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - self.resource_api.create_domain(domain2['id'], domain2) - domains = self.resource_api.list_domains() - self.assertEqual(3, len(domains)) - domain_ids = [] - for domain in domains: - domain_ids.append(domain.get('id')) - self.assertIn(CONF.identity.default_domain_id, domain_ids) - self.assertIn(domain1['id'], domain_ids) - self.assertIn(domain2['id'], domain_ids) - - def test_list_projects(self): - project_refs = self.resource_api.list_projects() - project_count = len(default_fixtures.TENANTS) + self.domain_count - self.assertEqual(project_count, len(project_refs)) - for project in default_fixtures.TENANTS: - self.assertIn(project, project_refs) - - def test_list_projects_with_multiple_filters(self): - # Create a project - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project = self.resource_api.create_project(project['id'], project) - - # Build driver hints with the project's name and inexistent description - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', uuid.uuid4().hex) - - # Retrieve projects based on hints and check an empty list is returned - projects = self.resource_api.list_projects(hints) - self.assertEqual([], projects) - - # Build correct driver hints - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', project['description']) - - # Retrieve projects based on hints - projects = self.resource_api.list_projects(hints) - - # Check that the returned list contains only the first project - self.assertEqual(1, len(projects)) - self.assertEqual(project, projects[0]) - - def test_list_projects_for_domain(self): - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - CONF.identity.default_domain_id)]) - # Only the projects from the default fixtures are expected, since - # filtering by domain does not include any project that acts as a - # domain. - self.assertThat( - project_ids, matchers.HasLength(len(default_fixtures.TENANTS))) - self.assertIn(self.tenant_bar['id'], project_ids) - self.assertIn(self.tenant_baz['id'], project_ids) - self.assertIn(self.tenant_mtu['id'], project_ids) - self.assertIn(self.tenant_service['id'], project_ids) - - @unit.skip_if_no_multiple_domains_support - def test_list_projects_acting_as_domain(self): - initial_domains = self.resource_api.list_domains() - - # Creating 5 projects that act as domains - new_projects_acting_as_domains = [] - for i in range(5): - project = unit.new_project_ref(is_domain=True) - project = self.resource_api.create_project(project['id'], project) - new_projects_acting_as_domains.append(project) - - # Creating a few regular project to ensure it doesn't mess with the - # ones that act as domains - self._create_projects_hierarchy(hierarchy_size=2) - - projects = self.resource_api.list_projects_acting_as_domain() - expected_number_projects = ( - len(initial_domains) + len(new_projects_acting_as_domains)) - self.assertEqual(expected_number_projects, len(projects)) - for project in new_projects_acting_as_domains: - self.assertIn(project, projects) - for domain in initial_domains: - self.assertIn(domain['id'], [p['id'] for p in projects]) - - @unit.skip_if_no_multiple_domains_support - def test_list_projects_for_alternate_domain(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - project2 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project2['id'], project2) - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - domain1['id'])]) - self.assertEqual(2, len(project_ids)) - self.assertIn(project1['id'], project_ids) - self.assertIn(project2['id'], project_ids) - - def _create_projects_hierarchy(self, hierarchy_size=2, - domain_id=None, - is_domain=False, - parent_project_id=None): - """Creates a project hierarchy with specified size. - - :param hierarchy_size: the desired hierarchy size, default is 2 - - a project with one child. - :param domain_id: domain where the projects hierarchy will be created. - :param is_domain: if the hierarchy will have the is_domain flag active - or not. - :param parent_project_id: if the intention is to create a - sub-hierarchy, sets the sub-hierarchy root. Defaults to creating - a new hierarchy, i.e. a new root project. - - :returns projects: a list of the projects in the created hierarchy. - - """ - if domain_id is None: - domain_id = CONF.identity.default_domain_id - if parent_project_id: - project = unit.new_project_ref(parent_id=parent_project_id, - domain_id=domain_id, - is_domain=is_domain) - else: - project = unit.new_project_ref(domain_id=domain_id, - is_domain=is_domain) - project_id = project['id'] - project = self.resource_api.create_project(project_id, project) - - projects = [project] - for i in range(1, hierarchy_size): - new_project = unit.new_project_ref(parent_id=project_id, - domain_id=domain_id) - - self.resource_api.create_project(new_project['id'], new_project) - projects.append(new_project) - project_id = new_project['id'] - - return projects - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_with_project_api(self): - project = unit.new_project_ref(is_domain=True) - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - self.resource_api.get_domain(ref['id']) - - @unit.skip_if_no_multiple_domains_support - def test_project_as_a_domain_uniqueness_constraints(self): - """Tests project uniqueness for those acting as domains. - - If it is a project acting as a domain, we can't have two or more with - the same name. - - """ - # Create two projects acting as a domain - project = unit.new_project_ref(is_domain=True) - project = self.resource_api.create_project(project['id'], project) - project2 = unit.new_project_ref(is_domain=True) - project2 = self.resource_api.create_project(project2['id'], project2) - - # All projects acting as domains have a null domain_id, so should not - # be able to create another with the same name but a different - # project ID. - new_project = project.copy() - new_project['id'] = uuid.uuid4().hex - - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - new_project['id'], - new_project) - - # We also should not be able to update one to have a name clash - project2['name'] = project['name'] - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project2['id'], - project2) - - # But updating it to a unique name is OK - project2['name'] = uuid.uuid4().hex - self.resource_api.update_project(project2['id'], project2) - - # Finally, it should be OK to create a project with same name as one of - # these acting as a domain, as long as it is a regular project - project3 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, name=project2['name']) - self.resource_api.create_project(project3['id'], project3) - # In fact, it should be OK to create such a project in the domain which - # has the matching name. - # TODO(henry-nash): Once we fully support projects acting as a domain, - # add a test here to create a sub-project with a name that matches its - # project acting as a domain - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for sub projects acting as domains support') - def test_is_domain_sub_project_has_parent_domain_id(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=True) - self.resource_api.create_project(project['id'], project) - - sub_project = unit.new_project_ref(domain_id=project['id'], - parent_id=project['id'], - is_domain=True) - - ref = self.resource_api.create_project(sub_project['id'], sub_project) - self.assertTrue(ref['is_domain']) - self.assertEqual(project['id'], ref['parent_id']) - self.assertEqual(project['id'], ref['domain_id']) - - @unit.skip_if_no_multiple_domains_support - def test_delete_domain_with_project_api(self): - project = unit.new_project_ref(domain_id=None, - is_domain=True) - self.resource_api.create_project(project['id'], project) - - # Check that a corresponding domain was created - self.resource_api.get_domain(project['id']) - - # Try to delete the enabled project that acts as a domain - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - project['id']) - - # Disable the project - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - # Successfully delete the project - self.resource_api.delete_project(project['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - project['id']) - - @unit.skip_if_no_multiple_domains_support - def test_create_subproject_acting_as_domain_fails(self): - root_project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(root_project['id'], root_project) - - sub_project = unit.new_project_ref(is_domain=True, - parent_id=root_project['id']) - - # Creation of sub projects acting as domains is not allowed yet - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - sub_project['id'], sub_project) - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_under_regular_project_hierarchy_fails(self): - # Projects acting as domains can't have a regular project as parent - projects_hierarchy = self._create_projects_hierarchy() - parent = projects_hierarchy[1] - project = unit.new_project_ref(domain_id=parent['id'], - parent_id=parent['id'], - is_domain=True) - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], project) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for sub projects acting as domains support') - def test_create_project_under_domain_hierarchy(self): - projects_hierarchy = self._create_projects_hierarchy(is_domain=True) - parent = projects_hierarchy[1] - project = unit.new_project_ref(domain_id=parent['id'], - parent_id=parent['id'], - is_domain=False) - - ref = self.resource_api.create_project(project['id'], project) - self.assertFalse(ref['is_domain']) - self.assertEqual(parent['id'], ref['parent_id']) - self.assertEqual(parent['id'], ref['domain_id']) - - def test_create_project_without_is_domain_flag(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['is_domain'] - ref = self.resource_api.create_project(project['id'], project) - # The is_domain flag should be False by default - self.assertFalse(ref['is_domain']) - - @unit.skip_if_no_multiple_domains_support - def test_create_project_passing_is_domain_flag_true(self): - project = unit.new_project_ref(is_domain=True) - - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - - def test_create_project_passing_is_domain_flag_false(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=False) - - ref = self.resource_api.create_project(project['id'], project) - self.assertIs(False, ref['is_domain']) - - @test_utils.wip('waiting for support for parent_id to imply domain_id') - def test_create_project_with_parent_id_and_without_domain_id(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now create a child by just naming the parent_id - sub_project = unit.new_project_ref(parent_id=project['id']) - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The domain_id should be set to the parent domain_id - self.assertEqual(project['domain_id'], ref['domain_id']) - - def test_create_project_with_domain_id_and_without_parent_id(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now create a child by just naming the domain_id - sub_project = unit.new_project_ref(domain_id=project['id']) - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The parent_id and domain_id should be set to the id of the project - # acting as a domain - self.assertEqual(project['id'], ref['parent_id']) - self.assertEqual(project['id'], ref['domain_id']) - - def test_create_project_with_domain_id_mismatch_to_parent_domain(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now try to create a child with the above as its parent, but - # specifying a different domain. - sub_project = unit.new_project_ref( - parent_id=project['id'], domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - sub_project['id'], sub_project) - - def test_check_leaf_projects(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.assertFalse(self.resource_api.is_leaf_project( - root_project['id'])) - self.assertTrue(self.resource_api.is_leaf_project( - leaf_project['id'])) - - # Delete leaf_project - self.resource_api.delete_project(leaf_project['id']) - - # Now, root_project should be leaf - self.assertTrue(self.resource_api.is_leaf_project( - root_project['id'])) - - def test_list_projects_in_subtree(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project2['id']) - self.resource_api.create_project(project4['id'], project4) - - subtree = self.resource_api.list_projects_in_subtree(project1['id']) - self.assertEqual(3, len(subtree)) - self.assertIn(project2, subtree) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project2['id']) - self.assertEqual(2, len(subtree)) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project3['id']) - self.assertEqual(0, len(subtree)) - - def test_list_projects_in_subtree_with_circular_reference(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project1 = self.resource_api.create_project(project1['id'], project1) - - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project1['id']) - self.resource_api.create_project(project2['id'], project2) - - project1['parent_id'] = project2['id'] # Adds cyclic reference - - # NOTE(dstanek): The manager does not allow parent_id to be updated. - # Instead will directly use the driver to create the cyclic - # reference. - self.resource_api.driver.update_project(project1['id'], project1) - - subtree = self.resource_api.list_projects_in_subtree(project1['id']) - - # NOTE(dstanek): If a cyclic reference is detected the code bails - # and returns None instead of falling into the infinite - # recursion trap. - self.assertIsNone(subtree) - - def test_list_projects_in_subtree_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_projects_in_subtree, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_projects_in_subtree, - uuid.uuid4().hex) - - def test_list_project_parents(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project2['id']) - self.resource_api.create_project(project4['id'], project4) - - parents1 = self.resource_api.list_project_parents(project3['id']) - self.assertEqual(3, len(parents1)) - self.assertIn(project1, parents1) - self.assertIn(project2, parents1) - - parents2 = self.resource_api.list_project_parents(project4['id']) - self.assertEqual(parents1, parents2) - - parents = self.resource_api.list_project_parents(project1['id']) - # It has the default domain as parent - self.assertEqual(1, len(parents)) - - def test_update_project_enabled_cascade(self): - """Test update_project_cascade - - Ensures the enabled attribute is correctly updated across - a simple 3-level projects hierarchy. - """ - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - parent = projects_hierarchy[0] - - # Disable in parent project disables the whole subtree - parent['enabled'] = False - # Store the ref from backend in another variable so we don't bother - # to remove other attributes that were not originally provided and - # were set in the manager, like parent_id and domain_id. - parent_ref = self.resource_api.update_project(parent['id'], - parent, - cascade=True) - - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertEqual(2, len(subtree)) - self.assertFalse(parent_ref['enabled']) - self.assertFalse(subtree[0]['enabled']) - self.assertFalse(subtree[1]['enabled']) - - # Enable parent project enables the whole subtree - parent['enabled'] = True - parent_ref = self.resource_api.update_project(parent['id'], - parent, - cascade=True) - - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertEqual(2, len(subtree)) - self.assertTrue(parent_ref['enabled']) - self.assertTrue(subtree[0]['enabled']) - self.assertTrue(subtree[1]['enabled']) - - def test_cannot_enable_cascade_with_parent_disabled(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - grandparent = projects_hierarchy[0] - parent = projects_hierarchy[1] - - grandparent['enabled'] = False - self.resource_api.update_project(grandparent['id'], - grandparent, - cascade=True) - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertFalse(subtree[0]['enabled']) - - parent['enabled'] = True - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - parent['id'], - parent, - cascade=True) - - def test_update_cascade_only_accepts_enabled(self): - # Update cascade does not accept any other attribute but 'enabled' - new_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(new_project['id'], new_project) - - new_project['name'] = 'project1' - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - new_project['id'], - new_project, - cascade=True) - - def test_list_project_parents_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_project_parents, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_project_parents, - uuid.uuid4().hex) - - def test_create_project_doesnt_modify_passed_in_dict(self): - new_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - original_project = new_project.copy() - self.resource_api.create_project(new_project['id'], new_project) - self.assertDictEqual(original_project, new_project) - - def test_update_project_enable(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(project['enabled'], project_ref['enabled']) - - # If not present, enabled field should not be updated - del project['enabled'] - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertFalse(project_ref['enabled']) - - project['enabled'] = True - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(project['enabled'], project_ref['enabled']) - - del project['enabled'] - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - def test_create_invalid_domain_fails(self): - new_group = unit.new_group_ref(domain_id="doesnotexist") - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_group, - new_group) - new_user = unit.new_user_ref(domain_id="doesnotexist") - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_user, - new_user) - - @unit.skip_if_no_multiple_domains_support - def test_project_crud(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - project = unit.new_project_ref(domain_id=domain['id']) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - self.resource_api.delete_project(project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_domain_delete_hierarchy(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - - # Creating a root and a leaf project inside the domain - projects_hierarchy = self._create_projects_hierarchy( - domain_id=domain['id']) - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[0] - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - # Make sure the root project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - # Make sure the leaf project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - def test_delete_projects_from_ids(self): - """Tests the resource backend call delete_projects_from_ids. - - Tests the normal flow of the delete_projects_from_ids backend call, - that ensures no project on the list exists after it is succesfully - called. - """ - project1_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - projects = (project1_ref, project2_ref) - for project in projects: - self.resource_api.create_project(project['id'], project) - - # Setting up the ID's list - projects_ids = [p['id'] for p in projects] - self.resource_api.driver.delete_projects_from_ids(projects_ids) - - # Ensuring projects no longer exist at backend level - for project_id in projects_ids: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.driver.get_project, - project_id) - - # Passing an empty list is silently ignored - self.resource_api.driver.delete_projects_from_ids([]) - - def test_delete_projects_from_ids_with_no_existing_project_id(self): - """Tests delete_projects_from_ids issues warning if not found. - - Tests the resource backend call delete_projects_from_ids passing a - non existing ID in project_ids, which is logged and ignored by - the backend. - """ - project_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - - # Setting up the ID's list - projects_ids = (project_ref['id'], uuid.uuid4().hex) - with mock.patch('keystone.resource.backends.sql.LOG') as mock_log: - self.resource_api.delete_projects_from_ids(projects_ids) - self.assertTrue(mock_log.warning.called) - # The existing project was deleted. - self.assertRaises(exception.ProjectNotFound, - self.resource_api.driver.get_project, - project_ref['id']) - - # Even if we only have one project, and it does not exist, it returns - # no error. - self.resource_api.driver.delete_projects_from_ids([uuid.uuid4().hex]) - - def test_delete_project_cascade(self): - # create a hierarchy with 3 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - root_project = projects_hierarchy[0] - project1 = projects_hierarchy[1] - project2 = projects_hierarchy[2] - - # Disabling all projects before attempting to delete - for project in (project2, project1, root_project): - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - self.resource_api.delete_project(root_project['id'], cascade=True) - - for project in projects_hierarchy: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_delete_large_project_cascade(self): - """Try delete a large project with cascade true. - - Tree we will create:: - - +-p1-+ - | | - p5 p2 - | | - p6 +-p3-+ - | | - p7 p4 - """ - # create a hierarchy with 4 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=4) - p1 = projects_hierarchy[0] - # Add the left branch to the hierarchy (p5, p6) - self._create_projects_hierarchy(hierarchy_size=2, - parent_project_id=p1['id']) - # Add p7 to the hierarchy - p3_id = projects_hierarchy[2]['id'] - self._create_projects_hierarchy(hierarchy_size=1, - parent_project_id=p3_id) - # Reverse the hierarchy to disable the leaf first - prjs_hierarchy = ([p1] + self.resource_api.list_projects_in_subtree( - p1['id']))[::-1] - - # Disabling all projects before attempting to delete - for project in prjs_hierarchy: - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - self.resource_api.delete_project(p1['id'], cascade=True) - for project in prjs_hierarchy: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_cannot_delete_project_cascade_with_enabled_child(self): - # create a hierarchy with 3 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - root_project = projects_hierarchy[0] - project1 = projects_hierarchy[1] - project2 = projects_hierarchy[2] - - project2['enabled'] = False - self.resource_api.update_project(project2['id'], project2) - - # Cannot cascade delete root_project, since project1 is enabled - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project['id'], - cascade=True) - - # Ensuring no project was deleted, not even project2 - self.resource_api.get_project(root_project['id']) - self.resource_api.get_project(project1['id']) - self.resource_api.get_project(project2['id']) - - def test_hierarchical_projects_crud(self): - # create a hierarchy with just a root project (which is a leaf as well) - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1) - root_project1 = projects_hierarchy[0] - - # create a hierarchy with one root project and one leaf project - projects_hierarchy = self._create_projects_hierarchy() - root_project2 = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # update description from leaf_project - leaf_project['description'] = 'new description' - self.resource_api.update_project(leaf_project['id'], leaf_project) - proj_ref = self.resource_api.get_project(leaf_project['id']) - self.assertDictEqual(leaf_project, proj_ref) - - # update the parent_id is not allowed - leaf_project['parent_id'] = root_project1['id'] - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - # delete root_project1 - self.resource_api.delete_project(root_project1['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project1['id']) - - # delete root_project2 is not allowed since it is not a leaf project - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project2['id']) - - def test_create_project_with_invalid_parent(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, parent_id='fake') - self.assertRaises(exception.ProjectNotFound, - self.resource_api.create_project, - project['id'], - project) - - @unit.skip_if_no_multiple_domains_support - def test_create_leaf_project_with_different_domain(self): - root_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(root_project['id'], root_project) - - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - leaf_project = unit.new_project_ref(domain_id=domain['id'], - parent_id=root_project['id']) - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - leaf_project['id'], - leaf_project) - - def test_delete_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.resource_api.delete_project(leaf_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - self.resource_api.delete_project(root_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - def test_delete_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project['id']) - - def test_update_project_parent(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - - # project2 is the parent from project3 - self.assertEqual(project3.get('parent_id'), project2['id']) - - # try to update project3 parent to parent1 - project3['parent_id'] = project1['id'] - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - project3['id'], - project3) - - def test_create_project_under_disabled_one(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, enabled=False) - self.resource_api.create_project(project1['id'], project1) - - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project1['id']) - - # It's not possible to create a project under a disabled one in the - # hierarchy - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project2['id'], - project2) - - def test_disable_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - leaf_project = projects_hierarchy[1] - - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - - project_ref = self.resource_api.get_project(leaf_project['id']) - self.assertEqual(leaf_project['enabled'], project_ref['enabled']) - - def test_disable_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - root_project['enabled'] = False - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - root_project['id'], - root_project) - - def test_enable_project_with_disabled_parent(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # Disable leaf and root - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - root_project['enabled'] = False - self.resource_api.update_project(root_project['id'], root_project) - - # Try to enable the leaf project, it's not possible since it has - # a disabled parent - leaf_project['enabled'] = True - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - def _get_hierarchy_depth(self, project_id): - return len(self.resource_api.list_project_parents(project_id)) + 1 - - def test_check_hierarchy_depth(self): - # Should be allowed to have a hierarchy of the max depth specified - # in the config option plus one (to allow for the additional project - # acting as a domain after an upgrade) - projects_hierarchy = self._create_projects_hierarchy( - CONF.max_project_tree_depth) - leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1] - - depth = self._get_hierarchy_depth(leaf_project['id']) - self.assertEqual(CONF.max_project_tree_depth + 1, depth) - - # Creating another project in the hierarchy shouldn't be allowed - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=leaf_project['id']) - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.create_project, - project['id'], - project) - - def test_project_update_missing_attrs_with_a_value(self): - # Creating a project with no description attribute. - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['description'] - project = self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project, project_ref) - - def test_project_update_missing_attrs_with_a_falsey_value(self): - # Creating a project with no description attribute. - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['description'] - project = self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = '' - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project, project_ref) - - def test_domain_crud(self): - domain = unit.new_domain_ref() - domain_ref = self.resource_api.create_domain(domain['id'], domain) - self.assertDictEqual(domain, domain_ref) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain, domain_ref) - - domain['name'] = uuid.uuid4().hex - domain_ref = self.resource_api.update_domain(domain['id'], domain) - self.assertDictEqual(domain, domain_ref) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain, domain_ref) - - # Ensure an 'enabled' domain cannot be deleted - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_domain, - domain_id=domain['id']) - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - @unit.skip_if_no_multiple_domains_support - def test_domain_name_case_sensitivity(self): - # create a ref with a lowercase name - domain_name = 'test_domain' - ref = unit.new_domain_ref(name=domain_name) - - lower_case_domain = self.resource_api.create_domain(ref['id'], ref) - - # assign a new ID to the ref with the same name, but in uppercase - ref['id'] = uuid.uuid4().hex - ref['name'] = domain_name.upper() - upper_case_domain = self.resource_api.create_domain(ref['id'], ref) - - # We can get each domain by name - lower_case_domain_ref = self.resource_api.get_domain_by_name( - domain_name) - self.assertDictEqual(lower_case_domain, lower_case_domain_ref) - - upper_case_domain_ref = self.resource_api.get_domain_by_name( - domain_name.upper()) - self.assertDictEqual(upper_case_domain, upper_case_domain_ref) - - def test_project_attribute_update(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - - # pick a key known to be non-existent - key = 'description' - - def assert_key_equals(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertEqual(value, project_ref[key]) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(value, project_ref[key]) - - def assert_get_key_is(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertIs(project_ref.get(key), value) - project_ref = self.resource_api.get_project(project['id']) - self.assertIs(project_ref.get(key), value) - - # add an attribute that doesn't exist, set it to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set an attribute with a falsey value to null - value = None - project[key] = value - assert_get_key_is(value) - - # do it again, in case updating from this situation is handled oddly - value = None - project[key] = value - assert_get_key_is(value) - - # set a possibly-null value to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set a falsey value to a truthy value - value = uuid.uuid4().hex - project[key] = value - assert_key_equals(value) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_domain_rename_invalidates_get_domain_by_name_cache(self): - domain = unit.new_domain_ref() - domain_id = domain['id'] - domain_name = domain['name'] - self.resource_api.create_domain(domain_id, domain) - domain_ref = self.resource_api.get_domain_by_name(domain_name) - domain_ref['name'] = uuid.uuid4().hex - self.resource_api.update_domain(domain_id, domain_ref) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - @unit.skip_if_cache_disabled('resource') - def test_cache_layer_domain_crud(self): - domain = unit.new_domain_ref() - domain_id = domain['id'] - # Create Domain - self.resource_api.create_domain(domain_id, domain) - project_domain_ref = self.resource_api.get_project(domain_id) - domain_ref = self.resource_api.get_domain(domain_id) - updated_project_domain_ref = copy.deepcopy(project_domain_ref) - updated_project_domain_ref['name'] = uuid.uuid4().hex - updated_domain_ref = copy.deepcopy(domain_ref) - updated_domain_ref['name'] = updated_project_domain_ref['name'] - # Update domain, bypassing resource api manager - self.resource_api.driver.update_project(domain_id, - updated_project_domain_ref) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain returns the updated domain - self.assertDictContainsSubset( - updated_domain_ref, self.resource_api.get_domain(domain_id)) - # Update the domain back to original ref, using the assignment api - # manager - self.resource_api.update_domain(domain_id, domain_ref) - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Make sure domain is 'disabled', bypass resource api manager - project_domain_ref_disabled = project_domain_ref.copy() - project_domain_ref_disabled['enabled'] = False - self.resource_api.driver.update_project(domain_id, - project_domain_ref_disabled) - self.resource_api.driver.update_project(domain_id, {'enabled': False}) - # Delete domain, bypassing resource api manager - self.resource_api.driver.delete_project(domain_id) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain now raises DomainNotFound - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, domain_id) - # Recreate Domain - self.resource_api.create_domain(domain_id, domain) - self.resource_api.get_domain(domain_id) - # Make sure domain is 'disabled', bypass resource api manager - domain['enabled'] = False - self.resource_api.driver.update_project(domain_id, domain) - self.resource_api.driver.update_project(domain_id, {'enabled': False}) - # Delete domain - self.resource_api.delete_domain(domain_id) - # verify DomainNotFound raised - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain_id) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_project_rename_invalidates_get_project_by_name_cache(self): - domain = unit.new_domain_ref() - project = unit.new_project_ref(domain_id=domain['id']) - project_id = project['id'] - project_name = project['name'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project_by_name(project_name, domain['id']) - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project_id, project) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - project_name, - domain['id']) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_cache_layer_project_crud(self): - domain = unit.new_domain_ref() - project = unit.new_project_ref(domain_id=domain['id']) - project_id = project['id'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - updated_project = copy.deepcopy(project) - updated_project['name'] = uuid.uuid4().hex - # Update project, bypassing resource manager - self.resource_api.driver.update_project(project_id, - updated_project) - # Verify get_project still returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify get_project now returns the new project - self.assertDictContainsSubset( - updated_project, - self.resource_api.get_project(project_id)) - # Update project using the resource_api manager back to original - self.resource_api.update_project(project['id'], project) - # Verify get_project returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Delete project bypassing resource - self.resource_api.driver.delete_project(project_id) - # Verify get_project still returns the project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify ProjectNotFound now raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - # recreate project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - # delete project - self.resource_api.delete_project(project_id) - # Verify ProjectNotFound is raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - - @unit.skip_if_no_multiple_domains_support - def test_get_default_domain_by_name(self): - domain_name = 'default' - - domain = unit.new_domain_ref(name=domain_name) - self.resource_api.create_domain(domain['id'], domain) - - domain_ref = self.resource_api.get_domain_by_name(domain_name) - self.assertEqual(domain, domain_ref) - - def test_get_not_default_domain_by_name(self): - domain_name = 'foo' - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - def test_project_update_and_project_get_return_same_response(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - - self.resource_api.create_project(project['id'], project) - - updated_project = {'enabled': False} - updated_project_ref = self.resource_api.update_project( - project['id'], updated_project) - - # SQL backend adds 'extra' field - updated_project_ref.pop('extra', None) - - self.assertIs(False, updated_project_ref['enabled']) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(updated_project_ref, project_ref) - - -class ResourceDriverTests(object): - """Tests for the resource driver. - - Subclasses must set self.driver to the driver instance. - - """ - - def test_create_project(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.driver.create_project(project_id, project) - - def test_create_project_all_defined_properties(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True, - 'parent_id': uuid.uuid4().hex, - 'is_domain': True, - } - self.driver.create_project(project_id, project) - - def test_create_project_null_domain(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': None, - } - self.driver.create_project(project_id, project) - - def test_create_project_same_name_same_domain_conflict(self): - name = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - - project_id = uuid.uuid4().hex - project = { - 'name': name, - 'id': project_id, - 'domain_id': domain_id, - } - self.driver.create_project(project_id, project) - - project_id = uuid.uuid4().hex - project = { - 'name': name, - 'id': project_id, - 'domain_id': domain_id, - } - self.assertRaises(exception.Conflict, self.driver.create_project, - project_id, project) - - def test_create_project_same_id_conflict(self): - project_id = uuid.uuid4().hex - - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.driver.create_project(project_id, project) - - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.assertRaises(exception.Conflict, self.driver.create_project, - project_id, project) -- cgit 1.2.3-korg