diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_ldap.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_backend_ldap.py | 572 |
1 files changed, 375 insertions, 197 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py index 10119808..94fb82e7 100644 --- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py +++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py @@ -20,27 +20,92 @@ import uuid import ldap import mock from oslo_config import cfg +import pkg_resources +from six.moves import range from testtools import matchers from keystone.common import cache from keystone.common import ldap as common_ldap from keystone.common.ldap import core as common_ldap_core -from keystone.common import sql from keystone import exception from keystone import identity from keystone.identity.mapping_backends import mapping as map from keystone import resource from keystone.tests import unit as tests from keystone.tests.unit import default_fixtures -from keystone.tests.unit import fakeldap from keystone.tests.unit import identity_mapping as mapping_sql from keystone.tests.unit.ksfixtures import database +from keystone.tests.unit.ksfixtures import ldapdb from keystone.tests.unit import test_backend CONF = cfg.CONF +def _assert_backends(testcase, **kwargs): + + def _get_backend_cls(testcase, subsystem): + observed_backend = getattr(testcase, subsystem + '_api').driver + return observed_backend.__class__ + + def _get_domain_specific_backend_cls(manager, domain): + observed_backend = manager.domain_configs.get_domain_driver(domain) + return observed_backend.__class__ + + def _get_entrypoint_cls(subsystem, name): + entrypoint = entrypoint_map['keystone.' + subsystem][name] + return entrypoint.resolve() + + def _load_domain_specific_configs(manager): + if (not manager.domain_configs.configured and + CONF.identity.domain_specific_drivers_enabled): + manager.domain_configs.setup_domain_drivers( + manager.driver, manager.resource_api) + + def _assert_equal(expected_cls, observed_cls, subsystem, + domain=None): + msg = ('subsystem %(subsystem)s expected %(expected_cls)r, ' + 'but observed %(observed_cls)r') + if domain: + subsystem = '%s[domain=%s]' % (subsystem, domain) + assert expected_cls == observed_cls, msg % { + 'expected_cls': expected_cls, + 'observed_cls': observed_cls, + 'subsystem': subsystem, + } + + env = pkg_resources.Environment() + keystone_dist = env['keystone'][0] + entrypoint_map = pkg_resources.get_entry_map(keystone_dist) + + for subsystem, entrypoint_name in kwargs.items(): + if isinstance(entrypoint_name, str): + observed_cls = _get_backend_cls(testcase, subsystem) + expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name) + _assert_equal(expected_cls, observed_cls, subsystem) + + elif isinstance(entrypoint_name, dict): + manager = getattr(testcase, subsystem + '_api') + _load_domain_specific_configs(manager) + + for domain, entrypoint_name in entrypoint_name.items(): + if domain is None: + observed_cls = _get_backend_cls(testcase, subsystem) + expected_cls = _get_entrypoint_cls( + subsystem, entrypoint_name) + _assert_equal(expected_cls, observed_cls, subsystem) + continue + + observed_cls = _get_domain_specific_backend_cls( + manager, domain) + expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name) + _assert_equal(expected_cls, observed_cls, subsystem, domain) + + else: + raise ValueError('%r is not an expected value for entrypoint name' + % entrypoint_name) + + def create_group_container(identity_api): # Create the groups base entry (ou=Groups,cn=example,cn=com) group_api = identity_api.driver.group @@ -54,35 +119,22 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def setUp(self): super(BaseLDAPIdentity, self).setUp() - self.clear_database() + self.ldapdb = self.useFixture(ldapdb.LDAPDatabase()) - common_ldap.register_handler('fake://', fakeldap.FakeLdap) self.load_backends() self.load_fixtures(default_fixtures) - self.addCleanup(common_ldap_core._HANDLERS.clear) - def _get_domain_fixture(self): """Domains in LDAP are read-only, so just return the static one.""" return self.resource_api.get_domain(CONF.identity.default_domain_id) - def clear_database(self): - for shelf in fakeldap.FakeShelves: - fakeldap.FakeShelves[shelf].clear() - - def reload_backends(self, domain_id): - # Only one backend unless we are using separate domain backends - self.load_backends() - def get_config(self, domain_id): # Only one conf structure unless we are using separate domain backends return CONF def config_overrides(self): super(BaseLDAPIdentity, self).config_overrides() - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') + self.config_fixture.config(group='identity', driver='ldap') def config_files(self): config_files = super(BaseLDAPIdentity, self).config_files() @@ -127,11 +179,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests): user['id']) def test_configurable_forbidden_user_actions(self): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_allow_create = False - conf.ldap.user_allow_update = False - conf.ldap.user_allow_delete = False - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.allow_create = False + driver.user.allow_update = False + driver.user.allow_delete = False user = {'name': u'fäké1', 'password': u'fäképass1', @@ -152,9 +204,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.user_foo['id']) def test_configurable_forbidden_create_existing_user(self): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_allow_create = False - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.allow_create = False self.assertRaises(exception.ForbiddenAction, self.identity_api.create_user, @@ -165,9 +217,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.user_foo.pop('password') self.assertDictEqual(user_ref, self.user_foo) - conf = self.get_config(user_ref['domain_id']) - conf.ldap.user_filter = '(CN=DOES_NOT_MATCH)' - self.reload_backends(user_ref['domain_id']) + driver = self.identity_api._select_identity_driver( + user_ref['domain_id']) + driver.user.ldap_filter = '(CN=DOES_NOT_MATCH)' # invalidate the cache if the result is cached. self.identity_api.get_user.invalidate(self.identity_api, self.user_foo['id']) @@ -468,9 +520,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests): after_assignments = len(self.assignment_api.list_role_assignments()) self.assertEqual(existing_assignments + 2, after_assignments) + def test_list_role_assignments_filtered_by_role(self): + # Domain roles are not supported by the LDAP Assignment backend + self.assertRaises( + exception.NotImplemented, + super(BaseLDAPIdentity, self). + test_list_role_assignments_filtered_by_role) + def test_list_role_assignments_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -495,7 +554,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_user_ids_for_project_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -569,7 +628,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_group_members_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -686,11 +745,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_create_user_none_mapping(self): # When create a user where an attribute maps to None, the entry is # created without that attribute and it doesn't fail with a TypeError. - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_attribute_ignore = ['enabled', 'email', - 'tenants', 'tenantId'] - self.reload_backends(CONF.identity.default_domain_id) - + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.attribute_ignore = ['enabled', 'email', + 'tenants', 'tenantId'] user = {'name': u'fäké1', 'password': u'fäképass1', 'domain_id': CONF.identity.default_domain_id, @@ -723,10 +781,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests): # Ensure that an attribute that maps to None that is not explicitly # ignored in configuration is implicitly ignored without triggering # an error. - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_attribute_ignore = ['enabled', 'email', - 'tenants', 'tenantId'] - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.attribute_ignore = ['enabled', 'email', + 'tenants', 'tenantId'] user = {'name': u'fäké1', 'password': u'fäképass1', @@ -930,6 +988,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): # credentials) that require a database. self.useFixture(database.Database()) super(LDAPIdentity, self).setUp() + _assert_backends(self, + assignment='ldap', + identity='ldap', + resource='ldap') def load_fixtures(self, fixtures): # Override super impl since need to create group container. @@ -937,7 +999,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): super(LDAPIdentity, self).load_fixtures(fixtures) def test_configurable_allowed_project_actions(self): - tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True} + domain = self._get_domain_fixture() + tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True, + 'domain_id': domain['id']} self.resource_api.create_project(u'fäké1', tenant) tenant_ref = self.resource_api.get_project(u'fäké1') self.assertEqual(u'fäké1', tenant_ref['id']) @@ -990,7 +1054,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): project_allow_update=False, project_allow_delete=False) self.load_backends() - tenant = {'id': u'fäké1', 'name': u'fäké1'} + domain = self._get_domain_fixture() + tenant = {'id': u'fäké1', 'name': u'fäké1', 'domain_id': domain['id']} self.assertRaises(exception.ForbiddenAction, self.resource_api.create_project, u'fäké1', @@ -1029,7 +1094,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): def test_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) dumb_id = common_ldap.BaseLdap._dn_to_id(CONF.ldap.dumb_member) @@ -1042,7 +1107,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): group='ldap', project_name_attribute='ou', project_desc_attribute='description', project_enabled_attribute='enabled') - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) # NOTE(morganfainberg): CONF.ldap.project_name_attribute, @@ -1087,7 +1152,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): self.config_fixture.config( group='ldap', project_attribute_ignore=['name', 'description', 'enabled']) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) # NOTE(morganfainberg): CONF.ldap.project_attribute_ignore will not be @@ -1107,7 +1172,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): def test_user_enable_attribute_mask(self): self.config_fixture.config(group='ldap', user_enabled_mask=2, user_enabled_default='512') - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -1155,7 +1220,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): def test_user_enabled_invert(self): self.config_fixture.config(group='ldap', user_enabled_invert=True, user_enabled_default=False) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -1426,6 +1491,26 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): new_user = [u for u in res if u['id'] == user['id']][0] self.assertThat(new_user['description'], matchers.Equals(description)) + def test_user_with_missing_id(self): + # create a user that doesn't have the id attribute + ldap_ = self.identity_api.driver.user.get_connection() + # `sn` is used for the attribute in the DN because it's allowed by + # the entry's objectclasses so that this test could conceivably run in + # the live tests. + ldap_id_field = 'sn' + ldap_id_value = uuid.uuid4().hex + dn = '%s=%s,ou=Users,cn=example,cn=com' % (ldap_id_field, + ldap_id_value) + modlist = [('objectClass', ['person', 'inetOrgPerson']), + (ldap_id_field, [ldap_id_value]), + ('mail', ['email@example.com']), + ('userPassword', [uuid.uuid4().hex])] + ldap_.add_s(dn, modlist) + + # make sure the user doesn't break other users + users = self.identity_api.driver.user.get_all() + self.assertThat(users, matchers.HasLength(len(default_fixtures.USERS))) + @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get') def test_user_mixed_case_attribute(self, mock_ldap_get): # Mock the search results to return attribute names @@ -1531,7 +1616,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): 'domain_id': CONF.identity.default_domain_id, 'description': uuid.uuid4().hex, 'enabled': True, - 'parent_id': None} + 'parent_id': None, + 'is_domain': False} self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) @@ -1609,7 +1695,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): 'description': '', 'domain_id': domain['id'], 'enabled': True, - 'parent_id': None} + 'parent_id': None, + 'is_domain': False} self.resource_api.create_project(project1['id'], project1) # Creating project2 under project1. LDAP will not allow @@ -1619,7 +1706,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): 'description': '', 'domain_id': domain['id'], 'enabled': True, - 'parent_id': project1['id']} + 'parent_id': project1['id'], + 'is_domain': False} self.assertRaises(exception.InvalidParentProject, self.resource_api.create_project, @@ -1633,6 +1721,58 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): # Returning projects to be used across the tests return [project1, project2] + def _assert_create_is_domain_project_not_allowed(self): + """Tests that we can't create more than one project acting as domain. + + This method will be used at any test that require the creation of a + project that act as a domain. LDAP does not support multiple domains + and the only domain it has (default) is immutable. + """ + domain = self._get_domain_fixture() + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'description': '', + 'domain_id': domain['id'], + 'enabled': True, + 'parent_id': None, + 'is_domain': True} + + self.assertRaises(exception.ValidationError, + self.resource_api.create_project, + project['id'], project) + + def test_update_is_domain_field(self): + domain = self._get_domain_fixture() + project = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'description': '', + 'domain_id': domain['id'], + 'enabled': True, + 'parent_id': None, + 'is_domain': False} + self.resource_api.create_project(project['id'], project) + + # Try to update the is_domain field to True + project['is_domain'] = True + self.assertRaises(exception.ValidationError, + self.resource_api.update_project, + project['id'], project) + + def test_delete_is_domain_project(self): + self._assert_create_is_domain_project_not_allowed() + + def test_create_domain_under_regular_project_hierarchy_fails(self): + self._assert_create_hierarchy_not_allowed() + + def test_create_not_is_domain_project_under_is_domain_hierarchy(self): + self._assert_create_hierarchy_not_allowed() + + def test_create_is_domain_project(self): + self._assert_create_is_domain_project_not_allowed() + + def test_create_project_with_parent_id_and_without_domain_id(self): + self._assert_create_hierarchy_not_allowed() + def test_check_leaf_projects(self): projects = self._assert_create_hierarchy_not_allowed() for project in projects: @@ -1642,13 +1782,17 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): projects = self._assert_create_hierarchy_not_allowed() for project in projects: subtree_list = self.resource_api.list_projects_in_subtree( - project) + project['id']) self.assertEqual(0, len(subtree_list)) + def test_list_projects_in_subtree_with_circular_reference(self): + self._assert_create_hierarchy_not_allowed() + def test_list_project_parents(self): projects = self._assert_create_hierarchy_not_allowed() for project in projects: - parents_list = self.resource_api.list_project_parents(project) + parents_list = self.resource_api.list_project_parents( + project['id']) self.assertEqual(0, len(parents_list)) def test_hierarchical_projects_crud(self): @@ -1826,9 +1970,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): self.assertEqual(set(expected_group_ids), group_ids) def test_user_id_attribute_in_create(self): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_id_attribute = 'mail' - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.id_attr = 'mail' user = {'name': u'fäké1', 'password': u'fäképass1', @@ -1840,9 +1984,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): self.assertEqual(user_ref['id'], user_ref['email']) def test_user_id_attribute_map(self): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_id_attribute = 'mail' - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.id_attr = 'mail' user_ref = self.identity_api.get_user(self.user_foo['email']) # the user_id_attribute map should be honored, which means @@ -1851,9 +1995,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get') def test_get_id_from_dn_for_multivalued_attribute_id(self, mock_ldap_get): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_id_attribute = 'mail' - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.id_attr = 'mail' # make 'email' multivalued so we can test the error condition email1 = uuid.uuid4().hex @@ -1888,10 +2032,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get') def test_user_id_not_in_dn(self, mock_ldap_get): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_id_attribute = 'uid' - conf.ldap.user_name_attribute = 'cn' - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.id_attr = 'uid' + driver.user.attribute_mapping['name'] = 'cn' mock_ldap_get.return_value = ( 'foo=bar,dc=example,dc=com', @@ -1908,10 +2052,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get') def test_user_name_in_dn(self, mock_ldap_get): - conf = self.get_config(CONF.identity.default_domain_id) - conf.ldap.user_id_attribute = 'sAMAccountName' - conf.ldap.user_name_attribute = 'cn' - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.id_attr = 'SAMAccountName' + driver.user.attribute_mapping['name'] = 'cn' mock_ldap_get.return_value = ( 'cn=Foo Bar,dc=example,dc=com', @@ -1929,12 +2073,16 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase): class LDAPIdentityEnabledEmulation(LDAPIdentity): def setUp(self): super(LDAPIdentityEnabledEmulation, self).setUp() - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) for obj in [self.tenant_bar, self.tenant_baz, self.user_foo, self.user_two, self.user_badguy]: obj.setdefault('enabled', True) + _assert_backends(self, + assignment='ldap', + identity='ldap', + resource='ldap') def load_fixtures(self, fixtures): # Override super impl since need to create group container. @@ -1961,7 +2109,8 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): 'name': uuid.uuid4().hex, 'domain_id': CONF.identity.default_domain_id, 'description': uuid.uuid4().hex, - 'parent_id': None} + 'parent_id': None, + 'is_domain': False} self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) @@ -2007,9 +2156,9 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): user['id']) def test_user_auth_emulated(self): - self.config_fixture.config(group='ldap', - user_enabled_emulation_dn='cn=test,dc=test') - self.reload_backends(CONF.identity.default_domain_id) + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) + driver.user.enabled_emulation_dn = 'cn=test,dc=test' self.identity_api.authenticate( context={}, user_id=self.user_foo['id'], @@ -2022,7 +2171,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): def test_user_enabled_invert(self): self.config_fixture.config(group='ldap', user_enabled_invert=True, user_enabled_default=False) - self.clear_database() + self.ldapdb.clear() self.load_backends() self.load_fixtures(default_fixtures) @@ -2110,32 +2259,26 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides, return config_files def setUp(self): - self.useFixture(database.Database()) + sqldb = self.useFixture(database.Database()) super(LdapIdentitySqlAssignment, self).setUp() - self.clear_database() + self.ldapdb.clear() self.load_backends() cache.configure_cache_region(cache.REGION) - self.engine = sql.get_engine() - self.addCleanup(sql.cleanup) - - sql.ModelBase.metadata.create_all(bind=self.engine) - self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) + sqldb.recreate() self.load_fixtures(default_fixtures) # defaulted by the data load self.user_foo['enabled'] = True + _assert_backends(self, + assignment='sql', + identity='ldap', + resource='sql') def config_overrides(self): super(LdapIdentitySqlAssignment, self).config_overrides() - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') - self.config_fixture.config( - group='resource', - driver='keystone.resource.backends.sql.Resource') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='resource', driver='sql') + self.config_fixture.config(group='assignment', driver='sql') def test_domain_crud(self): pass @@ -2214,6 +2357,11 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides, self.skipTest("Doesn't apply since LDAP configuration is ignored for " "SQL assignment backend.") + def test_list_role_assignments_filtered_by_role(self): + # Domain roles are supported by the SQL Assignment backend + base = super(BaseLDAPIdentity, self) + base.test_list_role_assignments_filtered_by_role() + class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment): """Class to test mapping of default LDAP backend. @@ -2390,16 +2538,11 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides, """ def setUp(self): - self.useFixture(database.Database()) + sqldb = self.useFixture(database.Database()) super(MultiLDAPandSQLIdentity, self).setUp() self.load_backends() - - self.engine = sql.get_engine() - self.addCleanup(sql.cleanup) - - sql.ModelBase.metadata.create_all(bind=self.engine) - self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) + sqldb.recreate() self.domain_count = 5 self.domain_specific_count = 3 @@ -2410,23 +2553,29 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides, # for separate backends per domain. self.enable_multi_domain() - self.clear_database() + self.ldapdb.clear() self.load_fixtures(default_fixtures) self.create_users_across_domains() + self.assert_backends() + + def assert_backends(self): + _assert_backends(self, + assignment='sql', + identity={ + None: 'sql', + self.domains['domain_default']['id']: 'ldap', + self.domains['domain1']['id']: 'ldap', + self.domains['domain2']['id']: 'ldap', + }, + resource='sql') def config_overrides(self): super(MultiLDAPandSQLIdentity, self).config_overrides() # Make sure identity and assignment are actually SQL drivers, # BaseLDAPIdentity sets these options to use LDAP. - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.sql.Identity') - self.config_fixture.config( - group='resource', - driver='keystone.resource.backends.sql.Resource') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + self.config_fixture.config(group='identity', driver='sql') + self.config_fixture.config(group='resource', driver='sql') + self.config_fixture.config(group='assignment', driver='sql') def _setup_initial_users(self): # Create some identity entities BEFORE we switch to multi-backend, so @@ -2453,11 +2602,6 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides, self.config_fixture.config(group='identity_mapping', backward_compatible_ids=False) - def reload_backends(self, domain_id): - # Just reload the driver for this domain - which will pickup - # any updated cfg - self.identity_api.domain_configs.reload_domain_driver(domain_id) - def get_config(self, domain_id): # Get the config for this domain, will return CONF # if no specific config defined for this domain @@ -2619,7 +2763,8 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides, 'domain_id': domain['id'], 'description': uuid.uuid4().hex, 'parent_id': None, - 'enabled': True} + 'enabled': True, + 'is_domain': False} self.resource_api.create_domain(domain['id'], domain) self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) @@ -2653,6 +2798,11 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides, self.skipTest("Doesn't apply since LDAP configuration is ignored for " "SQL assignment backend.") + def test_list_role_assignments_filtered_by_role(self): + # Domain roles are supported by the SQL Assignment backend + base = super(BaseLDAPIdentity, self) + base.test_list_role_assignments_filtered_by_role() + class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): """Class to test the use of domain configs stored in the database. @@ -2662,6 +2812,18 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): database. """ + + def assert_backends(self): + _assert_backends(self, + assignment='sql', + identity={ + None: 'sql', + self.domains['domain_default']['id']: 'ldap', + self.domains['domain1']['id']: 'ldap', + self.domains['domain2']['id']: 'ldap', + }, + resource='sql') + def enable_multi_domain(self): # The values below are the same as in the domain_configs_multi_ldap # cdirectory of test config_files. @@ -2670,14 +2832,14 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, - 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'} + 'identity': {'driver': 'ldap'} } domain1_config = { 'ldap': {'url': 'fake://memory1', 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, - 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'} + 'identity': {'driver': 'ldap'} } domain2_config = { 'ldap': {'url': 'fake://memory', @@ -2686,7 +2848,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): 'suffix': 'cn=myroot,cn=com', 'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org', 'user_tree_dn': 'ou=Users,dc=myroot,dc=org'}, - 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'} + 'identity': {'driver': 'ldap'} } self.domain_config_api.create_config(CONF.identity.default_domain_id, @@ -2725,6 +2887,48 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): CONF.identity.default_domain_id)) self.assertEqual(CONF.ldap.url, default_config.ldap.url) + def test_reloading_domain_config(self): + """Ensure domain drivers are reloaded on a config modification.""" + + domain_cfgs = self.identity_api.domain_configs + + # Create a new config for the default domain, hence overwriting the + # current settings. + new_config = { + 'ldap': {'url': uuid.uuid4().hex}, + 'identity': {'driver': 'ldap'}} + self.domain_config_api.create_config( + CONF.identity.default_domain_id, new_config) + default_config = ( + domain_cfgs.get_domain_conf(CONF.identity.default_domain_id)) + self.assertEqual(new_config['ldap']['url'], default_config.ldap.url) + + # Ensure updating is also honored + updated_config = {'url': uuid.uuid4().hex} + self.domain_config_api.update_config( + CONF.identity.default_domain_id, updated_config, + group='ldap', option='url') + default_config = ( + domain_cfgs.get_domain_conf(CONF.identity.default_domain_id)) + self.assertEqual(updated_config['url'], default_config.ldap.url) + + # ...and finally ensure delete causes the driver to get the standard + # config again. + self.domain_config_api.delete_config(CONF.identity.default_domain_id) + default_config = ( + domain_cfgs.get_domain_conf(CONF.identity.default_domain_id)) + self.assertEqual(CONF.ldap.url, default_config.ldap.url) + + def test_setting_sql_driver_raises_exception(self): + """Ensure setting of domain specific sql driver is prevented.""" + + new_config = {'identity': {'driver': 'sql'}} + self.domain_config_api.create_config( + CONF.identity.default_domain_id, new_config) + self.assertRaises(exception.InvalidDomainConfig, + self.identity_api.domain_configs.get_domain_conf, + CONF.identity.default_domain_id) + class DomainSpecificLDAPandSQLIdentity( BaseLDAPIdentity, tests.SQLDriverOverrides, tests.TestCase, @@ -2740,11 +2944,11 @@ class DomainSpecificLDAPandSQLIdentity( """ def setUp(self): - self.useFixture(database.Database()) + sqldb = self.useFixture(database.Database()) super(DomainSpecificLDAPandSQLIdentity, self).setUp() - self.initial_setup() + self.initial_setup(sqldb) - def initial_setup(self): + def initial_setup(self, sqldb): # We aren't setting up any initial data ahead of switching to # domain-specific operation, so make the switch straight away. self.config_fixture.config( @@ -2755,37 +2959,33 @@ class DomainSpecificLDAPandSQLIdentity( backward_compatible_ids=False) self.load_backends() - - self.engine = sql.get_engine() - self.addCleanup(sql.cleanup) - - sql.ModelBase.metadata.create_all(bind=self.engine) - self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) + sqldb.recreate() self.domain_count = 2 self.domain_specific_count = 2 self.setup_initial_domains() self.users = {} - self.clear_database() + self.ldapdb.clear() self.load_fixtures(default_fixtures) self.create_users_across_domains() + _assert_backends( + self, + assignment='sql', + identity={ + None: 'ldap', + 'default': 'ldap', + self.domains['domain1']['id']: 'sql', + }, + resource='sql') + def config_overrides(self): super(DomainSpecificLDAPandSQLIdentity, self).config_overrides() # Make sure resource & assignment are actually SQL drivers, # BaseLDAPIdentity causes this option to use LDAP. - self.config_fixture.config( - group='resource', - driver='keystone.resource.backends.sql.Resource') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') - - def reload_backends(self, domain_id): - # Just reload the driver for this domain - which will pickup - # any updated cfg - self.identity_api.domain_configs.reload_domain_driver(domain_id) + self.config_fixture.config(group='resource', driver='sql') + self.config_fixture.config(group='assignment', driver='sql') def get_config(self, domain_id): # Get the config for this domain, will return CONF @@ -2889,6 +3089,11 @@ class DomainSpecificLDAPandSQLIdentity( self.skipTest("Doesn't apply since LDAP configuration is ignored for " "SQL assignment backend.") + def test_list_role_assignments_filtered_by_role(self): + # Domain roles are supported by the SQL Assignment backend + base = super(BaseLDAPIdentity, self) + base.test_list_role_assignments_filtered_by_role() + class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): """Class to test simplest use of domain-specific SQL driver. @@ -2902,7 +3107,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): - A separate SQL backend for domain1 """ - def initial_setup(self): + def initial_setup(self, sqldb): # We aren't setting up any initial data ahead of switching to # domain-specific operation, so make the switch straight away. self.config_fixture.config( @@ -2916,12 +3121,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): backward_compatible_ids=True) self.load_backends() - - self.engine = sql.get_engine() - self.addCleanup(sql.cleanup) - - sql.ModelBase.metadata.create_all(bind=self.engine) - self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) + sqldb.recreate() self.domain_count = 2 self.domain_specific_count = 1 @@ -2931,17 +3131,16 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): self.load_fixtures(default_fixtures) self.create_users_across_domains() + _assert_backends(self, + assignment='sql', + identity='ldap', + resource='sql') + def config_overrides(self): super(DomainSpecificSQLIdentity, self).config_overrides() - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') - self.config_fixture.config( - group='resource', - driver='keystone.resource.backends.sql.Resource') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='resource', driver='sql') + self.config_fixture.config(group='assignment', driver='sql') def get_config(self, domain_id): if domain_id == CONF.identity.default_domain_id: @@ -2949,36 +3148,20 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): else: return self.identity_api.domain_configs.get_domain_conf(domain_id) - def reload_backends(self, domain_id): - if domain_id == CONF.identity.default_domain_id: - self.load_backends() - else: - # Just reload the driver for this domain - which will pickup - # any updated cfg - self.identity_api.domain_configs.reload_domain_driver(domain_id) - def test_default_sql_plus_sql_specific_driver_fails(self): # First confirm that if ldap is default driver, domain1 can be # loaded as sql - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='assignment', driver='sql') self.load_backends() # Make any identity call to initiate the lazy loading of configs self.identity_api.list_users( domain_scope=CONF.identity.default_domain_id) self.assertIsNotNone(self.get_config(self.domains['domain1']['id'])) - # Now re-initialize, but with sql as the default identity driver - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.sql.Identity') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + # Now re-initialize, but with sql as the identity driver + self.config_fixture.config(group='identity', driver='sql') + self.config_fixture.config(group='assignment', driver='sql') self.load_backends() # Make any identity call to initiate the lazy loading of configs, which # should fail since we would now have two sql drivers. @@ -2987,12 +3170,8 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): domain_scope=CONF.identity.default_domain_id) def test_multiple_sql_specific_drivers_fails(self): - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') - self.config_fixture.config( - group='assignment', - driver='keystone.assignment.backends.sql.Assignment') + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='assignment', driver='sql') self.load_backends() # Ensure default, domain1 and domain2 exist self.domain_count = 3 @@ -3019,31 +3198,30 @@ class LdapFilterTests(test_backend.FilterTests, tests.TestCase): def setUp(self): super(LdapFilterTests, self).setUp() - self.useFixture(database.Database()) - self.clear_database() + sqldb = self.useFixture(database.Database()) + self.useFixture(ldapdb.LDAPDatabase()) - common_ldap.register_handler('fake://', fakeldap.FakeLdap) self.load_backends() self.load_fixtures(default_fixtures) - - self.engine = sql.get_engine() - self.addCleanup(sql.cleanup) - sql.ModelBase.metadata.create_all(bind=self.engine) - - self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine) - self.addCleanup(common_ldap_core._HANDLERS.clear) + sqldb.recreate() + _assert_backends(self, assignment='ldap', identity='ldap') def config_overrides(self): super(LdapFilterTests, self).config_overrides() - self.config_fixture.config( - group='identity', - driver='keystone.identity.backends.ldap.Identity') + self.config_fixture.config(group='identity', driver='ldap') def config_files(self): config_files = super(LdapFilterTests, self).config_files() config_files.append(tests.dirs.tests_conf('backend_ldap.conf')) return config_files - def clear_database(self): - for shelf in fakeldap.FakeShelves: - fakeldap.FakeShelves[shelf].clear() + def test_list_users_in_group_filtered(self): + # The LDAP identity driver currently does not support filtering on the + # listing users for a given group, so will fail this test. + try: + super(LdapFilterTests, self).test_list_users_in_group_filtered() + except matchers.MismatchError: + return + # We shouldn't get here...if we do, it means someone has implemented + # filtering, so we can remove this test override. + self.assertTrue(False) |