summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_backend_ldap.py
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-09-01 16:03:26 +0200
committerasteroide <thomas.duval@orange.com>2015-09-01 16:04:53 +0200
commit92fd2dbfb672d7b2b1cdfd5dd5cf89f7716b3e12 (patch)
tree7ba22297042019e7363fa1d4ad26d1c32c5908c6 /keystone-moon/keystone/tests/unit/test_backend_ldap.py
parent26e753254f3e43399cc76e62892908b7742415e8 (diff)
Update Keystone code from official Github repository with branch Master on 09/01/2015.
Change-Id: I0ff6099e6e2580f87f502002a998bbfe12673498
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_ldap.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_backend_ldap.py572
1 files changed, 375 insertions, 197 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
index 10119808..94fb82e7 100644
--- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py
+++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
@@ -20,27 +20,92 @@ import uuid
import ldap
import mock
from oslo_config import cfg
+import pkg_resources
+from six.moves import range
from testtools import matchers
from keystone.common import cache
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
-from keystone.common import sql
from keystone import exception
from keystone import identity
from keystone.identity.mapping_backends import mapping as map
from keystone import resource
from keystone.tests import unit as tests
from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import fakeldap
from keystone.tests.unit import identity_mapping as mapping_sql
from keystone.tests.unit.ksfixtures import database
+from keystone.tests.unit.ksfixtures import ldapdb
from keystone.tests.unit import test_backend
CONF = cfg.CONF
+def _assert_backends(testcase, **kwargs):
+
+ def _get_backend_cls(testcase, subsystem):
+ observed_backend = getattr(testcase, subsystem + '_api').driver
+ return observed_backend.__class__
+
+ def _get_domain_specific_backend_cls(manager, domain):
+ observed_backend = manager.domain_configs.get_domain_driver(domain)
+ return observed_backend.__class__
+
+ def _get_entrypoint_cls(subsystem, name):
+ entrypoint = entrypoint_map['keystone.' + subsystem][name]
+ return entrypoint.resolve()
+
+ def _load_domain_specific_configs(manager):
+ if (not manager.domain_configs.configured and
+ CONF.identity.domain_specific_drivers_enabled):
+ manager.domain_configs.setup_domain_drivers(
+ manager.driver, manager.resource_api)
+
+ def _assert_equal(expected_cls, observed_cls, subsystem,
+ domain=None):
+ msg = ('subsystem %(subsystem)s expected %(expected_cls)r, '
+ 'but observed %(observed_cls)r')
+ if domain:
+ subsystem = '%s[domain=%s]' % (subsystem, domain)
+ assert expected_cls == observed_cls, msg % {
+ 'expected_cls': expected_cls,
+ 'observed_cls': observed_cls,
+ 'subsystem': subsystem,
+ }
+
+ env = pkg_resources.Environment()
+ keystone_dist = env['keystone'][0]
+ entrypoint_map = pkg_resources.get_entry_map(keystone_dist)
+
+ for subsystem, entrypoint_name in kwargs.items():
+ if isinstance(entrypoint_name, str):
+ observed_cls = _get_backend_cls(testcase, subsystem)
+ expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name)
+ _assert_equal(expected_cls, observed_cls, subsystem)
+
+ elif isinstance(entrypoint_name, dict):
+ manager = getattr(testcase, subsystem + '_api')
+ _load_domain_specific_configs(manager)
+
+ for domain, entrypoint_name in entrypoint_name.items():
+ if domain is None:
+ observed_cls = _get_backend_cls(testcase, subsystem)
+ expected_cls = _get_entrypoint_cls(
+ subsystem, entrypoint_name)
+ _assert_equal(expected_cls, observed_cls, subsystem)
+ continue
+
+ observed_cls = _get_domain_specific_backend_cls(
+ manager, domain)
+ expected_cls = _get_entrypoint_cls(subsystem, entrypoint_name)
+ _assert_equal(expected_cls, observed_cls, subsystem, domain)
+
+ else:
+ raise ValueError('%r is not an expected value for entrypoint name'
+ % entrypoint_name)
+
+
def create_group_container(identity_api):
# Create the groups base entry (ou=Groups,cn=example,cn=com)
group_api = identity_api.driver.group
@@ -54,35 +119,22 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def setUp(self):
super(BaseLDAPIdentity, self).setUp()
- self.clear_database()
+ self.ldapdb = self.useFixture(ldapdb.LDAPDatabase())
- common_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.load_backends()
self.load_fixtures(default_fixtures)
- self.addCleanup(common_ldap_core._HANDLERS.clear)
-
def _get_domain_fixture(self):
"""Domains in LDAP are read-only, so just return the static one."""
return self.resource_api.get_domain(CONF.identity.default_domain_id)
- def clear_database(self):
- for shelf in fakeldap.FakeShelves:
- fakeldap.FakeShelves[shelf].clear()
-
- def reload_backends(self, domain_id):
- # Only one backend unless we are using separate domain backends
- self.load_backends()
-
def get_config(self, domain_id):
# Only one conf structure unless we are using separate domain backends
return CONF
def config_overrides(self):
super(BaseLDAPIdentity, self).config_overrides()
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
+ self.config_fixture.config(group='identity', driver='ldap')
def config_files(self):
config_files = super(BaseLDAPIdentity, self).config_files()
@@ -127,11 +179,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
user['id'])
def test_configurable_forbidden_user_actions(self):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_allow_create = False
- conf.ldap.user_allow_update = False
- conf.ldap.user_allow_delete = False
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.allow_create = False
+ driver.user.allow_update = False
+ driver.user.allow_delete = False
user = {'name': u'fäké1',
'password': u'fäképass1',
@@ -152,9 +204,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.user_foo['id'])
def test_configurable_forbidden_create_existing_user(self):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_allow_create = False
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.allow_create = False
self.assertRaises(exception.ForbiddenAction,
self.identity_api.create_user,
@@ -165,9 +217,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.user_foo.pop('password')
self.assertDictEqual(user_ref, self.user_foo)
- conf = self.get_config(user_ref['domain_id'])
- conf.ldap.user_filter = '(CN=DOES_NOT_MATCH)'
- self.reload_backends(user_ref['domain_id'])
+ driver = self.identity_api._select_identity_driver(
+ user_ref['domain_id'])
+ driver.user.ldap_filter = '(CN=DOES_NOT_MATCH)'
# invalidate the cache if the result is cached.
self.identity_api.get_user.invalidate(self.identity_api,
self.user_foo['id'])
@@ -468,9 +520,16 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
after_assignments = len(self.assignment_api.list_role_assignments())
self.assertEqual(existing_assignments + 2, after_assignments)
+ def test_list_role_assignments_filtered_by_role(self):
+ # Domain roles are not supported by the LDAP Assignment backend
+ self.assertRaises(
+ exception.NotImplemented,
+ super(BaseLDAPIdentity, self).
+ test_list_role_assignments_filtered_by_role)
+
def test_list_role_assignments_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -495,7 +554,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_user_ids_for_project_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -569,7 +628,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_group_members_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -686,11 +745,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_create_user_none_mapping(self):
# When create a user where an attribute maps to None, the entry is
# created without that attribute and it doesn't fail with a TypeError.
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_attribute_ignore = ['enabled', 'email',
- 'tenants', 'tenantId']
- self.reload_backends(CONF.identity.default_domain_id)
-
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.attribute_ignore = ['enabled', 'email',
+ 'tenants', 'tenantId']
user = {'name': u'fäké1',
'password': u'fäképass1',
'domain_id': CONF.identity.default_domain_id,
@@ -723,10 +781,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# Ensure that an attribute that maps to None that is not explicitly
# ignored in configuration is implicitly ignored without triggering
# an error.
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_attribute_ignore = ['enabled', 'email',
- 'tenants', 'tenantId']
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.attribute_ignore = ['enabled', 'email',
+ 'tenants', 'tenantId']
user = {'name': u'fäké1',
'password': u'fäképass1',
@@ -930,6 +988,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
# credentials) that require a database.
self.useFixture(database.Database())
super(LDAPIdentity, self).setUp()
+ _assert_backends(self,
+ assignment='ldap',
+ identity='ldap',
+ resource='ldap')
def load_fixtures(self, fixtures):
# Override super impl since need to create group container.
@@ -937,7 +999,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
super(LDAPIdentity, self).load_fixtures(fixtures)
def test_configurable_allowed_project_actions(self):
- tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True}
+ domain = self._get_domain_fixture()
+ tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True,
+ 'domain_id': domain['id']}
self.resource_api.create_project(u'fäké1', tenant)
tenant_ref = self.resource_api.get_project(u'fäké1')
self.assertEqual(u'fäké1', tenant_ref['id'])
@@ -990,7 +1054,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
project_allow_update=False, project_allow_delete=False)
self.load_backends()
- tenant = {'id': u'fäké1', 'name': u'fäké1'}
+ domain = self._get_domain_fixture()
+ tenant = {'id': u'fäké1', 'name': u'fäké1', 'domain_id': domain['id']}
self.assertRaises(exception.ForbiddenAction,
self.resource_api.create_project,
u'fäké1',
@@ -1029,7 +1094,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
def test_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
dumb_id = common_ldap.BaseLdap._dn_to_id(CONF.ldap.dumb_member)
@@ -1042,7 +1107,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
group='ldap', project_name_attribute='ou',
project_desc_attribute='description',
project_enabled_attribute='enabled')
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
# NOTE(morganfainberg): CONF.ldap.project_name_attribute,
@@ -1087,7 +1152,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
self.config_fixture.config(
group='ldap',
project_attribute_ignore=['name', 'description', 'enabled'])
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
# NOTE(morganfainberg): CONF.ldap.project_attribute_ignore will not be
@@ -1107,7 +1172,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
def test_user_enable_attribute_mask(self):
self.config_fixture.config(group='ldap', user_enabled_mask=2,
user_enabled_default='512')
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -1155,7 +1220,7 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
def test_user_enabled_invert(self):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default=False)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -1426,6 +1491,26 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
new_user = [u for u in res if u['id'] == user['id']][0]
self.assertThat(new_user['description'], matchers.Equals(description))
+ def test_user_with_missing_id(self):
+ # create a user that doesn't have the id attribute
+ ldap_ = self.identity_api.driver.user.get_connection()
+ # `sn` is used for the attribute in the DN because it's allowed by
+ # the entry's objectclasses so that this test could conceivably run in
+ # the live tests.
+ ldap_id_field = 'sn'
+ ldap_id_value = uuid.uuid4().hex
+ dn = '%s=%s,ou=Users,cn=example,cn=com' % (ldap_id_field,
+ ldap_id_value)
+ modlist = [('objectClass', ['person', 'inetOrgPerson']),
+ (ldap_id_field, [ldap_id_value]),
+ ('mail', ['email@example.com']),
+ ('userPassword', [uuid.uuid4().hex])]
+ ldap_.add_s(dn, modlist)
+
+ # make sure the user doesn't break other users
+ users = self.identity_api.driver.user.get_all()
+ self.assertThat(users, matchers.HasLength(len(default_fixtures.USERS)))
+
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
def test_user_mixed_case_attribute(self, mock_ldap_get):
# Mock the search results to return attribute names
@@ -1531,7 +1616,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
'domain_id': CONF.identity.default_domain_id,
'description': uuid.uuid4().hex,
'enabled': True,
- 'parent_id': None}
+ 'parent_id': None,
+ 'is_domain': False}
self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
@@ -1609,7 +1695,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
'description': '',
'domain_id': domain['id'],
'enabled': True,
- 'parent_id': None}
+ 'parent_id': None,
+ 'is_domain': False}
self.resource_api.create_project(project1['id'], project1)
# Creating project2 under project1. LDAP will not allow
@@ -1619,7 +1706,8 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
'description': '',
'domain_id': domain['id'],
'enabled': True,
- 'parent_id': project1['id']}
+ 'parent_id': project1['id'],
+ 'is_domain': False}
self.assertRaises(exception.InvalidParentProject,
self.resource_api.create_project,
@@ -1633,6 +1721,58 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
# Returning projects to be used across the tests
return [project1, project2]
+ def _assert_create_is_domain_project_not_allowed(self):
+ """Tests that we can't create more than one project acting as domain.
+
+ This method will be used at any test that require the creation of a
+ project that act as a domain. LDAP does not support multiple domains
+ and the only domain it has (default) is immutable.
+ """
+ domain = self._get_domain_fixture()
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': '',
+ 'domain_id': domain['id'],
+ 'enabled': True,
+ 'parent_id': None,
+ 'is_domain': True}
+
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.create_project,
+ project['id'], project)
+
+ def test_update_is_domain_field(self):
+ domain = self._get_domain_fixture()
+ project = {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'description': '',
+ 'domain_id': domain['id'],
+ 'enabled': True,
+ 'parent_id': None,
+ 'is_domain': False}
+ self.resource_api.create_project(project['id'], project)
+
+ # Try to update the is_domain field to True
+ project['is_domain'] = True
+ self.assertRaises(exception.ValidationError,
+ self.resource_api.update_project,
+ project['id'], project)
+
+ def test_delete_is_domain_project(self):
+ self._assert_create_is_domain_project_not_allowed()
+
+ def test_create_domain_under_regular_project_hierarchy_fails(self):
+ self._assert_create_hierarchy_not_allowed()
+
+ def test_create_not_is_domain_project_under_is_domain_hierarchy(self):
+ self._assert_create_hierarchy_not_allowed()
+
+ def test_create_is_domain_project(self):
+ self._assert_create_is_domain_project_not_allowed()
+
+ def test_create_project_with_parent_id_and_without_domain_id(self):
+ self._assert_create_hierarchy_not_allowed()
+
def test_check_leaf_projects(self):
projects = self._assert_create_hierarchy_not_allowed()
for project in projects:
@@ -1642,13 +1782,17 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
projects = self._assert_create_hierarchy_not_allowed()
for project in projects:
subtree_list = self.resource_api.list_projects_in_subtree(
- project)
+ project['id'])
self.assertEqual(0, len(subtree_list))
+ def test_list_projects_in_subtree_with_circular_reference(self):
+ self._assert_create_hierarchy_not_allowed()
+
def test_list_project_parents(self):
projects = self._assert_create_hierarchy_not_allowed()
for project in projects:
- parents_list = self.resource_api.list_project_parents(project)
+ parents_list = self.resource_api.list_project_parents(
+ project['id'])
self.assertEqual(0, len(parents_list))
def test_hierarchical_projects_crud(self):
@@ -1826,9 +1970,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
self.assertEqual(set(expected_group_ids), group_ids)
def test_user_id_attribute_in_create(self):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_id_attribute = 'mail'
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.id_attr = 'mail'
user = {'name': u'fäké1',
'password': u'fäképass1',
@@ -1840,9 +1984,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
self.assertEqual(user_ref['id'], user_ref['email'])
def test_user_id_attribute_map(self):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_id_attribute = 'mail'
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.id_attr = 'mail'
user_ref = self.identity_api.get_user(self.user_foo['email'])
# the user_id_attribute map should be honored, which means
@@ -1851,9 +1995,9 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
def test_get_id_from_dn_for_multivalued_attribute_id(self, mock_ldap_get):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_id_attribute = 'mail'
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.id_attr = 'mail'
# make 'email' multivalued so we can test the error condition
email1 = uuid.uuid4().hex
@@ -1888,10 +2032,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
def test_user_id_not_in_dn(self, mock_ldap_get):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_id_attribute = 'uid'
- conf.ldap.user_name_attribute = 'cn'
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.id_attr = 'uid'
+ driver.user.attribute_mapping['name'] = 'cn'
mock_ldap_get.return_value = (
'foo=bar,dc=example,dc=com',
@@ -1908,10 +2052,10 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
@mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get')
def test_user_name_in_dn(self, mock_ldap_get):
- conf = self.get_config(CONF.identity.default_domain_id)
- conf.ldap.user_id_attribute = 'sAMAccountName'
- conf.ldap.user_name_attribute = 'cn'
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.id_attr = 'SAMAccountName'
+ driver.user.attribute_mapping['name'] = 'cn'
mock_ldap_get.return_value = (
'cn=Foo Bar,dc=example,dc=com',
@@ -1929,12 +2073,16 @@ class LDAPIdentity(BaseLDAPIdentity, tests.TestCase):
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
super(LDAPIdentityEnabledEmulation, self).setUp()
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
for obj in [self.tenant_bar, self.tenant_baz, self.user_foo,
self.user_two, self.user_badguy]:
obj.setdefault('enabled', True)
+ _assert_backends(self,
+ assignment='ldap',
+ identity='ldap',
+ resource='ldap')
def load_fixtures(self, fixtures):
# Override super impl since need to create group container.
@@ -1961,7 +2109,8 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'description': uuid.uuid4().hex,
- 'parent_id': None}
+ 'parent_id': None,
+ 'is_domain': False}
self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
@@ -2007,9 +2156,9 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
user['id'])
def test_user_auth_emulated(self):
- self.config_fixture.config(group='ldap',
- user_enabled_emulation_dn='cn=test,dc=test')
- self.reload_backends(CONF.identity.default_domain_id)
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
+ driver.user.enabled_emulation_dn = 'cn=test,dc=test'
self.identity_api.authenticate(
context={},
user_id=self.user_foo['id'],
@@ -2022,7 +2171,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
def test_user_enabled_invert(self):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default=False)
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
self.load_fixtures(default_fixtures)
@@ -2110,32 +2259,26 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides,
return config_files
def setUp(self):
- self.useFixture(database.Database())
+ sqldb = self.useFixture(database.Database())
super(LdapIdentitySqlAssignment, self).setUp()
- self.clear_database()
+ self.ldapdb.clear()
self.load_backends()
cache.configure_cache_region(cache.REGION)
- self.engine = sql.get_engine()
- self.addCleanup(sql.cleanup)
-
- sql.ModelBase.metadata.create_all(bind=self.engine)
- self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
+ sqldb.recreate()
self.load_fixtures(default_fixtures)
# defaulted by the data load
self.user_foo['enabled'] = True
+ _assert_backends(self,
+ assignment='sql',
+ identity='ldap',
+ resource='sql')
def config_overrides(self):
super(LdapIdentitySqlAssignment, self).config_overrides()
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
- self.config_fixture.config(
- group='resource',
- driver='keystone.resource.backends.sql.Resource')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='resource', driver='sql')
+ self.config_fixture.config(group='assignment', driver='sql')
def test_domain_crud(self):
pass
@@ -2214,6 +2357,11 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.skipTest("Doesn't apply since LDAP configuration is ignored for "
"SQL assignment backend.")
+ def test_list_role_assignments_filtered_by_role(self):
+ # Domain roles are supported by the SQL Assignment backend
+ base = super(BaseLDAPIdentity, self)
+ base.test_list_role_assignments_filtered_by_role()
+
class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
"""Class to test mapping of default LDAP backend.
@@ -2390,16 +2538,11 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
"""
def setUp(self):
- self.useFixture(database.Database())
+ sqldb = self.useFixture(database.Database())
super(MultiLDAPandSQLIdentity, self).setUp()
self.load_backends()
-
- self.engine = sql.get_engine()
- self.addCleanup(sql.cleanup)
-
- sql.ModelBase.metadata.create_all(bind=self.engine)
- self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
+ sqldb.recreate()
self.domain_count = 5
self.domain_specific_count = 3
@@ -2410,23 +2553,29 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
# for separate backends per domain.
self.enable_multi_domain()
- self.clear_database()
+ self.ldapdb.clear()
self.load_fixtures(default_fixtures)
self.create_users_across_domains()
+ self.assert_backends()
+
+ def assert_backends(self):
+ _assert_backends(self,
+ assignment='sql',
+ identity={
+ None: 'sql',
+ self.domains['domain_default']['id']: 'ldap',
+ self.domains['domain1']['id']: 'ldap',
+ self.domains['domain2']['id']: 'ldap',
+ },
+ resource='sql')
def config_overrides(self):
super(MultiLDAPandSQLIdentity, self).config_overrides()
# Make sure identity and assignment are actually SQL drivers,
# BaseLDAPIdentity sets these options to use LDAP.
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.sql.Identity')
- self.config_fixture.config(
- group='resource',
- driver='keystone.resource.backends.sql.Resource')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ self.config_fixture.config(group='identity', driver='sql')
+ self.config_fixture.config(group='resource', driver='sql')
+ self.config_fixture.config(group='assignment', driver='sql')
def _setup_initial_users(self):
# Create some identity entities BEFORE we switch to multi-backend, so
@@ -2453,11 +2602,6 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
- def reload_backends(self, domain_id):
- # Just reload the driver for this domain - which will pickup
- # any updated cfg
- self.identity_api.domain_configs.reload_domain_driver(domain_id)
-
def get_config(self, domain_id):
# Get the config for this domain, will return CONF
# if no specific config defined for this domain
@@ -2619,7 +2763,8 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
'domain_id': domain['id'],
'description': uuid.uuid4().hex,
'parent_id': None,
- 'enabled': True}
+ 'enabled': True,
+ 'is_domain': False}
self.resource_api.create_domain(domain['id'], domain)
self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
@@ -2653,6 +2798,11 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.skipTest("Doesn't apply since LDAP configuration is ignored for "
"SQL assignment backend.")
+ def test_list_role_assignments_filtered_by_role(self):
+ # Domain roles are supported by the SQL Assignment backend
+ base = super(BaseLDAPIdentity, self)
+ base.test_list_role_assignments_filtered_by_role()
+
class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
"""Class to test the use of domain configs stored in the database.
@@ -2662,6 +2812,18 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
database.
"""
+
+ def assert_backends(self):
+ _assert_backends(self,
+ assignment='sql',
+ identity={
+ None: 'sql',
+ self.domains['domain_default']['id']: 'ldap',
+ self.domains['domain1']['id']: 'ldap',
+ self.domains['domain2']['id']: 'ldap',
+ },
+ resource='sql')
+
def enable_multi_domain(self):
# The values below are the same as in the domain_configs_multi_ldap
# cdirectory of test config_files.
@@ -2670,14 +2832,14 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
- 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'}
+ 'identity': {'driver': 'ldap'}
}
domain1_config = {
'ldap': {'url': 'fake://memory1',
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
- 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'}
+ 'identity': {'driver': 'ldap'}
}
domain2_config = {
'ldap': {'url': 'fake://memory',
@@ -2686,7 +2848,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
'suffix': 'cn=myroot,cn=com',
'group_tree_dn': 'ou=UserGroups,dc=myroot,dc=org',
'user_tree_dn': 'ou=Users,dc=myroot,dc=org'},
- 'identity': {'driver': 'keystone.identity.backends.ldap.Identity'}
+ 'identity': {'driver': 'ldap'}
}
self.domain_config_api.create_config(CONF.identity.default_domain_id,
@@ -2725,6 +2887,48 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
CONF.identity.default_domain_id))
self.assertEqual(CONF.ldap.url, default_config.ldap.url)
+ def test_reloading_domain_config(self):
+ """Ensure domain drivers are reloaded on a config modification."""
+
+ domain_cfgs = self.identity_api.domain_configs
+
+ # Create a new config for the default domain, hence overwriting the
+ # current settings.
+ new_config = {
+ 'ldap': {'url': uuid.uuid4().hex},
+ 'identity': {'driver': 'ldap'}}
+ self.domain_config_api.create_config(
+ CONF.identity.default_domain_id, new_config)
+ default_config = (
+ domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
+ self.assertEqual(new_config['ldap']['url'], default_config.ldap.url)
+
+ # Ensure updating is also honored
+ updated_config = {'url': uuid.uuid4().hex}
+ self.domain_config_api.update_config(
+ CONF.identity.default_domain_id, updated_config,
+ group='ldap', option='url')
+ default_config = (
+ domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
+ self.assertEqual(updated_config['url'], default_config.ldap.url)
+
+ # ...and finally ensure delete causes the driver to get the standard
+ # config again.
+ self.domain_config_api.delete_config(CONF.identity.default_domain_id)
+ default_config = (
+ domain_cfgs.get_domain_conf(CONF.identity.default_domain_id))
+ self.assertEqual(CONF.ldap.url, default_config.ldap.url)
+
+ def test_setting_sql_driver_raises_exception(self):
+ """Ensure setting of domain specific sql driver is prevented."""
+
+ new_config = {'identity': {'driver': 'sql'}}
+ self.domain_config_api.create_config(
+ CONF.identity.default_domain_id, new_config)
+ self.assertRaises(exception.InvalidDomainConfig,
+ self.identity_api.domain_configs.get_domain_conf,
+ CONF.identity.default_domain_id)
+
class DomainSpecificLDAPandSQLIdentity(
BaseLDAPIdentity, tests.SQLDriverOverrides, tests.TestCase,
@@ -2740,11 +2944,11 @@ class DomainSpecificLDAPandSQLIdentity(
"""
def setUp(self):
- self.useFixture(database.Database())
+ sqldb = self.useFixture(database.Database())
super(DomainSpecificLDAPandSQLIdentity, self).setUp()
- self.initial_setup()
+ self.initial_setup(sqldb)
- def initial_setup(self):
+ def initial_setup(self, sqldb):
# We aren't setting up any initial data ahead of switching to
# domain-specific operation, so make the switch straight away.
self.config_fixture.config(
@@ -2755,37 +2959,33 @@ class DomainSpecificLDAPandSQLIdentity(
backward_compatible_ids=False)
self.load_backends()
-
- self.engine = sql.get_engine()
- self.addCleanup(sql.cleanup)
-
- sql.ModelBase.metadata.create_all(bind=self.engine)
- self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
+ sqldb.recreate()
self.domain_count = 2
self.domain_specific_count = 2
self.setup_initial_domains()
self.users = {}
- self.clear_database()
+ self.ldapdb.clear()
self.load_fixtures(default_fixtures)
self.create_users_across_domains()
+ _assert_backends(
+ self,
+ assignment='sql',
+ identity={
+ None: 'ldap',
+ 'default': 'ldap',
+ self.domains['domain1']['id']: 'sql',
+ },
+ resource='sql')
+
def config_overrides(self):
super(DomainSpecificLDAPandSQLIdentity, self).config_overrides()
# Make sure resource & assignment are actually SQL drivers,
# BaseLDAPIdentity causes this option to use LDAP.
- self.config_fixture.config(
- group='resource',
- driver='keystone.resource.backends.sql.Resource')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
-
- def reload_backends(self, domain_id):
- # Just reload the driver for this domain - which will pickup
- # any updated cfg
- self.identity_api.domain_configs.reload_domain_driver(domain_id)
+ self.config_fixture.config(group='resource', driver='sql')
+ self.config_fixture.config(group='assignment', driver='sql')
def get_config(self, domain_id):
# Get the config for this domain, will return CONF
@@ -2889,6 +3089,11 @@ class DomainSpecificLDAPandSQLIdentity(
self.skipTest("Doesn't apply since LDAP configuration is ignored for "
"SQL assignment backend.")
+ def test_list_role_assignments_filtered_by_role(self):
+ # Domain roles are supported by the SQL Assignment backend
+ base = super(BaseLDAPIdentity, self)
+ base.test_list_role_assignments_filtered_by_role()
+
class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
"""Class to test simplest use of domain-specific SQL driver.
@@ -2902,7 +3107,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
- A separate SQL backend for domain1
"""
- def initial_setup(self):
+ def initial_setup(self, sqldb):
# We aren't setting up any initial data ahead of switching to
# domain-specific operation, so make the switch straight away.
self.config_fixture.config(
@@ -2916,12 +3121,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
backward_compatible_ids=True)
self.load_backends()
-
- self.engine = sql.get_engine()
- self.addCleanup(sql.cleanup)
-
- sql.ModelBase.metadata.create_all(bind=self.engine)
- self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
+ sqldb.recreate()
self.domain_count = 2
self.domain_specific_count = 1
@@ -2931,17 +3131,16 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
self.load_fixtures(default_fixtures)
self.create_users_across_domains()
+ _assert_backends(self,
+ assignment='sql',
+ identity='ldap',
+ resource='sql')
+
def config_overrides(self):
super(DomainSpecificSQLIdentity, self).config_overrides()
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
- self.config_fixture.config(
- group='resource',
- driver='keystone.resource.backends.sql.Resource')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='resource', driver='sql')
+ self.config_fixture.config(group='assignment', driver='sql')
def get_config(self, domain_id):
if domain_id == CONF.identity.default_domain_id:
@@ -2949,36 +3148,20 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
else:
return self.identity_api.domain_configs.get_domain_conf(domain_id)
- def reload_backends(self, domain_id):
- if domain_id == CONF.identity.default_domain_id:
- self.load_backends()
- else:
- # Just reload the driver for this domain - which will pickup
- # any updated cfg
- self.identity_api.domain_configs.reload_domain_driver(domain_id)
-
def test_default_sql_plus_sql_specific_driver_fails(self):
# First confirm that if ldap is default driver, domain1 can be
# loaded as sql
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='assignment', driver='sql')
self.load_backends()
# Make any identity call to initiate the lazy loading of configs
self.identity_api.list_users(
domain_scope=CONF.identity.default_domain_id)
self.assertIsNotNone(self.get_config(self.domains['domain1']['id']))
- # Now re-initialize, but with sql as the default identity driver
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.sql.Identity')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ # Now re-initialize, but with sql as the identity driver
+ self.config_fixture.config(group='identity', driver='sql')
+ self.config_fixture.config(group='assignment', driver='sql')
self.load_backends()
# Make any identity call to initiate the lazy loading of configs, which
# should fail since we would now have two sql drivers.
@@ -2987,12 +3170,8 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
domain_scope=CONF.identity.default_domain_id)
def test_multiple_sql_specific_drivers_fails(self):
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
- self.config_fixture.config(
- group='assignment',
- driver='keystone.assignment.backends.sql.Assignment')
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='assignment', driver='sql')
self.load_backends()
# Ensure default, domain1 and domain2 exist
self.domain_count = 3
@@ -3019,31 +3198,30 @@ class LdapFilterTests(test_backend.FilterTests, tests.TestCase):
def setUp(self):
super(LdapFilterTests, self).setUp()
- self.useFixture(database.Database())
- self.clear_database()
+ sqldb = self.useFixture(database.Database())
+ self.useFixture(ldapdb.LDAPDatabase())
- common_ldap.register_handler('fake://', fakeldap.FakeLdap)
self.load_backends()
self.load_fixtures(default_fixtures)
-
- self.engine = sql.get_engine()
- self.addCleanup(sql.cleanup)
- sql.ModelBase.metadata.create_all(bind=self.engine)
-
- self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
- self.addCleanup(common_ldap_core._HANDLERS.clear)
+ sqldb.recreate()
+ _assert_backends(self, assignment='ldap', identity='ldap')
def config_overrides(self):
super(LdapFilterTests, self).config_overrides()
- self.config_fixture.config(
- group='identity',
- driver='keystone.identity.backends.ldap.Identity')
+ self.config_fixture.config(group='identity', driver='ldap')
def config_files(self):
config_files = super(LdapFilterTests, self).config_files()
config_files.append(tests.dirs.tests_conf('backend_ldap.conf'))
return config_files
- def clear_database(self):
- for shelf in fakeldap.FakeShelves:
- fakeldap.FakeShelves[shelf].clear()
+ def test_list_users_in_group_filtered(self):
+ # The LDAP identity driver currently does not support filtering on the
+ # listing users for a given group, so will fail this test.
+ try:
+ super(LdapFilterTests, self).test_list_users_in_group_filtered()
+ except matchers.MismatchError:
+ return
+ # We shouldn't get here...if we do, it means someone has implemented
+ # filtering, so we can remove this test override.
+ self.assertTrue(False)