aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/resource
diff options
context:
space:
mode:
authorRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
committerRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
commit920a49cfa055733d575282973e23558c33087a4a (patch)
treed371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/resource
parentef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff)
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/resource')
-rw-r--r--keystone-moon/keystone/tests/unit/resource/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/backends/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/backends/test_sql.py24
-rw-r--r--keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py53
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_backends.py1669
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_controllers.py57
-rw-r--r--keystone-moon/keystone/tests/unit/resource/test_core.py692
8 files changed, 0 insertions, 2495 deletions
diff --git a/keystone-moon/keystone/tests/unit/resource/__init__.py b/keystone-moon/keystone/tests/unit/resource/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/resource/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/backends/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/resource/backends/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py
deleted file mode 100644
index 79ad3df2..00000000
--- a/keystone-moon/keystone/tests/unit/resource/backends/test_sql.py
+++ /dev/null
@@ -1,24 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from keystone.resource.backends import sql
-from keystone.tests import unit
-from keystone.tests.unit.ksfixtures import database
-from keystone.tests.unit.resource import test_backends
-
-
-class TestSqlResourceDriver(unit.BaseTestCase,
- test_backends.ResourceDriverTests):
- def setUp(self):
- super(TestSqlResourceDriver, self).setUp()
- self.useFixture(database.Database())
- self.driver = sql.Resource()
diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py b/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/resource/config_backends/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py b/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py
deleted file mode 100644
index b4c5f262..00000000
--- a/keystone-moon/keystone/tests/unit/resource/config_backends/test_sql.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-from keystone.common import sql
-from keystone.resource.config_backends import sql as config_sql
-from keystone.tests import unit
-from keystone.tests.unit.backend import core_sql
-from keystone.tests.unit.ksfixtures import database
-from keystone.tests.unit.resource import test_core
-
-
-class SqlDomainConfigModels(core_sql.BaseBackendSqlModels):
-
- def test_whitelisted_model(self):
- cols = (('domain_id', sql.String, 64),
- ('group', sql.String, 255),
- ('option', sql.String, 255),
- ('value', sql.JsonBlob, None))
- self.assertExpectedSchema('whitelisted_config', cols)
-
- def test_sensitive_model(self):
- cols = (('domain_id', sql.String, 64),
- ('group', sql.String, 255),
- ('option', sql.String, 255),
- ('value', sql.JsonBlob, None))
- self.assertExpectedSchema('sensitive_config', cols)
-
-
-class SqlDomainConfigDriver(unit.BaseTestCase,
- test_core.DomainConfigDriverTests):
- def setUp(self):
- super(SqlDomainConfigDriver, self).setUp()
- self.useFixture(database.Database())
- self.driver = config_sql.DomainConfig()
-
-
-class SqlDomainConfig(core_sql.BaseBackendSqlTests,
- test_core.DomainConfigTests):
- def setUp(self):
- super(SqlDomainConfig, self).setUp()
- # test_core.DomainConfigTests is effectively a mixin class, so make
- # sure we call its setup
- test_core.DomainConfigTests.setUp(self)
diff --git a/keystone-moon/keystone/tests/unit/resource/test_backends.py b/keystone-moon/keystone/tests/unit/resource/test_backends.py
deleted file mode 100644
index eed4c6ba..00000000
--- a/keystone-moon/keystone/tests/unit/resource/test_backends.py
+++ /dev/null
@@ -1,1669 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import uuid
-
-import mock
-from oslo_config import cfg
-from six.moves import range
-from testtools import matchers
-
-from keystone.common import driver_hints
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import utils as test_utils
-
-
-CONF = cfg.CONF
-
-
-class ResourceTests(object):
-
- domain_count = len(default_fixtures.DOMAINS)
-
- def test_get_project(self):
- tenant_ref = self.resource_api.get_project(self.tenant_bar['id'])
- self.assertDictEqual(self.tenant_bar, tenant_ref)
-
- def test_get_project_returns_not_found(self):
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- uuid.uuid4().hex)
-
- def test_get_project_by_name(self):
- tenant_ref = self.resource_api.get_project_by_name(
- self.tenant_bar['name'],
- CONF.identity.default_domain_id)
- self.assertDictEqual(self.tenant_bar, tenant_ref)
-
- @unit.skip_if_no_multiple_domains_support
- def test_get_project_by_name_for_project_acting_as_a_domain(self):
- """Tests get_project_by_name works when the domain_id is None."""
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, is_domain=False)
- project = self.resource_api.create_project(project['id'], project)
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project_by_name,
- project['name'],
- None)
-
- # Test that querying with domain_id as None will find the project
- # acting as a domain, even if it's name is the same as the regular
- # project above.
- project2 = unit.new_project_ref(is_domain=True,
- name=project['name'])
- project2 = self.resource_api.create_project(project2['id'], project2)
-
- project_ref = self.resource_api.get_project_by_name(
- project2['name'], None)
-
- self.assertEqual(project2, project_ref)
-
- def test_get_project_by_name_returns_not_found(self):
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project_by_name,
- uuid.uuid4().hex,
- CONF.identity.default_domain_id)
-
- def test_create_duplicate_project_id_fails(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project_id = project['id']
- self.resource_api.create_project(project_id, project)
- project['name'] = 'fake2'
- self.assertRaises(exception.Conflict,
- self.resource_api.create_project,
- project_id,
- project)
-
- def test_create_duplicate_project_name_fails(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project_id = project['id']
- self.resource_api.create_project(project_id, project)
- project['id'] = 'fake2'
- self.assertRaises(exception.Conflict,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_create_duplicate_project_name_in_different_domains(self):
- new_domain = unit.new_domain_ref()
- self.resource_api.create_domain(new_domain['id'], new_domain)
- project1 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project2 = unit.new_project_ref(name=project1['name'],
- domain_id=new_domain['id'])
- self.resource_api.create_project(project1['id'], project1)
- self.resource_api.create_project(project2['id'], project2)
-
- def test_move_project_between_domains(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- project = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project['id'], project)
- project['domain_id'] = domain2['id']
- # Update the project asserting that a deprecation warning is emitted
- with mock.patch(
- 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
- self.resource_api.update_project(project['id'], project)
- self.assertTrue(mock_dep.called)
-
- updated_project_ref = self.resource_api.get_project(project['id'])
- self.assertEqual(domain2['id'], updated_project_ref['domain_id'])
-
- def test_move_project_between_domains_with_clashing_names_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- # First, create a project in domain1
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- # Now create a project in domain2 with a potentially clashing
- # name - which should work since we have domain separation
- project2 = unit.new_project_ref(name=project1['name'],
- domain_id=domain2['id'])
- self.resource_api.create_project(project2['id'], project2)
- # Now try and move project1 into the 2nd domain - which should
- # fail since the names clash
- project1['domain_id'] = domain2['id']
- self.assertRaises(exception.Conflict,
- self.resource_api.update_project,
- project1['id'],
- project1)
-
- @unit.skip_if_no_multiple_domains_support
- def test_move_project_with_children_between_domains_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- project = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project['id'], project)
- child_project = unit.new_project_ref(domain_id=domain1['id'],
- parent_id=project['id'])
- self.resource_api.create_project(child_project['id'], child_project)
- project['domain_id'] = domain2['id']
-
- # Update is not allowed, since updating the whole subtree would be
- # necessary
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- @unit.skip_if_no_multiple_domains_support
- def test_move_project_not_root_between_domains_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- project = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project['id'], project)
- child_project = unit.new_project_ref(domain_id=domain1['id'],
- parent_id=project['id'])
- self.resource_api.create_project(child_project['id'], child_project)
- child_project['domain_id'] = domain2['id']
-
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- child_project['id'],
- child_project)
-
- @unit.skip_if_no_multiple_domains_support
- def test_move_root_project_between_domains_succeeds(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- root_project = unit.new_project_ref(domain_id=domain1['id'])
- root_project = self.resource_api.create_project(root_project['id'],
- root_project)
-
- root_project['domain_id'] = domain2['id']
- self.resource_api.update_project(root_project['id'], root_project)
- project_from_db = self.resource_api.get_project(root_project['id'])
-
- self.assertEqual(domain2['id'], project_from_db['domain_id'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_update_domain_id_project_is_domain_fails(self):
- other_domain = unit.new_domain_ref()
- self.resource_api.create_domain(other_domain['id'], other_domain)
- project = unit.new_project_ref(is_domain=True)
- self.resource_api.create_project(project['id'], project)
- project['domain_id'] = other_domain['id']
-
- # Update of domain_id of projects acting as domains is not allowed
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- def test_rename_duplicate_project_name_fails(self):
- project1 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project2 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project1['id'], project1)
- self.resource_api.create_project(project2['id'], project2)
- project2['name'] = project1['name']
- self.assertRaises(exception.Error,
- self.resource_api.update_project,
- project2['id'],
- project2)
-
- def test_update_project_id_does_nothing(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project_id = project['id']
- self.resource_api.create_project(project['id'], project)
- project['id'] = 'fake2'
- self.resource_api.update_project(project_id, project)
- project_ref = self.resource_api.get_project(project_id)
- self.assertEqual(project_id, project_ref['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- 'fake2')
-
- def test_delete_domain_with_user_group_project_links(self):
- # TODO(chungg):add test case once expected behaviour defined
- pass
-
- def test_update_project_returns_not_found(self):
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.update_project,
- uuid.uuid4().hex,
- dict())
-
- def test_delete_project_returns_not_found(self):
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.delete_project,
- uuid.uuid4().hex)
-
- def test_create_update_delete_unicode_project(self):
- unicode_project_name = u'name \u540d\u5b57'
- project = unit.new_project_ref(
- name=unicode_project_name,
- domain_id=CONF.identity.default_domain_id)
- project = self.resource_api.create_project(project['id'], project)
- self.resource_api.update_project(project['id'], project)
- self.resource_api.delete_project(project['id'])
-
- def test_create_project_with_no_enabled_field(self):
- ref = unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
- del ref['enabled']
- self.resource_api.create_project(ref['id'], ref)
-
- project = self.resource_api.get_project(ref['id'])
- self.assertIs(project['enabled'], True)
-
- def test_create_project_long_name_fails(self):
- project = unit.new_project_ref(
- name='a' * 65, domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_create_project_blank_name_fails(self):
- project = unit.new_project_ref(
- name='', domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_create_project_invalid_name_fails(self):
- project = unit.new_project_ref(
- name=None, domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'],
- project)
- project = unit.new_project_ref(
- name=123, domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_update_project_blank_name_fails(self):
- project = unit.new_project_ref(
- name='fake1', domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
- project['name'] = ''
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- def test_update_project_long_name_fails(self):
- project = unit.new_project_ref(
- name='fake1', domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
- project['name'] = 'a' * 65
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- def test_update_project_invalid_name_fails(self):
- project = unit.new_project_ref(
- name='fake1', domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
- project['name'] = None
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- project['name'] = 123
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- def test_update_project_invalid_enabled_type_string(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertTrue(project_ref['enabled'])
-
- # Strings are not valid boolean values
- project['enabled'] = "false"
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- project['id'],
- project)
-
- def test_create_project_invalid_enabled_type_string(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- # invalid string value
- enabled="true")
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_create_project_invalid_domain_id(self):
- project = unit.new_project_ref(domain_id=uuid.uuid4().hex)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_list_domains(self):
- domain1 = unit.new_domain_ref()
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- self.resource_api.create_domain(domain2['id'], domain2)
- domains = self.resource_api.list_domains()
- self.assertEqual(3, len(domains))
- domain_ids = []
- for domain in domains:
- domain_ids.append(domain.get('id'))
- self.assertIn(CONF.identity.default_domain_id, domain_ids)
- self.assertIn(domain1['id'], domain_ids)
- self.assertIn(domain2['id'], domain_ids)
-
- def test_list_projects(self):
- project_refs = self.resource_api.list_projects()
- project_count = len(default_fixtures.TENANTS) + self.domain_count
- self.assertEqual(project_count, len(project_refs))
- for project in default_fixtures.TENANTS:
- self.assertIn(project, project_refs)
-
- def test_list_projects_with_multiple_filters(self):
- # Create a project
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project = self.resource_api.create_project(project['id'], project)
-
- # Build driver hints with the project's name and inexistent description
- hints = driver_hints.Hints()
- hints.add_filter('name', project['name'])
- hints.add_filter('description', uuid.uuid4().hex)
-
- # Retrieve projects based on hints and check an empty list is returned
- projects = self.resource_api.list_projects(hints)
- self.assertEqual([], projects)
-
- # Build correct driver hints
- hints = driver_hints.Hints()
- hints.add_filter('name', project['name'])
- hints.add_filter('description', project['description'])
-
- # Retrieve projects based on hints
- projects = self.resource_api.list_projects(hints)
-
- # Check that the returned list contains only the first project
- self.assertEqual(1, len(projects))
- self.assertEqual(project, projects[0])
-
- def test_list_projects_for_domain(self):
- project_ids = ([x['id'] for x in
- self.resource_api.list_projects_in_domain(
- CONF.identity.default_domain_id)])
- # Only the projects from the default fixtures are expected, since
- # filtering by domain does not include any project that acts as a
- # domain.
- self.assertThat(
- project_ids, matchers.HasLength(len(default_fixtures.TENANTS)))
- self.assertIn(self.tenant_bar['id'], project_ids)
- self.assertIn(self.tenant_baz['id'], project_ids)
- self.assertIn(self.tenant_mtu['id'], project_ids)
- self.assertIn(self.tenant_service['id'], project_ids)
-
- @unit.skip_if_no_multiple_domains_support
- def test_list_projects_acting_as_domain(self):
- initial_domains = self.resource_api.list_domains()
-
- # Creating 5 projects that act as domains
- new_projects_acting_as_domains = []
- for i in range(5):
- project = unit.new_project_ref(is_domain=True)
- project = self.resource_api.create_project(project['id'], project)
- new_projects_acting_as_domains.append(project)
-
- # Creating a few regular project to ensure it doesn't mess with the
- # ones that act as domains
- self._create_projects_hierarchy(hierarchy_size=2)
-
- projects = self.resource_api.list_projects_acting_as_domain()
- expected_number_projects = (
- len(initial_domains) + len(new_projects_acting_as_domains))
- self.assertEqual(expected_number_projects, len(projects))
- for project in new_projects_acting_as_domains:
- self.assertIn(project, projects)
- for domain in initial_domains:
- self.assertIn(domain['id'], [p['id'] for p in projects])
-
- @unit.skip_if_no_multiple_domains_support
- def test_list_projects_for_alternate_domain(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- project2 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project2['id'], project2)
- project_ids = ([x['id'] for x in
- self.resource_api.list_projects_in_domain(
- domain1['id'])])
- self.assertEqual(2, len(project_ids))
- self.assertIn(project1['id'], project_ids)
- self.assertIn(project2['id'], project_ids)
-
- def _create_projects_hierarchy(self, hierarchy_size=2,
- domain_id=None,
- is_domain=False,
- parent_project_id=None):
- """Creates a project hierarchy with specified size.
-
- :param hierarchy_size: the desired hierarchy size, default is 2 -
- a project with one child.
- :param domain_id: domain where the projects hierarchy will be created.
- :param is_domain: if the hierarchy will have the is_domain flag active
- or not.
- :param parent_project_id: if the intention is to create a
- sub-hierarchy, sets the sub-hierarchy root. Defaults to creating
- a new hierarchy, i.e. a new root project.
-
- :returns projects: a list of the projects in the created hierarchy.
-
- """
- if domain_id is None:
- domain_id = CONF.identity.default_domain_id
- if parent_project_id:
- project = unit.new_project_ref(parent_id=parent_project_id,
- domain_id=domain_id,
- is_domain=is_domain)
- else:
- project = unit.new_project_ref(domain_id=domain_id,
- is_domain=is_domain)
- project_id = project['id']
- project = self.resource_api.create_project(project_id, project)
-
- projects = [project]
- for i in range(1, hierarchy_size):
- new_project = unit.new_project_ref(parent_id=project_id,
- domain_id=domain_id)
-
- self.resource_api.create_project(new_project['id'], new_project)
- projects.append(new_project)
- project_id = new_project['id']
-
- return projects
-
- @unit.skip_if_no_multiple_domains_support
- def test_create_domain_with_project_api(self):
- project = unit.new_project_ref(is_domain=True)
- ref = self.resource_api.create_project(project['id'], project)
- self.assertTrue(ref['is_domain'])
- self.resource_api.get_domain(ref['id'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_project_as_a_domain_uniqueness_constraints(self):
- """Tests project uniqueness for those acting as domains.
-
- If it is a project acting as a domain, we can't have two or more with
- the same name.
-
- """
- # Create two projects acting as a domain
- project = unit.new_project_ref(is_domain=True)
- project = self.resource_api.create_project(project['id'], project)
- project2 = unit.new_project_ref(is_domain=True)
- project2 = self.resource_api.create_project(project2['id'], project2)
-
- # All projects acting as domains have a null domain_id, so should not
- # be able to create another with the same name but a different
- # project ID.
- new_project = project.copy()
- new_project['id'] = uuid.uuid4().hex
-
- self.assertRaises(exception.Conflict,
- self.resource_api.create_project,
- new_project['id'],
- new_project)
-
- # We also should not be able to update one to have a name clash
- project2['name'] = project['name']
- self.assertRaises(exception.Conflict,
- self.resource_api.update_project,
- project2['id'],
- project2)
-
- # But updating it to a unique name is OK
- project2['name'] = uuid.uuid4().hex
- self.resource_api.update_project(project2['id'], project2)
-
- # Finally, it should be OK to create a project with same name as one of
- # these acting as a domain, as long as it is a regular project
- project3 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, name=project2['name'])
- self.resource_api.create_project(project3['id'], project3)
- # In fact, it should be OK to create such a project in the domain which
- # has the matching name.
- # TODO(henry-nash): Once we fully support projects acting as a domain,
- # add a test here to create a sub-project with a name that matches its
- # project acting as a domain
-
- @unit.skip_if_no_multiple_domains_support
- @test_utils.wip('waiting for sub projects acting as domains support')
- def test_is_domain_sub_project_has_parent_domain_id(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, is_domain=True)
- self.resource_api.create_project(project['id'], project)
-
- sub_project = unit.new_project_ref(domain_id=project['id'],
- parent_id=project['id'],
- is_domain=True)
-
- ref = self.resource_api.create_project(sub_project['id'], sub_project)
- self.assertTrue(ref['is_domain'])
- self.assertEqual(project['id'], ref['parent_id'])
- self.assertEqual(project['id'], ref['domain_id'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_delete_domain_with_project_api(self):
- project = unit.new_project_ref(domain_id=None,
- is_domain=True)
- self.resource_api.create_project(project['id'], project)
-
- # Check that a corresponding domain was created
- self.resource_api.get_domain(project['id'])
-
- # Try to delete the enabled project that acts as a domain
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.delete_project,
- project['id'])
-
- # Disable the project
- project['enabled'] = False
- self.resource_api.update_project(project['id'], project)
-
- # Successfully delete the project
- self.resource_api.delete_project(project['id'])
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project['id'])
-
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- project['id'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_create_subproject_acting_as_domain_fails(self):
- root_project = unit.new_project_ref(is_domain=True)
- self.resource_api.create_project(root_project['id'], root_project)
-
- sub_project = unit.new_project_ref(is_domain=True,
- parent_id=root_project['id'])
-
- # Creation of sub projects acting as domains is not allowed yet
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- sub_project['id'], sub_project)
-
- @unit.skip_if_no_multiple_domains_support
- def test_create_domain_under_regular_project_hierarchy_fails(self):
- # Projects acting as domains can't have a regular project as parent
- projects_hierarchy = self._create_projects_hierarchy()
- parent = projects_hierarchy[1]
- project = unit.new_project_ref(domain_id=parent['id'],
- parent_id=parent['id'],
- is_domain=True)
-
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'], project)
-
- @unit.skip_if_no_multiple_domains_support
- @test_utils.wip('waiting for sub projects acting as domains support')
- def test_create_project_under_domain_hierarchy(self):
- projects_hierarchy = self._create_projects_hierarchy(is_domain=True)
- parent = projects_hierarchy[1]
- project = unit.new_project_ref(domain_id=parent['id'],
- parent_id=parent['id'],
- is_domain=False)
-
- ref = self.resource_api.create_project(project['id'], project)
- self.assertFalse(ref['is_domain'])
- self.assertEqual(parent['id'], ref['parent_id'])
- self.assertEqual(parent['id'], ref['domain_id'])
-
- def test_create_project_without_is_domain_flag(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- del project['is_domain']
- ref = self.resource_api.create_project(project['id'], project)
- # The is_domain flag should be False by default
- self.assertFalse(ref['is_domain'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_create_project_passing_is_domain_flag_true(self):
- project = unit.new_project_ref(is_domain=True)
-
- ref = self.resource_api.create_project(project['id'], project)
- self.assertTrue(ref['is_domain'])
-
- def test_create_project_passing_is_domain_flag_false(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, is_domain=False)
-
- ref = self.resource_api.create_project(project['id'], project)
- self.assertIs(False, ref['is_domain'])
-
- @test_utils.wip('waiting for support for parent_id to imply domain_id')
- def test_create_project_with_parent_id_and_without_domain_id(self):
- # First create a domain
- project = unit.new_project_ref(is_domain=True)
- self.resource_api.create_project(project['id'], project)
- # Now create a child by just naming the parent_id
- sub_project = unit.new_project_ref(parent_id=project['id'])
- ref = self.resource_api.create_project(sub_project['id'], sub_project)
-
- # The domain_id should be set to the parent domain_id
- self.assertEqual(project['domain_id'], ref['domain_id'])
-
- def test_create_project_with_domain_id_and_without_parent_id(self):
- # First create a domain
- project = unit.new_project_ref(is_domain=True)
- self.resource_api.create_project(project['id'], project)
- # Now create a child by just naming the domain_id
- sub_project = unit.new_project_ref(domain_id=project['id'])
- ref = self.resource_api.create_project(sub_project['id'], sub_project)
-
- # The parent_id and domain_id should be set to the id of the project
- # acting as a domain
- self.assertEqual(project['id'], ref['parent_id'])
- self.assertEqual(project['id'], ref['domain_id'])
-
- def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
- # First create a domain
- project = unit.new_project_ref(is_domain=True)
- self.resource_api.create_project(project['id'], project)
- # Now try to create a child with the above as its parent, but
- # specifying a different domain.
- sub_project = unit.new_project_ref(
- parent_id=project['id'], domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- sub_project['id'], sub_project)
-
- def test_check_leaf_projects(self):
- projects_hierarchy = self._create_projects_hierarchy()
- root_project = projects_hierarchy[0]
- leaf_project = projects_hierarchy[1]
-
- self.assertFalse(self.resource_api.is_leaf_project(
- root_project['id']))
- self.assertTrue(self.resource_api.is_leaf_project(
- leaf_project['id']))
-
- # Delete leaf_project
- self.resource_api.delete_project(leaf_project['id'])
-
- # Now, root_project should be leaf
- self.assertTrue(self.resource_api.is_leaf_project(
- root_project['id']))
-
- def test_list_projects_in_subtree(self):
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- project1 = projects_hierarchy[0]
- project2 = projects_hierarchy[1]
- project3 = projects_hierarchy[2]
- project4 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- parent_id=project2['id'])
- self.resource_api.create_project(project4['id'], project4)
-
- subtree = self.resource_api.list_projects_in_subtree(project1['id'])
- self.assertEqual(3, len(subtree))
- self.assertIn(project2, subtree)
- self.assertIn(project3, subtree)
- self.assertIn(project4, subtree)
-
- subtree = self.resource_api.list_projects_in_subtree(project2['id'])
- self.assertEqual(2, len(subtree))
- self.assertIn(project3, subtree)
- self.assertIn(project4, subtree)
-
- subtree = self.resource_api.list_projects_in_subtree(project3['id'])
- self.assertEqual(0, len(subtree))
-
- def test_list_projects_in_subtree_with_circular_reference(self):
- project1 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project1 = self.resource_api.create_project(project1['id'], project1)
-
- project2 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- parent_id=project1['id'])
- self.resource_api.create_project(project2['id'], project2)
-
- project1['parent_id'] = project2['id'] # Adds cyclic reference
-
- # NOTE(dstanek): The manager does not allow parent_id to be updated.
- # Instead will directly use the driver to create the cyclic
- # reference.
- self.resource_api.driver.update_project(project1['id'], project1)
-
- subtree = self.resource_api.list_projects_in_subtree(project1['id'])
-
- # NOTE(dstanek): If a cyclic reference is detected the code bails
- # and returns None instead of falling into the infinite
- # recursion trap.
- self.assertIsNone(subtree)
-
- def test_list_projects_in_subtree_invalid_project_id(self):
- self.assertRaises(exception.ValidationError,
- self.resource_api.list_projects_in_subtree,
- None)
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.list_projects_in_subtree,
- uuid.uuid4().hex)
-
- def test_list_project_parents(self):
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- project1 = projects_hierarchy[0]
- project2 = projects_hierarchy[1]
- project3 = projects_hierarchy[2]
- project4 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- parent_id=project2['id'])
- self.resource_api.create_project(project4['id'], project4)
-
- parents1 = self.resource_api.list_project_parents(project3['id'])
- self.assertEqual(3, len(parents1))
- self.assertIn(project1, parents1)
- self.assertIn(project2, parents1)
-
- parents2 = self.resource_api.list_project_parents(project4['id'])
- self.assertEqual(parents1, parents2)
-
- parents = self.resource_api.list_project_parents(project1['id'])
- # It has the default domain as parent
- self.assertEqual(1, len(parents))
-
- def test_update_project_enabled_cascade(self):
- """Test update_project_cascade
-
- Ensures the enabled attribute is correctly updated across
- a simple 3-level projects hierarchy.
- """
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- parent = projects_hierarchy[0]
-
- # Disable in parent project disables the whole subtree
- parent['enabled'] = False
- # Store the ref from backend in another variable so we don't bother
- # to remove other attributes that were not originally provided and
- # were set in the manager, like parent_id and domain_id.
- parent_ref = self.resource_api.update_project(parent['id'],
- parent,
- cascade=True)
-
- subtree = self.resource_api.list_projects_in_subtree(parent['id'])
- self.assertEqual(2, len(subtree))
- self.assertFalse(parent_ref['enabled'])
- self.assertFalse(subtree[0]['enabled'])
- self.assertFalse(subtree[1]['enabled'])
-
- # Enable parent project enables the whole subtree
- parent['enabled'] = True
- parent_ref = self.resource_api.update_project(parent['id'],
- parent,
- cascade=True)
-
- subtree = self.resource_api.list_projects_in_subtree(parent['id'])
- self.assertEqual(2, len(subtree))
- self.assertTrue(parent_ref['enabled'])
- self.assertTrue(subtree[0]['enabled'])
- self.assertTrue(subtree[1]['enabled'])
-
- def test_cannot_enable_cascade_with_parent_disabled(self):
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- grandparent = projects_hierarchy[0]
- parent = projects_hierarchy[1]
-
- grandparent['enabled'] = False
- self.resource_api.update_project(grandparent['id'],
- grandparent,
- cascade=True)
- subtree = self.resource_api.list_projects_in_subtree(parent['id'])
- self.assertFalse(subtree[0]['enabled'])
-
- parent['enabled'] = True
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.update_project,
- parent['id'],
- parent,
- cascade=True)
-
- def test_update_cascade_only_accepts_enabled(self):
- # Update cascade does not accept any other attribute but 'enabled'
- new_project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(new_project['id'], new_project)
-
- new_project['name'] = 'project1'
- self.assertRaises(exception.ValidationError,
- self.resource_api.update_project,
- new_project['id'],
- new_project,
- cascade=True)
-
- def test_list_project_parents_invalid_project_id(self):
- self.assertRaises(exception.ValidationError,
- self.resource_api.list_project_parents,
- None)
-
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.list_project_parents,
- uuid.uuid4().hex)
-
- def test_create_project_doesnt_modify_passed_in_dict(self):
- new_project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- original_project = new_project.copy()
- self.resource_api.create_project(new_project['id'], new_project)
- self.assertDictEqual(original_project, new_project)
-
- def test_update_project_enable(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertTrue(project_ref['enabled'])
-
- project['enabled'] = False
- self.resource_api.update_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertEqual(project['enabled'], project_ref['enabled'])
-
- # If not present, enabled field should not be updated
- del project['enabled']
- self.resource_api.update_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertFalse(project_ref['enabled'])
-
- project['enabled'] = True
- self.resource_api.update_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertEqual(project['enabled'], project_ref['enabled'])
-
- del project['enabled']
- self.resource_api.update_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertTrue(project_ref['enabled'])
-
- def test_create_invalid_domain_fails(self):
- new_group = unit.new_group_ref(domain_id="doesnotexist")
- self.assertRaises(exception.DomainNotFound,
- self.identity_api.create_group,
- new_group)
- new_user = unit.new_user_ref(domain_id="doesnotexist")
- self.assertRaises(exception.DomainNotFound,
- self.identity_api.create_user,
- new_user)
-
- @unit.skip_if_no_multiple_domains_support
- def test_project_crud(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- project = unit.new_project_ref(domain_id=domain['id'])
- self.resource_api.create_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertDictContainsSubset(project, project_ref)
-
- project['name'] = uuid.uuid4().hex
- self.resource_api.update_project(project['id'], project)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertDictContainsSubset(project, project_ref)
-
- self.resource_api.delete_project(project['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project['id'])
-
- def test_domain_delete_hierarchy(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
-
- # Creating a root and a leaf project inside the domain
- projects_hierarchy = self._create_projects_hierarchy(
- domain_id=domain['id'])
- root_project = projects_hierarchy[0]
- leaf_project = projects_hierarchy[0]
-
- # Disable the domain
- domain['enabled'] = False
- self.resource_api.update_domain(domain['id'], domain)
-
- # Delete the domain
- self.resource_api.delete_domain(domain['id'])
-
- # Make sure the domain no longer exists
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
-
- # Make sure the root project no longer exists
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- root_project['id'])
-
- # Make sure the leaf project no longer exists
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- leaf_project['id'])
-
- def test_delete_projects_from_ids(self):
- """Tests the resource backend call delete_projects_from_ids.
-
- Tests the normal flow of the delete_projects_from_ids backend call,
- that ensures no project on the list exists after it is succesfully
- called.
- """
- project1_ref = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- project2_ref = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- projects = (project1_ref, project2_ref)
- for project in projects:
- self.resource_api.create_project(project['id'], project)
-
- # Setting up the ID's list
- projects_ids = [p['id'] for p in projects]
- self.resource_api.driver.delete_projects_from_ids(projects_ids)
-
- # Ensuring projects no longer exist at backend level
- for project_id in projects_ids:
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.driver.get_project,
- project_id)
-
- # Passing an empty list is silently ignored
- self.resource_api.driver.delete_projects_from_ids([])
-
- def test_delete_projects_from_ids_with_no_existing_project_id(self):
- """Tests delete_projects_from_ids issues warning if not found.
-
- Tests the resource backend call delete_projects_from_ids passing a
- non existing ID in project_ids, which is logged and ignored by
- the backend.
- """
- project_ref = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project_ref['id'], project_ref)
-
- # Setting up the ID's list
- projects_ids = (project_ref['id'], uuid.uuid4().hex)
- with mock.patch('keystone.resource.backends.sql.LOG') as mock_log:
- self.resource_api.delete_projects_from_ids(projects_ids)
- self.assertTrue(mock_log.warning.called)
- # The existing project was deleted.
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.driver.get_project,
- project_ref['id'])
-
- # Even if we only have one project, and it does not exist, it returns
- # no error.
- self.resource_api.driver.delete_projects_from_ids([uuid.uuid4().hex])
-
- def test_delete_project_cascade(self):
- # create a hierarchy with 3 levels
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- root_project = projects_hierarchy[0]
- project1 = projects_hierarchy[1]
- project2 = projects_hierarchy[2]
-
- # Disabling all projects before attempting to delete
- for project in (project2, project1, root_project):
- project['enabled'] = False
- self.resource_api.update_project(project['id'], project)
-
- self.resource_api.delete_project(root_project['id'], cascade=True)
-
- for project in projects_hierarchy:
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project['id'])
-
- def test_delete_large_project_cascade(self):
- """Try delete a large project with cascade true.
-
- Tree we will create::
-
- +-p1-+
- | |
- p5 p2
- | |
- p6 +-p3-+
- | |
- p7 p4
- """
- # create a hierarchy with 4 levels
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=4)
- p1 = projects_hierarchy[0]
- # Add the left branch to the hierarchy (p5, p6)
- self._create_projects_hierarchy(hierarchy_size=2,
- parent_project_id=p1['id'])
- # Add p7 to the hierarchy
- p3_id = projects_hierarchy[2]['id']
- self._create_projects_hierarchy(hierarchy_size=1,
- parent_project_id=p3_id)
- # Reverse the hierarchy to disable the leaf first
- prjs_hierarchy = ([p1] + self.resource_api.list_projects_in_subtree(
- p1['id']))[::-1]
-
- # Disabling all projects before attempting to delete
- for project in prjs_hierarchy:
- project['enabled'] = False
- self.resource_api.update_project(project['id'], project)
-
- self.resource_api.delete_project(p1['id'], cascade=True)
- for project in prjs_hierarchy:
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project['id'])
-
- def test_cannot_delete_project_cascade_with_enabled_child(self):
- # create a hierarchy with 3 levels
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- root_project = projects_hierarchy[0]
- project1 = projects_hierarchy[1]
- project2 = projects_hierarchy[2]
-
- project2['enabled'] = False
- self.resource_api.update_project(project2['id'], project2)
-
- # Cannot cascade delete root_project, since project1 is enabled
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.delete_project,
- root_project['id'],
- cascade=True)
-
- # Ensuring no project was deleted, not even project2
- self.resource_api.get_project(root_project['id'])
- self.resource_api.get_project(project1['id'])
- self.resource_api.get_project(project2['id'])
-
- def test_hierarchical_projects_crud(self):
- # create a hierarchy with just a root project (which is a leaf as well)
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=1)
- root_project1 = projects_hierarchy[0]
-
- # create a hierarchy with one root project and one leaf project
- projects_hierarchy = self._create_projects_hierarchy()
- root_project2 = projects_hierarchy[0]
- leaf_project = projects_hierarchy[1]
-
- # update description from leaf_project
- leaf_project['description'] = 'new description'
- self.resource_api.update_project(leaf_project['id'], leaf_project)
- proj_ref = self.resource_api.get_project(leaf_project['id'])
- self.assertDictEqual(leaf_project, proj_ref)
-
- # update the parent_id is not allowed
- leaf_project['parent_id'] = root_project1['id']
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.update_project,
- leaf_project['id'],
- leaf_project)
-
- # delete root_project1
- self.resource_api.delete_project(root_project1['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- root_project1['id'])
-
- # delete root_project2 is not allowed since it is not a leaf project
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.delete_project,
- root_project2['id'])
-
- def test_create_project_with_invalid_parent(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, parent_id='fake')
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.create_project,
- project['id'],
- project)
-
- @unit.skip_if_no_multiple_domains_support
- def test_create_leaf_project_with_different_domain(self):
- root_project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(root_project['id'], root_project)
-
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- leaf_project = unit.new_project_ref(domain_id=domain['id'],
- parent_id=root_project['id'])
-
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- leaf_project['id'],
- leaf_project)
-
- def test_delete_hierarchical_leaf_project(self):
- projects_hierarchy = self._create_projects_hierarchy()
- root_project = projects_hierarchy[0]
- leaf_project = projects_hierarchy[1]
-
- self.resource_api.delete_project(leaf_project['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- leaf_project['id'])
-
- self.resource_api.delete_project(root_project['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- root_project['id'])
-
- def test_delete_hierarchical_not_leaf_project(self):
- projects_hierarchy = self._create_projects_hierarchy()
- root_project = projects_hierarchy[0]
-
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.delete_project,
- root_project['id'])
-
- def test_update_project_parent(self):
- projects_hierarchy = self._create_projects_hierarchy(hierarchy_size=3)
- project1 = projects_hierarchy[0]
- project2 = projects_hierarchy[1]
- project3 = projects_hierarchy[2]
-
- # project2 is the parent from project3
- self.assertEqual(project3.get('parent_id'), project2['id'])
-
- # try to update project3 parent to parent1
- project3['parent_id'] = project1['id']
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.update_project,
- project3['id'],
- project3)
-
- def test_create_project_under_disabled_one(self):
- project1 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id, enabled=False)
- self.resource_api.create_project(project1['id'], project1)
-
- project2 = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- parent_id=project1['id'])
-
- # It's not possible to create a project under a disabled one in the
- # hierarchy
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project2['id'],
- project2)
-
- def test_disable_hierarchical_leaf_project(self):
- projects_hierarchy = self._create_projects_hierarchy()
- leaf_project = projects_hierarchy[1]
-
- leaf_project['enabled'] = False
- self.resource_api.update_project(leaf_project['id'], leaf_project)
-
- project_ref = self.resource_api.get_project(leaf_project['id'])
- self.assertEqual(leaf_project['enabled'], project_ref['enabled'])
-
- def test_disable_hierarchical_not_leaf_project(self):
- projects_hierarchy = self._create_projects_hierarchy()
- root_project = projects_hierarchy[0]
-
- root_project['enabled'] = False
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.update_project,
- root_project['id'],
- root_project)
-
- def test_enable_project_with_disabled_parent(self):
- projects_hierarchy = self._create_projects_hierarchy()
- root_project = projects_hierarchy[0]
- leaf_project = projects_hierarchy[1]
-
- # Disable leaf and root
- leaf_project['enabled'] = False
- self.resource_api.update_project(leaf_project['id'], leaf_project)
- root_project['enabled'] = False
- self.resource_api.update_project(root_project['id'], root_project)
-
- # Try to enable the leaf project, it's not possible since it has
- # a disabled parent
- leaf_project['enabled'] = True
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.update_project,
- leaf_project['id'],
- leaf_project)
-
- def _get_hierarchy_depth(self, project_id):
- return len(self.resource_api.list_project_parents(project_id)) + 1
-
- def test_check_hierarchy_depth(self):
- # Should be allowed to have a hierarchy of the max depth specified
- # in the config option plus one (to allow for the additional project
- # acting as a domain after an upgrade)
- projects_hierarchy = self._create_projects_hierarchy(
- CONF.max_project_tree_depth)
- leaf_project = projects_hierarchy[CONF.max_project_tree_depth - 1]
-
- depth = self._get_hierarchy_depth(leaf_project['id'])
- self.assertEqual(CONF.max_project_tree_depth + 1, depth)
-
- # Creating another project in the hierarchy shouldn't be allowed
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id,
- parent_id=leaf_project['id'])
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.create_project,
- project['id'],
- project)
-
- def test_project_update_missing_attrs_with_a_value(self):
- # Creating a project with no description attribute.
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- del project['description']
- project = self.resource_api.create_project(project['id'], project)
-
- # Add a description attribute.
- project['description'] = uuid.uuid4().hex
- self.resource_api.update_project(project['id'], project)
-
- project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project, project_ref)
-
- def test_project_update_missing_attrs_with_a_falsey_value(self):
- # Creating a project with no description attribute.
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- del project['description']
- project = self.resource_api.create_project(project['id'], project)
-
- # Add a description attribute.
- project['description'] = ''
- self.resource_api.update_project(project['id'], project)
-
- project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project, project_ref)
-
- def test_domain_crud(self):
- domain = unit.new_domain_ref()
- domain_ref = self.resource_api.create_domain(domain['id'], domain)
- self.assertDictEqual(domain, domain_ref)
- domain_ref = self.resource_api.get_domain(domain['id'])
- self.assertDictEqual(domain, domain_ref)
-
- domain['name'] = uuid.uuid4().hex
- domain_ref = self.resource_api.update_domain(domain['id'], domain)
- self.assertDictEqual(domain, domain_ref)
- domain_ref = self.resource_api.get_domain(domain['id'])
- self.assertDictEqual(domain, domain_ref)
-
- # Ensure an 'enabled' domain cannot be deleted
- self.assertRaises(exception.ForbiddenNotSecurity,
- self.resource_api.delete_domain,
- domain_id=domain['id'])
-
- # Disable the domain
- domain['enabled'] = False
- self.resource_api.update_domain(domain['id'], domain)
-
- # Delete the domain
- self.resource_api.delete_domain(domain['id'])
-
- # Make sure the domain no longer exists
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
-
- @unit.skip_if_no_multiple_domains_support
- def test_domain_name_case_sensitivity(self):
- # create a ref with a lowercase name
- domain_name = 'test_domain'
- ref = unit.new_domain_ref(name=domain_name)
-
- lower_case_domain = self.resource_api.create_domain(ref['id'], ref)
-
- # assign a new ID to the ref with the same name, but in uppercase
- ref['id'] = uuid.uuid4().hex
- ref['name'] = domain_name.upper()
- upper_case_domain = self.resource_api.create_domain(ref['id'], ref)
-
- # We can get each domain by name
- lower_case_domain_ref = self.resource_api.get_domain_by_name(
- domain_name)
- self.assertDictEqual(lower_case_domain, lower_case_domain_ref)
-
- upper_case_domain_ref = self.resource_api.get_domain_by_name(
- domain_name.upper())
- self.assertDictEqual(upper_case_domain, upper_case_domain_ref)
-
- def test_project_attribute_update(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
- self.resource_api.create_project(project['id'], project)
-
- # pick a key known to be non-existent
- key = 'description'
-
- def assert_key_equals(value):
- project_ref = self.resource_api.update_project(
- project['id'], project)
- self.assertEqual(value, project_ref[key])
- project_ref = self.resource_api.get_project(project['id'])
- self.assertEqual(value, project_ref[key])
-
- def assert_get_key_is(value):
- project_ref = self.resource_api.update_project(
- project['id'], project)
- self.assertIs(project_ref.get(key), value)
- project_ref = self.resource_api.get_project(project['id'])
- self.assertIs(project_ref.get(key), value)
-
- # add an attribute that doesn't exist, set it to a falsey value
- value = ''
- project[key] = value
- assert_key_equals(value)
-
- # set an attribute with a falsey value to null
- value = None
- project[key] = value
- assert_get_key_is(value)
-
- # do it again, in case updating from this situation is handled oddly
- value = None
- project[key] = value
- assert_get_key_is(value)
-
- # set a possibly-null value to a falsey value
- value = ''
- project[key] = value
- assert_key_equals(value)
-
- # set a falsey value to a truthy value
- value = uuid.uuid4().hex
- project[key] = value
- assert_key_equals(value)
-
- @unit.skip_if_cache_disabled('resource')
- @unit.skip_if_no_multiple_domains_support
- def test_domain_rename_invalidates_get_domain_by_name_cache(self):
- domain = unit.new_domain_ref()
- domain_id = domain['id']
- domain_name = domain['name']
- self.resource_api.create_domain(domain_id, domain)
- domain_ref = self.resource_api.get_domain_by_name(domain_name)
- domain_ref['name'] = uuid.uuid4().hex
- self.resource_api.update_domain(domain_id, domain_ref)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain_by_name,
- domain_name)
-
- @unit.skip_if_cache_disabled('resource')
- def test_cache_layer_domain_crud(self):
- domain = unit.new_domain_ref()
- domain_id = domain['id']
- # Create Domain
- self.resource_api.create_domain(domain_id, domain)
- project_domain_ref = self.resource_api.get_project(domain_id)
- domain_ref = self.resource_api.get_domain(domain_id)
- updated_project_domain_ref = copy.deepcopy(project_domain_ref)
- updated_project_domain_ref['name'] = uuid.uuid4().hex
- updated_domain_ref = copy.deepcopy(domain_ref)
- updated_domain_ref['name'] = updated_project_domain_ref['name']
- # Update domain, bypassing resource api manager
- self.resource_api.driver.update_project(domain_id,
- updated_project_domain_ref)
- # Verify get_domain still returns the domain
- self.assertDictContainsSubset(
- domain_ref, self.resource_api.get_domain(domain_id))
- # Invalidate cache
- self.resource_api.get_domain.invalidate(self.resource_api,
- domain_id)
- # Verify get_domain returns the updated domain
- self.assertDictContainsSubset(
- updated_domain_ref, self.resource_api.get_domain(domain_id))
- # Update the domain back to original ref, using the assignment api
- # manager
- self.resource_api.update_domain(domain_id, domain_ref)
- self.assertDictContainsSubset(
- domain_ref, self.resource_api.get_domain(domain_id))
- # Make sure domain is 'disabled', bypass resource api manager
- project_domain_ref_disabled = project_domain_ref.copy()
- project_domain_ref_disabled['enabled'] = False
- self.resource_api.driver.update_project(domain_id,
- project_domain_ref_disabled)
- self.resource_api.driver.update_project(domain_id, {'enabled': False})
- # Delete domain, bypassing resource api manager
- self.resource_api.driver.delete_project(domain_id)
- # Verify get_domain still returns the domain
- self.assertDictContainsSubset(
- domain_ref, self.resource_api.get_domain(domain_id))
- # Invalidate cache
- self.resource_api.get_domain.invalidate(self.resource_api,
- domain_id)
- # Verify get_domain now raises DomainNotFound
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain, domain_id)
- # Recreate Domain
- self.resource_api.create_domain(domain_id, domain)
- self.resource_api.get_domain(domain_id)
- # Make sure domain is 'disabled', bypass resource api manager
- domain['enabled'] = False
- self.resource_api.driver.update_project(domain_id, domain)
- self.resource_api.driver.update_project(domain_id, {'enabled': False})
- # Delete domain
- self.resource_api.delete_domain(domain_id)
- # verify DomainNotFound raised
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain_id)
-
- @unit.skip_if_cache_disabled('resource')
- @unit.skip_if_no_multiple_domains_support
- def test_project_rename_invalidates_get_project_by_name_cache(self):
- domain = unit.new_domain_ref()
- project = unit.new_project_ref(domain_id=domain['id'])
- project_id = project['id']
- project_name = project['name']
- self.resource_api.create_domain(domain['id'], domain)
- # Create a project
- self.resource_api.create_project(project_id, project)
- self.resource_api.get_project_by_name(project_name, domain['id'])
- project['name'] = uuid.uuid4().hex
- self.resource_api.update_project(project_id, project)
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project_by_name,
- project_name,
- domain['id'])
-
- @unit.skip_if_cache_disabled('resource')
- @unit.skip_if_no_multiple_domains_support
- def test_cache_layer_project_crud(self):
- domain = unit.new_domain_ref()
- project = unit.new_project_ref(domain_id=domain['id'])
- project_id = project['id']
- self.resource_api.create_domain(domain['id'], domain)
- # Create a project
- self.resource_api.create_project(project_id, project)
- self.resource_api.get_project(project_id)
- updated_project = copy.deepcopy(project)
- updated_project['name'] = uuid.uuid4().hex
- # Update project, bypassing resource manager
- self.resource_api.driver.update_project(project_id,
- updated_project)
- # Verify get_project still returns the original project_ref
- self.assertDictContainsSubset(
- project, self.resource_api.get_project(project_id))
- # Invalidate cache
- self.resource_api.get_project.invalidate(self.resource_api,
- project_id)
- # Verify get_project now returns the new project
- self.assertDictContainsSubset(
- updated_project,
- self.resource_api.get_project(project_id))
- # Update project using the resource_api manager back to original
- self.resource_api.update_project(project['id'], project)
- # Verify get_project returns the original project_ref
- self.assertDictContainsSubset(
- project, self.resource_api.get_project(project_id))
- # Delete project bypassing resource
- self.resource_api.driver.delete_project(project_id)
- # Verify get_project still returns the project_ref
- self.assertDictContainsSubset(
- project, self.resource_api.get_project(project_id))
- # Invalidate cache
- self.resource_api.get_project.invalidate(self.resource_api,
- project_id)
- # Verify ProjectNotFound now raised
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project_id)
- # recreate project
- self.resource_api.create_project(project_id, project)
- self.resource_api.get_project(project_id)
- # delete project
- self.resource_api.delete_project(project_id)
- # Verify ProjectNotFound is raised
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- project_id)
-
- @unit.skip_if_no_multiple_domains_support
- def test_get_default_domain_by_name(self):
- domain_name = 'default'
-
- domain = unit.new_domain_ref(name=domain_name)
- self.resource_api.create_domain(domain['id'], domain)
-
- domain_ref = self.resource_api.get_domain_by_name(domain_name)
- self.assertEqual(domain, domain_ref)
-
- def test_get_not_default_domain_by_name(self):
- domain_name = 'foo'
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain_by_name,
- domain_name)
-
- def test_project_update_and_project_get_return_same_response(self):
- project = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
-
- self.resource_api.create_project(project['id'], project)
-
- updated_project = {'enabled': False}
- updated_project_ref = self.resource_api.update_project(
- project['id'], updated_project)
-
- # SQL backend adds 'extra' field
- updated_project_ref.pop('extra', None)
-
- self.assertIs(False, updated_project_ref['enabled'])
-
- project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(updated_project_ref, project_ref)
-
-
-class ResourceDriverTests(object):
- """Tests for the resource driver.
-
- Subclasses must set self.driver to the driver instance.
-
- """
-
- def test_create_project(self):
- project_id = uuid.uuid4().hex
- project = {
- 'name': uuid.uuid4().hex,
- 'id': project_id,
- 'domain_id': uuid.uuid4().hex,
- }
- self.driver.create_project(project_id, project)
-
- def test_create_project_all_defined_properties(self):
- project_id = uuid.uuid4().hex
- project = {
- 'name': uuid.uuid4().hex,
- 'id': project_id,
- 'domain_id': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex,
- 'enabled': True,
- 'parent_id': uuid.uuid4().hex,
- 'is_domain': True,
- }
- self.driver.create_project(project_id, project)
-
- def test_create_project_null_domain(self):
- project_id = uuid.uuid4().hex
- project = {
- 'name': uuid.uuid4().hex,
- 'id': project_id,
- 'domain_id': None,
- }
- self.driver.create_project(project_id, project)
-
- def test_create_project_same_name_same_domain_conflict(self):
- name = uuid.uuid4().hex
- domain_id = uuid.uuid4().hex
-
- project_id = uuid.uuid4().hex
- project = {
- 'name': name,
- 'id': project_id,
- 'domain_id': domain_id,
- }
- self.driver.create_project(project_id, project)
-
- project_id = uuid.uuid4().hex
- project = {
- 'name': name,
- 'id': project_id,
- 'domain_id': domain_id,
- }
- self.assertRaises(exception.Conflict, self.driver.create_project,
- project_id, project)
-
- def test_create_project_same_id_conflict(self):
- project_id = uuid.uuid4().hex
-
- project = {
- 'name': uuid.uuid4().hex,
- 'id': project_id,
- 'domain_id': uuid.uuid4().hex,
- }
- self.driver.create_project(project_id, project)
-
- project = {
- 'name': uuid.uuid4().hex,
- 'id': project_id,
- 'domain_id': uuid.uuid4().hex,
- }
- self.assertRaises(exception.Conflict, self.driver.create_project,
- project_id, project)
diff --git a/keystone-moon/keystone/tests/unit/resource/test_controllers.py b/keystone-moon/keystone/tests/unit/resource/test_controllers.py
deleted file mode 100644
index b8f247c8..00000000
--- a/keystone-moon/keystone/tests/unit/resource/test_controllers.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Copyright 2016 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-from oslo_config import cfg
-
-from keystone import exception
-from keystone.resource import controllers
-from keystone.tests import unit
-from keystone.tests.unit.ksfixtures import database
-
-
-CONF = cfg.CONF
-
-_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}}
-
-
-class TenantTestCaseNoDefaultDomain(unit.TestCase):
-
- def setUp(self):
- super(TenantTestCaseNoDefaultDomain, self).setUp()
- self.useFixture(database.Database())
- self.load_backends()
- self.tenant_controller = controllers.Tenant()
-
- def test_setup(self):
- # Other tests in this class assume there's no default domain, so make
- # sure the setUp worked as expected.
- self.assertRaises(
- exception.DomainNotFound,
- self.resource_api.get_domain, CONF.identity.default_domain_id)
-
- def test_get_all_projects(self):
- # When get_all_projects is done and there's no default domain, the
- # result is an empty list.
- res = self.tenant_controller.get_all_projects(_ADMIN_CONTEXT)
- self.assertEqual([], res['tenants'])
-
- def test_create_project(self):
- # When a project is created using the v2 controller and there's no
- # default domain, it doesn't fail with can't find domain (a default
- # domain is created)
- tenant = {'name': uuid.uuid4().hex}
- self.tenant_controller.create_project(_ADMIN_CONTEXT, tenant)
- # If the above doesn't fail then this is successful.
diff --git a/keystone-moon/keystone/tests/unit/resource/test_core.py b/keystone-moon/keystone/tests/unit/resource/test_core.py
deleted file mode 100644
index 2eb87e4c..00000000
--- a/keystone-moon/keystone/tests/unit/resource/test_core.py
+++ /dev/null
@@ -1,692 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import uuid
-
-import mock
-from testtools import matchers
-
-from oslo_config import cfg
-from oslotest import mockpatch
-
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit.ksfixtures import database
-
-
-CONF = cfg.CONF
-
-
-class TestResourceManagerNoFixtures(unit.SQLDriverOverrides, unit.TestCase):
-
- def setUp(self):
- super(TestResourceManagerNoFixtures, self).setUp()
- self.useFixture(database.Database(self.sql_driver_version_overrides))
- self.load_backends()
-
- def test_ensure_default_domain_exists(self):
- # When there's no default domain, ensure_default_domain_exists creates
- # it.
-
- # First make sure there's no default domain.
- self.assertRaises(
- exception.DomainNotFound,
- self.resource_api.get_domain, CONF.identity.default_domain_id)
-
- self.resource_api.ensure_default_domain_exists()
- default_domain = self.resource_api.get_domain(
- CONF.identity.default_domain_id)
-
- expected_domain = {
- 'id': CONF.identity.default_domain_id,
- 'name': 'Default',
- 'enabled': True,
- 'description': 'Domain created automatically to support V2.0 '
- 'operations.',
- }
- self.assertEqual(expected_domain, default_domain)
-
- def test_ensure_default_domain_exists_already_exists(self):
- # When there's already a default domain, ensure_default_domain_exists
- # doesn't do anything.
-
- name = uuid.uuid4().hex
- description = uuid.uuid4().hex
- domain_attrs = {
- 'id': CONF.identity.default_domain_id,
- 'name': name,
- 'description': description,
- }
- self.resource_api.create_domain(CONF.identity.default_domain_id,
- domain_attrs)
-
- self.resource_api.ensure_default_domain_exists()
-
- default_domain = self.resource_api.get_domain(
- CONF.identity.default_domain_id)
-
- expected_domain = {
- 'id': CONF.identity.default_domain_id,
- 'name': name,
- 'enabled': True,
- 'description': description,
- }
-
- self.assertEqual(expected_domain, default_domain)
-
- def test_ensure_default_domain_exists_fails(self):
- # When there's an unexpected exception creating domain it's passed on.
-
- self.useFixture(mockpatch.PatchObject(
- self.resource_api, 'create_domain',
- side_effect=exception.UnexpectedError))
-
- self.assertRaises(exception.UnexpectedError,
- self.resource_api.ensure_default_domain_exists)
-
- def test_update_project_name_conflict(self):
- name = uuid.uuid4().hex
- description = uuid.uuid4().hex
- domain_attrs = {
- 'id': CONF.identity.default_domain_id,
- 'name': name,
- 'description': description,
- }
- domain = self.resource_api.create_domain(
- CONF.identity.default_domain_id, domain_attrs)
- project1 = unit.new_project_ref(domain_id=domain['id'],
- name=uuid.uuid4().hex)
- self.resource_api.create_project(project1['id'], project1)
- project2 = unit.new_project_ref(domain_id=domain['id'],
- name=uuid.uuid4().hex)
- project = self.resource_api.create_project(project2['id'], project2)
-
- self.assertRaises(exception.Conflict,
- self.resource_api.update_project,
- project['id'], {'name': project1['name']})
-
-
-class DomainConfigDriverTests(object):
-
- def _domain_config_crud(self, sensitive):
- domain = uuid.uuid4().hex
- group = uuid.uuid4().hex
- option = uuid.uuid4().hex
- value = uuid.uuid4().hex
- self.driver.create_config_option(
- domain, group, option, value, sensitive)
- res = self.driver.get_config_option(
- domain, group, option, sensitive)
- config = {'group': group, 'option': option, 'value': value}
- self.assertEqual(config, res)
-
- value = uuid.uuid4().hex
- self.driver.update_config_option(
- domain, group, option, value, sensitive)
- res = self.driver.get_config_option(
- domain, group, option, sensitive)
- config = {'group': group, 'option': option, 'value': value}
- self.assertEqual(config, res)
-
- self.driver.delete_config_options(
- domain, group, option, sensitive)
- self.assertRaises(exception.DomainConfigNotFound,
- self.driver.get_config_option,
- domain, group, option, sensitive)
- # ...and silent if we try to delete it again
- self.driver.delete_config_options(
- domain, group, option, sensitive)
-
- def test_whitelisted_domain_config_crud(self):
- self._domain_config_crud(sensitive=False)
-
- def test_sensitive_domain_config_crud(self):
- self._domain_config_crud(sensitive=True)
-
- def _list_domain_config(self, sensitive):
- """Test listing by combination of domain, group & option."""
- config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- # Put config2 in the same group as config1
- config2 = {'group': config1['group'], 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- config3 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
- 'value': 100}
- domain = uuid.uuid4().hex
-
- for config in [config1, config2, config3]:
- self.driver.create_config_option(
- domain, config['group'], config['option'],
- config['value'], sensitive)
-
- # Try listing all items from a domain
- res = self.driver.list_config_options(
- domain, sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(3))
- for res_entry in res:
- self.assertIn(res_entry, [config1, config2, config3])
-
- # Try listing by domain and group
- res = self.driver.list_config_options(
- domain, group=config1['group'], sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(2))
- for res_entry in res:
- self.assertIn(res_entry, [config1, config2])
-
- # Try listing by domain, group and option
- res = self.driver.list_config_options(
- domain, group=config2['group'],
- option=config2['option'], sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(1))
- self.assertEqual(config2, res[0])
-
- def test_list_whitelisted_domain_config_crud(self):
- self._list_domain_config(False)
-
- def test_list_sensitive_domain_config_crud(self):
- self._list_domain_config(True)
-
- def _delete_domain_configs(self, sensitive):
- """Test deleting by combination of domain, group & option."""
- config1 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- # Put config2 and config3 in the same group as config1
- config2 = {'group': config1['group'], 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- config3 = {'group': config1['group'], 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- config4 = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- domain = uuid.uuid4().hex
-
- for config in [config1, config2, config3, config4]:
- self.driver.create_config_option(
- domain, config['group'], config['option'],
- config['value'], sensitive)
-
- # Try deleting by domain, group and option
- res = self.driver.delete_config_options(
- domain, group=config2['group'],
- option=config2['option'], sensitive=sensitive)
- res = self.driver.list_config_options(
- domain, sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(3))
- for res_entry in res:
- self.assertIn(res_entry, [config1, config3, config4])
-
- # Try deleting by domain and group
- res = self.driver.delete_config_options(
- domain, group=config4['group'], sensitive=sensitive)
- res = self.driver.list_config_options(
- domain, sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(2))
- for res_entry in res:
- self.assertIn(res_entry, [config1, config3])
-
- # Try deleting all items from a domain
- res = self.driver.delete_config_options(
- domain, sensitive=sensitive)
- res = self.driver.list_config_options(
- domain, sensitive=sensitive)
- self.assertThat(res, matchers.HasLength(0))
-
- def test_delete_whitelisted_domain_configs(self):
- self._delete_domain_configs(False)
-
- def test_delete_sensitive_domain_configs(self):
- self._delete_domain_configs(True)
-
- def _create_domain_config_twice(self, sensitive):
- """Test conflict error thrown if create the same option twice."""
- config = {'group': uuid.uuid4().hex, 'option': uuid.uuid4().hex,
- 'value': uuid.uuid4().hex}
- domain = uuid.uuid4().hex
-
- self.driver.create_config_option(
- domain, config['group'], config['option'],
- config['value'], sensitive=sensitive)
- self.assertRaises(exception.Conflict,
- self.driver.create_config_option,
- domain, config['group'], config['option'],
- config['value'], sensitive=sensitive)
-
- def test_create_whitelisted_domain_config_twice(self):
- self._create_domain_config_twice(False)
-
- def test_create_sensitive_domain_config_twice(self):
- self._create_domain_config_twice(True)
-
-
-class DomainConfigTests(object):
-
- def setUp(self):
- self.domain = unit.new_domain_ref()
- self.resource_api.create_domain(self.domain['id'], self.domain)
- self.addCleanup(self.clean_up_domain)
-
- def clean_up_domain(self):
- # NOTE(henry-nash): Deleting the domain will also delete any domain
- # configs for this domain.
- self.domain['enabled'] = False
- self.resource_api.update_domain(self.domain['id'], self.domain)
- self.resource_api.delete_domain(self.domain['id'])
- del self.domain
-
- def test_create_domain_config_including_sensitive_option(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
-
- # password is sensitive, so check that the whitelisted portion and
- # the sensitive piece have been stored in the appropriate locations.
- res = self.domain_config_api.get_config(self.domain['id'])
- config_whitelisted = copy.deepcopy(config)
- config_whitelisted['ldap'].pop('password')
- self.assertEqual(config_whitelisted, res)
- res = self.domain_config_api.driver.get_config_option(
- self.domain['id'], 'ldap', 'password', sensitive=True)
- self.assertEqual(config['ldap']['password'], res['value'])
-
- # Finally, use the non-public API to get back the whole config
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual(config, res)
-
- def test_get_partial_domain_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
-
- res = self.domain_config_api.get_config(self.domain['id'],
- group='identity')
- config_partial = copy.deepcopy(config)
- config_partial.pop('ldap')
- self.assertEqual(config_partial, res)
- res = self.domain_config_api.get_config(
- self.domain['id'], group='ldap', option='user_tree_dn')
- self.assertEqual({'user_tree_dn': config['ldap']['user_tree_dn']}, res)
- # ...but we should fail to get a sensitive option
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.get_config, self.domain['id'],
- group='ldap', option='password')
-
- def test_delete_partial_domain_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
-
- self.domain_config_api.delete_config(
- self.domain['id'], group='identity')
- config_partial = copy.deepcopy(config)
- config_partial.pop('identity')
- config_partial['ldap'].pop('password')
- res = self.domain_config_api.get_config(self.domain['id'])
- self.assertEqual(config_partial, res)
-
- self.domain_config_api.delete_config(
- self.domain['id'], group='ldap', option='url')
- config_partial = copy.deepcopy(config_partial)
- config_partial['ldap'].pop('url')
- res = self.domain_config_api.get_config(self.domain['id'])
- self.assertEqual(config_partial, res)
-
- def test_get_options_not_in_domain_config(self):
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.get_config, self.domain['id'])
- config = {'ldap': {'url': uuid.uuid4().hex}}
-
- self.domain_config_api.create_config(self.domain['id'], config)
-
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.get_config, self.domain['id'],
- group='identity')
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.get_config, self.domain['id'],
- group='ldap', option='user_tree_dn')
-
- def test_get_sensitive_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual({}, res)
- self.domain_config_api.create_config(self.domain['id'], config)
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual(config, res)
-
- def test_update_partial_domain_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
-
- # Try updating a group
- new_config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_filter': uuid.uuid4().hex}}
- res = self.domain_config_api.update_config(
- self.domain['id'], new_config, group='ldap')
- expected_config = copy.deepcopy(config)
- expected_config['ldap']['url'] = new_config['ldap']['url']
- expected_config['ldap']['user_filter'] = (
- new_config['ldap']['user_filter'])
- expected_full_config = copy.deepcopy(expected_config)
- expected_config['ldap'].pop('password')
- res = self.domain_config_api.get_config(self.domain['id'])
- self.assertEqual(expected_config, res)
- # The sensitive option should still exist
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual(expected_full_config, res)
-
- # Try updating a single whitelisted option
- self.domain_config_api.delete_config(self.domain['id'])
- self.domain_config_api.create_config(self.domain['id'], config)
- new_config = {'url': uuid.uuid4().hex}
- res = self.domain_config_api.update_config(
- self.domain['id'], new_config, group='ldap', option='url')
-
- # Make sure whitelisted and full config is updated
- expected_whitelisted_config = copy.deepcopy(config)
- expected_whitelisted_config['ldap']['url'] = new_config['url']
- expected_full_config = copy.deepcopy(expected_whitelisted_config)
- expected_whitelisted_config['ldap'].pop('password')
- self.assertEqual(expected_whitelisted_config, res)
- res = self.domain_config_api.get_config(self.domain['id'])
- self.assertEqual(expected_whitelisted_config, res)
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual(expected_full_config, res)
-
- # Try updating a single sensitive option
- self.domain_config_api.delete_config(self.domain['id'])
- self.domain_config_api.create_config(self.domain['id'], config)
- new_config = {'password': uuid.uuid4().hex}
- res = self.domain_config_api.update_config(
- self.domain['id'], new_config, group='ldap', option='password')
- # The whitelisted config should not have changed...
- expected_whitelisted_config = copy.deepcopy(config)
- expected_full_config = copy.deepcopy(config)
- expected_whitelisted_config['ldap'].pop('password')
- self.assertEqual(expected_whitelisted_config, res)
- res = self.domain_config_api.get_config(self.domain['id'])
- self.assertEqual(expected_whitelisted_config, res)
- expected_full_config['ldap']['password'] = new_config['password']
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- # ...but the sensitive piece should have.
- self.assertEqual(expected_full_config, res)
-
- def test_update_invalid_partial_domain_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- # An extra group, when specifying one group should fail
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config, group='ldap')
- # An extra option, when specifying one option should fail
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config['ldap'],
- group='ldap', option='url')
-
- # Now try the right number of groups/options, but just not
- # ones that are in the config provided
- config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config, group='identity')
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config['ldap'], group='ldap',
- option='url')
-
- # Now some valid groups/options, but just not ones that are in the
- # existing config
- config = {'ldap': {'user_tree_dn': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
- config_wrong_group = {'identity': {'driver': uuid.uuid4().hex}}
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.update_config,
- self.domain['id'], config_wrong_group,
- group='identity')
- config_wrong_option = {'url': uuid.uuid4().hex}
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.update_config,
- self.domain['id'], config_wrong_option,
- group='ldap', option='url')
-
- # And finally just some bad groups/options
- bad_group = uuid.uuid4().hex
- config = {bad_group: {'user': uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config, group=bad_group,
- option='user')
- bad_option = uuid.uuid4().hex
- config = {'ldap': {bad_option: uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.update_config,
- self.domain['id'], config, group='ldap',
- option=bad_option)
-
- def test_create_invalid_domain_config(self):
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.create_config,
- self.domain['id'], {})
- config = {uuid.uuid4().hex: uuid.uuid4().hex}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.create_config,
- self.domain['id'], config)
- config = {uuid.uuid4().hex: {uuid.uuid4().hex: uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.create_config,
- self.domain['id'], config)
- config = {'ldap': {uuid.uuid4().hex: uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.create_config,
- self.domain['id'], config)
- # Try an option that IS in the standard conf, but neither whitelisted
- # or marked as sensitive
- config = {'identity': {'user_tree_dn': uuid.uuid4().hex}}
- self.assertRaises(exception.InvalidDomainConfig,
- self.domain_config_api.create_config,
- self.domain['id'], config)
-
- def test_delete_invalid_partial_domain_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
- # Try deleting a group not in the config
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.delete_config,
- self.domain['id'], group='identity')
- # Try deleting an option not in the config
- self.assertRaises(exception.DomainConfigNotFound,
- self.domain_config_api.delete_config,
- self.domain['id'],
- group='ldap', option='user_tree_dn')
-
- def test_sensitive_substitution_in_domain_config(self):
- # Create a config that contains a whitelisted option that requires
- # substitution of a sensitive option.
- config = {'ldap': {'url': 'my_url/%(password)s',
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
-
- # Read back the config with the internal method and ensure that the
- # substitution has taken place.
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- expected_url = (
- config['ldap']['url'] % {'password': config['ldap']['password']})
- self.assertEqual(expected_url, res['ldap']['url'])
-
- def test_invalid_sensitive_substitution_in_domain_config(self):
- """Check that invalid substitutions raise warnings."""
- mock_log = mock.Mock()
-
- invalid_option_config = {
- 'ldap': {'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
-
- for invalid_option in ['my_url/%(passssword)s',
- 'my_url/%(password',
- 'my_url/%(password)',
- 'my_url/%(password)d']:
- invalid_option_config['ldap']['url'] = invalid_option
- self.domain_config_api.create_config(
- self.domain['id'], invalid_option_config)
-
- with mock.patch('keystone.resource.core.LOG', mock_log):
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- mock_log.warning.assert_any_call(mock.ANY)
- self.assertEqual(
- invalid_option_config['ldap']['url'], res['ldap']['url'])
-
- def test_escaped_sequence_in_domain_config(self):
- """Check that escaped '%(' doesn't get interpreted."""
- mock_log = mock.Mock()
-
- escaped_option_config = {
- 'ldap': {'url': 'my_url/%%(password)s',
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
-
- self.domain_config_api.create_config(
- self.domain['id'], escaped_option_config)
-
- with mock.patch('keystone.resource.core.LOG', mock_log):
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertFalse(mock_log.warn.called)
- # The escaping '%' should have been removed
- self.assertEqual('my_url/%(password)s', res['ldap']['url'])
-
- @unit.skip_if_cache_disabled('domain_config')
- def test_cache_layer_get_sensitive_config(self):
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex},
- 'identity': {'driver': uuid.uuid4().hex}}
- self.domain_config_api.create_config(self.domain['id'], config)
- # cache the result
- res = self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id'])
- self.assertEqual(config, res)
-
- # delete, bypassing domain config manager api
- self.domain_config_api.delete_config_options(self.domain['id'])
- self.domain_config_api.delete_config_options(self.domain['id'],
- sensitive=True)
-
- self.assertDictEqual(
- res, self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id']))
- self.domain_config_api.get_config_with_sensitive_info.invalidate(
- self.domain_config_api, self.domain['id'])
- self.assertDictEqual(
- {},
- self.domain_config_api.get_config_with_sensitive_info(
- self.domain['id']))
-
- def test_delete_domain_deletes_configs(self):
- """Test domain deletion clears the domain configs."""
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- config = {'ldap': {'url': uuid.uuid4().hex,
- 'user_tree_dn': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex}}
- self.domain_config_api.create_config(domain['id'], config)
-
- # Now delete the domain
- domain['enabled'] = False
- self.resource_api.update_domain(domain['id'], domain)
- self.resource_api.delete_domain(domain['id'])
-
- # Check domain configs have also been deleted
- self.assertRaises(
- exception.DomainConfigNotFound,
- self.domain_config_api.get_config,
- domain['id'])
-
- # The get_config_with_sensitive_info does not throw an exception if
- # the config is empty, it just returns an empty dict
- self.assertDictEqual(
- {},
- self.domain_config_api.get_config_with_sensitive_info(
- domain['id']))
-
- def test_config_registration(self):
- type = uuid.uuid4().hex
- self.domain_config_api.obtain_registration(
- self.domain['id'], type)
- self.domain_config_api.release_registration(
- self.domain['id'], type=type)
-
- # Make sure that once someone has it, nobody else can get it.
- # This includes the domain who already has it.
- self.domain_config_api.obtain_registration(
- self.domain['id'], type)
- self.assertFalse(
- self.domain_config_api.obtain_registration(
- self.domain['id'], type))
-
- # Make sure we can read who does have it
- self.assertEqual(
- self.domain['id'],
- self.domain_config_api.read_registration(type))
-
- # Make sure releasing it is silent if the domain specified doesn't
- # have the registration
- domain2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
- self.resource_api.create_domain(domain2['id'], domain2)
- self.domain_config_api.release_registration(
- domain2['id'], type=type)
-
- # If nobody has the type registered, then trying to read it should
- # raise ConfigRegistrationNotFound
- self.domain_config_api.release_registration(
- self.domain['id'], type=type)
- self.assertRaises(exception.ConfigRegistrationNotFound,
- self.domain_config_api.read_registration,
- type)
-
- # Finally check multiple registrations are cleared if you free the
- # registration without specifying the type
- type2 = uuid.uuid4().hex
- self.domain_config_api.obtain_registration(
- self.domain['id'], type)
- self.domain_config_api.obtain_registration(
- self.domain['id'], type2)
- self.domain_config_api.release_registration(self.domain['id'])
- self.assertRaises(exception.ConfigRegistrationNotFound,
- self.domain_config_api.read_registration,
- type)
- self.assertRaises(exception.ConfigRegistrationNotFound,
- self.domain_config_api.read_registration,
- type2)