aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/resource
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/resource')
-rw-r--r--keystone-moon/keystone/tests/unit/resource/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/backends/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/backends/test_sql.py24
-rw-r--r--keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py53
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_backends.py1669
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_controllers.py57
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_core.py692
8 files changed, 2495 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/resource/__init__.py b/keystone-moon/keystone/tests/unit/resource/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/__init__.py
diff --git a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py
diff --git a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py
new file mode 100644
index 00000000..79ad3df2
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py
@@ -0,0 +1,24 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from keystone.resource.backends import sql
+from keystone.tests import unit
+from keystone.tests.unit.ksfixtures import database
+from keystone.tests.unit.resource import test_backends
+
+
+class TestSqlResourceDriver(unit.BaseTestCase,
+ test_backends.ResourceDriverTests):
+ def setUp(self):
+ super(TestSqlResourceDriver, self).setUp()
+ self.useFixture(database.Database())
+ self.driver = sql.Resource()
diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py
new file mode 100644
index 00000000..e69de29b
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py
diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py
new file mode 100644
index 00000000..b4c5f262
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py
@@ -0,0 +1,53 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from keystone.common import sql
+from keystone.resource.config_backends import sql as config_sql
+from keystone.tests import unit
+from keystone.tests.unit.backend import core_sql
+from keystone.tests.unit.ksfixtures import database
+from keystone.tests.unit.resource import test_core
+
+
+class SqlDomainConfigModels(core_sql.BaseBackendSqlModels):
+
+ def test_whitelisted_model(self):
+ cols = (('domain_id', sql.String, 64),
+ ('group', sql.String, 255),
+ ('option', sql.String, 255),
+ ('value', sql.JsonBlob, None))
+ self.assertExpectedSchema('whitelisted_config', cols)
+
+ def test_sensitive_model(self):
+ cols = (('domain_id', sql.String, 64),
+ ('group', sql.String, 255),
+ ('option', sql.String, 255),
+ ('value', sql.JsonBlob, None))
+ self.assertExpectedSchema('sensitive_config', cols)
+
+
+class SqlDomainConfigDriver(unit.BaseTestCase,
+ test_core.DomainConfigDriverTests):
+ def setUp(self):
+ super(SqlDomainConfigDriver, self).setUp()
+ self.useFixture(database.Database())
+ self.driver = config_sql.DomainConfig()
+
+
+class SqlDomainConfig(core_sql.BaseBackendSqlTests,
+ test_core.DomainConfigTests):
+ def setUp(self):
+ super(SqlDomainConfig, self).setUp()
+ # test_core.DomainConfigTests is effectively a mixin class, so make
+ # sure we call its setup
+ test_core.DomainConfigTests.setUp(self)
diff --git a/keystone-moon/keystone/tests/unit/resource/test_backends.py b/keystone-moon/keystone/tests/unit/resource/test_backends.py
new file mode 100644
index 00000000..eed4c6ba
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/test_backends.py
@@ -0,0 +1,1669 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import uuid
+
+import mock
+from oslo_config import cfg
+from six.moves import range
+from testtools import matchers
+
+from keystone.common import driver_hints
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit import default_fixtures
+from keystone.tests.unit import utils as test_utils
+
+
+CONF = cfg.CONF
+
+
+class ResourceTests(object):
+
+ domain_count = len(default_fixtures.DOMAINS)
+
+ def test_get_project(self):
+ tenant_ref = self.resource_api.get_project(self.tenant_bar['id'])
+ self.assertDictEqual(self.tenant_bar, tenant_ref)
+
+ def test_get_project_returns_not_found(self):
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ uuid.uuid4().hex)
+
+ def test_get_project_by_name(self):
+ tenant_ref = self.resource_api.get_project_by_name(
+ self.tenant_bar['name'],
+ CONF.identity.default_domain_id)
+ self.assertDictEqual(self.tenant_bar, tenant_ref)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_get_project_by_name_for_project_acting_as_a_domain(self):
+ """Tests get_project_by_name works when the domain_id is None."""
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, is_domain=False)
+ project = self.resource_api.create_project(project['id'], project)
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project_by_name,
+ project['name'],
+ None)
+
+ # Test that querying with domain_id as None will find the project
+ # acting as a domain, even if it's name is the same as the regular
+ # project above.
+ project2 = unit.new_project_ref(is_domain=True,
+ name=project['name'])
+ project2 = self.resource_api.create_project(project2['id'], project2)
+
+ project_ref = self.resource_api.get_project_by_name(
+ project2['name'], None)
+
+ self.assertEqual(project2, project_ref)
+
+ def test_get_project_by_name_returns_not_found(self):
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project_by_name,
+ uuid.uuid4().hex,
+ CONF.identity.default_domain_id)
+
+ def test_create_duplicate_project_id_fails(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project_id = project['id']
+ self.resource_api.create_project(project_id, project)
+ project['name'] = 'fake2'
+ self.assertRaises(exception.Conflict,
+ self.resource_api.create_project,
+ project_id,
+ project)
+
+ def test_create_duplicate_project_name_fails(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project_id = project['id']
+ self.resource_api.create_project(project_id, project)
+ project['id'] = 'fake2'
+ self.assertRaises(exception.Conflict,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_create_duplicate_project_name_in_different_domains(self):
+ new_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(new_domain['id'], new_domain)
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project2 = unit.new_project_ref(name=project1['name'],
+ domain_id=new_domain['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ self.resource_api.create_project(project2['id'], project2)
+
+ def test_move_project_between_domains(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ project = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project['id'], project)
+ project['domain_id'] = domain2['id']
+ # Update the project asserting that a deprecation warning is emitted
+ with mock.patch(
+ 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
+ self.resource_api.update_project(project['id'], project)
+ self.assertTrue(mock_dep.called)
+
+ updated_project_ref = self.resource_api.get_project(project['id'])
+ self.assertEqual(domain2['id'], updated_project_ref['domain_id'])
+
+ def test_move_project_between_domains_with_clashing_names_fails(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ # First, create a project in domain1
+ project1 = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ # Now create a project in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ project2 = unit.new_project_ref(name=project1['name'],
+ domain_id=domain2['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ # Now try and move project1 into the 2nd domain - which should
+ # fail since the names clash
+ project1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.resource_api.update_project,
+ project1['id'],
+ project1)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_move_project_with_children_between_domains_fails(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ project = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project['id'], project)
+ child_project = unit.new_project_ref(domain_id=domain1['id'],
+ parent_id=project['id'])
+ self.resource_api.create_project(child_project['id'], child_project)
+ project['domain_id'] = domain2['id']
+
+ # Update is not allowed, since updating the whole subtree would be
+ # necessary
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_move_project_not_root_between_domains_fails(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ project = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project['id'], project)
+ child_project = unit.new_project_ref(domain_id=domain1['id'],
+ parent_id=project['id'])
+ self.resource_api.create_project(child_project['id'], child_project)
+ child_project['domain_id'] = domain2['id']
+
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ child_project['id'],
+ child_project)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_move_root_project_between_domains_succeeds(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ root_project = unit.new_project_ref(domain_id=domain1['id'])
+ root_project = self.resource_api.create_project(root_project['id'],
+ root_project)
+
+ root_project['domain_id'] = domain2['id']
+ self.resource_api.update_project(root_project['id'], root_project)
+ project_from_db = self.resource_api.get_project(root_project['id'])
+
+ self.assertEqual(domain2['id'], project_from_db['domain_id'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_update_domain_id_project_is_domain_fails(self):
+ other_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(other_domain['id'], other_domain)
+ project = unit.new_project_ref(is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+ project['domain_id'] = other_domain['id']
+
+ # Update of domain_id of projects acting as domains is not allowed
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ def test_rename_duplicate_project_name_fails(self):
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project2 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project1['id'], project1)
+ self.resource_api.create_project(project2['id'], project2)
+ project2['name'] = project1['name']
+ self.assertRaises(exception.Error,
+ self.resource_api.update_project,
+ project2['id'],
+ project2)
+
+ def test_update_project_id_does_nothing(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project_id = project['id']
+ self.resource_api.create_project(project['id'], project)
+ project['id'] = 'fake2'
+ self.resource_api.update_project(project_id, project)
+ project_ref = self.resource_api.get_project(project_id)
+ self.assertEqual(project_id, project_ref['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ 'fake2')
+
+ def test_delete_domain_with_user_group_project_links(self):
+ # TODO(chungg):add test case once expected behaviour defined
+ pass
+
+ def test_update_project_returns_not_found(self):
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.update_project,
+ uuid.uuid4().hex,
+ dict())
+
+ def test_delete_project_returns_not_found(self):
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.delete_project,
+ uuid.uuid4().hex)
+
+ def test_create_update_delete_unicode_project(self):
+ unicode_project_name = u'name \u540d\u5b57'
+ project = unit.new_project_ref(
+ name=unicode_project_name,
+ domain_id=CONF.identity.default_domain_id)
+ project = self.resource_api.create_project(project['id'], project)
+ self.resource_api.update_project(project['id'], project)
+ self.resource_api.delete_project(project['id'])
+
+ def test_create_project_with_no_enabled_field(self):
+ ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
+ del ref['enabled']
+ self.resource_api.create_project(ref['id'], ref)
+
+ project = self.resource_api.get_project(ref['id'])
+ self.assertIs(project['enabled'], True)
+
+ def test_create_project_long_name_fails(self):
+ project = unit.new_project_ref(
+ name='a' * 65, domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_create_project_blank_name_fails(self):
+ project = unit.new_project_ref(
+ name='', domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_create_project_invalid_name_fails(self):
+ project = unit.new_project_ref(
+ name=None, domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+ project = unit.new_project_ref(
+ name=123, domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_update_project_blank_name_fails(self):
+ project = unit.new_project_ref(
+ name='fake1', domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+ project['name'] = ''
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ def test_update_project_long_name_fails(self):
+ project = unit.new_project_ref(
+ name='fake1', domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+ project['name'] = 'a' * 65
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ def test_update_project_invalid_name_fails(self):
+ project = unit.new_project_ref(
+ name='fake1', domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+ project['name'] = None
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ project['name'] = 123
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ def test_update_project_invalid_enabled_type_string(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertTrue(project_ref['enabled'])
+
+ # Strings are not valid boolean values
+ project['enabled'] = "false"
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'],
+ project)
+
+ def test_create_project_invalid_enabled_type_string(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ # invalid string value
+ enabled="true")
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_create_project_invalid_domain_id(self):
+ project = unit.new_project_ref(domain_id=uuid.uuid4().hex)
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_list_domains(self):
+ domain1 = unit.new_domain_ref()
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ self.resource_api.create_domain(domain2['id'], domain2)
+ domains = self.resource_api.list_domains()
+ self.assertEqual(3, len(domains))
+ domain_ids = []
+ for domain in domains:
+ domain_ids.append(domain.get('id'))
+ self.assertIn(CONF.identity.default_domain_id, domain_ids)
+ self.assertIn(domain1['id'], domain_ids)
+ self.assertIn(domain2['id'], domain_ids)
+
+ def test_list_projects(self):
+ project_refs = self.resource_api.list_projects()
+ project_count = len(default_fixtures.TENANTS) + self.domain_count
+ self.assertEqual(project_count, len(project_refs))
+ for project in default_fixtures.TENANTS:
+ self.assertIn(project, project_refs)
+
+ def test_list_projects_with_multiple_filters(self):
+ # Create a project
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project = self.resource_api.create_project(project['id'], project)
+
+ # Build driver hints with the project's name and inexistent description
+ hints = driver_hints.Hints()
+ hints.add_filter('name', project['name'])
+ hints.add_filter('description', uuid.uuid4().hex)
+
+ # Retrieve projects based on hints and check an empty list is returned
+ projects = self.resource_api.list_projects(hints)
+ self.assertEqual([], projects)
+
+ # Build correct driver hints
+ hints = driver_hints.Hints()
+ hints.add_filter('name', project['name'])
+ hints.add_filter('description', project['description'])
+
+ # Retrieve projects based on hints
+ projects = self.resource_api.list_projects(hints)
+
+ # Check that the returned list contains only the first project
+ self.assertEqual(1, len(projects))
+ self.assertEqual(project, projects[0])
+
+ def test_list_projects_for_domain(self):
+ project_ids = ([x['id'] for x in
+ self.resource_api.list_projects_in_domain(
+ CONF.identity.default_domain_id)])
+ # Only the projects from the default fixtures are expected, since
+ # filtering by domain does not include any project that acts as a
+ # domain.
+ self.assertThat(
+ project_ids, matchers.HasLength(len(default_fixtures.TENANTS)))
+ self.assertIn(self.tenant_bar['id'], project_ids)
+ self.assertIn(self.tenant_baz['id'], project_ids)
+ self.assertIn(self.tenant_mtu['id'], project_ids)
+ self.assertIn(self.tenant_service['id'], project_ids)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_list_projects_acting_as_domain(self):
+ initial_domains = self.resource_api.list_domains()
+
+ # Creating 5 projects that act as domains
+ new_projects_acting_as_domains = []
+ for i in range(5):
+ project = unit.new_project_ref(is_domain=True)
+ project = self.resource_api.create_project(project['id'], project)
+ new_projects_acting_as_domains.append(project)
+
+ # Creating a few regular project to ensure it doesn't mess with the
+ # ones that act as domains
+ self._create_projects_hierarchy(hierarchy_size=2)
+
+ projects = self.resource_api.list_projects_acting_as_domain()
+ expected_number_projects = (
+ len(initial_domains) + len(new_projects_acting_as_domains))
+ self.assertEqual(expected_number_projects, len(projects))
+ for project in new_projects_acting_as_domains:
+ self.assertIn(project, projects)
+ for domain in initial_domains:
+ self.assertIn(domain['id'], [p['id'] for p in projects])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_list_projects_for_alternate_domain(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ project1 = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ project_ids = ([x['id'] for x in
+ self.resource_api.list_projects_in_domain(
+ domain1['id'])])
+ self.assertEqual(2, len(project_ids))
+ self.assertIn(project1['id'], project_ids)
+ self.assertIn(project2['id'], project_ids)
+
+ def _create_projects_hierarchy(self, hierarchy_size=2,
+ domain_id=None,
+ is_domain=False,
+ parent_project_id=None):
+ """Creates a project hierarchy with specified size.
+
+ :param hierarchy_size: the desired hierarchy size, default is 2 -
+ a project with one child.
+ :param domain_id: domain where the projects hierarchy will be created.
+ :param is_domain: if the hierarchy will have the is_domain flag active
+ or not.
+ :param parent_project_id: if the intention is to create a
+ sub-hierarchy, sets the sub-hierarchy root. Defaults to creating
+ a new hierarchy, i.e. a new root project.
+
+ :returns projects: a list of the projects in the created hierarchy.
+
+ """
+ if domain_id is None:
+ domain_id = CONF.identity.default_domain_id
+ if parent_project_id:
+ project = unit.new_project_ref(parent_id=parent_project_id,
+ domain_id=domain_id,
+ is_domain=is_domain)
+ else:
+ project = unit.new_project_ref(domain_id=domain_id,
+ is_domain=is_domain)
+ project_id = project['id']
+ project = self.resource_api.create_project(project_id, project)
+
+ projects = [project]
+ for i in range(1, hierarchy_size):
+ new_project = unit.new_project_ref(parent_id=project_id,
+ domain_id=domain_id)
+
+ self.resource_api.create_project(new_project['id'], new_project)
+ projects.append(new_project)
+ project_id = new_project['id']
+
+ return projects
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_create_domain_with_project_api(self):
+ project = unit.new_project_ref(is_domain=True)
+ ref = self.resource_api.create_project(project['id'], project)
+ self.assertTrue(ref['is_domain'])
+ self.resource_api.get_domain(ref['id'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_project_as_a_domain_uniqueness_constraints(self):
+ """Tests project uniqueness for those acting as domains.
+
+ If it is a project acting as a domain, we can't have two or more with
+ the same name.
+
+ """
+ # Create two projects acting as a domain
+ project = unit.new_project_ref(is_domain=True)
+ project = self.resource_api.create_project(project['id'], project)
+ project2 = unit.new_project_ref(is_domain=True)
+ project2 = self.resource_api.create_project(project2['id'], project2)
+
+ # All projects acting as domains have a null domain_id, so should not
+ # be able to create another with the same name but a different
+ # project ID.
+ new_project = project.copy()
+ new_project['id'] = uuid.uuid4().hex
+
+ self.assertRaises(exception.Conflict,
+ self.resource_api.create_project,
+ new_project['id'],
+ new_project)
+
+ # We also should not be able to update one to have a name clash
+ project2['name'] = project['name']
+ self.assertRaises(exception.Conflict,
+ self.resource_api.update_project,
+ project2['id'],
+ project2)
+
+ # But updating it to a unique name is OK
+ project2['name'] = uuid.uuid4().hex
+ self.resource_api.update_project(project2['id'], project2)
+
+ # Finally, it should be OK to create a project with same name as one of
+ # these acting as a domain, as long as it is a regular project
+ project3 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, name=project2['name'])
+ self.resource_api.create_project(project3['id'], project3)
+ # In fact, it should be OK to create such a project in the domain which
+ # has the matching name.
+ # TODO(henry-nash): Once we fully support projects acting as a domain,
+ # add a test here to create a sub-project with a name that matches its
+ # project acting as a domain
+
+ @unit.skip_if_no_multiple_domains_support
+ @test_utils.wip('waiting for sub projects acting as domains support')
+ def test_is_domain_sub_project_has_parent_domain_id(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+
+ sub_project = unit.new_project_ref(domain_id=project['id'],
+ parent_id=project['id'],
+ is_domain=True)
+
+ ref = self.resource_api.create_project(sub_project['id'], sub_project)
+ self.assertTrue(ref['is_domain'])
+ self.assertEqual(project['id'], ref['parent_id'])
+ self.assertEqual(project['id'], ref['domain_id'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_delete_domain_with_project_api(self):
+ project = unit.new_project_ref(domain_id=None,
+ is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+
+ # Check that a corresponding domain was created
+ self.resource_api.get_domain(project['id'])
+
+ # Try to delete the enabled project that acts as a domain
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.delete_project,
+ project['id'])
+
+ # Disable the project
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+
+ # Successfully delete the project
+ self.resource_api.delete_project(project['id'])
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project['id'])
+
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ project['id'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_create_subproject_acting_as_domain_fails(self):
+ root_project = unit.new_project_ref(is_domain=True)
+ self.resource_api.create_project(root_project['id'], root_project)
+
+ sub_project = unit.new_project_ref(is_domain=True,
+ parent_id=root_project['id'])
+
+ # Creation of sub projects acting as domains is not allowed yet
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ sub_project['id'], sub_project)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_create_domain_under_regular_project_hierarchy_fails(self):
+ # Projects acting as domains can't have a regular project as parent
+ projects_hierarchy = self._create_projects_hierarchy()
+ parent = projects_hierarchy[1]
+ project = unit.new_project_ref(domain_id=parent['id'],
+ parent_id=parent['id'],
+ is_domain=True)
+
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'], project)
+
+ @unit.skip_if_no_multiple_domains_support
+ @test_utils.wip('waiting for sub projects acting as domains support')
+ def test_create_project_under_domain_hierarchy(self):
+ projects_hierarchy = self._create_projects_hierarchy(is_domain=True)
+ parent = projects_hierarchy[1]
+ project = unit.new_project_ref(domain_id=parent['id'],
+ parent_id=parent['id'],
+ is_domain=False)
+
+ ref = self.resource_api.create_project(project['id'], project)
+ self.assertFalse(ref['is_domain'])
+ self.assertEqual(parent['id'], ref['parent_id'])
+ self.assertEqual(parent['id'], ref['domain_id'])
+
+ def test_create_project_without_is_domain_flag(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ del project['is_domain']
+ ref = self.resource_api.create_project(project['id'], project)
+ # The is_domain flag should be False by default
+ self.assertFalse(ref['is_domain'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_create_project_passing_is_domain_flag_true(self):
+ project = unit.new_project_ref(is_domain=True)
+
+ ref = self.resource_api.create_project(project['id'], project)
+ self.assertTrue(ref['is_domain'])
+
+ def test_create_project_passing_is_domain_flag_false(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, is_domain=False)
+
+ ref = self.resource_api.create_project(project['id'], project)
+ self.assertIs(False, ref['is_domain'])
+
+ @test_utils.wip('waiting for support for parent_id to imply domain_id')
+ def test_create_project_with_parent_id_and_without_domain_id(self):
+ # First create a domain
+ project = unit.new_project_ref(is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+ # Now create a child by just naming the parent_id
+ sub_project = unit.new_project_ref(parent_id=project['id'])
+ ref = self.resource_api.create_project(sub_project['id'], sub_project)
+
+ # The domain_id should be set to the parent domain_id
+ self.assertEqual(project['domain_id'], ref['domain_id'])
+
+ def test_create_project_with_domain_id_and_without_parent_id(self):
+ # First create a domain
+ project = unit.new_project_ref(is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+ # Now create a child by just naming the domain_id
+ sub_project = unit.new_project_ref(domain_id=project['id'])
+ ref = self.resource_api.create_project(sub_project['id'], sub_project)
+
+ # The parent_id and domain_id should be set to the id of the project
+ # acting as a domain
+ self.assertEqual(project['id'], ref['parent_id'])
+ self.assertEqual(project['id'], ref['domain_id'])
+
+ def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
+ # First create a domain
+ project = unit.new_project_ref(is_domain=True)
+ self.resource_api.create_project(project['id'], project)
+ # Now try to create a child with the above as its parent, but
+ # specifying a different domain.
+ sub_project = unit.new_project_ref(
+ parent_id=project['id'], domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ sub_project['id'], sub_project)
+
+ def test_check_leaf_projects(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project = projects_hierarchy[0]
+ leaf_project = projects_hierarchy[1]
+
+ self.assertFalse(self.resource_api.is_leaf_project(
+ root_project['id']))
+ self.assertTrue(self.resource_api.is_leaf_project(
+ leaf_project['id']))
+
+ # Delete leaf_project
+ self.resource_api.delete_project(leaf_project['id'])
+
+ # Now, root_project should be leaf
+ self.assertTrue(self.resource_api.is_leaf_project(
+ root_project['id']))
+
+ def test_list_projects_in_subtree(self):
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ project1 = projects_hierarchy[0]
+ project2 = projects_hierarchy[1]
+ project3 = projects_hierarchy[2]
+ project4 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ parent_id=project2['id'])
+ self.resource_api.create_project(project4['id'], project4)
+
+ subtree = self.resource_api.list_projects_in_subtree(project1['id'])
+ self.assertEqual(3, len(subtree))
+ self.assertIn(project2, subtree)
+ self.assertIn(project3, subtree)
+ self.assertIn(project4, subtree)
+
+ subtree = self.resource_api.list_projects_in_subtree(project2['id'])
+ self.assertEqual(2, len(subtree))
+ self.assertIn(project3, subtree)
+ self.assertIn(project4, subtree)
+
+ subtree = self.resource_api.list_projects_in_subtree(project3['id'])
+ self.assertEqual(0, len(subtree))
+
+ def test_list_projects_in_subtree_with_circular_reference(self):
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project1 = self.resource_api.create_project(project1['id'], project1)
+
+ project2 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ parent_id=project1['id'])
+ self.resource_api.create_project(project2['id'], project2)
+
+ project1['parent_id'] = project2['id'] # Adds cyclic reference
+
+ # NOTE(dstanek): The manager does not allow parent_id to be updated.
+ # Instead will directly use the driver to create the cyclic
+ # reference.
+ self.resource_api.driver.update_project(project1['id'], project1)
+
+ subtree = self.resource_api.list_projects_in_subtree(project1['id'])
+
+ # NOTE(dstanek): If a cyclic reference is detected the code bails
+ # and returns None instead of falling into the infinite
+ # recursion trap.
+ self.assertIsNone(subtree)
+
+ def test_list_projects_in_subtree_invalid_project_id(self):
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.list_projects_in_subtree,
+ None)
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.list_projects_in_subtree,
+ uuid.uuid4().hex)
+
+ def test_list_project_parents(self):
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ project1 = projects_hierarchy[0]
+ project2 = projects_hierarchy[1]
+ project3 = projects_hierarchy[2]
+ project4 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ parent_id=project2['id'])
+ self.resource_api.create_project(project4['id'], project4)
+
+ parents1 = self.resource_api.list_project_parents(project3['id'])
+ self.assertEqual(3, len(parents1))
+ self.assertIn(project1, parents1)
+ self.assertIn(project2, parents1)
+
+ parents2 = self.resource_api.list_project_parents(project4['id'])
+ self.assertEqual(parents1, parents2)
+
+ parents = self.resource_api.list_project_parents(project1['id'])
+ # It has the default domain as parent
+ self.assertEqual(1, len(parents))
+
+ def test_update_project_enabled_cascade(self):
+ """Test update_project_cascade
+
+ Ensures the enabled attribute is correctly updated across
+ a simple 3-level projects hierarchy.
+ """
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ parent = projects_hierarchy[0]
+
+ # Disable in parent project disables the whole subtree
+ parent['enabled'] = False
+ # Store the ref from backend in another variable so we don't bother
+ # to remove other attributes that were not originally provided and
+ # were set in the manager, like parent_id and domain_id.
+ parent_ref = self.resource_api.update_project(parent['id'],
+ parent,
+ cascade=True)
+
+ subtree = self.resource_api.list_projects_in_subtree(parent['id'])
+ self.assertEqual(2, len(subtree))
+ self.assertFalse(parent_ref['enabled'])
+ self.assertFalse(subtree[0]['enabled'])
+ self.assertFalse(subtree[1]['enabled'])
+
+ # Enable parent project enables the whole subtree
+ parent['enabled'] = True
+ parent_ref = self.resource_api.update_project(parent['id'],
+ parent,
+ cascade=True)
+
+ subtree = self.resource_api.list_projects_in_subtree(parent['id'])
+ self.assertEqual(2, len(subtree))
+ self.assertTrue(parent_ref['enabled'])
+ self.assertTrue(subtree[0]['enabled'])
+ self.assertTrue(subtree[1]['enabled'])
+
+ def test_cannot_enable_cascade_with_parent_disabled(self):
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ grandparent = projects_hierarchy[0]
+ parent = projects_hierarchy[1]
+
+ grandparent['enabled'] = False
+ self.resource_api.update_project(grandparent['id'],
+ grandparent,
+ cascade=True)
+ subtree = self.resource_api.list_projects_in_subtree(parent['id'])
+ self.assertFalse(subtree[0]['enabled'])
+
+ parent['enabled'] = True
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.update_project,
+ parent['id'],
+ parent,
+ cascade=True)
+
+ def test_update_cascade_only_accepts_enabled(self):
+ # Update cascade does not accept any other attribute but 'enabled'
+ new_project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(new_project['id'], new_project)
+
+ new_project['name'] = 'project1'
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ new_project['id'],
+ new_project,
+ cascade=True)
+
+ def test_list_project_parents_invalid_project_id(self):
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.list_project_parents,
+ None)
+
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.list_project_parents,
+ uuid.uuid4().hex)
+
+ def test_create_project_doesnt_modify_passed_in_dict(self):
+ new_project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ original_project = new_project.copy()
+ self.resource_api.create_project(new_project['id'], new_project)
+ self.assertDictEqual(original_project, new_project)
+
+ def test_update_project_enable(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertTrue(project_ref['enabled'])
+
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertEqual(project['enabled'], project_ref['enabled'])
+
+ # If not present, enabled field should not be updated
+ del project['enabled']
+ self.resource_api.update_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertFalse(project_ref['enabled'])
+
+ project['enabled'] = True
+ self.resource_api.update_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertEqual(project['enabled'], project_ref['enabled'])
+
+ del project['enabled']
+ self.resource_api.update_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertTrue(project_ref['enabled'])
+
+ def test_create_invalid_domain_fails(self):
+ new_group = unit.new_group_ref(domain_id="doesnotexist")
+ self.assertRaises(exception.DomainNotFound,
+ self.identity_api.create_group,
+ new_group)
+ new_user = unit.new_user_ref(domain_id="doesnotexist")
+ self.assertRaises(exception.DomainNotFound,
+ self.identity_api.create_user,
+ new_user)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_project_crud(self):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ project = unit.new_project_ref(domain_id=domain['id'])
+ self.resource_api.create_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertDictContainsSubset(project, project_ref)
+
+ project['name'] = uuid.uuid4().hex
+ self.resource_api.update_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertDictContainsSubset(project, project_ref)
+
+ self.resource_api.delete_project(project['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project['id'])
+
+ def test_domain_delete_hierarchy(self):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+
+ # Creating a root and a leaf project inside the domain
+ projects_hierarchy = self._create_projects_hierarchy(
+ domain_id=domain['id'])
+ root_project = projects_hierarchy[0]
+ leaf_project = projects_hierarchy[0]
+
+ # Disable the domain
+ domain['enabled'] = False
+ self.resource_api.update_domain(domain['id'], domain)
+
+ # Delete the domain
+ self.resource_api.delete_domain(domain['id'])
+
+ # Make sure the domain no longer exists
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ domain['id'])
+
+ # Make sure the root project no longer exists
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ root_project['id'])
+
+ # Make sure the leaf project no longer exists
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ leaf_project['id'])
+
+ def test_delete_projects_from_ids(self):
+ """Tests the resource backend call delete_projects_from_ids.
+
+ Tests the normal flow of the delete_projects_from_ids backend call,
+ that ensures no project on the list exists after it is succesfully
+ called.
+ """
+ project1_ref = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ project2_ref = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ projects = (project1_ref, project2_ref)
+ for project in projects:
+ self.resource_api.create_project(project['id'], project)
+
+ # Setting up the ID's list
+ projects_ids = [p['id'] for p in projects]
+ self.resource_api.driver.delete_projects_from_ids(projects_ids)
+
+ # Ensuring projects no longer exist at backend level
+ for project_id in projects_ids:
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.driver.get_project,
+ project_id)
+
+ # Passing an empty list is silently ignored
+ self.resource_api.driver.delete_projects_from_ids([])
+
+ def test_delete_projects_from_ids_with_no_existing_project_id(self):
+ """Tests delete_projects_from_ids issues warning if not found.
+
+ Tests the resource backend call delete_projects_from_ids passing a
+ non existing ID in project_ids, which is logged and ignored by
+ the backend.
+ """
+ project_ref = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project_ref['id'], project_ref)
+
+ # Setting up the ID's list
+ projects_ids = (project_ref['id'], uuid.uuid4().hex)
+ with mock.patch('keystone.resource.backends.sql.LOG') as mock_log:
+ self.resource_api.delete_projects_from_ids(projects_ids)
+ self.assertTrue(mock_log.warning.called)
+ # The existing project was deleted.
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.driver.get_project,
+ project_ref['id'])
+
+ # Even if we only have one project, and it does not exist, it returns
+ # no error.
+ self.resource_api.driver.delete_projects_from_ids([uuid.uuid4().hex])
+
+ def test_delete_project_cascade(self):
+ # create a hierarchy with 3 levels
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ root_project = projects_hierarchy[0]
+ project1 = projects_hierarchy[1]
+ project2 = projects_hierarchy[2]
+
+ # Disabling all projects before attempting to delete
+ for project in (project2, project1, root_project):
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+
+ self.resource_api.delete_project(root_project['id'], cascade=True)
+
+ for project in projects_hierarchy:
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project['id'])
+
+ def test_delete_large_project_cascade(self):
+ """Try delete a large project with cascade true.
+
+ Tree we will create::
+
+ +-p1-+
+ | |
+ p5 p2
+ | |
+ p6 +-p3-+
+ | |
+ p7 p4
+ """
+ # create a hierarchy with 4 levels
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=4)
+ p1 = projects_hierarchy[0]
+ # Add the left branch to the hierarchy (p5, p6)
+ self._create_projects_hierarchy(hierarchy_size=2,
+ parent_project_id=p1['id'])
+ # Add p7 to the hierarchy
+ p3_id = projects_hierarchy[2]['id']
+ self._create_projects_hierarchy(hierarchy_size=1,
+ parent_project_id=p3_id)
+ # Reverse the hierarchy to disable the leaf first
+ prjs_hierarchy = ([p1] + self.resource_api.list_projects_in_subtree(
+ p1['id']))[::-1]
+
+ # Disabling all projects before attempting to delete
+ for project in prjs_hierarchy:
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
+
+ self.resource_api.delete_project(p1['id'], cascade=True)
+ for project in prjs_hierarchy:
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project['id'])
+
+ def test_cannot_delete_project_cascade_with_enabled_child(self):
+ # create a hierarchy with 3 levels
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ root_project = projects_hierarchy[0]
+ project1 = projects_hierarchy[1]
+ project2 = projects_hierarchy[2]
+
+ project2['enabled'] = False
+ self.resource_api.update_project(project2['id'], project2)
+
+ # Cannot cascade delete root_project, since project1 is enabled
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.delete_project,
+ root_project['id'],
+ cascade=True)
+
+ # Ensuring no project was deleted, not even project2
+ self.resource_api.get_project(root_project['id'])
+ self.resource_api.get_project(project1['id'])
+ self.resource_api.get_project(project2['id'])
+
+ def test_hierarchical_projects_crud(self):
+ # create a hierarchy with just a root project (which is a leaf as well)
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1)
+ root_project1 = projects_hierarchy[0]
+
+ # create a hierarchy with one root project and one leaf project
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project2 = projects_hierarchy[0]
+ leaf_project = projects_hierarchy[1]
+
+ # update description from leaf_project
+ leaf_project['description'] = 'new description'
+ self.resource_api.update_project(leaf_project['id'], leaf_project)
+ proj_ref = self.resource_api.get_project(leaf_project['id'])
+ self.assertDictEqual(leaf_project, proj_ref)
+
+ # update the parent_id is not allowed
+ leaf_project['parent_id'] = root_project1['id']
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.update_project,
+ leaf_project['id'],
+ leaf_project)
+
+ # delete root_project1
+ self.resource_api.delete_project(root_project1['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ root_project1['id'])
+
+ # delete root_project2 is not allowed since it is not a leaf project
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.delete_project,
+ root_project2['id'])
+
+ def test_create_project_with_invalid_parent(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, parent_id='fake')
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_create_leaf_project_with_different_domain(self):
+ root_project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(root_project['id'], root_project)
+
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ leaf_project = unit.new_project_ref(domain_id=domain['id'],
+ parent_id=root_project['id'])
+
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ leaf_project['id'],
+ leaf_project)
+
+ def test_delete_hierarchical_leaf_project(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project = projects_hierarchy[0]
+ leaf_project = projects_hierarchy[1]
+
+ self.resource_api.delete_project(leaf_project['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ leaf_project['id'])
+
+ self.resource_api.delete_project(root_project['id'])
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ root_project['id'])
+
+ def test_delete_hierarchical_not_leaf_project(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project = projects_hierarchy[0]
+
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.delete_project,
+ root_project['id'])
+
+ def test_update_project_parent(self):
+ projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
+ project1 = projects_hierarchy[0]
+ project2 = projects_hierarchy[1]
+ project3 = projects_hierarchy[2]
+
+ # project2 is the parent from project3
+ self.assertEqual(project3.get('parent_id'), project2['id'])
+
+ # try to update project3 parent to parent1
+ project3['parent_id'] = project1['id']
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.update_project,
+ project3['id'],
+ project3)
+
+ def test_create_project_under_disabled_one(self):
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id, enabled=False)
+ self.resource_api.create_project(project1['id'], project1)
+
+ project2 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ parent_id=project1['id'])
+
+ # It's not possible to create a project under a disabled one in the
+ # hierarchy
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project2['id'],
+ project2)
+
+ def test_disable_hierarchical_leaf_project(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ leaf_project = projects_hierarchy[1]
+
+ leaf_project['enabled'] = False
+ self.resource_api.update_project(leaf_project['id'], leaf_project)
+
+ project_ref = self.resource_api.get_project(leaf_project['id'])
+ self.assertEqual(leaf_project['enabled'], project_ref['enabled'])
+
+ def test_disable_hierarchical_not_leaf_project(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project = projects_hierarchy[0]
+
+ root_project['enabled'] = False
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.update_project,
+ root_project['id'],
+ root_project)
+
+ def test_enable_project_with_disabled_parent(self):
+ projects_hierarchy = self._create_projects_hierarchy()
+ root_project = projects_hierarchy[0]
+ leaf_project = projects_hierarchy[1]
+
+ # Disable leaf and root
+ leaf_project['enabled'] = False
+ self.resource_api.update_project(leaf_project['id'], leaf_project)
+ root_project['enabled'] = False
+ self.resource_api.update_project(root_project['id'], root_project)
+
+ # Try to enable the leaf project, it's not possible since it has
+ # a disabled parent
+ leaf_project['enabled'] = True
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.update_project,
+ leaf_project['id'],
+ leaf_project)
+
+ def _get_hierarchy_depth(self, project_id):
+ return len(self.resource_api.list_project_parents(project_id)) + 1
+
+ def test_check_hierarchy_depth(self):
+ # Should be allowed to have a hierarchy of the max depth specified
+ # in the config option plus one (to allow for the additional project
+ # acting as a domain after an upgrade)
+ projects_hierarchy = self._create_projects_hierarchy(
+ CONF.max_project_tree_depth)
+ leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1]
+
+ depth = self._get_hierarchy_depth(leaf_project['id'])
+ self.assertEqual(CONF.max_project_tree_depth + 1, depth)
+
+ # Creating another project in the hierarchy shouldn't be allowed
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id,
+ parent_id=leaf_project['id'])
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.create_project,
+ project['id'],
+ project)
+
+ def test_project_update_missing_attrs_with_a_value(self):
+ # Creating a project with no description attribute.
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ del project['description']
+ project = self.resource_api.create_project(project['id'], project)
+
+ # Add a description attribute.
+ project['description'] = uuid.uuid4().hex
+ self.resource_api.update_project(project['id'], project)
+
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertDictEqual(project, project_ref)
+
+ def test_project_update_missing_attrs_with_a_falsey_value(self):
+ # Creating a project with no description attribute.
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ del project['description']
+ project = self.resource_api.create_project(project['id'], project)
+
+ # Add a description attribute.
+ project['description'] = ''
+ self.resource_api.update_project(project['id'], project)
+
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertDictEqual(project, project_ref)
+
+ def test_domain_crud(self):
+ domain = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain['id'], domain)
+ self.assertDictEqual(domain, domain_ref)
+ domain_ref = self.resource_api.get_domain(domain['id'])
+ self.assertDictEqual(domain, domain_ref)
+
+ domain['name'] = uuid.uuid4().hex
+ domain_ref = self.resource_api.update_domain(domain['id'], domain)
+ self.assertDictEqual(domain, domain_ref)
+ domain_ref = self.resource_api.get_domain(domain['id'])
+ self.assertDictEqual(domain, domain_ref)
+
+ # Ensure an 'enabled' domain cannot be deleted
+ self.assertRaises(exception.ForbiddenNotSecurity,
+ self.resource_api.delete_domain,
+ domain_id=domain['id'])
+
+ # Disable the domain
+ domain['enabled'] = False
+ self.resource_api.update_domain(domain['id'], domain)
+
+ # Delete the domain
+ self.resource_api.delete_domain(domain['id'])
+
+ # Make sure the domain no longer exists
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ domain['id'])
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_domain_name_case_sensitivity(self):
+ # create a ref with a lowercase name
+ domain_name = 'test_domain'
+ ref = unit.new_domain_ref(name=domain_name)
+
+ lower_case_domain = self.resource_api.create_domain(ref['id'], ref)
+
+ # assign a new ID to the ref with the same name, but in uppercase
+ ref['id'] = uuid.uuid4().hex
+ ref['name'] = domain_name.upper()
+ upper_case_domain = self.resource_api.create_domain(ref['id'], ref)
+
+ # We can get each domain by name
+ lower_case_domain_ref = self.resource_api.get_domain_by_name(
+ domain_name)
+ self.assertDictEqual(lower_case_domain, lower_case_domain_ref)
+
+ upper_case_domain_ref = self.resource_api.get_domain_by_name(
+ domain_name.upper())
+ self.assertDictEqual(upper_case_domain, upper_case_domain_ref)
+
+ def test_project_attribute_update(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(project['id'], project)
+
+ # pick a key known to be non-existent
+ key = 'description'
+
+ def assert_key_equals(value):
+ project_ref = self.resource_api.update_project(
+ project['id'], project)
+ self.assertEqual(value, project_ref[key])
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertEqual(value, project_ref[key])
+
+ def assert_get_key_is(value):
+ project_ref = self.resource_api.update_project(
+ project['id'], project)
+ self.assertIs(project_ref.get(key), value)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertIs(project_ref.get(key), value)
+
+ # add an attribute that doesn't exist, set it to a falsey value
+ value = ''
+ project[key] = value
+ assert_key_equals(value)
+
+ # set an attribute with a falsey value to null
+ value = None
+ project[key] = value
+ assert_get_key_is(value)
+
+ # do it again, in case updating from this situation is handled oddly
+ value = None
+ project[key] = value
+ assert_get_key_is(value)
+
+ # set a possibly-null value to a falsey value
+ value = ''
+ project[key] = value
+ assert_key_equals(value)
+
+ # set a falsey value to a truthy value
+ value = uuid.uuid4().hex
+ project[key] = value
+ assert_key_equals(value)
+
+ @unit.skip_if_cache_disabled('resource')
+ @unit.skip_if_no_multiple_domains_support
+ def test_domain_rename_invalidates_get_domain_by_name_cache(self):
+ domain = unit.new_domain_ref()
+ domain_id = domain['id']
+ domain_name = domain['name']
+ self.resource_api.create_domain(domain_id, domain)
+ domain_ref = self.resource_api.get_domain_by_name(domain_name)
+ domain_ref['name'] = uuid.uuid4().hex
+ self.resource_api.update_domain(domain_id, domain_ref)
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain_by_name,
+ domain_name)
+
+ @unit.skip_if_cache_disabled('resource')
+ def test_cache_layer_domain_crud(self):
+ domain = unit.new_domain_ref()
+ domain_id = domain['id']
+ # Create Domain
+ self.resource_api.create_domain(domain_id, domain)
+ project_domain_ref = self.resource_api.get_project(domain_id)
+ domain_ref = self.resource_api.get_domain(domain_id)
+ updated_project_domain_ref = copy.deepcopy(project_domain_ref)
+ updated_project_domain_ref['name'] = uuid.uuid4().hex
+ updated_domain_ref = copy.deepcopy(domain_ref)
+ updated_domain_ref['name'] = updated_project_domain_ref['name']
+ # Update domain, bypassing resource api manager
+ self.resource_api.driver.update_project(domain_id,
+ updated_project_domain_ref)
+ # Verify get_domain still returns the domain
+ self.assertDictContainsSubset(
+ domain_ref, self.resource_api.get_domain(domain_id))
+ # Invalidate cache
+ self.resource_api.get_domain.invalidate(self.resource_api,
+ domain_id)
+ # Verify get_domain returns the updated domain
+ self.assertDictContainsSubset(
+ updated_domain_ref, self.resource_api.get_domain(domain_id))
+ # Update the domain back to original ref, using the assignment api
+ # manager
+ self.resource_api.update_domain(domain_id, domain_ref)
+ self.assertDictContainsSubset(
+ domain_ref, self.resource_api.get_domain(domain_id))
+ # Make sure domain is 'disabled', bypass resource api manager
+ project_domain_ref_disabled = project_domain_ref.copy()
+ project_domain_ref_disabled['enabled'] = False
+ self.resource_api.driver.update_project(domain_id,
+ project_domain_ref_disabled)
+ self.resource_api.driver.update_project(domain_id, {'enabled': False})
+ # Delete domain, bypassing resource api manager
+ self.resource_api.driver.delete_project(domain_id)
+ # Verify get_domain still returns the domain
+ self.assertDictContainsSubset(
+ domain_ref, self.resource_api.get_domain(domain_id))
+ # Invalidate cache
+ self.resource_api.get_domain.invalidate(self.resource_api,
+ domain_id)
+ # Verify get_domain now raises DomainNotFound
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain, domain_id)
+ # Recreate Domain
+ self.resource_api.create_domain(domain_id, domain)
+ self.resource_api.get_domain(domain_id)
+ # Make sure domain is 'disabled', bypass resource api manager
+ domain['enabled'] = False
+ self.resource_api.driver.update_project(domain_id, domain)
+ self.resource_api.driver.update_project(domain_id, {'enabled': False})
+ # Delete domain
+ self.resource_api.delete_domain(domain_id)
+ # verify DomainNotFound raised
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain,
+ domain_id)
+
+ @unit.skip_if_cache_disabled('resource')
+ @unit.skip_if_no_multiple_domains_support
+ def test_project_rename_invalidates_get_project_by_name_cache(self):
+ domain = unit.new_domain_ref()
+ project = unit.new_project_ref(domain_id=domain['id'])
+ project_id = project['id']
+ project_name = project['name']
+ self.resource_api.create_domain(domain['id'], domain)
+ # Create a project
+ self.resource_api.create_project(project_id, project)
+ self.resource_api.get_project_by_name(project_name, domain['id'])
+ project['name'] = uuid.uuid4().hex
+ self.resource_api.update_project(project_id, project)
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project_by_name,
+ project_name,
+ domain['id'])
+
+ @unit.skip_if_cache_disabled('resource')
+ @unit.skip_if_no_multiple_domains_support
+ def test_cache_layer_project_crud(self):
+ domain = unit.new_domain_ref()
+ project = unit.new_project_ref(domain_id=domain['id'])
+ project_id = project['id']
+ self.resource_api.create_domain(domain['id'], domain)
+ # Create a project
+ self.resource_api.create_project(project_id, project)
+ self.resource_api.get_project(project_id)
+ updated_project = copy.deepcopy(project)
+ updated_project['name'] = uuid.uuid4().hex
+ # Update project, bypassing resource manager
+ self.resource_api.driver.update_project(project_id,
+ updated_project)
+ # Verify get_project still returns the original project_ref
+ self.assertDictContainsSubset(
+ project, self.resource_api.get_project(project_id))
+ # Invalidate cache
+ self.resource_api.get_project.invalidate(self.resource_api,
+ project_id)
+ # Verify get_project now returns the new project
+ self.assertDictContainsSubset(
+ updated_project,
+ self.resource_api.get_project(project_id))
+ # Update project using the resource_api manager back to original
+ self.resource_api.update_project(project['id'], project)
+ # Verify get_project returns the original project_ref
+ self.assertDictContainsSubset(
+ project, self.resource_api.get_project(project_id))
+ # Delete project bypassing resource
+ self.resource_api.driver.delete_project(project_id)
+ # Verify get_project still returns the project_ref
+ self.assertDictContainsSubset(
+ project, self.resource_api.get_project(project_id))
+ # Invalidate cache
+ self.resource_api.get_project.invalidate(self.resource_api,
+ project_id)
+ # Verify ProjectNotFound now raised
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project_id)
+ # recreate project
+ self.resource_api.create_project(project_id, project)
+ self.resource_api.get_project(project_id)
+ # delete project
+ self.resource_api.delete_project(project_id)
+ # Verify ProjectNotFound is raised
+ self.assertRaises(exception.ProjectNotFound,
+ self.resource_api.get_project,
+ project_id)
+
+ @unit.skip_if_no_multiple_domains_support
+ def test_get_default_domain_by_name(self):
+ domain_name = 'default'
+
+ domain = unit.new_domain_ref(name=domain_name)
+ self.resource_api.create_domain(domain['id'], domain)
+
+ domain_ref = self.resource_api.get_domain_by_name(domain_name)
+ self.assertEqual(domain, domain_ref)
+
+ def test_get_not_default_domain_by_name(self):
+ domain_name = 'foo'
+ self.assertRaises(exception.DomainNotFound,
+ self.resource_api.get_domain_by_name,
+ domain_name)
+
+ def test_project_update_and_project_get_return_same_response(self):
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+
+ self.resource_api.create_project(project['id'], project)
+
+ updated_project = {'enabled': False}
+ updated_project_ref = self.resource_api.update_project(
+ project['id'], updated_project)
+
+ # SQL backend adds 'extra' field
+ updated_project_ref.pop('extra', None)
+
+ self.assertIs(False, updated_project_ref['enabled'])
+
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertDictEqual(updated_project_ref, project_ref)
+
+
+class ResourceDriverTests(object):
+ """Tests for the resource driver.
+
+ Subclasses must set self.driver to the driver instance.
+
+ """
+
+ def test_create_project(self):
+ project_id = uuid.uuid4().hex
+ project = {
+ 'name': uuid.uuid4().hex,
+ 'id': project_id,
+ 'domain_id': uuid.uuid4().hex,
+ }
+ self.driver.create_project(project_id, project)
+
+ def test_create_project_all_defined_properties(self):
+ project_id = uuid.uuid4().hex
+ project = {
+ 'name': uuid.uuid4().hex,
+ 'id': project_id,
+ 'domain_id': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'parent_id': uuid.uuid4().hex,
+ 'is_domain': True,
+ }
+ self.driver.create_project(project_id, project)
+
+ def test_create_project_null_domain(self):
+ project_id = uuid.uuid4().hex
+ project = {
+ 'name': uuid.uuid4().hex,
+ 'id': project_id,
+ 'domain_id': None,
+ }
+ self.driver.create_project(project_id, project)
+
+ def test_create_project_same_name_same_domain_conflict(self):
+ name = uuid.uuid4().hex
+ domain_id = uuid.uuid4().hex
+
+ project_id = uuid.uuid4().hex
+ project = {
+ 'name': name,
+ 'id': project_id,
+ 'domain_id': domain_id,
+ }
+ self.driver.create_project(project_id, project)
+
+ project_id = uuid.uuid4().hex
+ project = {
+ 'name': name,
+ 'id': project_id,
+ 'domain_id': domain_id,
+ }
+ self.assertRaises(exception.Conflict, self.driver.create_project,
+ project_id, project)
+
+ def test_create_project_same_id_conflict(self):
+ project_id = uuid.uuid4().hex
+
+ project = {
+ 'name': uuid.uuid4().hex,
+ 'id': project_id,
+ 'domain_id': uuid.uuid4().hex,
+ }
+ self.driver.create_project(project_id, project)
+
+ project = {
+ 'name': uuid.uuid4().hex,
+ 'id': project_id,
+ 'domain_id': uuid.uuid4().hex,
+ }
+ self.assertRaises(exception.Conflict, self.driver.create_project,
+ project_id, project)
diff --git a/keystone-moon/keystone/tests/unit/resource/test_controllers.py b/keystone-moon/keystone/tests/unit/resource/test_controllers.py
new file mode 100644
index 00000000..b8f247c8
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/test_controllers.py
@@ -0,0 +1,57 @@
+# Copyright 2016 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from oslo_config import cfg
+
+from keystone import exception
+from keystone.resource import controllers
+from keystone.tests import unit
+from keystone.tests.unit.ksfixtures import database
+
+
+CONF = cfg.CONF
+
+_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}}
+
+
+class TenantTestCaseNoDefaultDomain(unit.TestCase):
+
+ def setUp(self):
+ super(TenantTestCaseNoDefaultDomain, self).setUp()
+ self.useFixture(database.Database())
+ self.load_backends()
+ self.tenant_controller = controllers.Tenant()
+
+ def test_setup(self):
+ # Other tests in this class assume there's no default domain, so make
+ # sure the setUp worked as expected.
+ self.assertRaises(
+ exception.DomainNotFound,
+ self.resource_api.get_domain, CONF.identity.default_domain_id)
+
+ def test_get_all_projects(self):
+ # When get_all_projects is done and there's no default domain, the
+ # result is an empty list.
+ res = self.tenant_controller.get_all_projects(_ADMIN_CONTEXT)
+ self.assertEqual([], res['tenants'])
+
+ def test_create_project(self):
+ # When a project is created using the v2 controller and there's no
+ # default domain, it doesn't fail with can't find domain (a default
+ # domain is created)
+ tenant = {'name': uuid.uuid4().hex}
+ self.tenant_controller.create_project(_ADMIN_CONTEXT, tenant)
+ # If the above doesn't fail then this is successful.
diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py
new file mode 100644
index 00000000..2eb87e4c
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/resource/test_core.py
@@ -0,0 +1,692 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import uuid
+
+import mock
+from testtools import matchers
+
+from oslo_config import cfg
+from oslotest import mockpatch
+
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit.ksfixtures import database
+
+
+CONF = cfg.CONF
+
+
+class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase):
+
+ def setUp(self):
+ super(TestResourceManagerNoFixtures, self).setUp()
+ self.useFixture(database.Database(self.sql_driver_version_overrides))
+ self.load_backends()
+
+ def test_ensure_default_domain_exists(self):
+ # When there's no default domain, ensure_default_domain_exists creates
+ # it.
+
+ # First make sure there's no default domain.
+ self.assertRaises(
+ exception.DomainNotFound,
+ self.resource_api.get_domain, CONF.identity.default_domain_id)
+
+ self.resource_api.ensure_default_domain_exists()
+ default_domain = self.resource_api.get_domain(
+ CONF.identity.default_domain_id)
+
+ expected_domain = {
+ 'id': CONF.identity.default_domain_id,
+ 'name': 'Default',
+ 'enabled': True,
+ 'description': 'Domain created automatically to support V2.0 '
+ 'operations.',
+ }
+ self.assertEqual(expected_domain, default_domain)
+
+ def test_ensure_default_domain_exists_already_exists(self):
+ # When there's already a default domain, ensure_default_domain_exists
+ # doesn't do anything.
+
+ name = uuid.uuid4().hex
+ description = uuid.uuid4().hex
+ domain_attrs = {
+ 'id': CONF.identity.default_domain_id,
+ 'name': name,
+ 'description': description,
+ }
+ self.resource_api.create_domain(CONF.identity.default_domain_id,
+ domain_attrs)
+
+ self.resource_api.ensure_default_domain_exists()
+
+ default_domain = self.resource_api.get_domain(
+ CONF.identity.default_domain_id)
+
+ expected_domain = {
+ 'id': CONF.identity.default_domain_id,
+ 'name': name,
+ 'enabled': True,
+ 'description': description,
+ }
+
+ self.assertEqual(expected_domain, default_domain)
+
+ def test_ensure_default_domain_exists_fails(self):
+ # When there's an unexpected exception creating domain it's passed on.
+
+ self.useFixture(mockpatch.PatchObject(
+ self.resource_api, 'create_domain',
+ side_effect=exception.UnexpectedError))
+
+ self.assertRaises(exception.UnexpectedError,
+ self.resource_api.ensure_default_domain_exists)
+
+ def test_update_project_name_conflict(self):
+ name = uuid.uuid4().hex
+ description = uuid.uuid4().hex
+ domain_attrs = {
+ 'id': CONF.identity.default_domain_id,
+ 'name': name,
+ 'description': description,
+ }
+ domain = self.resource_api.create_domain(
+ CONF.identity.default_domain_id, domain_attrs)
+ project1 = unit.new_project_ref(domain_id=domain['id'],
+ name=uuid.uuid4().hex)
+ self.resource_api.create_project(project1['id'], project1)
+ project2 = unit.new_project_ref(domain_id=domain['id'],
+ name=uuid.uuid4().hex)
+ project = self.resource_api.create_project(project2['id'], project2)
+
+ self.assertRaises(exception.Conflict,
+ self.resource_api.update_project,
+ project['id'], {'name': project1['name']})
+
+
+class DomainConfigDriverTests(object):
+
+ def _domain_config_crud(self, sensitive):
+ domain = uuid.uuid4().hex
+ group = uuid.uuid4().hex
+ option = uuid.uuid4().hex
+ value = uuid.uuid4().hex
+ self.driver.create_config_option(
+ domain, group, option, value, sensitive)
+ res = self.driver.get_config_option(
+ domain, group, option, sensitive)
+ config = {'group': group, 'option': option, 'value': value}
+ self.assertEqual(config, res)
+
+ value = uuid.uuid4().hex
+ self.driver.update_config_option(
+ domain, group, option, value, sensitive)
+ res = self.driver.get_config_option(
+ domain, group, option, sensitive)
+ config = {'group': group, 'option': option, 'value': value}
+ self.assertEqual(config, res)
+
+ self.driver.delete_config_options(
+ domain, group, option, sensitive)
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.driver.get_config_option,
+ domain, group, option, sensitive)
+ # ...and silent if we try to delete it again
+ self.driver.delete_config_options(
+ domain, group, option, sensitive)
+
+ def test_whitelisted_domain_config_crud(self):
+ self._domain_config_crud(sensitive=False)
+
+ def test_sensitive_domain_config_crud(self):
+ self._domain_config_crud(sensitive=True)
+
+ def _list_domain_config(self, sensitive):
+ """Test listing by combination of domain, group & option."""
+ config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ # Put config2 in the same group as config1
+ config2 = {'group': config1['group'], 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
+ 'value': 100}
+ domain = uuid.uuid4().hex
+
+ for config in [config1, config2, config3]:
+ self.driver.create_config_option(
+ domain, config['group'], config['option'],
+ config['value'], sensitive)
+
+ # Try listing all items from a domain
+ res = self.driver.list_config_options(
+ domain, sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(3))
+ for res_entry in res:
+ self.assertIn(res_entry, [config1, config2, config3])
+
+ # Try listing by domain and group
+ res = self.driver.list_config_options(
+ domain, group=config1['group'], sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(2))
+ for res_entry in res:
+ self.assertIn(res_entry, [config1, config2])
+
+ # Try listing by domain, group and option
+ res = self.driver.list_config_options(
+ domain, group=config2['group'],
+ option=config2['option'], sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(1))
+ self.assertEqual(config2, res[0])
+
+ def test_list_whitelisted_domain_config_crud(self):
+ self._list_domain_config(False)
+
+ def test_list_sensitive_domain_config_crud(self):
+ self._list_domain_config(True)
+
+ def _delete_domain_configs(self, sensitive):
+ """Test deleting by combination of domain, group & option."""
+ config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ # Put config2 and config3 in the same group as config1
+ config2 = {'group': config1['group'], 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ config3 = {'group': config1['group'], 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ domain = uuid.uuid4().hex
+
+ for config in [config1, config2, config3, config4]:
+ self.driver.create_config_option(
+ domain, config['group'], config['option'],
+ config['value'], sensitive)
+
+ # Try deleting by domain, group and option
+ res = self.driver.delete_config_options(
+ domain, group=config2['group'],
+ option=config2['option'], sensitive=sensitive)
+ res = self.driver.list_config_options(
+ domain, sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(3))
+ for res_entry in res:
+ self.assertIn(res_entry, [config1, config3, config4])
+
+ # Try deleting by domain and group
+ res = self.driver.delete_config_options(
+ domain, group=config4['group'], sensitive=sensitive)
+ res = self.driver.list_config_options(
+ domain, sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(2))
+ for res_entry in res:
+ self.assertIn(res_entry, [config1, config3])
+
+ # Try deleting all items from a domain
+ res = self.driver.delete_config_options(
+ domain, sensitive=sensitive)
+ res = self.driver.list_config_options(
+ domain, sensitive=sensitive)
+ self.assertThat(res, matchers.HasLength(0))
+
+ def test_delete_whitelisted_domain_configs(self):
+ self._delete_domain_configs(False)
+
+ def test_delete_sensitive_domain_configs(self):
+ self._delete_domain_configs(True)
+
+ def _create_domain_config_twice(self, sensitive):
+ """Test conflict error thrown if create the same option twice."""
+ config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
+ 'value': uuid.uuid4().hex}
+ domain = uuid.uuid4().hex
+
+ self.driver.create_config_option(
+ domain, config['group'], config['option'],
+ config['value'], sensitive=sensitive)
+ self.assertRaises(exception.Conflict,
+ self.driver.create_config_option,
+ domain, config['group'], config['option'],
+ config['value'], sensitive=sensitive)
+
+ def test_create_whitelisted_domain_config_twice(self):
+ self._create_domain_config_twice(False)
+
+ def test_create_sensitive_domain_config_twice(self):
+ self._create_domain_config_twice(True)
+
+
+class DomainConfigTests(object):
+
+ def setUp(self):
+ self.domain = unit.new_domain_ref()
+ self.resource_api.create_domain(self.domain['id'], self.domain)
+ self.addCleanup(self.clean_up_domain)
+
+ def clean_up_domain(self):
+ # NOTE(henry-nash): Deleting the domain will also delete any domain
+ # configs for this domain.
+ self.domain['enabled'] = False
+ self.resource_api.update_domain(self.domain['id'], self.domain)
+ self.resource_api.delete_domain(self.domain['id'])
+ del self.domain
+
+ def test_create_domain_config_including_sensitive_option(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ # password is sensitive, so check that the whitelisted portion and
+ # the sensitive piece have been stored in the appropriate locations.
+ res = self.domain_config_api.get_config(self.domain['id'])
+ config_whitelisted = copy.deepcopy(config)
+ config_whitelisted['ldap'].pop('password')
+ self.assertEqual(config_whitelisted, res)
+ res = self.domain_config_api.driver.get_config_option(
+ self.domain['id'], 'ldap', 'password', sensitive=True)
+ self.assertEqual(config['ldap']['password'], res['value'])
+
+ # Finally, use the non-public API to get back the whole config
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual(config, res)
+
+ def test_get_partial_domain_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ res = self.domain_config_api.get_config(self.domain['id'],
+ group='identity')
+ config_partial = copy.deepcopy(config)
+ config_partial.pop('ldap')
+ self.assertEqual(config_partial, res)
+ res = self.domain_config_api.get_config(
+ self.domain['id'], group='ldap', option='user_tree_dn')
+ self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res)
+ # ...but we should fail to get a sensitive option
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.get_config, self.domain['id'],
+ group='ldap', option='password')
+
+ def test_delete_partial_domain_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ self.domain_config_api.delete_config(
+ self.domain['id'], group='identity')
+ config_partial = copy.deepcopy(config)
+ config_partial.pop('identity')
+ config_partial['ldap'].pop('password')
+ res = self.domain_config_api.get_config(self.domain['id'])
+ self.assertEqual(config_partial, res)
+
+ self.domain_config_api.delete_config(
+ self.domain['id'], group='ldap', option='url')
+ config_partial = copy.deepcopy(config_partial)
+ config_partial['ldap'].pop('url')
+ res = self.domain_config_api.get_config(self.domain['id'])
+ self.assertEqual(config_partial, res)
+
+ def test_get_options_not_in_domain_config(self):
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.get_config, self.domain['id'])
+ config = {'ldap': {'url': uuid.uuid4().hex}}
+
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.get_config, self.domain['id'],
+ group='identity')
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.get_config, self.domain['id'],
+ group='ldap', option='user_tree_dn')
+
+ def test_get_sensitive_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual({}, res)
+ self.domain_config_api.create_config(self.domain['id'], config)
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual(config, res)
+
+ def test_update_partial_domain_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ # Try updating a group
+ new_config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_filter': uuid.uuid4().hex}}
+ res = self.domain_config_api.update_config(
+ self.domain['id'], new_config, group='ldap')
+ expected_config = copy.deepcopy(config)
+ expected_config['ldap']['url'] = new_config['ldap']['url']
+ expected_config['ldap']['user_filter'] = (
+ new_config['ldap']['user_filter'])
+ expected_full_config = copy.deepcopy(expected_config)
+ expected_config['ldap'].pop('password')
+ res = self.domain_config_api.get_config(self.domain['id'])
+ self.assertEqual(expected_config, res)
+ # The sensitive option should still exist
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual(expected_full_config, res)
+
+ # Try updating a single whitelisted option
+ self.domain_config_api.delete_config(self.domain['id'])
+ self.domain_config_api.create_config(self.domain['id'], config)
+ new_config = {'url': uuid.uuid4().hex}
+ res = self.domain_config_api.update_config(
+ self.domain['id'], new_config, group='ldap', option='url')
+
+ # Make sure whitelisted and full config is updated
+ expected_whitelisted_config = copy.deepcopy(config)
+ expected_whitelisted_config['ldap']['url'] = new_config['url']
+ expected_full_config = copy.deepcopy(expected_whitelisted_config)
+ expected_whitelisted_config['ldap'].pop('password')
+ self.assertEqual(expected_whitelisted_config, res)
+ res = self.domain_config_api.get_config(self.domain['id'])
+ self.assertEqual(expected_whitelisted_config, res)
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual(expected_full_config, res)
+
+ # Try updating a single sensitive option
+ self.domain_config_api.delete_config(self.domain['id'])
+ self.domain_config_api.create_config(self.domain['id'], config)
+ new_config = {'password': uuid.uuid4().hex}
+ res = self.domain_config_api.update_config(
+ self.domain['id'], new_config, group='ldap', option='password')
+ # The whitelisted config should not have changed...
+ expected_whitelisted_config = copy.deepcopy(config)
+ expected_full_config = copy.deepcopy(config)
+ expected_whitelisted_config['ldap'].pop('password')
+ self.assertEqual(expected_whitelisted_config, res)
+ res = self.domain_config_api.get_config(self.domain['id'])
+ self.assertEqual(expected_whitelisted_config, res)
+ expected_full_config['ldap']['password'] = new_config['password']
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ # ...but the sensitive piece should have.
+ self.assertEqual(expected_full_config, res)
+
+ def test_update_invalid_partial_domain_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ # An extra group, when specifying one group should fail
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config, group='ldap')
+ # An extra option, when specifying one option should fail
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config['ldap'],
+ group='ldap', option='url')
+
+ # Now try the right number of groups/options, but just not
+ # ones that are in the config provided
+ config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config, group='identity')
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config['ldap'], group='ldap',
+ option='url')
+
+ # Now some valid groups/options, but just not ones that are in the
+ # existing config
+ config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+ config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}}
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.update_config,
+ self.domain['id'], config_wrong_group,
+ group='identity')
+ config_wrong_option = {'url': uuid.uuid4().hex}
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.update_config,
+ self.domain['id'], config_wrong_option,
+ group='ldap', option='url')
+
+ # And finally just some bad groups/options
+ bad_group = uuid.uuid4().hex
+ config = {bad_group: {'user': uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config, group=bad_group,
+ option='user')
+ bad_option = uuid.uuid4().hex
+ config = {'ldap': {bad_option: uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.update_config,
+ self.domain['id'], config, group='ldap',
+ option=bad_option)
+
+ def test_create_invalid_domain_config(self):
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.create_config,
+ self.domain['id'], {})
+ config = {uuid.uuid4().hex: uuid.uuid4().hex}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.create_config,
+ self.domain['id'], config)
+ config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.create_config,
+ self.domain['id'], config)
+ config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.create_config,
+ self.domain['id'], config)
+ # Try an option that IS in the standard conf, but neither whitelisted
+ # or marked as sensitive
+ config = {'identity': {'user_tree_dn': uuid.uuid4().hex}}
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.domain_config_api.create_config,
+ self.domain['id'], config)
+
+ def test_delete_invalid_partial_domain_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+ # Try deleting a group not in the config
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.delete_config,
+ self.domain['id'], group='identity')
+ # Try deleting an option not in the config
+ self.assertRaises(exception.DomainConfigNotFound,
+ self.domain_config_api.delete_config,
+ self.domain['id'],
+ group='ldap', option='user_tree_dn')
+
+ def test_sensitive_substitution_in_domain_config(self):
+ # Create a config that contains a whitelisted option that requires
+ # substitution of a sensitive option.
+ config = {'ldap': {'url': 'my_url/%(password)s',
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+
+ # Read back the config with the internal method and ensure that the
+ # substitution has taken place.
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ expected_url = (
+ config['ldap']['url'] % {'password': config['ldap']['password']})
+ self.assertEqual(expected_url, res['ldap']['url'])
+
+ def test_invalid_sensitive_substitution_in_domain_config(self):
+ """Check that invalid substitutions raise warnings."""
+ mock_log = mock.Mock()
+
+ invalid_option_config = {
+ 'ldap': {'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+
+ for invalid_option in ['my_url/%(passssword)s',
+ 'my_url/%(password',
+ 'my_url/%(password)',
+ 'my_url/%(password)d']:
+ invalid_option_config['ldap']['url'] = invalid_option
+ self.domain_config_api.create_config(
+ self.domain['id'], invalid_option_config)
+
+ with mock.patch('keystone.resource.core.LOG', mock_log):
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ mock_log.warning.assert_any_call(mock.ANY)
+ self.assertEqual(
+ invalid_option_config['ldap']['url'], res['ldap']['url'])
+
+ def test_escaped_sequence_in_domain_config(self):
+ """Check that escaped '%(' doesn't get interpreted."""
+ mock_log = mock.Mock()
+
+ escaped_option_config = {
+ 'ldap': {'url': 'my_url/%%(password)s',
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+
+ self.domain_config_api.create_config(
+ self.domain['id'], escaped_option_config)
+
+ with mock.patch('keystone.resource.core.LOG', mock_log):
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertFalse(mock_log.warn.called)
+ # The escaping '%' should have been removed
+ self.assertEqual('my_url/%(password)s', res['ldap']['url'])
+
+ @unit.skip_if_cache_disabled('domain_config')
+ def test_cache_layer_get_sensitive_config(self):
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex},
+ 'identity': {'driver': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(self.domain['id'], config)
+ # cache the result
+ res = self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id'])
+ self.assertEqual(config, res)
+
+ # delete, bypassing domain config manager api
+ self.domain_config_api.delete_config_options(self.domain['id'])
+ self.domain_config_api.delete_config_options(self.domain['id'],
+ sensitive=True)
+
+ self.assertDictEqual(
+ res, self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id']))
+ self.domain_config_api.get_config_with_sensitive_info.invalidate(
+ self.domain_config_api, self.domain['id'])
+ self.assertDictEqual(
+ {},
+ self.domain_config_api.get_config_with_sensitive_info(
+ self.domain['id']))
+
+ def test_delete_domain_deletes_configs(self):
+ """Test domain deletion clears the domain configs."""
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ config = {'ldap': {'url': uuid.uuid4().hex,
+ 'user_tree_dn': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex}}
+ self.domain_config_api.create_config(domain['id'], config)
+
+ # Now delete the domain
+ domain['enabled'] = False
+ self.resource_api.update_domain(domain['id'], domain)
+ self.resource_api.delete_domain(domain['id'])
+
+ # Check domain configs have also been deleted
+ self.assertRaises(
+ exception.DomainConfigNotFound,
+ self.domain_config_api.get_config,
+ domain['id'])
+
+ # The get_config_with_sensitive_info does not throw an exception if
+ # the config is empty, it just returns an empty dict
+ self.assertDictEqual(
+ {},
+ self.domain_config_api.get_config_with_sensitive_info(
+ domain['id']))
+
+ def test_config_registration(self):
+ type = uuid.uuid4().hex
+ self.domain_config_api.obtain_registration(
+ self.domain['id'], type)
+ self.domain_config_api.release_registration(
+ self.domain['id'], type=type)
+
+ # Make sure that once someone has it, nobody else can get it.
+ # This includes the domain who already has it.
+ self.domain_config_api.obtain_registration(
+ self.domain['id'], type)
+ self.assertFalse(
+ self.domain_config_api.obtain_registration(
+ self.domain['id'], type))
+
+ # Make sure we can read who does have it
+ self.assertEqual(
+ self.domain['id'],
+ self.domain_config_api.read_registration(type))
+
+ # Make sure releasing it is silent if the domain specified doesn't
+ # have the registration
+ domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.resource_api.create_domain(domain2['id'], domain2)
+ self.domain_config_api.release_registration(
+ domain2['id'], type=type)
+
+ # If nobody has the type registered, then trying to read it should
+ # raise ConfigRegistrationNotFound
+ self.domain_config_api.release_registration(
+ self.domain['id'], type=type)
+ self.assertRaises(exception.ConfigRegistrationNotFound,
+ self.domain_config_api.read_registration,
+ type)
+
+ # Finally check multiple registrations are cleared if you free the
+ # registration without specifying the type
+ type2 = uuid.uuid4().hex
+ self.domain_config_api.obtain_registration(
+ self.domain['id'], type)
+ self.domain_config_api.obtain_registration(
+ self.domain['id'], type2)
+ self.domain_config_api.release_registration(self.domain['id'])
+ self.assertRaises(exception.ConfigRegistrationNotFound,
+ self.domain_config_api.read_registration,
+ type)
+ self.assertRaises(exception.ConfigRegistrationNotFound,
+ self.domain_config_api.read_registration,
+ type2)