diff options
author | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
---|---|---|
committer | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
commit | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (patch) | |
tree | 8b8d94001ebe6cc34106cf813b538911a8d66d9a /keystone-moon/keystone/tests/unit/resource | |
parent | a33bdcb627102a01244630a54cb4b5066b385a6a (diff) |
Update Keystone core to Mitaka.
Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db
Diffstat (limited to 'keystone-moon/keystone/tests/unit/resource')
8 files changed, 2495 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/resource/__init__.py b/keystone-moon/keystone/tests/unit/resource/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/__init__.py diff --git a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py diff --git a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py new file mode 100644 index 00000000..79ad3df2 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py @@ -0,0 +1,24 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystone.resource.backends import sql +from keystone.tests import unit +from keystone.tests.unit.ksfixtures import database +from keystone.tests.unit.resource import test_backends + + +class TestSqlResourceDriver(unit.BaseTestCase, + test_backends.ResourceDriverTests): + def setUp(self): + super(TestSqlResourceDriver, self).setUp() + self.useFixture(database.Database()) + self.driver = sql.Resource() diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py new file mode 100644 index 00000000..e69de29b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py new file mode 100644 index 00000000..b4c5f262 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from keystone.common import sql +from keystone.resource.config_backends import sql as config_sql +from keystone.tests import unit +from keystone.tests.unit.backend import core_sql +from keystone.tests.unit.ksfixtures import database +from keystone.tests.unit.resource import test_core + + +class SqlDomainConfigModels(core_sql.BaseBackendSqlModels): + + def test_whitelisted_model(self): + cols = (('domain_id', sql.String, 64), + ('group', sql.String, 255), + ('option', sql.String, 255), + ('value', sql.JsonBlob, None)) + self.assertExpectedSchema('whitelisted_config', cols) + + def test_sensitive_model(self): + cols = (('domain_id', sql.String, 64), + ('group', sql.String, 255), + ('option', sql.String, 255), + ('value', sql.JsonBlob, None)) + self.assertExpectedSchema('sensitive_config', cols) + + +class SqlDomainConfigDriver(unit.BaseTestCase, + test_core.DomainConfigDriverTests): + def setUp(self): + super(SqlDomainConfigDriver, self).setUp() + self.useFixture(database.Database()) + self.driver = config_sql.DomainConfig() + + +class SqlDomainConfig(core_sql.BaseBackendSqlTests, + test_core.DomainConfigTests): + def setUp(self): + super(SqlDomainConfig, self).setUp() + # test_core.DomainConfigTests is effectively a mixin class, so make + # sure we call its setup + test_core.DomainConfigTests.setUp(self) diff --git a/keystone-moon/keystone/tests/unit/resource/test_backends.py b/keystone-moon/keystone/tests/unit/resource/test_backends.py new file mode 100644 index 00000000..eed4c6ba --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/test_backends.py @@ -0,0 +1,1669 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +import mock +from oslo_config import cfg +from six.moves import range +from testtools import matchers + +from keystone.common import driver_hints +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit import default_fixtures +from keystone.tests.unit import utils as test_utils + + +CONF = cfg.CONF + + +class ResourceTests(object): + + domain_count = len(default_fixtures.DOMAINS) + + def test_get_project(self): + tenant_ref = self.resource_api.get_project(self.tenant_bar['id']) + self.assertDictEqual(self.tenant_bar, tenant_ref) + + def test_get_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + uuid.uuid4().hex) + + def test_get_project_by_name(self): + tenant_ref = self.resource_api.get_project_by_name( + self.tenant_bar['name'], + CONF.identity.default_domain_id) + self.assertDictEqual(self.tenant_bar, tenant_ref) + + @unit.skip_if_no_multiple_domains_support + def test_get_project_by_name_for_project_acting_as_a_domain(self): + """Tests get_project_by_name works when the domain_id is None.""" + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, is_domain=False) + project = self.resource_api.create_project(project['id'], project) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project_by_name, + project['name'], + None) + + # Test that querying with domain_id as None will find the project + # acting as a domain, even if it's name is the same as the regular + # project above. + project2 = unit.new_project_ref(is_domain=True, + name=project['name']) + project2 = self.resource_api.create_project(project2['id'], project2) + + project_ref = self.resource_api.get_project_by_name( + project2['name'], None) + + self.assertEqual(project2, project_ref) + + def test_get_project_by_name_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project_by_name, + uuid.uuid4().hex, + CONF.identity.default_domain_id) + + def test_create_duplicate_project_id_fails(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project_id = project['id'] + self.resource_api.create_project(project_id, project) + project['name'] = 'fake2' + self.assertRaises(exception.Conflict, + self.resource_api.create_project, + project_id, + project) + + def test_create_duplicate_project_name_fails(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project_id = project['id'] + self.resource_api.create_project(project_id, project) + project['id'] = 'fake2' + self.assertRaises(exception.Conflict, + self.resource_api.create_project, + project['id'], + project) + + def test_create_duplicate_project_name_in_different_domains(self): + new_domain = unit.new_domain_ref() + self.resource_api.create_domain(new_domain['id'], new_domain) + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project2 = unit.new_project_ref(name=project1['name'], + domain_id=new_domain['id']) + self.resource_api.create_project(project1['id'], project1) + self.resource_api.create_project(project2['id'], project2) + + def test_move_project_between_domains(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project['id'], project) + project['domain_id'] = domain2['id'] + # Update the project asserting that a deprecation warning is emitted + with mock.patch( + 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: + self.resource_api.update_project(project['id'], project) + self.assertTrue(mock_dep.called) + + updated_project_ref = self.resource_api.get_project(project['id']) + self.assertEqual(domain2['id'], updated_project_ref['domain_id']) + + def test_move_project_between_domains_with_clashing_names_fails(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + # First, create a project in domain1 + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + # Now create a project in domain2 with a potentially clashing + # name - which should work since we have domain separation + project2 = unit.new_project_ref(name=project1['name'], + domain_id=domain2['id']) + self.resource_api.create_project(project2['id'], project2) + # Now try and move project1 into the 2nd domain - which should + # fail since the names clash + project1['domain_id'] = domain2['id'] + self.assertRaises(exception.Conflict, + self.resource_api.update_project, + project1['id'], + project1) + + @unit.skip_if_no_multiple_domains_support + def test_move_project_with_children_between_domains_fails(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project['id'], project) + child_project = unit.new_project_ref(domain_id=domain1['id'], + parent_id=project['id']) + self.resource_api.create_project(child_project['id'], child_project) + project['domain_id'] = domain2['id'] + + # Update is not allowed, since updating the whole subtree would be + # necessary + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + @unit.skip_if_no_multiple_domains_support + def test_move_project_not_root_between_domains_fails(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + project = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project['id'], project) + child_project = unit.new_project_ref(domain_id=domain1['id'], + parent_id=project['id']) + self.resource_api.create_project(child_project['id'], child_project) + child_project['domain_id'] = domain2['id'] + + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + child_project['id'], + child_project) + + @unit.skip_if_no_multiple_domains_support + def test_move_root_project_between_domains_succeeds(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + root_project = unit.new_project_ref(domain_id=domain1['id']) + root_project = self.resource_api.create_project(root_project['id'], + root_project) + + root_project['domain_id'] = domain2['id'] + self.resource_api.update_project(root_project['id'], root_project) + project_from_db = self.resource_api.get_project(root_project['id']) + + self.assertEqual(domain2['id'], project_from_db['domain_id']) + + @unit.skip_if_no_multiple_domains_support + def test_update_domain_id_project_is_domain_fails(self): + other_domain = unit.new_domain_ref() + self.resource_api.create_domain(other_domain['id'], other_domain) + project = unit.new_project_ref(is_domain=True) + self.resource_api.create_project(project['id'], project) + project['domain_id'] = other_domain['id'] + + # Update of domain_id of projects acting as domains is not allowed + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + def test_rename_duplicate_project_name_fails(self): + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project2 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project1['id'], project1) + self.resource_api.create_project(project2['id'], project2) + project2['name'] = project1['name'] + self.assertRaises(exception.Error, + self.resource_api.update_project, + project2['id'], + project2) + + def test_update_project_id_does_nothing(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project_id = project['id'] + self.resource_api.create_project(project['id'], project) + project['id'] = 'fake2' + self.resource_api.update_project(project_id, project) + project_ref = self.resource_api.get_project(project_id) + self.assertEqual(project_id, project_ref['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + 'fake2') + + def test_delete_domain_with_user_group_project_links(self): + # TODO(chungg):add test case once expected behaviour defined + pass + + def test_update_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.resource_api.update_project, + uuid.uuid4().hex, + dict()) + + def test_delete_project_returns_not_found(self): + self.assertRaises(exception.ProjectNotFound, + self.resource_api.delete_project, + uuid.uuid4().hex) + + def test_create_update_delete_unicode_project(self): + unicode_project_name = u'name \u540d\u5b57' + project = unit.new_project_ref( + name=unicode_project_name, + domain_id=CONF.identity.default_domain_id) + project = self.resource_api.create_project(project['id'], project) + self.resource_api.update_project(project['id'], project) + self.resource_api.delete_project(project['id']) + + def test_create_project_with_no_enabled_field(self): + ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id) + del ref['enabled'] + self.resource_api.create_project(ref['id'], ref) + + project = self.resource_api.get_project(ref['id']) + self.assertIs(project['enabled'], True) + + def test_create_project_long_name_fails(self): + project = unit.new_project_ref( + name='a' * 65, domain_id=CONF.identity.default_domain_id) + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], + project) + + def test_create_project_blank_name_fails(self): + project = unit.new_project_ref( + name='', domain_id=CONF.identity.default_domain_id) + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], + project) + + def test_create_project_invalid_name_fails(self): + project = unit.new_project_ref( + name=None, domain_id=CONF.identity.default_domain_id) + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], + project) + project = unit.new_project_ref( + name=123, domain_id=CONF.identity.default_domain_id) + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], + project) + + def test_update_project_blank_name_fails(self): + project = unit.new_project_ref( + name='fake1', domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + project['name'] = '' + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + def test_update_project_long_name_fails(self): + project = unit.new_project_ref( + name='fake1', domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + project['name'] = 'a' * 65 + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + def test_update_project_invalid_name_fails(self): + project = unit.new_project_ref( + name='fake1', domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + project['name'] = None + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + project['name'] = 123 + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + def test_update_project_invalid_enabled_type_string(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertTrue(project_ref['enabled']) + + # Strings are not valid boolean values + project['enabled'] = "false" + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], + project) + + def test_create_project_invalid_enabled_type_string(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + # invalid string value + enabled="true") + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], + project) + + def test_create_project_invalid_domain_id(self): + project = unit.new_project_ref(domain_id=uuid.uuid4().hex) + self.assertRaises(exception.DomainNotFound, + self.resource_api.create_project, + project['id'], + project) + + def test_list_domains(self): + domain1 = unit.new_domain_ref() + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + self.resource_api.create_domain(domain2['id'], domain2) + domains = self.resource_api.list_domains() + self.assertEqual(3, len(domains)) + domain_ids = [] + for domain in domains: + domain_ids.append(domain.get('id')) + self.assertIn(CONF.identity.default_domain_id, domain_ids) + self.assertIn(domain1['id'], domain_ids) + self.assertIn(domain2['id'], domain_ids) + + def test_list_projects(self): + project_refs = self.resource_api.list_projects() + project_count = len(default_fixtures.TENANTS) + self.domain_count + self.assertEqual(project_count, len(project_refs)) + for project in default_fixtures.TENANTS: + self.assertIn(project, project_refs) + + def test_list_projects_with_multiple_filters(self): + # Create a project + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project = self.resource_api.create_project(project['id'], project) + + # Build driver hints with the project's name and inexistent description + hints = driver_hints.Hints() + hints.add_filter('name', project['name']) + hints.add_filter('description', uuid.uuid4().hex) + + # Retrieve projects based on hints and check an empty list is returned + projects = self.resource_api.list_projects(hints) + self.assertEqual([], projects) + + # Build correct driver hints + hints = driver_hints.Hints() + hints.add_filter('name', project['name']) + hints.add_filter('description', project['description']) + + # Retrieve projects based on hints + projects = self.resource_api.list_projects(hints) + + # Check that the returned list contains only the first project + self.assertEqual(1, len(projects)) + self.assertEqual(project, projects[0]) + + def test_list_projects_for_domain(self): + project_ids = ([x['id'] for x in + self.resource_api.list_projects_in_domain( + CONF.identity.default_domain_id)]) + # Only the projects from the default fixtures are expected, since + # filtering by domain does not include any project that acts as a + # domain. + self.assertThat( + project_ids, matchers.HasLength(len(default_fixtures.TENANTS))) + self.assertIn(self.tenant_bar['id'], project_ids) + self.assertIn(self.tenant_baz['id'], project_ids) + self.assertIn(self.tenant_mtu['id'], project_ids) + self.assertIn(self.tenant_service['id'], project_ids) + + @unit.skip_if_no_multiple_domains_support + def test_list_projects_acting_as_domain(self): + initial_domains = self.resource_api.list_domains() + + # Creating 5 projects that act as domains + new_projects_acting_as_domains = [] + for i in range(5): + project = unit.new_project_ref(is_domain=True) + project = self.resource_api.create_project(project['id'], project) + new_projects_acting_as_domains.append(project) + + # Creating a few regular project to ensure it doesn't mess with the + # ones that act as domains + self._create_projects_hierarchy(hierarchy_size=2) + + projects = self.resource_api.list_projects_acting_as_domain() + expected_number_projects = ( + len(initial_domains) + len(new_projects_acting_as_domains)) + self.assertEqual(expected_number_projects, len(projects)) + for project in new_projects_acting_as_domains: + self.assertIn(project, projects) + for domain in initial_domains: + self.assertIn(domain['id'], [p['id'] for p in projects]) + + @unit.skip_if_no_multiple_domains_support + def test_list_projects_for_alternate_domain(self): + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + project1 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain1['id']) + self.resource_api.create_project(project2['id'], project2) + project_ids = ([x['id'] for x in + self.resource_api.list_projects_in_domain( + domain1['id'])]) + self.assertEqual(2, len(project_ids)) + self.assertIn(project1['id'], project_ids) + self.assertIn(project2['id'], project_ids) + + def _create_projects_hierarchy(self, hierarchy_size=2, + domain_id=None, + is_domain=False, + parent_project_id=None): + """Creates a project hierarchy with specified size. + + :param hierarchy_size: the desired hierarchy size, default is 2 - + a project with one child. + :param domain_id: domain where the projects hierarchy will be created. + :param is_domain: if the hierarchy will have the is_domain flag active + or not. + :param parent_project_id: if the intention is to create a + sub-hierarchy, sets the sub-hierarchy root. Defaults to creating + a new hierarchy, i.e. a new root project. + + :returns projects: a list of the projects in the created hierarchy. + + """ + if domain_id is None: + domain_id = CONF.identity.default_domain_id + if parent_project_id: + project = unit.new_project_ref(parent_id=parent_project_id, + domain_id=domain_id, + is_domain=is_domain) + else: + project = unit.new_project_ref(domain_id=domain_id, + is_domain=is_domain) + project_id = project['id'] + project = self.resource_api.create_project(project_id, project) + + projects = [project] + for i in range(1, hierarchy_size): + new_project = unit.new_project_ref(parent_id=project_id, + domain_id=domain_id) + + self.resource_api.create_project(new_project['id'], new_project) + projects.append(new_project) + project_id = new_project['id'] + + return projects + + @unit.skip_if_no_multiple_domains_support + def test_create_domain_with_project_api(self): + project = unit.new_project_ref(is_domain=True) + ref = self.resource_api.create_project(project['id'], project) + self.assertTrue(ref['is_domain']) + self.resource_api.get_domain(ref['id']) + + @unit.skip_if_no_multiple_domains_support + def test_project_as_a_domain_uniqueness_constraints(self): + """Tests project uniqueness for those acting as domains. + + If it is a project acting as a domain, we can't have two or more with + the same name. + + """ + # Create two projects acting as a domain + project = unit.new_project_ref(is_domain=True) + project = self.resource_api.create_project(project['id'], project) + project2 = unit.new_project_ref(is_domain=True) + project2 = self.resource_api.create_project(project2['id'], project2) + + # All projects acting as domains have a null domain_id, so should not + # be able to create another with the same name but a different + # project ID. + new_project = project.copy() + new_project['id'] = uuid.uuid4().hex + + self.assertRaises(exception.Conflict, + self.resource_api.create_project, + new_project['id'], + new_project) + + # We also should not be able to update one to have a name clash + project2['name'] = project['name'] + self.assertRaises(exception.Conflict, + self.resource_api.update_project, + project2['id'], + project2) + + # But updating it to a unique name is OK + project2['name'] = uuid.uuid4().hex + self.resource_api.update_project(project2['id'], project2) + + # Finally, it should be OK to create a project with same name as one of + # these acting as a domain, as long as it is a regular project + project3 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, name=project2['name']) + self.resource_api.create_project(project3['id'], project3) + # In fact, it should be OK to create such a project in the domain which + # has the matching name. + # TODO(henry-nash): Once we fully support projects acting as a domain, + # add a test here to create a sub-project with a name that matches its + # project acting as a domain + + @unit.skip_if_no_multiple_domains_support + @test_utils.wip('waiting for sub projects acting as domains support') + def test_is_domain_sub_project_has_parent_domain_id(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, is_domain=True) + self.resource_api.create_project(project['id'], project) + + sub_project = unit.new_project_ref(domain_id=project['id'], + parent_id=project['id'], + is_domain=True) + + ref = self.resource_api.create_project(sub_project['id'], sub_project) + self.assertTrue(ref['is_domain']) + self.assertEqual(project['id'], ref['parent_id']) + self.assertEqual(project['id'], ref['domain_id']) + + @unit.skip_if_no_multiple_domains_support + def test_delete_domain_with_project_api(self): + project = unit.new_project_ref(domain_id=None, + is_domain=True) + self.resource_api.create_project(project['id'], project) + + # Check that a corresponding domain was created + self.resource_api.get_domain(project['id']) + + # Try to delete the enabled project that acts as a domain + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.delete_project, + project['id']) + + # Disable the project + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + + # Successfully delete the project + self.resource_api.delete_project(project['id']) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project['id']) + + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + project['id']) + + @unit.skip_if_no_multiple_domains_support + def test_create_subproject_acting_as_domain_fails(self): + root_project = unit.new_project_ref(is_domain=True) + self.resource_api.create_project(root_project['id'], root_project) + + sub_project = unit.new_project_ref(is_domain=True, + parent_id=root_project['id']) + + # Creation of sub projects acting as domains is not allowed yet + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + sub_project['id'], sub_project) + + @unit.skip_if_no_multiple_domains_support + def test_create_domain_under_regular_project_hierarchy_fails(self): + # Projects acting as domains can't have a regular project as parent + projects_hierarchy = self._create_projects_hierarchy() + parent = projects_hierarchy[1] + project = unit.new_project_ref(domain_id=parent['id'], + parent_id=parent['id'], + is_domain=True) + + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], project) + + @unit.skip_if_no_multiple_domains_support + @test_utils.wip('waiting for sub projects acting as domains support') + def test_create_project_under_domain_hierarchy(self): + projects_hierarchy = self._create_projects_hierarchy(is_domain=True) + parent = projects_hierarchy[1] + project = unit.new_project_ref(domain_id=parent['id'], + parent_id=parent['id'], + is_domain=False) + + ref = self.resource_api.create_project(project['id'], project) + self.assertFalse(ref['is_domain']) + self.assertEqual(parent['id'], ref['parent_id']) + self.assertEqual(parent['id'], ref['domain_id']) + + def test_create_project_without_is_domain_flag(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + del project['is_domain'] + ref = self.resource_api.create_project(project['id'], project) + # The is_domain flag should be False by default + self.assertFalse(ref['is_domain']) + + @unit.skip_if_no_multiple_domains_support + def test_create_project_passing_is_domain_flag_true(self): + project = unit.new_project_ref(is_domain=True) + + ref = self.resource_api.create_project(project['id'], project) + self.assertTrue(ref['is_domain']) + + def test_create_project_passing_is_domain_flag_false(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, is_domain=False) + + ref = self.resource_api.create_project(project['id'], project) + self.assertIs(False, ref['is_domain']) + + @test_utils.wip('waiting for support for parent_id to imply domain_id') + def test_create_project_with_parent_id_and_without_domain_id(self): + # First create a domain + project = unit.new_project_ref(is_domain=True) + self.resource_api.create_project(project['id'], project) + # Now create a child by just naming the parent_id + sub_project = unit.new_project_ref(parent_id=project['id']) + ref = self.resource_api.create_project(sub_project['id'], sub_project) + + # The domain_id should be set to the parent domain_id + self.assertEqual(project['domain_id'], ref['domain_id']) + + def test_create_project_with_domain_id_and_without_parent_id(self): + # First create a domain + project = unit.new_project_ref(is_domain=True) + self.resource_api.create_project(project['id'], project) + # Now create a child by just naming the domain_id + sub_project = unit.new_project_ref(domain_id=project['id']) + ref = self.resource_api.create_project(sub_project['id'], sub_project) + + # The parent_id and domain_id should be set to the id of the project + # acting as a domain + self.assertEqual(project['id'], ref['parent_id']) + self.assertEqual(project['id'], ref['domain_id']) + + def test_create_project_with_domain_id_mismatch_to_parent_domain(self): + # First create a domain + project = unit.new_project_ref(is_domain=True) + self.resource_api.create_project(project['id'], project) + # Now try to create a child with the above as its parent, but + # specifying a different domain. + sub_project = unit.new_project_ref( + parent_id=project['id'], domain_id=CONF.identity.default_domain_id) + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + sub_project['id'], sub_project) + + def test_check_leaf_projects(self): + projects_hierarchy = self._create_projects_hierarchy() + root_project = projects_hierarchy[0] + leaf_project = projects_hierarchy[1] + + self.assertFalse(self.resource_api.is_leaf_project( + root_project['id'])) + self.assertTrue(self.resource_api.is_leaf_project( + leaf_project['id'])) + + # Delete leaf_project + self.resource_api.delete_project(leaf_project['id']) + + # Now, root_project should be leaf + self.assertTrue(self.resource_api.is_leaf_project( + root_project['id'])) + + def test_list_projects_in_subtree(self): + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + project1 = projects_hierarchy[0] + project2 = projects_hierarchy[1] + project3 = projects_hierarchy[2] + project4 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=project2['id']) + self.resource_api.create_project(project4['id'], project4) + + subtree = self.resource_api.list_projects_in_subtree(project1['id']) + self.assertEqual(3, len(subtree)) + self.assertIn(project2, subtree) + self.assertIn(project3, subtree) + self.assertIn(project4, subtree) + + subtree = self.resource_api.list_projects_in_subtree(project2['id']) + self.assertEqual(2, len(subtree)) + self.assertIn(project3, subtree) + self.assertIn(project4, subtree) + + subtree = self.resource_api.list_projects_in_subtree(project3['id']) + self.assertEqual(0, len(subtree)) + + def test_list_projects_in_subtree_with_circular_reference(self): + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project1 = self.resource_api.create_project(project1['id'], project1) + + project2 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=project1['id']) + self.resource_api.create_project(project2['id'], project2) + + project1['parent_id'] = project2['id'] # Adds cyclic reference + + # NOTE(dstanek): The manager does not allow parent_id to be updated. + # Instead will directly use the driver to create the cyclic + # reference. + self.resource_api.driver.update_project(project1['id'], project1) + + subtree = self.resource_api.list_projects_in_subtree(project1['id']) + + # NOTE(dstanek): If a cyclic reference is detected the code bails + # and returns None instead of falling into the infinite + # recursion trap. + self.assertIsNone(subtree) + + def test_list_projects_in_subtree_invalid_project_id(self): + self.assertRaises(exception.ValidationError, + self.resource_api.list_projects_in_subtree, + None) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.list_projects_in_subtree, + uuid.uuid4().hex) + + def test_list_project_parents(self): + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + project1 = projects_hierarchy[0] + project2 = projects_hierarchy[1] + project3 = projects_hierarchy[2] + project4 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=project2['id']) + self.resource_api.create_project(project4['id'], project4) + + parents1 = self.resource_api.list_project_parents(project3['id']) + self.assertEqual(3, len(parents1)) + self.assertIn(project1, parents1) + self.assertIn(project2, parents1) + + parents2 = self.resource_api.list_project_parents(project4['id']) + self.assertEqual(parents1, parents2) + + parents = self.resource_api.list_project_parents(project1['id']) + # It has the default domain as parent + self.assertEqual(1, len(parents)) + + def test_update_project_enabled_cascade(self): + """Test update_project_cascade + + Ensures the enabled attribute is correctly updated across + a simple 3-level projects hierarchy. + """ + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + parent = projects_hierarchy[0] + + # Disable in parent project disables the whole subtree + parent['enabled'] = False + # Store the ref from backend in another variable so we don't bother + # to remove other attributes that were not originally provided and + # were set in the manager, like parent_id and domain_id. + parent_ref = self.resource_api.update_project(parent['id'], + parent, + cascade=True) + + subtree = self.resource_api.list_projects_in_subtree(parent['id']) + self.assertEqual(2, len(subtree)) + self.assertFalse(parent_ref['enabled']) + self.assertFalse(subtree[0]['enabled']) + self.assertFalse(subtree[1]['enabled']) + + # Enable parent project enables the whole subtree + parent['enabled'] = True + parent_ref = self.resource_api.update_project(parent['id'], + parent, + cascade=True) + + subtree = self.resource_api.list_projects_in_subtree(parent['id']) + self.assertEqual(2, len(subtree)) + self.assertTrue(parent_ref['enabled']) + self.assertTrue(subtree[0]['enabled']) + self.assertTrue(subtree[1]['enabled']) + + def test_cannot_enable_cascade_with_parent_disabled(self): + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + grandparent = projects_hierarchy[0] + parent = projects_hierarchy[1] + + grandparent['enabled'] = False + self.resource_api.update_project(grandparent['id'], + grandparent, + cascade=True) + subtree = self.resource_api.list_projects_in_subtree(parent['id']) + self.assertFalse(subtree[0]['enabled']) + + parent['enabled'] = True + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.update_project, + parent['id'], + parent, + cascade=True) + + def test_update_cascade_only_accepts_enabled(self): + # Update cascade does not accept any other attribute but 'enabled' + new_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(new_project['id'], new_project) + + new_project['name'] = 'project1' + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + new_project['id'], + new_project, + cascade=True) + + def test_list_project_parents_invalid_project_id(self): + self.assertRaises(exception.ValidationError, + self.resource_api.list_project_parents, + None) + + self.assertRaises(exception.ProjectNotFound, + self.resource_api.list_project_parents, + uuid.uuid4().hex) + + def test_create_project_doesnt_modify_passed_in_dict(self): + new_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + original_project = new_project.copy() + self.resource_api.create_project(new_project['id'], new_project) + self.assertDictEqual(original_project, new_project) + + def test_update_project_enable(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertTrue(project_ref['enabled']) + + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertEqual(project['enabled'], project_ref['enabled']) + + # If not present, enabled field should not be updated + del project['enabled'] + self.resource_api.update_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertFalse(project_ref['enabled']) + + project['enabled'] = True + self.resource_api.update_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertEqual(project['enabled'], project_ref['enabled']) + + del project['enabled'] + self.resource_api.update_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertTrue(project_ref['enabled']) + + def test_create_invalid_domain_fails(self): + new_group = unit.new_group_ref(domain_id="doesnotexist") + self.assertRaises(exception.DomainNotFound, + self.identity_api.create_group, + new_group) + new_user = unit.new_user_ref(domain_id="doesnotexist") + self.assertRaises(exception.DomainNotFound, + self.identity_api.create_user, + new_user) + + @unit.skip_if_no_multiple_domains_support + def test_project_crud(self): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + project = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertDictContainsSubset(project, project_ref) + + project['name'] = uuid.uuid4().hex + self.resource_api.update_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertDictContainsSubset(project, project_ref) + + self.resource_api.delete_project(project['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project['id']) + + def test_domain_delete_hierarchy(self): + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + + # Creating a root and a leaf project inside the domain + projects_hierarchy = self._create_projects_hierarchy( + domain_id=domain['id']) + root_project = projects_hierarchy[0] + leaf_project = projects_hierarchy[0] + + # Disable the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + + # Delete the domain + self.resource_api.delete_domain(domain['id']) + + # Make sure the domain no longer exists + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + domain['id']) + + # Make sure the root project no longer exists + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + root_project['id']) + + # Make sure the leaf project no longer exists + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + leaf_project['id']) + + def test_delete_projects_from_ids(self): + """Tests the resource backend call delete_projects_from_ids. + + Tests the normal flow of the delete_projects_from_ids backend call, + that ensures no project on the list exists after it is succesfully + called. + """ + project1_ref = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + project2_ref = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + projects = (project1_ref, project2_ref) + for project in projects: + self.resource_api.create_project(project['id'], project) + + # Setting up the ID's list + projects_ids = [p['id'] for p in projects] + self.resource_api.driver.delete_projects_from_ids(projects_ids) + + # Ensuring projects no longer exist at backend level + for project_id in projects_ids: + self.assertRaises(exception.ProjectNotFound, + self.resource_api.driver.get_project, + project_id) + + # Passing an empty list is silently ignored + self.resource_api.driver.delete_projects_from_ids([]) + + def test_delete_projects_from_ids_with_no_existing_project_id(self): + """Tests delete_projects_from_ids issues warning if not found. + + Tests the resource backend call delete_projects_from_ids passing a + non existing ID in project_ids, which is logged and ignored by + the backend. + """ + project_ref = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project_ref['id'], project_ref) + + # Setting up the ID's list + projects_ids = (project_ref['id'], uuid.uuid4().hex) + with mock.patch('keystone.resource.backends.sql.LOG') as mock_log: + self.resource_api.delete_projects_from_ids(projects_ids) + self.assertTrue(mock_log.warning.called) + # The existing project was deleted. + self.assertRaises(exception.ProjectNotFound, + self.resource_api.driver.get_project, + project_ref['id']) + + # Even if we only have one project, and it does not exist, it returns + # no error. + self.resource_api.driver.delete_projects_from_ids([uuid.uuid4().hex]) + + def test_delete_project_cascade(self): + # create a hierarchy with 3 levels + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + root_project = projects_hierarchy[0] + project1 = projects_hierarchy[1] + project2 = projects_hierarchy[2] + + # Disabling all projects before attempting to delete + for project in (project2, project1, root_project): + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + + self.resource_api.delete_project(root_project['id'], cascade=True) + + for project in projects_hierarchy: + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project['id']) + + def test_delete_large_project_cascade(self): + """Try delete a large project with cascade true. + + Tree we will create:: + + +-p1-+ + | | + p5 p2 + | | + p6 +-p3-+ + | | + p7 p4 + """ + # create a hierarchy with 4 levels + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=4) + p1 = projects_hierarchy[0] + # Add the left branch to the hierarchy (p5, p6) + self._create_projects_hierarchy(hierarchy_size=2, + parent_project_id=p1['id']) + # Add p7 to the hierarchy + p3_id = projects_hierarchy[2]['id'] + self._create_projects_hierarchy(hierarchy_size=1, + parent_project_id=p3_id) + # Reverse the hierarchy to disable the leaf first + prjs_hierarchy = ([p1] + self.resource_api.list_projects_in_subtree( + p1['id']))[::-1] + + # Disabling all projects before attempting to delete + for project in prjs_hierarchy: + project['enabled'] = False + self.resource_api.update_project(project['id'], project) + + self.resource_api.delete_project(p1['id'], cascade=True) + for project in prjs_hierarchy: + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project['id']) + + def test_cannot_delete_project_cascade_with_enabled_child(self): + # create a hierarchy with 3 levels + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + root_project = projects_hierarchy[0] + project1 = projects_hierarchy[1] + project2 = projects_hierarchy[2] + + project2['enabled'] = False + self.resource_api.update_project(project2['id'], project2) + + # Cannot cascade delete root_project, since project1 is enabled + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.delete_project, + root_project['id'], + cascade=True) + + # Ensuring no project was deleted, not even project2 + self.resource_api.get_project(root_project['id']) + self.resource_api.get_project(project1['id']) + self.resource_api.get_project(project2['id']) + + def test_hierarchical_projects_crud(self): + # create a hierarchy with just a root project (which is a leaf as well) + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1) + root_project1 = projects_hierarchy[0] + + # create a hierarchy with one root project and one leaf project + projects_hierarchy = self._create_projects_hierarchy() + root_project2 = projects_hierarchy[0] + leaf_project = projects_hierarchy[1] + + # update description from leaf_project + leaf_project['description'] = 'new description' + self.resource_api.update_project(leaf_project['id'], leaf_project) + proj_ref = self.resource_api.get_project(leaf_project['id']) + self.assertDictEqual(leaf_project, proj_ref) + + # update the parent_id is not allowed + leaf_project['parent_id'] = root_project1['id'] + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.update_project, + leaf_project['id'], + leaf_project) + + # delete root_project1 + self.resource_api.delete_project(root_project1['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + root_project1['id']) + + # delete root_project2 is not allowed since it is not a leaf project + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.delete_project, + root_project2['id']) + + def test_create_project_with_invalid_parent(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, parent_id='fake') + self.assertRaises(exception.ProjectNotFound, + self.resource_api.create_project, + project['id'], + project) + + @unit.skip_if_no_multiple_domains_support + def test_create_leaf_project_with_different_domain(self): + root_project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(root_project['id'], root_project) + + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + leaf_project = unit.new_project_ref(domain_id=domain['id'], + parent_id=root_project['id']) + + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + leaf_project['id'], + leaf_project) + + def test_delete_hierarchical_leaf_project(self): + projects_hierarchy = self._create_projects_hierarchy() + root_project = projects_hierarchy[0] + leaf_project = projects_hierarchy[1] + + self.resource_api.delete_project(leaf_project['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + leaf_project['id']) + + self.resource_api.delete_project(root_project['id']) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + root_project['id']) + + def test_delete_hierarchical_not_leaf_project(self): + projects_hierarchy = self._create_projects_hierarchy() + root_project = projects_hierarchy[0] + + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.delete_project, + root_project['id']) + + def test_update_project_parent(self): + projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3) + project1 = projects_hierarchy[0] + project2 = projects_hierarchy[1] + project3 = projects_hierarchy[2] + + # project2 is the parent from project3 + self.assertEqual(project3.get('parent_id'), project2['id']) + + # try to update project3 parent to parent1 + project3['parent_id'] = project1['id'] + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.update_project, + project3['id'], + project3) + + def test_create_project_under_disabled_one(self): + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, enabled=False) + self.resource_api.create_project(project1['id'], project1) + + project2 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=project1['id']) + + # It's not possible to create a project under a disabled one in the + # hierarchy + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project2['id'], + project2) + + def test_disable_hierarchical_leaf_project(self): + projects_hierarchy = self._create_projects_hierarchy() + leaf_project = projects_hierarchy[1] + + leaf_project['enabled'] = False + self.resource_api.update_project(leaf_project['id'], leaf_project) + + project_ref = self.resource_api.get_project(leaf_project['id']) + self.assertEqual(leaf_project['enabled'], project_ref['enabled']) + + def test_disable_hierarchical_not_leaf_project(self): + projects_hierarchy = self._create_projects_hierarchy() + root_project = projects_hierarchy[0] + + root_project['enabled'] = False + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.update_project, + root_project['id'], + root_project) + + def test_enable_project_with_disabled_parent(self): + projects_hierarchy = self._create_projects_hierarchy() + root_project = projects_hierarchy[0] + leaf_project = projects_hierarchy[1] + + # Disable leaf and root + leaf_project['enabled'] = False + self.resource_api.update_project(leaf_project['id'], leaf_project) + root_project['enabled'] = False + self.resource_api.update_project(root_project['id'], root_project) + + # Try to enable the leaf project, it's not possible since it has + # a disabled parent + leaf_project['enabled'] = True + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.update_project, + leaf_project['id'], + leaf_project) + + def _get_hierarchy_depth(self, project_id): + return len(self.resource_api.list_project_parents(project_id)) + 1 + + def test_check_hierarchy_depth(self): + # Should be allowed to have a hierarchy of the max depth specified + # in the config option plus one (to allow for the additional project + # acting as a domain after an upgrade) + projects_hierarchy = self._create_projects_hierarchy( + CONF.max_project_tree_depth) + leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1] + + depth = self._get_hierarchy_depth(leaf_project['id']) + self.assertEqual(CONF.max_project_tree_depth + 1, depth) + + # Creating another project in the hierarchy shouldn't be allowed + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id, + parent_id=leaf_project['id']) + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.create_project, + project['id'], + project) + + def test_project_update_missing_attrs_with_a_value(self): + # Creating a project with no description attribute. + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + del project['description'] + project = self.resource_api.create_project(project['id'], project) + + # Add a description attribute. + project['description'] = uuid.uuid4().hex + self.resource_api.update_project(project['id'], project) + + project_ref = self.resource_api.get_project(project['id']) + self.assertDictEqual(project, project_ref) + + def test_project_update_missing_attrs_with_a_falsey_value(self): + # Creating a project with no description attribute. + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + del project['description'] + project = self.resource_api.create_project(project['id'], project) + + # Add a description attribute. + project['description'] = '' + self.resource_api.update_project(project['id'], project) + + project_ref = self.resource_api.get_project(project['id']) + self.assertDictEqual(project, project_ref) + + def test_domain_crud(self): + domain = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain['id'], domain) + self.assertDictEqual(domain, domain_ref) + domain_ref = self.resource_api.get_domain(domain['id']) + self.assertDictEqual(domain, domain_ref) + + domain['name'] = uuid.uuid4().hex + domain_ref = self.resource_api.update_domain(domain['id'], domain) + self.assertDictEqual(domain, domain_ref) + domain_ref = self.resource_api.get_domain(domain['id']) + self.assertDictEqual(domain, domain_ref) + + # Ensure an 'enabled' domain cannot be deleted + self.assertRaises(exception.ForbiddenNotSecurity, + self.resource_api.delete_domain, + domain_id=domain['id']) + + # Disable the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + + # Delete the domain + self.resource_api.delete_domain(domain['id']) + + # Make sure the domain no longer exists + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + domain['id']) + + @unit.skip_if_no_multiple_domains_support + def test_domain_name_case_sensitivity(self): + # create a ref with a lowercase name + domain_name = 'test_domain' + ref = unit.new_domain_ref(name=domain_name) + + lower_case_domain = self.resource_api.create_domain(ref['id'], ref) + + # assign a new ID to the ref with the same name, but in uppercase + ref['id'] = uuid.uuid4().hex + ref['name'] = domain_name.upper() + upper_case_domain = self.resource_api.create_domain(ref['id'], ref) + + # We can get each domain by name + lower_case_domain_ref = self.resource_api.get_domain_by_name( + domain_name) + self.assertDictEqual(lower_case_domain, lower_case_domain_ref) + + upper_case_domain_ref = self.resource_api.get_domain_by_name( + domain_name.upper()) + self.assertDictEqual(upper_case_domain, upper_case_domain_ref) + + def test_project_attribute_update(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + self.resource_api.create_project(project['id'], project) + + # pick a key known to be non-existent + key = 'description' + + def assert_key_equals(value): + project_ref = self.resource_api.update_project( + project['id'], project) + self.assertEqual(value, project_ref[key]) + project_ref = self.resource_api.get_project(project['id']) + self.assertEqual(value, project_ref[key]) + + def assert_get_key_is(value): + project_ref = self.resource_api.update_project( + project['id'], project) + self.assertIs(project_ref.get(key), value) + project_ref = self.resource_api.get_project(project['id']) + self.assertIs(project_ref.get(key), value) + + # add an attribute that doesn't exist, set it to a falsey value + value = '' + project[key] = value + assert_key_equals(value) + + # set an attribute with a falsey value to null + value = None + project[key] = value + assert_get_key_is(value) + + # do it again, in case updating from this situation is handled oddly + value = None + project[key] = value + assert_get_key_is(value) + + # set a possibly-null value to a falsey value + value = '' + project[key] = value + assert_key_equals(value) + + # set a falsey value to a truthy value + value = uuid.uuid4().hex + project[key] = value + assert_key_equals(value) + + @unit.skip_if_cache_disabled('resource') + @unit.skip_if_no_multiple_domains_support + def test_domain_rename_invalidates_get_domain_by_name_cache(self): + domain = unit.new_domain_ref() + domain_id = domain['id'] + domain_name = domain['name'] + self.resource_api.create_domain(domain_id, domain) + domain_ref = self.resource_api.get_domain_by_name(domain_name) + domain_ref['name'] = uuid.uuid4().hex + self.resource_api.update_domain(domain_id, domain_ref) + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain_by_name, + domain_name) + + @unit.skip_if_cache_disabled('resource') + def test_cache_layer_domain_crud(self): + domain = unit.new_domain_ref() + domain_id = domain['id'] + # Create Domain + self.resource_api.create_domain(domain_id, domain) + project_domain_ref = self.resource_api.get_project(domain_id) + domain_ref = self.resource_api.get_domain(domain_id) + updated_project_domain_ref = copy.deepcopy(project_domain_ref) + updated_project_domain_ref['name'] = uuid.uuid4().hex + updated_domain_ref = copy.deepcopy(domain_ref) + updated_domain_ref['name'] = updated_project_domain_ref['name'] + # Update domain, bypassing resource api manager + self.resource_api.driver.update_project(domain_id, + updated_project_domain_ref) + # Verify get_domain still returns the domain + self.assertDictContainsSubset( + domain_ref, self.resource_api.get_domain(domain_id)) + # Invalidate cache + self.resource_api.get_domain.invalidate(self.resource_api, + domain_id) + # Verify get_domain returns the updated domain + self.assertDictContainsSubset( + updated_domain_ref, self.resource_api.get_domain(domain_id)) + # Update the domain back to original ref, using the assignment api + # manager + self.resource_api.update_domain(domain_id, domain_ref) + self.assertDictContainsSubset( + domain_ref, self.resource_api.get_domain(domain_id)) + # Make sure domain is 'disabled', bypass resource api manager + project_domain_ref_disabled = project_domain_ref.copy() + project_domain_ref_disabled['enabled'] = False + self.resource_api.driver.update_project(domain_id, + project_domain_ref_disabled) + self.resource_api.driver.update_project(domain_id, {'enabled': False}) + # Delete domain, bypassing resource api manager + self.resource_api.driver.delete_project(domain_id) + # Verify get_domain still returns the domain + self.assertDictContainsSubset( + domain_ref, self.resource_api.get_domain(domain_id)) + # Invalidate cache + self.resource_api.get_domain.invalidate(self.resource_api, + domain_id) + # Verify get_domain now raises DomainNotFound + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, domain_id) + # Recreate Domain + self.resource_api.create_domain(domain_id, domain) + self.resource_api.get_domain(domain_id) + # Make sure domain is 'disabled', bypass resource api manager + domain['enabled'] = False + self.resource_api.driver.update_project(domain_id, domain) + self.resource_api.driver.update_project(domain_id, {'enabled': False}) + # Delete domain + self.resource_api.delete_domain(domain_id) + # verify DomainNotFound raised + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain, + domain_id) + + @unit.skip_if_cache_disabled('resource') + @unit.skip_if_no_multiple_domains_support + def test_project_rename_invalidates_get_project_by_name_cache(self): + domain = unit.new_domain_ref() + project = unit.new_project_ref(domain_id=domain['id']) + project_id = project['id'] + project_name = project['name'] + self.resource_api.create_domain(domain['id'], domain) + # Create a project + self.resource_api.create_project(project_id, project) + self.resource_api.get_project_by_name(project_name, domain['id']) + project['name'] = uuid.uuid4().hex + self.resource_api.update_project(project_id, project) + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project_by_name, + project_name, + domain['id']) + + @unit.skip_if_cache_disabled('resource') + @unit.skip_if_no_multiple_domains_support + def test_cache_layer_project_crud(self): + domain = unit.new_domain_ref() + project = unit.new_project_ref(domain_id=domain['id']) + project_id = project['id'] + self.resource_api.create_domain(domain['id'], domain) + # Create a project + self.resource_api.create_project(project_id, project) + self.resource_api.get_project(project_id) + updated_project = copy.deepcopy(project) + updated_project['name'] = uuid.uuid4().hex + # Update project, bypassing resource manager + self.resource_api.driver.update_project(project_id, + updated_project) + # Verify get_project still returns the original project_ref + self.assertDictContainsSubset( + project, self.resource_api.get_project(project_id)) + # Invalidate cache + self.resource_api.get_project.invalidate(self.resource_api, + project_id) + # Verify get_project now returns the new project + self.assertDictContainsSubset( + updated_project, + self.resource_api.get_project(project_id)) + # Update project using the resource_api manager back to original + self.resource_api.update_project(project['id'], project) + # Verify get_project returns the original project_ref + self.assertDictContainsSubset( + project, self.resource_api.get_project(project_id)) + # Delete project bypassing resource + self.resource_api.driver.delete_project(project_id) + # Verify get_project still returns the project_ref + self.assertDictContainsSubset( + project, self.resource_api.get_project(project_id)) + # Invalidate cache + self.resource_api.get_project.invalidate(self.resource_api, + project_id) + # Verify ProjectNotFound now raised + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project_id) + # recreate project + self.resource_api.create_project(project_id, project) + self.resource_api.get_project(project_id) + # delete project + self.resource_api.delete_project(project_id) + # Verify ProjectNotFound is raised + self.assertRaises(exception.ProjectNotFound, + self.resource_api.get_project, + project_id) + + @unit.skip_if_no_multiple_domains_support + def test_get_default_domain_by_name(self): + domain_name = 'default' + + domain = unit.new_domain_ref(name=domain_name) + self.resource_api.create_domain(domain['id'], domain) + + domain_ref = self.resource_api.get_domain_by_name(domain_name) + self.assertEqual(domain, domain_ref) + + def test_get_not_default_domain_by_name(self): + domain_name = 'foo' + self.assertRaises(exception.DomainNotFound, + self.resource_api.get_domain_by_name, + domain_name) + + def test_project_update_and_project_get_return_same_response(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + + self.resource_api.create_project(project['id'], project) + + updated_project = {'enabled': False} + updated_project_ref = self.resource_api.update_project( + project['id'], updated_project) + + # SQL backend adds 'extra' field + updated_project_ref.pop('extra', None) + + self.assertIs(False, updated_project_ref['enabled']) + + project_ref = self.resource_api.get_project(project['id']) + self.assertDictEqual(updated_project_ref, project_ref) + + +class ResourceDriverTests(object): + """Tests for the resource driver. + + Subclasses must set self.driver to the driver instance. + + """ + + def test_create_project(self): + project_id = uuid.uuid4().hex + project = { + 'name': uuid.uuid4().hex, + 'id': project_id, + 'domain_id': uuid.uuid4().hex, + } + self.driver.create_project(project_id, project) + + def test_create_project_all_defined_properties(self): + project_id = uuid.uuid4().hex + project = { + 'name': uuid.uuid4().hex, + 'id': project_id, + 'domain_id': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': True, + 'parent_id': uuid.uuid4().hex, + 'is_domain': True, + } + self.driver.create_project(project_id, project) + + def test_create_project_null_domain(self): + project_id = uuid.uuid4().hex + project = { + 'name': uuid.uuid4().hex, + 'id': project_id, + 'domain_id': None, + } + self.driver.create_project(project_id, project) + + def test_create_project_same_name_same_domain_conflict(self): + name = uuid.uuid4().hex + domain_id = uuid.uuid4().hex + + project_id = uuid.uuid4().hex + project = { + 'name': name, + 'id': project_id, + 'domain_id': domain_id, + } + self.driver.create_project(project_id, project) + + project_id = uuid.uuid4().hex + project = { + 'name': name, + 'id': project_id, + 'domain_id': domain_id, + } + self.assertRaises(exception.Conflict, self.driver.create_project, + project_id, project) + + def test_create_project_same_id_conflict(self): + project_id = uuid.uuid4().hex + + project = { + 'name': uuid.uuid4().hex, + 'id': project_id, + 'domain_id': uuid.uuid4().hex, + } + self.driver.create_project(project_id, project) + + project = { + 'name': uuid.uuid4().hex, + 'id': project_id, + 'domain_id': uuid.uuid4().hex, + } + self.assertRaises(exception.Conflict, self.driver.create_project, + project_id, project) diff --git a/keystone-moon/keystone/tests/unit/resource/test_controllers.py b/keystone-moon/keystone/tests/unit/resource/test_controllers.py new file mode 100644 index 00000000..b8f247c8 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/test_controllers.py @@ -0,0 +1,57 @@ +# Copyright 2016 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo_config import cfg + +from keystone import exception +from keystone.resource import controllers +from keystone.tests import unit +from keystone.tests.unit.ksfixtures import database + + +CONF = cfg.CONF + +_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}} + + +class TenantTestCaseNoDefaultDomain(unit.TestCase): + + def setUp(self): + super(TenantTestCaseNoDefaultDomain, self).setUp() + self.useFixture(database.Database()) + self.load_backends() + self.tenant_controller = controllers.Tenant() + + def test_setup(self): + # Other tests in this class assume there's no default domain, so make + # sure the setUp worked as expected. + self.assertRaises( + exception.DomainNotFound, + self.resource_api.get_domain, CONF.identity.default_domain_id) + + def test_get_all_projects(self): + # When get_all_projects is done and there's no default domain, the + # result is an empty list. + res = self.tenant_controller.get_all_projects(_ADMIN_CONTEXT) + self.assertEqual([], res['tenants']) + + def test_create_project(self): + # When a project is created using the v2 controller and there's no + # default domain, it doesn't fail with can't find domain (a default + # domain is created) + tenant = {'name': uuid.uuid4().hex} + self.tenant_controller.create_project(_ADMIN_CONTEXT, tenant) + # If the above doesn't fail then this is successful. diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py new file mode 100644 index 00000000..2eb87e4c --- /dev/null +++ b/keystone-moon/keystone/tests/unit/resource/test_core.py @@ -0,0 +1,692 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +import mock +from testtools import matchers + +from oslo_config import cfg +from oslotest import mockpatch + +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit.ksfixtures import database + + +CONF = cfg.CONF + + +class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase): + + def setUp(self): + super(TestResourceManagerNoFixtures, self).setUp() + self.useFixture(database.Database(self.sql_driver_version_overrides)) + self.load_backends() + + def test_ensure_default_domain_exists(self): + # When there's no default domain, ensure_default_domain_exists creates + # it. + + # First make sure there's no default domain. + self.assertRaises( + exception.DomainNotFound, + self.resource_api.get_domain, CONF.identity.default_domain_id) + + self.resource_api.ensure_default_domain_exists() + default_domain = self.resource_api.get_domain( + CONF.identity.default_domain_id) + + expected_domain = { + 'id': CONF.identity.default_domain_id, + 'name': 'Default', + 'enabled': True, + 'description': 'Domain created automatically to support V2.0 ' + 'operations.', + } + self.assertEqual(expected_domain, default_domain) + + def test_ensure_default_domain_exists_already_exists(self): + # When there's already a default domain, ensure_default_domain_exists + # doesn't do anything. + + name = uuid.uuid4().hex + description = uuid.uuid4().hex + domain_attrs = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'description': description, + } + self.resource_api.create_domain(CONF.identity.default_domain_id, + domain_attrs) + + self.resource_api.ensure_default_domain_exists() + + default_domain = self.resource_api.get_domain( + CONF.identity.default_domain_id) + + expected_domain = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'enabled': True, + 'description': description, + } + + self.assertEqual(expected_domain, default_domain) + + def test_ensure_default_domain_exists_fails(self): + # When there's an unexpected exception creating domain it's passed on. + + self.useFixture(mockpatch.PatchObject( + self.resource_api, 'create_domain', + side_effect=exception.UnexpectedError)) + + self.assertRaises(exception.UnexpectedError, + self.resource_api.ensure_default_domain_exists) + + def test_update_project_name_conflict(self): + name = uuid.uuid4().hex + description = uuid.uuid4().hex + domain_attrs = { + 'id': CONF.identity.default_domain_id, + 'name': name, + 'description': description, + } + domain = self.resource_api.create_domain( + CONF.identity.default_domain_id, domain_attrs) + project1 = unit.new_project_ref(domain_id=domain['id'], + name=uuid.uuid4().hex) + self.resource_api.create_project(project1['id'], project1) + project2 = unit.new_project_ref(domain_id=domain['id'], + name=uuid.uuid4().hex) + project = self.resource_api.create_project(project2['id'], project2) + + self.assertRaises(exception.Conflict, + self.resource_api.update_project, + project['id'], {'name': project1['name']}) + + +class DomainConfigDriverTests(object): + + def _domain_config_crud(self, sensitive): + domain = uuid.uuid4().hex + group = uuid.uuid4().hex + option = uuid.uuid4().hex + value = uuid.uuid4().hex + self.driver.create_config_option( + domain, group, option, value, sensitive) + res = self.driver.get_config_option( + domain, group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + value = uuid.uuid4().hex + self.driver.update_config_option( + domain, group, option, value, sensitive) + res = self.driver.get_config_option( + domain, group, option, sensitive) + config = {'group': group, 'option': option, 'value': value} + self.assertEqual(config, res) + + self.driver.delete_config_options( + domain, group, option, sensitive) + self.assertRaises(exception.DomainConfigNotFound, + self.driver.get_config_option, + domain, group, option, sensitive) + # ...and silent if we try to delete it again + self.driver.delete_config_options( + domain, group, option, sensitive) + + def test_whitelisted_domain_config_crud(self): + self._domain_config_crud(sensitive=False) + + def test_sensitive_domain_config_crud(self): + self._domain_config_crud(sensitive=True) + + def _list_domain_config(self, sensitive): + """Test listing by combination of domain, group & option.""" + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': 100} + domain = uuid.uuid4().hex + + for config in [config1, config2, config3]: + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive) + + # Try listing all items from a domain + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2, config3]) + + # Try listing by domain and group + res = self.driver.list_config_options( + domain, group=config1['group'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config2]) + + # Try listing by domain, group and option + res = self.driver.list_config_options( + domain, group=config2['group'], + option=config2['option'], sensitive=sensitive) + self.assertThat(res, matchers.HasLength(1)) + self.assertEqual(config2, res[0]) + + def test_list_whitelisted_domain_config_crud(self): + self._list_domain_config(False) + + def test_list_sensitive_domain_config_crud(self): + self._list_domain_config(True) + + def _delete_domain_configs(self, sensitive): + """Test deleting by combination of domain, group & option.""" + config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + # Put config2 and config3 in the same group as config1 + config2 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config3 = {'group': config1['group'], 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + domain = uuid.uuid4().hex + + for config in [config1, config2, config3, config4]: + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive) + + # Try deleting by domain, group and option + res = self.driver.delete_config_options( + domain, group=config2['group'], + option=config2['option'], sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(3)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3, config4]) + + # Try deleting by domain and group + res = self.driver.delete_config_options( + domain, group=config4['group'], sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(2)) + for res_entry in res: + self.assertIn(res_entry, [config1, config3]) + + # Try deleting all items from a domain + res = self.driver.delete_config_options( + domain, sensitive=sensitive) + res = self.driver.list_config_options( + domain, sensitive=sensitive) + self.assertThat(res, matchers.HasLength(0)) + + def test_delete_whitelisted_domain_configs(self): + self._delete_domain_configs(False) + + def test_delete_sensitive_domain_configs(self): + self._delete_domain_configs(True) + + def _create_domain_config_twice(self, sensitive): + """Test conflict error thrown if create the same option twice.""" + config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex, + 'value': uuid.uuid4().hex} + domain = uuid.uuid4().hex + + self.driver.create_config_option( + domain, config['group'], config['option'], + config['value'], sensitive=sensitive) + self.assertRaises(exception.Conflict, + self.driver.create_config_option, + domain, config['group'], config['option'], + config['value'], sensitive=sensitive) + + def test_create_whitelisted_domain_config_twice(self): + self._create_domain_config_twice(False) + + def test_create_sensitive_domain_config_twice(self): + self._create_domain_config_twice(True) + + +class DomainConfigTests(object): + + def setUp(self): + self.domain = unit.new_domain_ref() + self.resource_api.create_domain(self.domain['id'], self.domain) + self.addCleanup(self.clean_up_domain) + + def clean_up_domain(self): + # NOTE(henry-nash): Deleting the domain will also delete any domain + # configs for this domain. + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + self.resource_api.delete_domain(self.domain['id']) + del self.domain + + def test_create_domain_config_including_sensitive_option(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # password is sensitive, so check that the whitelisted portion and + # the sensitive piece have been stored in the appropriate locations. + res = self.domain_config_api.get_config(self.domain['id']) + config_whitelisted = copy.deepcopy(config) + config_whitelisted['ldap'].pop('password') + self.assertEqual(config_whitelisted, res) + res = self.domain_config_api.driver.get_config_option( + self.domain['id'], 'ldap', 'password', sensitive=True) + self.assertEqual(config['ldap']['password'], res['value']) + + # Finally, use the non-public API to get back the whole config + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_get_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + res = self.domain_config_api.get_config(self.domain['id'], + group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('ldap') + self.assertEqual(config_partial, res) + res = self.domain_config_api.get_config( + self.domain['id'], group='ldap', option='user_tree_dn') + self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res) + # ...but we should fail to get a sensitive option + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='password') + + def test_delete_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + self.domain_config_api.delete_config( + self.domain['id'], group='identity') + config_partial = copy.deepcopy(config) + config_partial.pop('identity') + config_partial['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + self.domain_config_api.delete_config( + self.domain['id'], group='ldap', option='url') + config_partial = copy.deepcopy(config_partial) + config_partial['ldap'].pop('url') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(config_partial, res) + + def test_get_options_not_in_domain_config(self): + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id']) + config = {'ldap': {'url': uuid.uuid4().hex}} + + self.domain_config_api.create_config(self.domain['id'], config) + + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='identity') + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.get_config, self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_get_sensitive_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual({}, res) + self.domain_config_api.create_config(self.domain['id'], config) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + def test_update_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Try updating a group + new_config = {'ldap': {'url': uuid.uuid4().hex, + 'user_filter': uuid.uuid4().hex}} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap') + expected_config = copy.deepcopy(config) + expected_config['ldap']['url'] = new_config['ldap']['url'] + expected_config['ldap']['user_filter'] = ( + new_config['ldap']['user_filter']) + expected_full_config = copy.deepcopy(expected_config) + expected_config['ldap'].pop('password') + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_config, res) + # The sensitive option should still exist + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single whitelisted option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'url': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='url') + + # Make sure whitelisted and full config is updated + expected_whitelisted_config = copy.deepcopy(config) + expected_whitelisted_config['ldap']['url'] = new_config['url'] + expected_full_config = copy.deepcopy(expected_whitelisted_config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(expected_full_config, res) + + # Try updating a single sensitive option + self.domain_config_api.delete_config(self.domain['id']) + self.domain_config_api.create_config(self.domain['id'], config) + new_config = {'password': uuid.uuid4().hex} + res = self.domain_config_api.update_config( + self.domain['id'], new_config, group='ldap', option='password') + # The whitelisted config should not have changed... + expected_whitelisted_config = copy.deepcopy(config) + expected_full_config = copy.deepcopy(config) + expected_whitelisted_config['ldap'].pop('password') + self.assertEqual(expected_whitelisted_config, res) + res = self.domain_config_api.get_config(self.domain['id']) + self.assertEqual(expected_whitelisted_config, res) + expected_full_config['ldap']['password'] = new_config['password'] + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + # ...but the sensitive piece should have. + self.assertEqual(expected_full_config, res) + + def test_update_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + # An extra group, when specifying one group should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap') + # An extra option, when specifying one option should fail + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], + group='ldap', option='url') + + # Now try the right number of groups/options, but just not + # ones that are in the config provided + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='identity') + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config['ldap'], group='ldap', + option='url') + + # Now some valid groups/options, but just not ones that are in the + # existing config + config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_group, + group='identity') + config_wrong_option = {'url': uuid.uuid4().hex} + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.update_config, + self.domain['id'], config_wrong_option, + group='ldap', option='url') + + # And finally just some bad groups/options + bad_group = uuid.uuid4().hex + config = {bad_group: {'user': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group=bad_group, + option='user') + bad_option = uuid.uuid4().hex + config = {'ldap': {bad_option: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.update_config, + self.domain['id'], config, group='ldap', + option=bad_option) + + def test_create_invalid_domain_config(self): + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], {}) + config = {uuid.uuid4().hex: uuid.uuid4().hex} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + # Try an option that IS in the standard conf, but neither whitelisted + # or marked as sensitive + config = {'identity': {'user_tree_dn': uuid.uuid4().hex}} + self.assertRaises(exception.InvalidDomainConfig, + self.domain_config_api.create_config, + self.domain['id'], config) + + def test_delete_invalid_partial_domain_config(self): + config = {'ldap': {'url': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + # Try deleting a group not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], group='identity') + # Try deleting an option not in the config + self.assertRaises(exception.DomainConfigNotFound, + self.domain_config_api.delete_config, + self.domain['id'], + group='ldap', option='user_tree_dn') + + def test_sensitive_substitution_in_domain_config(self): + # Create a config that contains a whitelisted option that requires + # substitution of a sensitive option. + config = {'ldap': {'url': 'my_url/%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + + # Read back the config with the internal method and ensure that the + # substitution has taken place. + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + expected_url = ( + config['ldap']['url'] % {'password': config['ldap']['password']}) + self.assertEqual(expected_url, res['ldap']['url']) + + def test_invalid_sensitive_substitution_in_domain_config(self): + """Check that invalid substitutions raise warnings.""" + mock_log = mock.Mock() + + invalid_option_config = { + 'ldap': {'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + for invalid_option in ['my_url/%(passssword)s', + 'my_url/%(password', + 'my_url/%(password)', + 'my_url/%(password)d']: + invalid_option_config['ldap']['url'] = invalid_option + self.domain_config_api.create_config( + self.domain['id'], invalid_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + mock_log.warning.assert_any_call(mock.ANY) + self.assertEqual( + invalid_option_config['ldap']['url'], res['ldap']['url']) + + def test_escaped_sequence_in_domain_config(self): + """Check that escaped '%(' doesn't get interpreted.""" + mock_log = mock.Mock() + + escaped_option_config = { + 'ldap': {'url': 'my_url/%%(password)s', + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + + self.domain_config_api.create_config( + self.domain['id'], escaped_option_config) + + with mock.patch('keystone.resource.core.LOG', mock_log): + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertFalse(mock_log.warn.called) + # The escaping '%' should have been removed + self.assertEqual('my_url/%(password)s', res['ldap']['url']) + + @unit.skip_if_cache_disabled('domain_config') + def test_cache_layer_get_sensitive_config(self): + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}, + 'identity': {'driver': uuid.uuid4().hex}} + self.domain_config_api.create_config(self.domain['id'], config) + # cache the result + res = self.domain_config_api.get_config_with_sensitive_info( + self.domain['id']) + self.assertEqual(config, res) + + # delete, bypassing domain config manager api + self.domain_config_api.delete_config_options(self.domain['id']) + self.domain_config_api.delete_config_options(self.domain['id'], + sensitive=True) + + self.assertDictEqual( + res, self.domain_config_api.get_config_with_sensitive_info( + self.domain['id'])) + self.domain_config_api.get_config_with_sensitive_info.invalidate( + self.domain_config_api, self.domain['id']) + self.assertDictEqual( + {}, + self.domain_config_api.get_config_with_sensitive_info( + self.domain['id'])) + + def test_delete_domain_deletes_configs(self): + """Test domain deletion clears the domain configs.""" + domain = unit.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + config = {'ldap': {'url': uuid.uuid4().hex, + 'user_tree_dn': uuid.uuid4().hex, + 'password': uuid.uuid4().hex}} + self.domain_config_api.create_config(domain['id'], config) + + # Now delete the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + self.resource_api.delete_domain(domain['id']) + + # Check domain configs have also been deleted + self.assertRaises( + exception.DomainConfigNotFound, + self.domain_config_api.get_config, + domain['id']) + + # The get_config_with_sensitive_info does not throw an exception if + # the config is empty, it just returns an empty dict + self.assertDictEqual( + {}, + self.domain_config_api.get_config_with_sensitive_info( + domain['id'])) + + def test_config_registration(self): + type = uuid.uuid4().hex + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.domain_config_api.release_registration( + self.domain['id'], type=type) + + # Make sure that once someone has it, nobody else can get it. + # This includes the domain who already has it. + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.assertFalse( + self.domain_config_api.obtain_registration( + self.domain['id'], type)) + + # Make sure we can read who does have it + self.assertEqual( + self.domain['id'], + self.domain_config_api.read_registration(type)) + + # Make sure releasing it is silent if the domain specified doesn't + # have the registration + domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(domain2['id'], domain2) + self.domain_config_api.release_registration( + domain2['id'], type=type) + + # If nobody has the type registered, then trying to read it should + # raise ConfigRegistrationNotFound + self.domain_config_api.release_registration( + self.domain['id'], type=type) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type) + + # Finally check multiple registrations are cleared if you free the + # registration without specifying the type + type2 = uuid.uuid4().hex + self.domain_config_api.obtain_registration( + self.domain['id'], type) + self.domain_config_api.obtain_registration( + self.domain['id'], type2) + self.domain_config_api.release_registration(self.domain['id']) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type) + self.assertRaises(exception.ConfigRegistrationNotFound, + self.domain_config_api.read_registration, + type2) |