diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/resource | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/resource')
8 files changed, 0 insertions, 2495 deletions
diff --git a/keystone-moon/keystone/tests/unit/resource/__init__.py b/keystone-moon/keystone/tests/unit/resource/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/resource/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py deleted file mode 100644 index 79ad3df2..00000000 --- a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.resource.backends import sql -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database -from keystone.tests.unit.resource import test_backends - - -class TestSqlResourceDriver(unit.BaseTestCase, - test_backends.ResourceDriverTests): - def setUp(self): - super(TestSqlResourceDriver, self).setUp() - self.useFixture(database.Database()) - self.driver = sql.Resource() diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py deleted file mode 100644 index b4c5f262..00000000 --- a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -from keystone.common import sql -from keystone.resource.config_backends import sql as config_sql -from keystone.tests import unit -from keystone.tests.unit.backend import core_sql -from keystone.tests.unit.ksfixtures import database -from keystone.tests.unit.resource import test_core - - -class SqlDomainConfigModels(core_sql.BaseBackendSqlModels): - - def test_whitelisted_model(self): - cols = (('domain_id', sql.String, 64), - ('group', sql.String, 255), - ('option', sql.String, 255), - ('value', sql.JsonBlob, None)) - self.assertExpectedSchema('whitelisted_config', cols) - - def test_sensitive_model(self): - cols = (('domain_id', sql.String, 64), - ('group', sql.String, 255), - ('option', sql.String, 255), - ('value', sql.JsonBlob, None)) - self.assertExpectedSchema('sensitive_config', cols) - - -class SqlDomainConfigDriver(unit.BaseTestCase, - test_core.DomainConfigDriverTests): - def setUp(self): - super(SqlDomainConfigDriver, self).setUp() - self.useFixture(database.Database()) - self.driver = config_sql.DomainConfig() - - -class SqlDomainConfig(core_sql.BaseBackendSqlTests, - test_core.DomainConfigTests): - def setUp(self): - super(SqlDomainConfig, self).setUp() - # test_core.DomainConfigTests is effectively a mixin class, so make - # sure we call its setup - test_core.DomainConfigTests.setUp(self) diff --git a/keystone-moon/keystone/tests/unit/resource/test_backends.py b/keystone-moon/keystone/tests/unit/resource/test_backends.py deleted file mode 100644 index eed4c6ba..00000000 --- a/keystone-moon/keystone/tests/unit/resource/test_backends.py +++ /dev/null @@ -1,1669 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from oslo_config import cfg -from six.moves import range -from testtools import matchers - -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import utils as test_utils - - -CONF = cfg.CONF - - -class ResourceTests(object): - - domain_count = len(default_fixtures.DOMAINS) - - def test_get_project(self): - tenant_ref = self.resource_api.get_project(self.tenant_bar['id']) - self.assertDictEqual(self.tenant_bar, tenant_ref) - - def test_get_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - uuid.uuid4().hex) - - def test_get_project_by_name(self): - tenant_ref = self.resource_api.get_project_by_name( - self.tenant_bar['name'], - CONF.identity.default_domain_id) - self.assertDictEqual(self.tenant_bar, tenant_ref) - - @unit.skip_if_no_multiple_domains_support - def test_get_project_by_name_for_project_acting_as_a_domain(self): - """Tests get_project_by_name works when the domain_id is None.""" - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=False) - project = self.resource_api.create_project(project['id'], project) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - project['name'], - None) - - # Test that querying with domain_id as None will find the project - # acting as a domain, even if it's name is the same as the regular - # project above. - project2 = unit.new_project_ref(is_domain=True, - name=project['name']) - project2 = self.resource_api.create_project(project2['id'], project2) - - project_ref = self.resource_api.get_project_by_name( - project2['name'], None) - - self.assertEqual(project2, project_ref) - - def test_get_project_by_name_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - def test_create_duplicate_project_id_fails(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project_id, project) - project['name'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - project_id, - project) - - def test_create_duplicate_project_name_fails(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project_id, project) - project['id'] = 'fake2' - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - project['id'], - project) - - def test_create_duplicate_project_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2 = unit.new_project_ref(name=project1['name'], - domain_id=new_domain['id']) - self.resource_api.create_project(project1['id'], project1) - self.resource_api.create_project(project2['id'], project2) - - def test_move_project_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - project['domain_id'] = domain2['id'] - # Update the project asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.resource_api.update_project(project['id'], project) - self.assertTrue(mock_dep.called) - - updated_project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(domain2['id'], updated_project_ref['domain_id']) - - def test_move_project_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a project in domain1 - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - # Now create a project in domain2 with a potentially clashing - # name - which should work since we have domain separation - project2 = unit.new_project_ref(name=project1['name'], - domain_id=domain2['id']) - self.resource_api.create_project(project2['id'], project2) - # Now try and move project1 into the 2nd domain - which should - # fail since the names clash - project1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project1['id'], - project1) - - @unit.skip_if_no_multiple_domains_support - def test_move_project_with_children_between_domains_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - child_project = unit.new_project_ref(domain_id=domain1['id'], - parent_id=project['id']) - self.resource_api.create_project(child_project['id'], child_project) - project['domain_id'] = domain2['id'] - - # Update is not allowed, since updating the whole subtree would be - # necessary - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - @unit.skip_if_no_multiple_domains_support - def test_move_project_not_root_between_domains_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - project = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project['id'], project) - child_project = unit.new_project_ref(domain_id=domain1['id'], - parent_id=project['id']) - self.resource_api.create_project(child_project['id'], child_project) - child_project['domain_id'] = domain2['id'] - - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - child_project['id'], - child_project) - - @unit.skip_if_no_multiple_domains_support - def test_move_root_project_between_domains_succeeds(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - root_project = unit.new_project_ref(domain_id=domain1['id']) - root_project = self.resource_api.create_project(root_project['id'], - root_project) - - root_project['domain_id'] = domain2['id'] - self.resource_api.update_project(root_project['id'], root_project) - project_from_db = self.resource_api.get_project(root_project['id']) - - self.assertEqual(domain2['id'], project_from_db['domain_id']) - - @unit.skip_if_no_multiple_domains_support - def test_update_domain_id_project_is_domain_fails(self): - other_domain = unit.new_domain_ref() - self.resource_api.create_domain(other_domain['id'], other_domain) - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - project['domain_id'] = other_domain['id'] - - # Update of domain_id of projects acting as domains is not allowed - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_rename_duplicate_project_name_fails(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project1['id'], project1) - self.resource_api.create_project(project2['id'], project2) - project2['name'] = project1['name'] - self.assertRaises(exception.Error, - self.resource_api.update_project, - project2['id'], - project2) - - def test_update_project_id_does_nothing(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project_id = project['id'] - self.resource_api.create_project(project['id'], project) - project['id'] = 'fake2' - self.resource_api.update_project(project_id, project) - project_ref = self.resource_api.get_project(project_id) - self.assertEqual(project_id, project_ref['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - 'fake2') - - def test_delete_domain_with_user_group_project_links(self): - # TODO(chungg):add test case once expected behaviour defined - pass - - def test_update_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.update_project, - uuid.uuid4().hex, - dict()) - - def test_delete_project_returns_not_found(self): - self.assertRaises(exception.ProjectNotFound, - self.resource_api.delete_project, - uuid.uuid4().hex) - - def test_create_update_delete_unicode_project(self): - unicode_project_name = u'name \u540d\u5b57' - project = unit.new_project_ref( - name=unicode_project_name, - domain_id=CONF.identity.default_domain_id) - project = self.resource_api.create_project(project['id'], project) - self.resource_api.update_project(project['id'], project) - self.resource_api.delete_project(project['id']) - - def test_create_project_with_no_enabled_field(self): - ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id) - del ref['enabled'] - self.resource_api.create_project(ref['id'], ref) - - project = self.resource_api.get_project(ref['id']) - self.assertIs(project['enabled'], True) - - def test_create_project_long_name_fails(self): - project = unit.new_project_ref( - name='a' * 65, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_blank_name_fails(self): - project = unit.new_project_ref( - name='', domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_invalid_name_fails(self): - project = unit.new_project_ref( - name=None, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - project = unit.new_project_ref( - name=123, domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_update_project_blank_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = '' - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_long_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = 'a' * 65 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_invalid_name_fails(self): - project = unit.new_project_ref( - name='fake1', domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project['name'] = None - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - project['name'] = 123 - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_update_project_invalid_enabled_type_string(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - # Strings are not valid boolean values - project['enabled'] = "false" - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - project['id'], - project) - - def test_create_project_invalid_enabled_type_string(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - # invalid string value - enabled="true") - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], - project) - - def test_create_project_invalid_domain_id(self): - project = unit.new_project_ref(domain_id=uuid.uuid4().hex) - self.assertRaises(exception.DomainNotFound, - self.resource_api.create_project, - project['id'], - project) - - def test_list_domains(self): - domain1 = unit.new_domain_ref() - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - self.resource_api.create_domain(domain2['id'], domain2) - domains = self.resource_api.list_domains() - self.assertEqual(3, len(domains)) - domain_ids = [] - for domain in domains: - domain_ids.append(domain.get('id')) - self.assertIn(CONF.identity.default_domain_id, domain_ids) - self.assertIn(domain1['id'], domain_ids) - self.assertIn(domain2['id'], domain_ids) - - def test_list_projects(self): - project_refs = self.resource_api.list_projects() - project_count = len(default_fixtures.TENANTS) + self.domain_count - self.assertEqual(project_count, len(project_refs)) - for project in default_fixtures.TENANTS: - self.assertIn(project, project_refs) - - def test_list_projects_with_multiple_filters(self): - # Create a project - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project = self.resource_api.create_project(project['id'], project) - - # Build driver hints with the project's name and inexistent description - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', uuid.uuid4().hex) - - # Retrieve projects based on hints and check an empty list is returned - projects = self.resource_api.list_projects(hints) - self.assertEqual([], projects) - - # Build correct driver hints - hints = driver_hints.Hints() - hints.add_filter('name', project['name']) - hints.add_filter('description', project['description']) - - # Retrieve projects based on hints - projects = self.resource_api.list_projects(hints) - - # Check that the returned list contains only the first project - self.assertEqual(1, len(projects)) - self.assertEqual(project, projects[0]) - - def test_list_projects_for_domain(self): - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - CONF.identity.default_domain_id)]) - # Only the projects from the default fixtures are expected, since - # filtering by domain does not include any project that acts as a - # domain. - self.assertThat( - project_ids, matchers.HasLength(len(default_fixtures.TENANTS))) - self.assertIn(self.tenant_bar['id'], project_ids) - self.assertIn(self.tenant_baz['id'], project_ids) - self.assertIn(self.tenant_mtu['id'], project_ids) - self.assertIn(self.tenant_service['id'], project_ids) - - @unit.skip_if_no_multiple_domains_support - def test_list_projects_acting_as_domain(self): - initial_domains = self.resource_api.list_domains() - - # Creating 5 projects that act as domains - new_projects_acting_as_domains = [] - for i in range(5): - project = unit.new_project_ref(is_domain=True) - project = self.resource_api.create_project(project['id'], project) - new_projects_acting_as_domains.append(project) - - # Creating a few regular project to ensure it doesn't mess with the - # ones that act as domains - self._create_projects_hierarchy(hierarchy_size=2) - - projects = self.resource_api.list_projects_acting_as_domain() - expected_number_projects = ( - len(initial_domains) + len(new_projects_acting_as_domains)) - self.assertEqual(expected_number_projects, len(projects)) - for project in new_projects_acting_as_domains: - self.assertIn(project, projects) - for domain in initial_domains: - self.assertIn(domain['id'], [p['id'] for p in projects]) - - @unit.skip_if_no_multiple_domains_support - def test_list_projects_for_alternate_domain(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - project2 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project2['id'], project2) - project_ids = ([x['id'] for x in - self.resource_api.list_projects_in_domain( - domain1['id'])]) - self.assertEqual(2, len(project_ids)) - self.assertIn(project1['id'], project_ids) - self.assertIn(project2['id'], project_ids) - - def _create_projects_hierarchy(self, hierarchy_size=2, - domain_id=None, - is_domain=False, - parent_project_id=None): - """Creates a project hierarchy with specified size. - - :param hierarchy_size: the desired hierarchy size, default is 2 - - a project with one child. - :param domain_id: domain where the projects hierarchy will be created. - :param is_domain: if the hierarchy will have the is_domain flag active - or not. - :param parent_project_id: if the intention is to create a - sub-hierarchy, sets the sub-hierarchy root. Defaults to creating - a new hierarchy, i.e. a new root project. - - :returns projects: a list of the projects in the created hierarchy. - - """ - if domain_id is None: - domain_id = CONF.identity.default_domain_id - if parent_project_id: - project = unit.new_project_ref(parent_id=parent_project_id, - domain_id=domain_id, - is_domain=is_domain) - else: - project = unit.new_project_ref(domain_id=domain_id, - is_domain=is_domain) - project_id = project['id'] - project = self.resource_api.create_project(project_id, project) - - projects = [project] - for i in range(1, hierarchy_size): - new_project = unit.new_project_ref(parent_id=project_id, - domain_id=domain_id) - - self.resource_api.create_project(new_project['id'], new_project) - projects.append(new_project) - project_id = new_project['id'] - - return projects - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_with_project_api(self): - project = unit.new_project_ref(is_domain=True) - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - self.resource_api.get_domain(ref['id']) - - @unit.skip_if_no_multiple_domains_support - def test_project_as_a_domain_uniqueness_constraints(self): - """Tests project uniqueness for those acting as domains. - - If it is a project acting as a domain, we can't have two or more with - the same name. - - """ - # Create two projects acting as a domain - project = unit.new_project_ref(is_domain=True) - project = self.resource_api.create_project(project['id'], project) - project2 = unit.new_project_ref(is_domain=True) - project2 = self.resource_api.create_project(project2['id'], project2) - - # All projects acting as domains have a null domain_id, so should not - # be able to create another with the same name but a different - # project ID. - new_project = project.copy() - new_project['id'] = uuid.uuid4().hex - - self.assertRaises(exception.Conflict, - self.resource_api.create_project, - new_project['id'], - new_project) - - # We also should not be able to update one to have a name clash - project2['name'] = project['name'] - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project2['id'], - project2) - - # But updating it to a unique name is OK - project2['name'] = uuid.uuid4().hex - self.resource_api.update_project(project2['id'], project2) - - # Finally, it should be OK to create a project with same name as one of - # these acting as a domain, as long as it is a regular project - project3 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, name=project2['name']) - self.resource_api.create_project(project3['id'], project3) - # In fact, it should be OK to create such a project in the domain which - # has the matching name. - # TODO(henry-nash): Once we fully support projects acting as a domain, - # add a test here to create a sub-project with a name that matches its - # project acting as a domain - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for sub projects acting as domains support') - def test_is_domain_sub_project_has_parent_domain_id(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=True) - self.resource_api.create_project(project['id'], project) - - sub_project = unit.new_project_ref(domain_id=project['id'], - parent_id=project['id'], - is_domain=True) - - ref = self.resource_api.create_project(sub_project['id'], sub_project) - self.assertTrue(ref['is_domain']) - self.assertEqual(project['id'], ref['parent_id']) - self.assertEqual(project['id'], ref['domain_id']) - - @unit.skip_if_no_multiple_domains_support - def test_delete_domain_with_project_api(self): - project = unit.new_project_ref(domain_id=None, - is_domain=True) - self.resource_api.create_project(project['id'], project) - - # Check that a corresponding domain was created - self.resource_api.get_domain(project['id']) - - # Try to delete the enabled project that acts as a domain - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - project['id']) - - # Disable the project - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - # Successfully delete the project - self.resource_api.delete_project(project['id']) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - project['id']) - - @unit.skip_if_no_multiple_domains_support - def test_create_subproject_acting_as_domain_fails(self): - root_project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(root_project['id'], root_project) - - sub_project = unit.new_project_ref(is_domain=True, - parent_id=root_project['id']) - - # Creation of sub projects acting as domains is not allowed yet - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - sub_project['id'], sub_project) - - @unit.skip_if_no_multiple_domains_support - def test_create_domain_under_regular_project_hierarchy_fails(self): - # Projects acting as domains can't have a regular project as parent - projects_hierarchy = self._create_projects_hierarchy() - parent = projects_hierarchy[1] - project = unit.new_project_ref(domain_id=parent['id'], - parent_id=parent['id'], - is_domain=True) - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], project) - - @unit.skip_if_no_multiple_domains_support - @test_utils.wip('waiting for sub projects acting as domains support') - def test_create_project_under_domain_hierarchy(self): - projects_hierarchy = self._create_projects_hierarchy(is_domain=True) - parent = projects_hierarchy[1] - project = unit.new_project_ref(domain_id=parent['id'], - parent_id=parent['id'], - is_domain=False) - - ref = self.resource_api.create_project(project['id'], project) - self.assertFalse(ref['is_domain']) - self.assertEqual(parent['id'], ref['parent_id']) - self.assertEqual(parent['id'], ref['domain_id']) - - def test_create_project_without_is_domain_flag(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['is_domain'] - ref = self.resource_api.create_project(project['id'], project) - # The is_domain flag should be False by default - self.assertFalse(ref['is_domain']) - - @unit.skip_if_no_multiple_domains_support - def test_create_project_passing_is_domain_flag_true(self): - project = unit.new_project_ref(is_domain=True) - - ref = self.resource_api.create_project(project['id'], project) - self.assertTrue(ref['is_domain']) - - def test_create_project_passing_is_domain_flag_false(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, is_domain=False) - - ref = self.resource_api.create_project(project['id'], project) - self.assertIs(False, ref['is_domain']) - - @test_utils.wip('waiting for support for parent_id to imply domain_id') - def test_create_project_with_parent_id_and_without_domain_id(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now create a child by just naming the parent_id - sub_project = unit.new_project_ref(parent_id=project['id']) - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The domain_id should be set to the parent domain_id - self.assertEqual(project['domain_id'], ref['domain_id']) - - def test_create_project_with_domain_id_and_without_parent_id(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now create a child by just naming the domain_id - sub_project = unit.new_project_ref(domain_id=project['id']) - ref = self.resource_api.create_project(sub_project['id'], sub_project) - - # The parent_id and domain_id should be set to the id of the project - # acting as a domain - self.assertEqual(project['id'], ref['parent_id']) - self.assertEqual(project['id'], ref['domain_id']) - - def test_create_project_with_domain_id_mismatch_to_parent_domain(self): - # First create a domain - project = unit.new_project_ref(is_domain=True) - self.resource_api.create_project(project['id'], project) - # Now try to create a child with the above as its parent, but - # specifying a different domain. - sub_project = unit.new_project_ref( - parent_id=project['id'], domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - sub_project['id'], sub_project) - - def test_check_leaf_projects(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.assertFalse(self.resource_api.is_leaf_project( - root_project['id'])) - self.assertTrue(self.resource_api.is_leaf_project( - leaf_project['id'])) - - # Delete leaf_project - self.resource_api.delete_project(leaf_project['id']) - - # Now, root_project should be leaf - self.assertTrue(self.resource_api.is_leaf_project( - root_project['id'])) - - def test_list_projects_in_subtree(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project2['id']) - self.resource_api.create_project(project4['id'], project4) - - subtree = self.resource_api.list_projects_in_subtree(project1['id']) - self.assertEqual(3, len(subtree)) - self.assertIn(project2, subtree) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project2['id']) - self.assertEqual(2, len(subtree)) - self.assertIn(project3, subtree) - self.assertIn(project4, subtree) - - subtree = self.resource_api.list_projects_in_subtree(project3['id']) - self.assertEqual(0, len(subtree)) - - def test_list_projects_in_subtree_with_circular_reference(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project1 = self.resource_api.create_project(project1['id'], project1) - - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project1['id']) - self.resource_api.create_project(project2['id'], project2) - - project1['parent_id'] = project2['id'] # Adds cyclic reference - - # NOTE(dstanek): The manager does not allow parent_id to be updated. - # Instead will directly use the driver to create the cyclic - # reference. - self.resource_api.driver.update_project(project1['id'], project1) - - subtree = self.resource_api.list_projects_in_subtree(project1['id']) - - # NOTE(dstanek): If a cyclic reference is detected the code bails - # and returns None instead of falling into the infinite - # recursion trap. - self.assertIsNone(subtree) - - def test_list_projects_in_subtree_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_projects_in_subtree, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_projects_in_subtree, - uuid.uuid4().hex) - - def test_list_project_parents(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - project4 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project2['id']) - self.resource_api.create_project(project4['id'], project4) - - parents1 = self.resource_api.list_project_parents(project3['id']) - self.assertEqual(3, len(parents1)) - self.assertIn(project1, parents1) - self.assertIn(project2, parents1) - - parents2 = self.resource_api.list_project_parents(project4['id']) - self.assertEqual(parents1, parents2) - - parents = self.resource_api.list_project_parents(project1['id']) - # It has the default domain as parent - self.assertEqual(1, len(parents)) - - def test_update_project_enabled_cascade(self): - """Test update_project_cascade - - Ensures the enabled attribute is correctly updated across - a simple 3-level projects hierarchy. - """ - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - parent = projects_hierarchy[0] - - # Disable in parent project disables the whole subtree - parent['enabled'] = False - # Store the ref from backend in another variable so we don't bother - # to remove other attributes that were not originally provided and - # were set in the manager, like parent_id and domain_id. - parent_ref = self.resource_api.update_project(parent['id'], - parent, - cascade=True) - - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertEqual(2, len(subtree)) - self.assertFalse(parent_ref['enabled']) - self.assertFalse(subtree[0]['enabled']) - self.assertFalse(subtree[1]['enabled']) - - # Enable parent project enables the whole subtree - parent['enabled'] = True - parent_ref = self.resource_api.update_project(parent['id'], - parent, - cascade=True) - - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertEqual(2, len(subtree)) - self.assertTrue(parent_ref['enabled']) - self.assertTrue(subtree[0]['enabled']) - self.assertTrue(subtree[1]['enabled']) - - def test_cannot_enable_cascade_with_parent_disabled(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - grandparent = projects_hierarchy[0] - parent = projects_hierarchy[1] - - grandparent['enabled'] = False - self.resource_api.update_project(grandparent['id'], - grandparent, - cascade=True) - subtree = self.resource_api.list_projects_in_subtree(parent['id']) - self.assertFalse(subtree[0]['enabled']) - - parent['enabled'] = True - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - parent['id'], - parent, - cascade=True) - - def test_update_cascade_only_accepts_enabled(self): - # Update cascade does not accept any other attribute but 'enabled' - new_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(new_project['id'], new_project) - - new_project['name'] = 'project1' - self.assertRaises(exception.ValidationError, - self.resource_api.update_project, - new_project['id'], - new_project, - cascade=True) - - def test_list_project_parents_invalid_project_id(self): - self.assertRaises(exception.ValidationError, - self.resource_api.list_project_parents, - None) - - self.assertRaises(exception.ProjectNotFound, - self.resource_api.list_project_parents, - uuid.uuid4().hex) - - def test_create_project_doesnt_modify_passed_in_dict(self): - new_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - original_project = new_project.copy() - self.resource_api.create_project(new_project['id'], new_project) - self.assertDictEqual(original_project, new_project) - - def test_update_project_enable(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(project['enabled'], project_ref['enabled']) - - # If not present, enabled field should not be updated - del project['enabled'] - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertFalse(project_ref['enabled']) - - project['enabled'] = True - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(project['enabled'], project_ref['enabled']) - - del project['enabled'] - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertTrue(project_ref['enabled']) - - def test_create_invalid_domain_fails(self): - new_group = unit.new_group_ref(domain_id="doesnotexist") - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_group, - new_group) - new_user = unit.new_user_ref(domain_id="doesnotexist") - self.assertRaises(exception.DomainNotFound, - self.identity_api.create_user, - new_user) - - @unit.skip_if_no_multiple_domains_support - def test_project_crud(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - project = unit.new_project_ref(domain_id=domain['id']) - self.resource_api.create_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - project_ref = self.resource_api.get_project(project['id']) - self.assertDictContainsSubset(project, project_ref) - - self.resource_api.delete_project(project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_domain_delete_hierarchy(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - - # Creating a root and a leaf project inside the domain - projects_hierarchy = self._create_projects_hierarchy( - domain_id=domain['id']) - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[0] - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - # Make sure the root project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - # Make sure the leaf project no longer exists - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - def test_delete_projects_from_ids(self): - """Tests the resource backend call delete_projects_from_ids. - - Tests the normal flow of the delete_projects_from_ids backend call, - that ensures no project on the list exists after it is succesfully - called. - """ - project1_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - project2_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - projects = (project1_ref, project2_ref) - for project in projects: - self.resource_api.create_project(project['id'], project) - - # Setting up the ID's list - projects_ids = [p['id'] for p in projects] - self.resource_api.driver.delete_projects_from_ids(projects_ids) - - # Ensuring projects no longer exist at backend level - for project_id in projects_ids: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.driver.get_project, - project_id) - - # Passing an empty list is silently ignored - self.resource_api.driver.delete_projects_from_ids([]) - - def test_delete_projects_from_ids_with_no_existing_project_id(self): - """Tests delete_projects_from_ids issues warning if not found. - - Tests the resource backend call delete_projects_from_ids passing a - non existing ID in project_ids, which is logged and ignored by - the backend. - """ - project_ref = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project_ref['id'], project_ref) - - # Setting up the ID's list - projects_ids = (project_ref['id'], uuid.uuid4().hex) - with mock.patch('keystone.resource.backends.sql.LOG') as mock_log: - self.resource_api.delete_projects_from_ids(projects_ids) - self.assertTrue(mock_log.warning.called) - # The existing project was deleted. - self.assertRaises(exception.ProjectNotFound, - self.resource_api.driver.get_project, - project_ref['id']) - - # Even if we only have one project, and it does not exist, it returns - # no error. - self.resource_api.driver.delete_projects_from_ids([uuid.uuid4().hex]) - - def test_delete_project_cascade(self): - # create a hierarchy with 3 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - root_project = projects_hierarchy[0] - project1 = projects_hierarchy[1] - project2 = projects_hierarchy[2] - - # Disabling all projects before attempting to delete - for project in (project2, project1, root_project): - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - self.resource_api.delete_project(root_project['id'], cascade=True) - - for project in projects_hierarchy: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_delete_large_project_cascade(self): - """Try delete a large project with cascade true. - - Tree we will create:: - - +-p1-+ - | | - p5 p2 - | | - p6 +-p3-+ - | | - p7 p4 - """ - # create a hierarchy with 4 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=4) - p1 = projects_hierarchy[0] - # Add the left branch to the hierarchy (p5, p6) - self._create_projects_hierarchy(hierarchy_size=2, - parent_project_id=p1['id']) - # Add p7 to the hierarchy - p3_id = projects_hierarchy[2]['id'] - self._create_projects_hierarchy(hierarchy_size=1, - parent_project_id=p3_id) - # Reverse the hierarchy to disable the leaf first - prjs_hierarchy = ([p1] + self.resource_api.list_projects_in_subtree( - p1['id']))[::-1] - - # Disabling all projects before attempting to delete - for project in prjs_hierarchy: - project['enabled'] = False - self.resource_api.update_project(project['id'], project) - - self.resource_api.delete_project(p1['id'], cascade=True) - for project in prjs_hierarchy: - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project['id']) - - def test_cannot_delete_project_cascade_with_enabled_child(self): - # create a hierarchy with 3 levels - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - root_project = projects_hierarchy[0] - project1 = projects_hierarchy[1] - project2 = projects_hierarchy[2] - - project2['enabled'] = False - self.resource_api.update_project(project2['id'], project2) - - # Cannot cascade delete root_project, since project1 is enabled - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project['id'], - cascade=True) - - # Ensuring no project was deleted, not even project2 - self.resource_api.get_project(root_project['id']) - self.resource_api.get_project(project1['id']) - self.resource_api.get_project(project2['id']) - - def test_hierarchical_projects_crud(self): - # create a hierarchy with just a root project (which is a leaf as well) - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1) - root_project1 = projects_hierarchy[0] - - # create a hierarchy with one root project and one leaf project - projects_hierarchy = self._create_projects_hierarchy() - root_project2 = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # update description from leaf_project - leaf_project['description'] = 'new description' - self.resource_api.update_project(leaf_project['id'], leaf_project) - proj_ref = self.resource_api.get_project(leaf_project['id']) - self.assertDictEqual(leaf_project, proj_ref) - - # update the parent_id is not allowed - leaf_project['parent_id'] = root_project1['id'] - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - # delete root_project1 - self.resource_api.delete_project(root_project1['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project1['id']) - - # delete root_project2 is not allowed since it is not a leaf project - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project2['id']) - - def test_create_project_with_invalid_parent(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, parent_id='fake') - self.assertRaises(exception.ProjectNotFound, - self.resource_api.create_project, - project['id'], - project) - - @unit.skip_if_no_multiple_domains_support - def test_create_leaf_project_with_different_domain(self): - root_project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(root_project['id'], root_project) - - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - leaf_project = unit.new_project_ref(domain_id=domain['id'], - parent_id=root_project['id']) - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - leaf_project['id'], - leaf_project) - - def test_delete_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - self.resource_api.delete_project(leaf_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - leaf_project['id']) - - self.resource_api.delete_project(root_project['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - root_project['id']) - - def test_delete_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_project, - root_project['id']) - - def test_update_project_parent(self): - projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) - project1 = projects_hierarchy[0] - project2 = projects_hierarchy[1] - project3 = projects_hierarchy[2] - - # project2 is the parent from project3 - self.assertEqual(project3.get('parent_id'), project2['id']) - - # try to update project3 parent to parent1 - project3['parent_id'] = project1['id'] - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - project3['id'], - project3) - - def test_create_project_under_disabled_one(self): - project1 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, enabled=False) - self.resource_api.create_project(project1['id'], project1) - - project2 = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=project1['id']) - - # It's not possible to create a project under a disabled one in the - # hierarchy - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project2['id'], - project2) - - def test_disable_hierarchical_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - leaf_project = projects_hierarchy[1] - - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - - project_ref = self.resource_api.get_project(leaf_project['id']) - self.assertEqual(leaf_project['enabled'], project_ref['enabled']) - - def test_disable_hierarchical_not_leaf_project(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - - root_project['enabled'] = False - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - root_project['id'], - root_project) - - def test_enable_project_with_disabled_parent(self): - projects_hierarchy = self._create_projects_hierarchy() - root_project = projects_hierarchy[0] - leaf_project = projects_hierarchy[1] - - # Disable leaf and root - leaf_project['enabled'] = False - self.resource_api.update_project(leaf_project['id'], leaf_project) - root_project['enabled'] = False - self.resource_api.update_project(root_project['id'], root_project) - - # Try to enable the leaf project, it's not possible since it has - # a disabled parent - leaf_project['enabled'] = True - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.update_project, - leaf_project['id'], - leaf_project) - - def _get_hierarchy_depth(self, project_id): - return len(self.resource_api.list_project_parents(project_id)) + 1 - - def test_check_hierarchy_depth(self): - # Should be allowed to have a hierarchy of the max depth specified - # in the config option plus one (to allow for the additional project - # acting as a domain after an upgrade) - projects_hierarchy = self._create_projects_hierarchy( - CONF.max_project_tree_depth) - leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1] - - depth = self._get_hierarchy_depth(leaf_project['id']) - self.assertEqual(CONF.max_project_tree_depth + 1, depth) - - # Creating another project in the hierarchy shouldn't be allowed - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id, - parent_id=leaf_project['id']) - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.create_project, - project['id'], - project) - - def test_project_update_missing_attrs_with_a_value(self): - # Creating a project with no description attribute. - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['description'] - project = self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = uuid.uuid4().hex - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project, project_ref) - - def test_project_update_missing_attrs_with_a_falsey_value(self): - # Creating a project with no description attribute. - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - del project['description'] - project = self.resource_api.create_project(project['id'], project) - - # Add a description attribute. - project['description'] = '' - self.resource_api.update_project(project['id'], project) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project, project_ref) - - def test_domain_crud(self): - domain = unit.new_domain_ref() - domain_ref = self.resource_api.create_domain(domain['id'], domain) - self.assertDictEqual(domain, domain_ref) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain, domain_ref) - - domain['name'] = uuid.uuid4().hex - domain_ref = self.resource_api.update_domain(domain['id'], domain) - self.assertDictEqual(domain, domain_ref) - domain_ref = self.resource_api.get_domain(domain['id']) - self.assertDictEqual(domain, domain_ref) - - # Ensure an 'enabled' domain cannot be deleted - self.assertRaises(exception.ForbiddenNotSecurity, - self.resource_api.delete_domain, - domain_id=domain['id']) - - # Disable the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - - # Delete the domain - self.resource_api.delete_domain(domain['id']) - - # Make sure the domain no longer exists - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - @unit.skip_if_no_multiple_domains_support - def test_domain_name_case_sensitivity(self): - # create a ref with a lowercase name - domain_name = 'test_domain' - ref = unit.new_domain_ref(name=domain_name) - - lower_case_domain = self.resource_api.create_domain(ref['id'], ref) - - # assign a new ID to the ref with the same name, but in uppercase - ref['id'] = uuid.uuid4().hex - ref['name'] = domain_name.upper() - upper_case_domain = self.resource_api.create_domain(ref['id'], ref) - - # We can get each domain by name - lower_case_domain_ref = self.resource_api.get_domain_by_name( - domain_name) - self.assertDictEqual(lower_case_domain, lower_case_domain_ref) - - upper_case_domain_ref = self.resource_api.get_domain_by_name( - domain_name.upper()) - self.assertDictEqual(upper_case_domain, upper_case_domain_ref) - - def test_project_attribute_update(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(project['id'], project) - - # pick a key known to be non-existent - key = 'description' - - def assert_key_equals(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertEqual(value, project_ref[key]) - project_ref = self.resource_api.get_project(project['id']) - self.assertEqual(value, project_ref[key]) - - def assert_get_key_is(value): - project_ref = self.resource_api.update_project( - project['id'], project) - self.assertIs(project_ref.get(key), value) - project_ref = self.resource_api.get_project(project['id']) - self.assertIs(project_ref.get(key), value) - - # add an attribute that doesn't exist, set it to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set an attribute with a falsey value to null - value = None - project[key] = value - assert_get_key_is(value) - - # do it again, in case updating from this situation is handled oddly - value = None - project[key] = value - assert_get_key_is(value) - - # set a possibly-null value to a falsey value - value = '' - project[key] = value - assert_key_equals(value) - - # set a falsey value to a truthy value - value = uuid.uuid4().hex - project[key] = value - assert_key_equals(value) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_domain_rename_invalidates_get_domain_by_name_cache(self): - domain = unit.new_domain_ref() - domain_id = domain['id'] - domain_name = domain['name'] - self.resource_api.create_domain(domain_id, domain) - domain_ref = self.resource_api.get_domain_by_name(domain_name) - domain_ref['name'] = uuid.uuid4().hex - self.resource_api.update_domain(domain_id, domain_ref) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - @unit.skip_if_cache_disabled('resource') - def test_cache_layer_domain_crud(self): - domain = unit.new_domain_ref() - domain_id = domain['id'] - # Create Domain - self.resource_api.create_domain(domain_id, domain) - project_domain_ref = self.resource_api.get_project(domain_id) - domain_ref = self.resource_api.get_domain(domain_id) - updated_project_domain_ref = copy.deepcopy(project_domain_ref) - updated_project_domain_ref['name'] = uuid.uuid4().hex - updated_domain_ref = copy.deepcopy(domain_ref) - updated_domain_ref['name'] = updated_project_domain_ref['name'] - # Update domain, bypassing resource api manager - self.resource_api.driver.update_project(domain_id, - updated_project_domain_ref) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain returns the updated domain - self.assertDictContainsSubset( - updated_domain_ref, self.resource_api.get_domain(domain_id)) - # Update the domain back to original ref, using the assignment api - # manager - self.resource_api.update_domain(domain_id, domain_ref) - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Make sure domain is 'disabled', bypass resource api manager - project_domain_ref_disabled = project_domain_ref.copy() - project_domain_ref_disabled['enabled'] = False - self.resource_api.driver.update_project(domain_id, - project_domain_ref_disabled) - self.resource_api.driver.update_project(domain_id, {'enabled': False}) - # Delete domain, bypassing resource api manager - self.resource_api.driver.delete_project(domain_id) - # Verify get_domain still returns the domain - self.assertDictContainsSubset( - domain_ref, self.resource_api.get_domain(domain_id)) - # Invalidate cache - self.resource_api.get_domain.invalidate(self.resource_api, - domain_id) - # Verify get_domain now raises DomainNotFound - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, domain_id) - # Recreate Domain - self.resource_api.create_domain(domain_id, domain) - self.resource_api.get_domain(domain_id) - # Make sure domain is 'disabled', bypass resource api manager - domain['enabled'] = False - self.resource_api.driver.update_project(domain_id, domain) - self.resource_api.driver.update_project(domain_id, {'enabled': False}) - # Delete domain - self.resource_api.delete_domain(domain_id) - # verify DomainNotFound raised - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain_id) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_project_rename_invalidates_get_project_by_name_cache(self): - domain = unit.new_domain_ref() - project = unit.new_project_ref(domain_id=domain['id']) - project_id = project['id'] - project_name = project['name'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project_by_name(project_name, domain['id']) - project['name'] = uuid.uuid4().hex - self.resource_api.update_project(project_id, project) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project_by_name, - project_name, - domain['id']) - - @unit.skip_if_cache_disabled('resource') - @unit.skip_if_no_multiple_domains_support - def test_cache_layer_project_crud(self): - domain = unit.new_domain_ref() - project = unit.new_project_ref(domain_id=domain['id']) - project_id = project['id'] - self.resource_api.create_domain(domain['id'], domain) - # Create a project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - updated_project = copy.deepcopy(project) - updated_project['name'] = uuid.uuid4().hex - # Update project, bypassing resource manager - self.resource_api.driver.update_project(project_id, - updated_project) - # Verify get_project still returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify get_project now returns the new project - self.assertDictContainsSubset( - updated_project, - self.resource_api.get_project(project_id)) - # Update project using the resource_api manager back to original - self.resource_api.update_project(project['id'], project) - # Verify get_project returns the original project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Delete project bypassing resource - self.resource_api.driver.delete_project(project_id) - # Verify get_project still returns the project_ref - self.assertDictContainsSubset( - project, self.resource_api.get_project(project_id)) - # Invalidate cache - self.resource_api.get_project.invalidate(self.resource_api, - project_id) - # Verify ProjectNotFound now raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - # recreate project - self.resource_api.create_project(project_id, project) - self.resource_api.get_project(project_id) - # delete project - self.resource_api.delete_project(project_id) - # Verify ProjectNotFound is raised - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - project_id) - - @unit.skip_if_no_multiple_domains_support - def test_get_default_domain_by_name(self): - domain_name = 'default' - - domain = unit.new_domain_ref(name=domain_name) - self.resource_api.create_domain(domain['id'], domain) - - domain_ref = self.resource_api.get_domain_by_name(domain_name) - self.assertEqual(domain, domain_ref) - - def test_get_not_default_domain_by_name(self): - domain_name = 'foo' - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain_by_name, - domain_name) - - def test_project_update_and_project_get_return_same_response(self): - project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - - self.resource_api.create_project(project['id'], project) - - updated_project = {'enabled': False} - updated_project_ref = self.resource_api.update_project( - project['id'], updated_project) - - # SQL backend adds 'extra' field - updated_project_ref.pop('extra', None) - - self.assertIs(False, updated_project_ref['enabled']) - - project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(updated_project_ref, project_ref) - - -class ResourceDriverTests(object): - """Tests for the resource driver. - - Subclasses must set self.driver to the driver instance. - - """ - - def test_create_project(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.driver.create_project(project_id, project) - - def test_create_project_all_defined_properties(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - 'description': uuid.uuid4().hex, - 'enabled': True, - 'parent_id': uuid.uuid4().hex, - 'is_domain': True, - } - self.driver.create_project(project_id, project) - - def test_create_project_null_domain(self): - project_id = uuid.uuid4().hex - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': None, - } - self.driver.create_project(project_id, project) - - def test_create_project_same_name_same_domain_conflict(self): - name = uuid.uuid4().hex - domain_id = uuid.uuid4().hex - - project_id = uuid.uuid4().hex - project = { - 'name': name, - 'id': project_id, - 'domain_id': domain_id, - } - self.driver.create_project(project_id, project) - - project_id = uuid.uuid4().hex - project = { - 'name': name, - 'id': project_id, - 'domain_id': domain_id, - } - self.assertRaises(exception.Conflict, self.driver.create_project, - project_id, project) - - def test_create_project_same_id_conflict(self): - project_id = uuid.uuid4().hex - - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.driver.create_project(project_id, project) - - project = { - 'name': uuid.uuid4().hex, - 'id': project_id, - 'domain_id': uuid.uuid4().hex, - } - self.assertRaises(exception.Conflict, self.driver.create_project, - project_id, project) diff --git a/keystone-moon/keystone/tests/unit/resource/test_controllers.py b/keystone-moon/keystone/tests/unit/resource/test_controllers.py deleted file mode 100644 index b8f247c8..00000000 --- a/keystone-moon/keystone/tests/unit/resource/test_controllers.py +++ /dev/null @@ -1,57 +0,0 @@ -# Copyright 2016 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslo_config import cfg - -from keystone import exception -from keystone.resource import controllers -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - -_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}} - - -class TenantTestCaseNoDefaultDomain(unit.TestCase): - - def setUp(self): - super(TenantTestCaseNoDefaultDomain, self).setUp() - self.useFixture(database.Database()) - self.load_backends() - self.tenant_controller = controllers.Tenant() - - def test_setup(self): - # Other tests in this class assume there's no default domain, so make - # sure the setUp worked as expected. - self.assertRaises( - exception.DomainNotFound, - self.resource_api.get_domain, CONF.identity.default_domain_id) - - def test_get_all_projects(self): - # When get_all_projects is done and there's no default domain, the - # result is an empty list. - res = self.tenant_controller.get_all_projects(_ADMIN_CONTEXT) - self.assertEqual([], res['tenants']) - - def test_create_project(self): - # When a project is created using the v2 controller and there's no - # default domain, it doesn't fail with can't find domain (a default - # domain is created) - tenant = {'name': uuid.uuid4().hex} - self.tenant_controller.create_project(_ADMIN_CONTEXT, tenant) - # If the above doesn't fail then this is successful. diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py deleted file mode 100644 index 2eb87e4c..00000000 --- a/keystone-moon/keystone/tests/unit/resource/test_core.py +++ /dev/null @@ -1,692 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -import mock -from testtools import matchers - -from oslo_config import cfg -from oslotest import mockpatch - -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - - -class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase): - - def setUp(self): - super(TestResourceManagerNoFixtures, self).setUp() - self.useFixture(database.Database(self.sql_driver_version_overrides)) - self.load_backends() - - def test_ensure_default_domain_exists(self): - # When there's no default domain, ensure_default_domain_exists creates - # it. - - # First make sure there's no default domain. - self.assertRaises( - exception.DomainNotFound, - self.resource_api.get_domain, CONF.identity.default_domain_id) - - self.resource_api.ensure_default_domain_exists() - default_domain = self.resource_api.get_domain( - CONF.identity.default_domain_id) - - expected_domain = { - 'id': CONF.identity.default_domain_id, - 'name': 'Default', - 'enabled': True, - 'description': 'Domain created automatically to support V2.0 ' - 'operations.', - } - self.assertEqual(expected_domain, default_domain) - - def test_ensure_default_domain_exists_already_exists(self): - # When there's already a default domain, ensure_default_domain_exists - # doesn't do anything. - - name = uuid.uuid4().hex - description = uuid.uuid4().hex - domain_attrs = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'description': description, - } - self.resource_api.create_domain(CONF.identity.default_domain_id, - domain_attrs) - - self.resource_api.ensure_default_domain_exists() - - default_domain = self.resource_api.get_domain( - CONF.identity.default_domain_id) - - expected_domain = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'enabled': True, - 'description': description, - } - - self.assertEqual(expected_domain, default_domain) - - def test_ensure_default_domain_exists_fails(self): - # When there's an unexpected exception creating domain it's passed on. - - self.useFixture(mockpatch.PatchObject( - self.resource_api, 'create_domain', - side_effect=exception.UnexpectedError)) - - self.assertRaises(exception.UnexpectedError, - self.resource_api.ensure_default_domain_exists) - - def test_update_project_name_conflict(self): - name = uuid.uuid4().hex - description = uuid.uuid4().hex - domain_attrs = { - 'id': CONF.identity.default_domain_id, - 'name': name, - 'description': description, - } - domain = self.resource_api.create_domain( - CONF.identity.default_domain_id, domain_attrs) - project1 = unit.new_project_ref(domain_id=domain['id'], - name=uuid.uuid4().hex) - self.resource_api.create_project(project1['id'], project1) - project2 = unit.new_project_ref(domain_id=domain['id'], - name=uuid.uuid4().hex) - project = self.resource_api.create_project(project2['id'], project2) - - self.assertRaises(exception.Conflict, - self.resource_api.update_project, - project['id'], {'name': project1['name']}) - - -class DomainConfigDriverTests(object): - - def _domain_config_crud(self, sensitive): - domain = uuid.uuid4().hex - group = uuid.uuid4().hex - option = uuid.uuid4().hex - value = uuid.uuid4().hex - self.driver.create_config_option( - domain, group, option, value, sensitive) - res = self.driver.get_config_option( - domain, group, option, sensitive) - config = {'group': group, 'option': option, 'value': value} - self.assertEqual(config, res) - - value = uuid.uuid4().hex - self.driver.update_config_option( - domain, group, option, value, sensitive) - res = self.driver.get_config_option( - domain, group, option, sensitive) - config = {'group': group, 'option': option, 'value': value} - self.assertEqual(config, res) - - self.driver.delete_config_options( - domain, group, option, sensitive) - self.assertRaises(exception.DomainConfigNotFound, - self.driver.get_config_option, - domain, group, option, sensitive) - # ...and silent if we try to delete it again - self.driver.delete_config_options( - domain, group, option, sensitive) - - def test_whitelisted_domain_config_crud(self): - self._domain_config_crud(sensitive=False) - - def test_sensitive_domain_config_crud(self): - self._domain_config_crud(sensitive=True) - - def _list_domain_config(self, sensitive): - """Test listing by combination of domain, group & option.""" - config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - # Put config2 in the same group as config1 - config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': 100} - domain = uuid.uuid4().hex - - for config in [config1, config2, config3]: - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive) - - # Try listing all items from a domain - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(3)) - for res_entry in res: - self.assertIn(res_entry, [config1, config2, config3]) - - # Try listing by domain and group - res = self.driver.list_config_options( - domain, group=config1['group'], sensitive=sensitive) - self.assertThat(res, matchers.HasLength(2)) - for res_entry in res: - self.assertIn(res_entry, [config1, config2]) - - # Try listing by domain, group and option - res = self.driver.list_config_options( - domain, group=config2['group'], - option=config2['option'], sensitive=sensitive) - self.assertThat(res, matchers.HasLength(1)) - self.assertEqual(config2, res[0]) - - def test_list_whitelisted_domain_config_crud(self): - self._list_domain_config(False) - - def test_list_sensitive_domain_config_crud(self): - self._list_domain_config(True) - - def _delete_domain_configs(self, sensitive): - """Test deleting by combination of domain, group & option.""" - config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - # Put config2 and config3 in the same group as config1 - config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - domain = uuid.uuid4().hex - - for config in [config1, config2, config3, config4]: - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive) - - # Try deleting by domain, group and option - res = self.driver.delete_config_options( - domain, group=config2['group'], - option=config2['option'], sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(3)) - for res_entry in res: - self.assertIn(res_entry, [config1, config3, config4]) - - # Try deleting by domain and group - res = self.driver.delete_config_options( - domain, group=config4['group'], sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(2)) - for res_entry in res: - self.assertIn(res_entry, [config1, config3]) - - # Try deleting all items from a domain - res = self.driver.delete_config_options( - domain, sensitive=sensitive) - res = self.driver.list_config_options( - domain, sensitive=sensitive) - self.assertThat(res, matchers.HasLength(0)) - - def test_delete_whitelisted_domain_configs(self): - self._delete_domain_configs(False) - - def test_delete_sensitive_domain_configs(self): - self._delete_domain_configs(True) - - def _create_domain_config_twice(self, sensitive): - """Test conflict error thrown if create the same option twice.""" - config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, - 'value': uuid.uuid4().hex} - domain = uuid.uuid4().hex - - self.driver.create_config_option( - domain, config['group'], config['option'], - config['value'], sensitive=sensitive) - self.assertRaises(exception.Conflict, - self.driver.create_config_option, - domain, config['group'], config['option'], - config['value'], sensitive=sensitive) - - def test_create_whitelisted_domain_config_twice(self): - self._create_domain_config_twice(False) - - def test_create_sensitive_domain_config_twice(self): - self._create_domain_config_twice(True) - - -class DomainConfigTests(object): - - def setUp(self): - self.domain = unit.new_domain_ref() - self.resource_api.create_domain(self.domain['id'], self.domain) - self.addCleanup(self.clean_up_domain) - - def clean_up_domain(self): - # NOTE(henry-nash): Deleting the domain will also delete any domain - # configs for this domain. - self.domain['enabled'] = False - self.resource_api.update_domain(self.domain['id'], self.domain) - self.resource_api.delete_domain(self.domain['id']) - del self.domain - - def test_create_domain_config_including_sensitive_option(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # password is sensitive, so check that the whitelisted portion and - # the sensitive piece have been stored in the appropriate locations. - res = self.domain_config_api.get_config(self.domain['id']) - config_whitelisted = copy.deepcopy(config) - config_whitelisted['ldap'].pop('password') - self.assertEqual(config_whitelisted, res) - res = self.domain_config_api.driver.get_config_option( - self.domain['id'], 'ldap', 'password', sensitive=True) - self.assertEqual(config['ldap']['password'], res['value']) - - # Finally, use the non-public API to get back the whole config - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - def test_get_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - res = self.domain_config_api.get_config(self.domain['id'], - group='identity') - config_partial = copy.deepcopy(config) - config_partial.pop('ldap') - self.assertEqual(config_partial, res) - res = self.domain_config_api.get_config( - self.domain['id'], group='ldap', option='user_tree_dn') - self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) - # ...but we should fail to get a sensitive option - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='ldap', option='password') - - def test_delete_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - self.domain_config_api.delete_config( - self.domain['id'], group='identity') - config_partial = copy.deepcopy(config) - config_partial.pop('identity') - config_partial['ldap'].pop('password') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(config_partial, res) - - self.domain_config_api.delete_config( - self.domain['id'], group='ldap', option='url') - config_partial = copy.deepcopy(config_partial) - config_partial['ldap'].pop('url') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(config_partial, res) - - def test_get_options_not_in_domain_config(self): - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id']) - config = {'ldap': {'url': uuid.uuid4().hex}} - - self.domain_config_api.create_config(self.domain['id'], config) - - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='identity') - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.get_config, self.domain['id'], - group='ldap', option='user_tree_dn') - - def test_get_sensitive_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual({}, res) - self.domain_config_api.create_config(self.domain['id'], config) - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - def test_update_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # Try updating a group - new_config = {'ldap': {'url': uuid.uuid4().hex, - 'user_filter': uuid.uuid4().hex}} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap') - expected_config = copy.deepcopy(config) - expected_config['ldap']['url'] = new_config['ldap']['url'] - expected_config['ldap']['user_filter'] = ( - new_config['ldap']['user_filter']) - expected_full_config = copy.deepcopy(expected_config) - expected_config['ldap'].pop('password') - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_config, res) - # The sensitive option should still exist - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(expected_full_config, res) - - # Try updating a single whitelisted option - self.domain_config_api.delete_config(self.domain['id']) - self.domain_config_api.create_config(self.domain['id'], config) - new_config = {'url': uuid.uuid4().hex} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap', option='url') - - # Make sure whitelisted and full config is updated - expected_whitelisted_config = copy.deepcopy(config) - expected_whitelisted_config['ldap']['url'] = new_config['url'] - expected_full_config = copy.deepcopy(expected_whitelisted_config) - expected_whitelisted_config['ldap'].pop('password') - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(expected_full_config, res) - - # Try updating a single sensitive option - self.domain_config_api.delete_config(self.domain['id']) - self.domain_config_api.create_config(self.domain['id'], config) - new_config = {'password': uuid.uuid4().hex} - res = self.domain_config_api.update_config( - self.domain['id'], new_config, group='ldap', option='password') - # The whitelisted config should not have changed... - expected_whitelisted_config = copy.deepcopy(config) - expected_full_config = copy.deepcopy(config) - expected_whitelisted_config['ldap'].pop('password') - self.assertEqual(expected_whitelisted_config, res) - res = self.domain_config_api.get_config(self.domain['id']) - self.assertEqual(expected_whitelisted_config, res) - expected_full_config['ldap']['password'] = new_config['password'] - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - # ...but the sensitive piece should have. - self.assertEqual(expected_full_config, res) - - def test_update_invalid_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - # An extra group, when specifying one group should fail - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='ldap') - # An extra option, when specifying one option should fail - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config['ldap'], - group='ldap', option='url') - - # Now try the right number of groups/options, but just not - # ones that are in the config provided - config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='identity') - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config['ldap'], group='ldap', - option='url') - - # Now some valid groups/options, but just not ones that are in the - # existing config - config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.update_config, - self.domain['id'], config_wrong_group, - group='identity') - config_wrong_option = {'url': uuid.uuid4().hex} - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.update_config, - self.domain['id'], config_wrong_option, - group='ldap', option='url') - - # And finally just some bad groups/options - bad_group = uuid.uuid4().hex - config = {bad_group: {'user': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group=bad_group, - option='user') - bad_option = uuid.uuid4().hex - config = {'ldap': {bad_option: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.update_config, - self.domain['id'], config, group='ldap', - option=bad_option) - - def test_create_invalid_domain_config(self): - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], {}) - config = {uuid.uuid4().hex: uuid.uuid4().hex} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - # Try an option that IS in the standard conf, but neither whitelisted - # or marked as sensitive - config = {'identity': {'user_tree_dn': uuid.uuid4().hex}} - self.assertRaises(exception.InvalidDomainConfig, - self.domain_config_api.create_config, - self.domain['id'], config) - - def test_delete_invalid_partial_domain_config(self): - config = {'ldap': {'url': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - # Try deleting a group not in the config - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.delete_config, - self.domain['id'], group='identity') - # Try deleting an option not in the config - self.assertRaises(exception.DomainConfigNotFound, - self.domain_config_api.delete_config, - self.domain['id'], - group='ldap', option='user_tree_dn') - - def test_sensitive_substitution_in_domain_config(self): - # Create a config that contains a whitelisted option that requires - # substitution of a sensitive option. - config = {'ldap': {'url': 'my_url/%(password)s', - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - - # Read back the config with the internal method and ensure that the - # substitution has taken place. - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - expected_url = ( - config['ldap']['url'] % {'password': config['ldap']['password']}) - self.assertEqual(expected_url, res['ldap']['url']) - - def test_invalid_sensitive_substitution_in_domain_config(self): - """Check that invalid substitutions raise warnings.""" - mock_log = mock.Mock() - - invalid_option_config = { - 'ldap': {'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - - for invalid_option in ['my_url/%(passssword)s', - 'my_url/%(password', - 'my_url/%(password)', - 'my_url/%(password)d']: - invalid_option_config['ldap']['url'] = invalid_option - self.domain_config_api.create_config( - self.domain['id'], invalid_option_config) - - with mock.patch('keystone.resource.core.LOG', mock_log): - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - mock_log.warning.assert_any_call(mock.ANY) - self.assertEqual( - invalid_option_config['ldap']['url'], res['ldap']['url']) - - def test_escaped_sequence_in_domain_config(self): - """Check that escaped '%(' doesn't get interpreted.""" - mock_log = mock.Mock() - - escaped_option_config = { - 'ldap': {'url': 'my_url/%%(password)s', - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - - self.domain_config_api.create_config( - self.domain['id'], escaped_option_config) - - with mock.patch('keystone.resource.core.LOG', mock_log): - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertFalse(mock_log.warn.called) - # The escaping '%' should have been removed - self.assertEqual('my_url/%(password)s', res['ldap']['url']) - - @unit.skip_if_cache_disabled('domain_config') - def test_cache_layer_get_sensitive_config(self): - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}, - 'identity': {'driver': uuid.uuid4().hex}} - self.domain_config_api.create_config(self.domain['id'], config) - # cache the result - res = self.domain_config_api.get_config_with_sensitive_info( - self.domain['id']) - self.assertEqual(config, res) - - # delete, bypassing domain config manager api - self.domain_config_api.delete_config_options(self.domain['id']) - self.domain_config_api.delete_config_options(self.domain['id'], - sensitive=True) - - self.assertDictEqual( - res, self.domain_config_api.get_config_with_sensitive_info( - self.domain['id'])) - self.domain_config_api.get_config_with_sensitive_info.invalidate( - self.domain_config_api, self.domain['id']) - self.assertDictEqual( - {}, - self.domain_config_api.get_config_with_sensitive_info( - self.domain['id'])) - - def test_delete_domain_deletes_configs(self): - """Test domain deletion clears the domain configs.""" - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - config = {'ldap': {'url': uuid.uuid4().hex, - 'user_tree_dn': uuid.uuid4().hex, - 'password': uuid.uuid4().hex}} - self.domain_config_api.create_config(domain['id'], config) - - # Now delete the domain - domain['enabled'] = False - self.resource_api.update_domain(domain['id'], domain) - self.resource_api.delete_domain(domain['id']) - - # Check domain configs have also been deleted - self.assertRaises( - exception.DomainConfigNotFound, - self.domain_config_api.get_config, - domain['id']) - - # The get_config_with_sensitive_info does not throw an exception if - # the config is empty, it just returns an empty dict - self.assertDictEqual( - {}, - self.domain_config_api.get_config_with_sensitive_info( - domain['id'])) - - def test_config_registration(self): - type = uuid.uuid4().hex - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.domain_config_api.release_registration( - self.domain['id'], type=type) - - # Make sure that once someone has it, nobody else can get it. - # This includes the domain who already has it. - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.assertFalse( - self.domain_config_api.obtain_registration( - self.domain['id'], type)) - - # Make sure we can read who does have it - self.assertEqual( - self.domain['id'], - self.domain_config_api.read_registration(type)) - - # Make sure releasing it is silent if the domain specified doesn't - # have the registration - domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} - self.resource_api.create_domain(domain2['id'], domain2) - self.domain_config_api.release_registration( - domain2['id'], type=type) - - # If nobody has the type registered, then trying to read it should - # raise ConfigRegistrationNotFound - self.domain_config_api.release_registration( - self.domain['id'], type=type) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type) - - # Finally check multiple registrations are cleared if you free the - # registration without specifying the type - type2 = uuid.uuid4().hex - self.domain_config_api.obtain_registration( - self.domain['id'], type) - self.domain_config_api.obtain_registration( - self.domain['id'], type2) - self.domain_config_api.release_registration(self.domain['id']) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type) - self.assertRaises(exception.ConfigRegistrationNotFound, - self.domain_config_api.read_registration, - type2) |