aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_backend_ldap.py
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2016-06-09 08:12:34 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2016-06-09 08:12:34 +0000
commit4bc079a2664f9a407e332291f34d174625a9d5ea (patch)
tree7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_backend_ldap.py
parent2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff)
parent2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff)
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_ldap.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_backend_ldap.py1285
1 files changed, 608 insertions, 677 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
index d96ec376..cf618633 100644
--- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py
+++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
@@ -20,11 +20,15 @@ import uuid
import ldap
import mock
from oslo_config import cfg
+from oslo_log import versionutils
+from oslotest import mockpatch
import pkg_resources
+from six.moves import http_client
from six.moves import range
from testtools import matchers
from keystone.common import cache
+from keystone.common import driver_hints
from keystone.common import ldap as common_ldap
from keystone.common.ldap import core as common_ldap_core
from keystone import exception
@@ -32,11 +36,14 @@ from keystone import identity
from keystone.identity.mapping_backends import mapping as map
from keystone import resource
from keystone.tests import unit
+from keystone.tests.unit.assignment import test_backends as assignment_tests
from keystone.tests.unit import default_fixtures
+from keystone.tests.unit.identity import test_backends as identity_tests
from keystone.tests.unit import identity_mapping as mapping_sql
from keystone.tests.unit.ksfixtures import database
from keystone.tests.unit.ksfixtures import ldapdb
-from keystone.tests.unit import test_backend
+from keystone.tests.unit.resource import test_backends as resource_tests
+from keystone.tests.unit.utils import wip
CONF = cfg.CONF
@@ -115,7 +122,9 @@ def create_group_container(identity_api):
('ou', ['Groups'])])
-class BaseLDAPIdentity(test_backend.IdentityTests):
+class BaseLDAPIdentity(identity_tests.IdentityTests,
+ assignment_tests.AssignmentTests,
+ resource_tests.ResourceTests):
def setUp(self):
super(BaseLDAPIdentity, self).setUp()
@@ -123,6 +132,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.load_backends()
self.load_fixtures(default_fixtures)
+ self.config_fixture.config(group='os_inherit', enabled=False)
def _get_domain_fixture(self):
"""Domains in LDAP are read-only, so just return the static one."""
@@ -141,6 +151,13 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
return config_files
+ def new_user_ref(self, domain_id, project_id=None, **kwargs):
+ ref = unit.new_user_ref(domain_id=domain_id, project_id=project_id,
+ **kwargs)
+ if 'id' not in kwargs:
+ del ref['id']
+ return ref
+
def get_user_enabled_vals(self, user):
user_dn = (
self.identity_api.driver.user._id_to_dn_string(user['id']))
@@ -156,17 +173,13 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
return None
def test_build_tree(self):
- """Regression test for building the tree names
- """
+ """Regression test for building the tree names."""
user_api = identity.backends.ldap.UserApi(CONF)
self.assertTrue(user_api)
self.assertEqual("ou=Users,%s" % CONF.ldap.suffix, user_api.tree_dn)
def test_configurable_allowed_user_actions(self):
- user = {'name': u'fäké1',
- 'password': u'fäképass1',
- 'domain_id': CONF.identity.default_domain_id,
- 'tenants': ['bar']}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.identity_api.get_user(user['id'])
@@ -185,10 +198,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
driver.user.allow_update = False
driver.user.allow_delete = False
- user = {'name': u'fäké1',
- 'password': u'fäképass1',
- 'domain_id': CONF.identity.default_domain_id,
- 'tenants': ['bar']}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
self.assertRaises(exception.ForbiddenAction,
self.identity_api.create_user,
user)
@@ -215,7 +225,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_user_filter(self):
user_ref = self.identity_api.get_user(self.user_foo['id'])
self.user_foo.pop('password')
- self.assertDictEqual(user_ref, self.user_foo)
+ self.assertDictEqual(self.user_foo, user_ref)
driver = self.identity_api._select_identity_driver(
user_ref['domain_id'])
@@ -227,6 +237,20 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.identity_api.get_user,
self.user_foo['id'])
+ def test_list_users_by_name_and_with_filter(self):
+ # confirm that the user is not exposed when it does not match the
+ # filter setting in conf even if it is requested by name in user list
+ hints = driver_hints.Hints()
+ hints.add_filter('name', self.user_foo['name'])
+ domain_id = self.user_foo['domain_id']
+ driver = self.identity_api._select_identity_driver(domain_id)
+ driver.user.ldap_filter = ('(|(cn=%s)(cn=%s))' %
+ (self.user_sna['id'], self.user_two['id']))
+ users = self.identity_api.list_users(
+ domain_scope=self._set_domain_scope(domain_id),
+ hints=hints)
+ self.assertEqual(0, len(users))
+
def test_remove_role_grant_from_user_and_project(self):
self.assignment_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'],
@@ -234,7 +258,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
roles_ref = self.assignment_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'])
- self.assertDictEqual(roles_ref[0], self.role_member)
+ self.assertDictEqual(self.role_member, roles_ref[0])
self.assignment_api.delete_grant(user_id=self.user_foo['id'],
project_id=self.tenant_baz['id'],
@@ -251,11 +275,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_get_and_remove_role_grant_by_group_and_project(self):
new_domain = self._get_domain_fixture()
- new_group = {'domain_id': new_domain['id'],
- 'name': uuid.uuid4().hex}
+ new_group = unit.new_group_ref(domain_id=new_domain['id'])
new_group = self.identity_api.create_group(new_group)
- new_user = {'name': 'new_user', 'enabled': True,
- 'domain_id': new_domain['id']}
+ new_user = self.new_user_ref(domain_id=new_domain['id'])
new_user = self.identity_api.create_user(new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
@@ -273,7 +295,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertNotEmpty(roles_ref)
- self.assertDictEqual(roles_ref[0], self.role_member)
+ self.assertDictEqual(self.role_member, roles_ref[0])
self.assignment_api.delete_grant(group_id=new_group['id'],
project_id=self.tenant_bar['id'],
@@ -289,7 +311,44 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
role_id='member')
def test_get_and_remove_role_grant_by_group_and_domain(self):
- self.skipTest('N/A: LDAP does not support multiple domains')
+ # TODO(henry-nash): We should really rewrite the tests in
+ # unit.resource.test_backends to be more flexible as to where the
+ # domains are sourced from, so that we would not need to override such
+ # tests here. This is raised as bug 1373865.
+ new_domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=new_domain['id'],)
+ new_group = self.identity_api.create_group(new_group)
+ new_user = self.new_user_ref(domain_id=new_domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ domain_id=new_domain['id'])
+ self.assertEqual(0, len(roles_ref))
+
+ self.assignment_api.create_grant(group_id=new_group['id'],
+ domain_id=new_domain['id'],
+ role_id='member')
+
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ domain_id=new_domain['id'])
+ self.assertDictEqual(self.role_member, roles_ref[0])
+
+ self.assignment_api.delete_grant(group_id=new_group['id'],
+ domain_id=new_domain['id'],
+ role_id='member')
+ roles_ref = self.assignment_api.list_grants(
+ group_id=new_group['id'],
+ domain_id=new_domain['id'])
+ self.assertEqual(0, len(roles_ref))
+ self.assertRaises(exception.NotFound,
+ self.assignment_api.delete_grant,
+ group_id=new_group['id'],
+ domain_id=new_domain['id'],
+ role_id='member')
def test_get_role_assignment_by_domain_not_found(self):
self.skipTest('N/A: LDAP does not support multiple domains')
@@ -327,10 +386,12 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_delete_group_with_user_project_domain_links(self):
self.skipTest('N/A: LDAP does not support multiple domains')
+ def test_list_role_assignment_containing_names(self):
+ self.skipTest('N/A: LDAP does not support multiple domains')
+
def test_list_projects_for_user(self):
domain = self._get_domain_fixture()
- user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain['id'], 'enabled': True}
+ user1 = self.new_user_ref(domain_id=domain['id'])
user1 = self.identity_api.create_user(user1)
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
self.assertThat(user_projects, matchers.HasLength(0))
@@ -347,11 +408,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertThat(user_projects, matchers.HasLength(2))
# Now, check number of projects through groups
- user2 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain['id'], 'enabled': True}
+ user2 = self.new_user_ref(domain_id=domain['id'])
user2 = self.identity_api.create_user(user2)
- group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
self.identity_api.add_user_to_group(user2['id'], group1['id'])
@@ -377,12 +437,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_projects_for_user_and_groups(self):
domain = self._get_domain_fixture()
# Create user1
- user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'domain_id': domain['id'], 'enabled': True}
+ user1 = self.new_user_ref(domain_id=domain['id'])
user1 = self.identity_api.create_user(user1)
# Create new group for user1
- group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
# Add user1 to group1
@@ -412,20 +471,17 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_projects_for_user_with_grants(self):
domain = self._get_domain_fixture()
- new_user = {'name': 'new_user', 'password': uuid.uuid4().hex,
- 'enabled': True, 'domain_id': domain['id']}
+ new_user = self.new_user_ref(domain_id=domain['id'])
new_user = self.identity_api.create_user(new_user)
- group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group1 = unit.new_group_ref(domain_id=domain['id'])
group1 = self.identity_api.create_group(group1)
- group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group2 = unit.new_group_ref(domain_id=domain['id'])
group2 = self.identity_api.create_group(group2)
- project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': domain['id']}
+ project1 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project1['id'], project1)
- project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': domain['id']}
+ project2 = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project2['id'], project2)
self.identity_api.add_user_to_group(new_user['id'],
@@ -496,14 +552,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_role_assignments_unfiltered(self):
new_domain = self._get_domain_fixture()
- new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'enabled': True, 'domain_id': new_domain['id']}
+ new_user = self.new_user_ref(domain_id=new_domain['id'])
new_user = self.identity_api.create_user(new_user)
- new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex}
+ new_group = unit.new_group_ref(domain_id=new_domain['id'])
new_group = self.identity_api.create_group(new_group)
- new_project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'domain_id': new_domain['id']}
+ new_project = unit.new_project_ref(domain_id=new_domain['id'])
self.resource_api.create_project(new_project['id'], new_project)
# First check how many role grant already exist
@@ -520,13 +573,6 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
after_assignments = len(self.assignment_api.list_role_assignments())
self.assertEqual(existing_assignments + 2, after_assignments)
- def test_list_role_assignments_filtered_by_role(self):
- # Domain roles are not supported by the LDAP Assignment backend
- self.assertRaises(
- exception.NotImplemented,
- super(BaseLDAPIdentity, self).
- test_list_role_assignments_filtered_by_role)
-
def test_list_role_assignments_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
self.ldapdb.clear()
@@ -534,12 +580,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.load_fixtures(default_fixtures)
new_domain = self._get_domain_fixture()
- new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'enabled': True, 'domain_id': new_domain['id']}
+ new_user = self.new_user_ref(domain_id=new_domain['id'])
new_user = self.identity_api.create_user(new_user)
- new_project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'domain_id': new_domain['id']}
+ new_project = unit.new_project_ref(domain_id=new_domain['id'])
self.resource_api.create_project(new_project['id'], new_project)
self.assignment_api.create_grant(user_id=new_user['id'],
project_id=new_project['id'],
@@ -558,8 +601,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.load_backends()
self.load_fixtures(default_fixtures)
- user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex,
- 'enabled': True, 'domain_id': test_backend.DEFAULT_DOMAIN_ID}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.assignment_api.add_user_to_project(self.tenant_baz['id'],
@@ -582,10 +624,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
are returned.
"""
-
# Create a group
- group = dict(name=uuid.uuid4().hex,
- domain_id=CONF.identity.default_domain_id)
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group)['id']
# Create a couple of users and add them to the group.
@@ -617,10 +657,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_list_group_members_when_no_members(self):
# List group members when there is no member in the group.
# No exception should be raised.
- group = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex}
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# If this doesn't raise, then the test is successful.
@@ -633,8 +670,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.load_fixtures(default_fixtures)
# Create a group
- group = dict(name=uuid.uuid4().hex,
- domain_id=CONF.identity.default_domain_id)
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group_id = self.identity_api.create_group(group)['id']
# Create a user
@@ -651,30 +687,23 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertNotIn(dumb_id, user_ids)
def test_list_domains(self):
+ # We have more domains here than the parent class, check for the
+ # correct number of domains for the multildap backend configs
+ domain1 = unit.new_domain_ref()
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ self.resource_api.create_domain(domain2['id'], domain2)
domains = self.resource_api.list_domains()
- self.assertEqual(
- [resource.calc_default_domain()],
- domains)
-
- def test_list_domains_non_default_domain_id(self):
- # If change the default_domain_id, the ID of the default domain
- # returned by list_domains changes is the new default_domain_id.
-
- new_domain_id = uuid.uuid4().hex
- self.config_fixture.config(group='identity',
- default_domain_id=new_domain_id)
-
- domains = self.resource_api.list_domains()
-
- self.assertEqual(new_domain_id, domains[0]['id'])
+ self.assertEqual(7, len(domains))
+ domain_ids = []
+ for domain in domains:
+ domain_ids.append(domain.get('id'))
+ self.assertIn(CONF.identity.default_domain_id, domain_ids)
+ self.assertIn(domain1['id'], domain_ids)
+ self.assertIn(domain2['id'], domain_ids)
def test_authenticate_requires_simple_bind(self):
- user = {
- 'name': 'NO_META',
- 'domain_id': test_backend.DEFAULT_DOMAIN_ID,
- 'password': 'no_meta2',
- 'enabled': True,
- }
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
self.assignment_api.add_user_to_project(self.tenant_baz['id'],
user['id'])
@@ -689,34 +718,54 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
user_id=user['id'],
password=None)
- # (spzala)The group and domain crud tests below override the standard ones
- # in test_backend.py so that we can exclude the update name test, since we
- # do not yet support the update of either group or domain names with LDAP.
- # In the tests below, the update is demonstrated by updating description.
- # Refer to bug 1136403 for more detail.
- def test_group_crud(self):
- group = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex,
- 'description': uuid.uuid4().hex}
+ # The group and domain CRUD tests below override the standard ones in
+ # unit.identity.test_backends.py so that we can exclude the update name
+ # test, since we do not (and will not) support the update of either group
+ # or domain names with LDAP. In the tests below, the update is tested by
+ # updating description.
+ @mock.patch.object(versionutils, 'report_deprecated_feature')
+ def test_group_crud(self, mock_deprecator):
+ # NOTE(stevemar): As of the Mitaka release, we now check for calls that
+ # the LDAP write functionality has been deprecated.
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("create_group for the LDAP identity backend", args[1])
+
group_ref = self.identity_api.get_group(group['id'])
- self.assertDictEqual(group_ref, group)
+ self.assertDictEqual(group, group_ref)
group['description'] = uuid.uuid4().hex
self.identity_api.update_group(group['id'], group)
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("update_group for the LDAP identity backend", args[1])
+
group_ref = self.identity_api.get_group(group['id'])
- self.assertDictEqual(group_ref, group)
+ self.assertDictEqual(group, group_ref)
self.identity_api.delete_group(group['id'])
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("delete_group for the LDAP identity backend", args[1])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
group['id'])
+ @mock.patch.object(versionutils, 'report_deprecated_feature')
+ def test_add_remove_user_group_deprecated(self, mock_deprecator):
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group = self.identity_api.create_group(group)
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ self.identity_api.add_user_to_group(user['id'], group['id'])
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("add_user_to_group for the LDAP identity", args[1])
+
+ self.identity_api.remove_user_from_group(user['id'], group['id'])
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("remove_user_from_group for the LDAP identity", args[1])
+
@unit.skip_if_cache_disabled('identity')
def test_cache_layer_group_crud(self):
- group = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex}
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
group_ref = self.identity_api.get_group(group['id'])
@@ -731,9 +780,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group, group['id'])
- group = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex}
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
group = self.identity_api.create_group(group)
# cache the result
self.identity_api.get_group(group['id'])
@@ -749,11 +796,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
CONF.identity.default_domain_id)
driver.user.attribute_ignore = ['enabled', 'email',
'tenants', 'tenantId']
- user = {'name': u'fäké1',
- 'password': u'fäképass1',
- 'domain_id': CONF.identity.default_domain_id,
- 'default_project_id': 'maps_to_none',
- }
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
+ project_id='maps_to_none')
# If this doesn't raise, then the test is successful.
user = self.identity_api.create_user(user)
@@ -765,9 +809,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False',
'TrUe' 'FaLse']
for name in boolean_strings:
- user = {
- 'name': name,
- 'domain_id': CONF.identity.default_domain_id}
+ user = self.new_user_ref(name=name,
+ domain_id=CONF.identity.default_domain_id)
user_ref = self.identity_api.create_user(user)
user_info = self.identity_api.get_user(user_ref['id'])
self.assertEqual(name, user_info['name'])
@@ -786,10 +829,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
driver.user.attribute_ignore = ['enabled', 'email',
'tenants', 'tenantId']
- user = {'name': u'fäké1',
- 'password': u'fäképass1',
- 'domain_id': CONF.identity.default_domain_id,
- }
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_ref = self.identity_api.create_user(user)
@@ -818,19 +858,14 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_user_id_comma(self):
"""Even if the user has a , in their ID, groups can be listed."""
-
# Create a user with a , in their ID
# NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
# Since we want to fake up this special ID, we'll squirt this
# direct into the driver and bypass the manager layer.
user_id = u'Doe, John'
- user = {
- 'id': user_id,
- 'name': self.getUniqueString(),
- 'password': self.getUniqueString(),
- 'domain_id': CONF.identity.default_domain_id,
- }
+ user = self.new_user_ref(id=user_id,
+ domain_id=CONF.identity.default_domain_id)
user = self.identity_api.driver.create_user(user_id, user)
# Now we'll use the manager to discover it, which will create a
@@ -843,13 +878,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
break
# Create a group
- group_id = uuid.uuid4().hex
- group = {
- 'id': group_id,
- 'name': self.getUniqueString(prefix='tuidc'),
- 'description': self.getUniqueString(),
- 'domain_id': CONF.identity.default_domain_id,
- }
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group_id = group['id']
group = self.identity_api.driver.create_group(group_id, group)
# Now we'll use the manager to discover it, which will create a
# Public ID for it.
@@ -870,21 +900,15 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertThat(ref_list, matchers.Equals([group]))
def test_user_id_comma_grants(self):
- """Even if the user has a , in their ID, can get user and group grants.
- """
-
+ """List user and group grants, even with a comma in the user's ID."""
# Create a user with a , in their ID
# NOTE(blk-u): the DN for this user is hard-coded in fakeldap!
# Since we want to fake up this special ID, we'll squirt this
# direct into the driver and bypass the manager layer
user_id = u'Doe, John'
- user = {
- 'id': user_id,
- 'name': self.getUniqueString(),
- 'password': self.getUniqueString(),
- 'domain_id': CONF.identity.default_domain_id,
- }
+ user = self.new_user_ref(id=user_id,
+ domain_id=CONF.identity.default_domain_id)
self.identity_api.driver.create_user(user_id, user)
# Now we'll use the manager to discover it, which will create a
@@ -943,8 +967,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# There's no group fixture so create a group.
new_domain = self._get_domain_fixture()
- new_group = {'domain_id': new_domain['id'],
- 'name': uuid.uuid4().hex}
+ new_group = unit.new_group_ref(domain_id=new_domain['id'])
new_group = self.identity_api.create_group(new_group)
# Attempt to disable the group.
@@ -959,39 +982,55 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.assertNotIn('enabled', group_info)
def test_project_enabled_ignored_disable_error(self):
- # When the server is configured so that the enabled attribute is
- # ignored for projects, projects cannot be disabled.
-
- self.config_fixture.config(group='ldap',
- project_attribute_ignore=['enabled'])
-
- # Need to re-load backends for the config change to take effect.
- self.load_backends()
-
- # Attempt to disable the project.
- self.assertRaises(exception.ForbiddenAction,
- self.resource_api.update_project,
- self.tenant_baz['id'], {'enabled': False})
-
- project_info = self.resource_api.get_project(self.tenant_baz['id'])
-
- # Unlike other entities, if 'enabled' is ignored then 'enabled' is
- # returned as part of the ref.
- self.assertIs(True, project_info['enabled'])
+ self.skipTest('Resource LDAP has been removed')
def test_list_role_assignment_by_domain(self):
"""Multiple domain assignments are not supported."""
self.assertRaises(
- (exception.Forbidden, exception.DomainNotFound),
+ (exception.Forbidden, exception.DomainNotFound,
+ exception.ValidationError),
super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain)
def test_list_role_assignment_by_user_with_domain_group_roles(self):
"""Multiple domain assignments are not supported."""
self.assertRaises(
- (exception.Forbidden, exception.DomainNotFound),
+ (exception.Forbidden, exception.DomainNotFound,
+ exception.ValidationError),
super(BaseLDAPIdentity, self).
test_list_role_assignment_by_user_with_domain_group_roles)
+ def test_domain_crud(self):
+ self.skipTest('Resource LDAP has been removed')
+
+ def test_list_role_assignment_using_sourced_groups_with_domains(self):
+ """Multiple domain assignments are not supported."""
+ self.assertRaises(
+ (exception.Forbidden, exception.ValidationError,
+ exception.DomainNotFound),
+ super(BaseLDAPIdentity, self).
+ test_list_role_assignment_using_sourced_groups_with_domains)
+
+ def test_create_project_with_domain_id_and_without_parent_id(self):
+ """Multiple domains are not supported."""
+ self.assertRaises(
+ exception.ValidationError,
+ super(BaseLDAPIdentity, self).
+ test_create_project_with_domain_id_and_without_parent_id)
+
+ def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
+ """Multiple domains are not supported."""
+ self.assertRaises(
+ exception.ValidationError,
+ super(BaseLDAPIdentity, self).
+ test_create_project_with_domain_id_mismatch_to_parent_domain)
+
+ def test_remove_foreign_assignments_when_deleting_a_domain(self):
+ """Multiple domains are not supported."""
+ self.assertRaises(
+ (exception.ValidationError, exception.DomainNotFound),
+ super(BaseLDAPIdentity,
+ self).test_remove_foreign_assignments_when_deleting_a_domain)
+
class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
@@ -1002,46 +1041,46 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.useFixture(database.Database())
super(LDAPIdentity, self).setUp()
_assert_backends(self,
- assignment='ldap',
+ assignment='sql',
identity='ldap',
- resource='ldap')
+ resource='sql')
def load_fixtures(self, fixtures):
# Override super impl since need to create group container.
create_group_container(self.identity_api)
super(LDAPIdentity, self).load_fixtures(fixtures)
+ def test_list_domains(self):
+ domains = self.resource_api.list_domains()
+ self.assertEqual([resource.calc_default_domain()], domains)
+
def test_configurable_allowed_project_actions(self):
domain = self._get_domain_fixture()
- tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True,
- 'domain_id': domain['id']}
- self.resource_api.create_project(u'fäké1', tenant)
- tenant_ref = self.resource_api.get_project(u'fäké1')
- self.assertEqual(u'fäké1', tenant_ref['id'])
+ project = unit.new_project_ref(domain_id=domain['id'])
+ project = self.resource_api.create_project(project['id'], project)
+ project_ref = self.resource_api.get_project(project['id'])
+ self.assertEqual(project['id'], project_ref['id'])
- tenant['enabled'] = False
- self.resource_api.update_project(u'fäké1', tenant)
+ project['enabled'] = False
+ self.resource_api.update_project(project['id'], project)
- self.resource_api.delete_project(u'fäké1')
+ self.resource_api.delete_project(project['id'])
self.assertRaises(exception.ProjectNotFound,
self.resource_api.get_project,
- u'fäké1')
+ project['id'])
def test_configurable_subtree_delete(self):
self.config_fixture.config(group='ldap', allow_subtree_delete=True)
self.load_backends()
- project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id}
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(project1['id'], project1)
- role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role1 = unit.new_role_ref()
self.role_api.create_role(role1['id'], role1)
- user1 = {'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'password': uuid.uuid4().hex,
- 'enabled': True}
+ user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user1 = self.identity_api.create_user(user1)
self.assignment_api.add_role_to_user_and_project(
@@ -1062,48 +1101,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertEqual(0, len(list))
def test_configurable_forbidden_project_actions(self):
- self.config_fixture.config(
- group='ldap', project_allow_create=False,
- project_allow_update=False, project_allow_delete=False)
- self.load_backends()
-
- domain = self._get_domain_fixture()
- tenant = {'id': u'fäké1', 'name': u'fäké1', 'domain_id': domain['id']}
- self.assertRaises(exception.ForbiddenAction,
- self.resource_api.create_project,
- u'fäké1',
- tenant)
-
- self.tenant_bar['enabled'] = False
- self.assertRaises(exception.ForbiddenAction,
- self.resource_api.update_project,
- self.tenant_bar['id'],
- self.tenant_bar)
- self.assertRaises(exception.ForbiddenAction,
- self.resource_api.delete_project,
- self.tenant_bar['id'])
+ self.skipTest('Resource LDAP has been removed')
def test_project_filter(self):
- tenant_ref = self.resource_api.get_project(self.tenant_bar['id'])
- self.assertDictEqual(tenant_ref, self.tenant_bar)
-
- self.config_fixture.config(group='ldap',
- project_filter='(CN=DOES_NOT_MATCH)')
- self.load_backends()
- # NOTE(morganfainberg): CONF.ldap.project_filter will not be
- # dynamically changed at runtime. This invalidate is a work-around for
- # the expectation that it is safe to change config values in tests that
- # could affect what the drivers would return up to the manager. This
- # solves this assumption when working with aggressive (on-create)
- # cache population.
- self.role_api.get_role.invalidate(self.role_api,
- self.role_member['id'])
- self.role_api.get_role(self.role_member['id'])
- self.resource_api.get_project.invalidate(self.resource_api,
- self.tenant_bar['id'])
- self.assertRaises(exception.ProjectNotFound,
- self.resource_api.get_project,
- self.tenant_bar['id'])
+ self.skipTest('Resource LDAP has been removed')
def test_dumb_member(self):
self.config_fixture.config(group='ldap', use_dumb_member=True)
@@ -1116,71 +1117,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
dumb_id)
def test_project_attribute_mapping(self):
- self.config_fixture.config(
- group='ldap', project_name_attribute='ou',
- project_desc_attribute='description',
- project_enabled_attribute='enabled')
- self.ldapdb.clear()
- self.load_backends()
- self.load_fixtures(default_fixtures)
- # NOTE(morganfainberg): CONF.ldap.project_name_attribute,
- # CONF.ldap.project_desc_attribute, and
- # CONF.ldap.project_enabled_attribute will not be
- # dynamically changed at runtime. This invalidate is a work-around for
- # the expectation that it is safe to change config values in tests that
- # could affect what the drivers would return up to the manager. This
- # solves this assumption when working with aggressive (on-create)
- # cache population.
- self.resource_api.get_project.invalidate(self.resource_api,
- self.tenant_baz['id'])
- tenant_ref = self.resource_api.get_project(self.tenant_baz['id'])
- self.assertEqual(self.tenant_baz['id'], tenant_ref['id'])
- self.assertEqual(self.tenant_baz['name'], tenant_ref['name'])
- self.assertEqual(
- self.tenant_baz['description'],
- tenant_ref['description'])
- self.assertEqual(self.tenant_baz['enabled'], tenant_ref['enabled'])
-
- self.config_fixture.config(group='ldap',
- project_name_attribute='description',
- project_desc_attribute='ou')
- self.load_backends()
- # NOTE(morganfainberg): CONF.ldap.project_name_attribute,
- # CONF.ldap.project_desc_attribute, and
- # CONF.ldap.project_enabled_attribute will not be
- # dynamically changed at runtime. This invalidate is a work-around for
- # the expectation that it is safe to change config values in tests that
- # could affect what the drivers would return up to the manager. This
- # solves this assumption when working with aggressive (on-create)
- # cache population.
- self.resource_api.get_project.invalidate(self.resource_api,
- self.tenant_baz['id'])
- tenant_ref = self.resource_api.get_project(self.tenant_baz['id'])
- self.assertEqual(self.tenant_baz['id'], tenant_ref['id'])
- self.assertEqual(self.tenant_baz['description'], tenant_ref['name'])
- self.assertEqual(self.tenant_baz['name'], tenant_ref['description'])
- self.assertEqual(self.tenant_baz['enabled'], tenant_ref['enabled'])
+ self.skipTest('Resource LDAP has been removed')
def test_project_attribute_ignore(self):
- self.config_fixture.config(
- group='ldap',
- project_attribute_ignore=['name', 'description', 'enabled'])
- self.ldapdb.clear()
- self.load_backends()
- self.load_fixtures(default_fixtures)
- # NOTE(morganfainberg): CONF.ldap.project_attribute_ignore will not be
- # dynamically changed at runtime. This invalidate is a work-around for
- # the expectation that it is safe to change configs values in tests
- # that could affect what the drivers would return up to the manager.
- # This solves this assumption when working with aggressive (on-create)
- # cache population.
- self.resource_api.get_project.invalidate(self.resource_api,
- self.tenant_baz['id'])
- tenant_ref = self.resource_api.get_project(self.tenant_baz['id'])
- self.assertEqual(self.tenant_baz['id'], tenant_ref['id'])
- self.assertNotIn('name', tenant_ref)
- self.assertNotIn('description', tenant_ref)
- self.assertNotIn('enabled', tenant_ref)
+ self.skipTest('Resource LDAP has been removed')
def test_user_enable_attribute_mask(self):
self.config_fixture.config(group='ldap', user_enabled_mask=2,
@@ -1189,8 +1129,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.load_backends()
self.load_fixtures(default_fixtures)
- user = {'name': u'fäké1', 'enabled': True,
- 'domain_id': CONF.identity.default_domain_id}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_ref = self.identity_api.create_user(user)
@@ -1237,14 +1176,12 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.load_backends()
self.load_fixtures(default_fixtures)
- user1 = {'name': u'fäké1', 'enabled': True,
- 'domain_id': CONF.identity.default_domain_id}
+ user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user2 = {'name': u'fäké2', 'enabled': False,
- 'domain_id': CONF.identity.default_domain_id}
+ user2 = self.new_user_ref(enabled=False,
+ domain_id=CONF.identity.default_domain_id)
- user3 = {'name': u'fäké3',
- 'domain_id': CONF.identity.default_domain_id}
+ user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
# Ensure that the LDAP attribute is False for a newly created
# enabled user.
@@ -1473,15 +1410,28 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
group='ldap',
user_additional_attribute_mapping=['description:name'])
self.load_backends()
- user = {
- 'name': 'EXTRA_ATTRIBUTES',
- 'password': 'extra',
- 'domain_id': CONF.identity.default_domain_id
- }
+ user = self.new_user_ref(name='EXTRA_ATTRIBUTES',
+ password='extra',
+ domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
dn, attrs = self.identity_api.driver.user._ldap_get(user['id'])
self.assertThat([user['name']], matchers.Equals(attrs['description']))
+ def test_user_description_attribute_mapping(self):
+ self.config_fixture.config(
+ group='ldap',
+ user_description_attribute='displayName')
+ self.load_backends()
+
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
+ displayName=uuid.uuid4().hex)
+ description = user['displayName']
+ user = self.identity_api.create_user(user)
+ res = self.identity_api.driver.user.get_all()
+
+ new_user = [u for u in res if u['id'] == user['id']][0]
+ self.assertThat(new_user['description'], matchers.Equals(description))
+
def test_user_extra_attribute_mapping_description_is_returned(self):
# Given a mapping like description:description, the description is
# returned.
@@ -1491,13 +1441,9 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
user_additional_attribute_mapping=['description:description'])
self.load_backends()
- description = uuid.uuid4().hex
- user = {
- 'name': uuid.uuid4().hex,
- 'description': description,
- 'password': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id
- }
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id,
+ description=uuid.uuid4().hex)
+ description = user['description']
user = self.identity_api.create_user(user)
res = self.identity_api.driver.user.get_all()
@@ -1551,52 +1497,17 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
'fake': 'invalid', 'invalid2': ''}
self.assertDictEqual(expected_dict, mapping)
-# TODO(henry-nash): These need to be removed when the full LDAP implementation
-# is submitted - see Bugs 1092187, 1101287, 1101276, 1101289
-
- def test_domain_crud(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'enabled': True, 'description': uuid.uuid4().hex}
- self.assertRaises(exception.Forbidden,
+ def test_create_domain(self):
+ domain = unit.new_domain_ref()
+ self.assertRaises(exception.ValidationError,
self.resource_api.create_domain,
domain['id'],
domain)
- self.assertRaises(exception.Conflict,
- self.resource_api.create_domain,
- CONF.identity.default_domain_id,
- domain)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
-
- domain['description'] = uuid.uuid4().hex
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.update_domain,
- domain['id'],
- domain)
- self.assertRaises(exception.Forbidden,
- self.resource_api.update_domain,
- CONF.identity.default_domain_id,
- domain)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.delete_domain,
- domain['id'])
- self.assertRaises(exception.Forbidden,
- self.resource_api.delete_domain,
- CONF.identity.default_domain_id)
- self.assertRaises(exception.DomainNotFound,
- self.resource_api.get_domain,
- domain['id'])
@unit.skip_if_no_multiple_domains_support
def test_create_domain_case_sensitivity(self):
# domains are read-only, so case sensitivity isn't an issue
- ref = {
- 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
+ ref = unit.new_domain_ref()
self.assertRaises(exception.Forbidden,
self.resource_api.create_domain,
ref['id'],
@@ -1624,22 +1535,18 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# NOTE(topol): LDAP implementation does not currently support the
# updating of a project name so this method override
# provides a different update test
- project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'description': uuid.uuid4().hex,
- 'enabled': True,
- 'parent_id': None,
- 'is_domain': False}
- self.resource_api.create_project(project['id'], project)
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+
+ project = self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project_ref, project)
+ self.assertDictEqual(project, project_ref)
project['description'] = uuid.uuid4().hex
self.resource_api.update_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project_ref, project)
+ self.assertDictEqual(project, project_ref)
self.resource_api.delete_project(project['id'])
self.assertRaises(exception.ProjectNotFound,
@@ -1651,12 +1558,11 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
# NOTE(morganfainberg): LDAP implementation does not currently support
# updating project names. This method override provides a different
# update test.
- project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'description': uuid.uuid4().hex}
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
project_id = project['id']
# Create a project
- self.resource_api.create_project(project_id, project)
+ project = self.resource_api.create_project(project_id, project)
self.resource_api.get_project(project_id)
updated_project = copy.deepcopy(project)
updated_project['description'] = uuid.uuid4().hex
@@ -1700,70 +1606,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.resource_api.get_project,
project_id)
- def _assert_create_hierarchy_not_allowed(self):
- domain = self._get_domain_fixture()
-
- project1 = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': '',
- 'domain_id': domain['id'],
- 'enabled': True,
- 'parent_id': None,
- 'is_domain': False}
- self.resource_api.create_project(project1['id'], project1)
-
- # Creating project2 under project1. LDAP will not allow
- # the creation of a project with parent_id being set
- project2 = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': '',
- 'domain_id': domain['id'],
- 'enabled': True,
- 'parent_id': project1['id'],
- 'is_domain': False}
-
- self.assertRaises(exception.InvalidParentProject,
- self.resource_api.create_project,
- project2['id'],
- project2)
-
- # Now, we'll create project 2 with no parent
- project2['parent_id'] = None
- self.resource_api.create_project(project2['id'], project2)
-
- # Returning projects to be used across the tests
- return [project1, project2]
-
- def _assert_create_is_domain_project_not_allowed(self):
- """Tests that we can't create more than one project acting as domain.
-
- This method will be used at any test that require the creation of a
- project that act as a domain. LDAP does not support multiple domains
- and the only domain it has (default) is immutable.
- """
- domain = self._get_domain_fixture()
- project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': '',
- 'domain_id': domain['id'],
- 'enabled': True,
- 'parent_id': None,
- 'is_domain': True}
-
- self.assertRaises(exception.ValidationError,
- self.resource_api.create_project,
- project['id'], project)
-
def test_update_is_domain_field(self):
domain = self._get_domain_fixture()
- project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'description': '',
- 'domain_id': domain['id'],
- 'enabled': True,
- 'parent_id': None,
- 'is_domain': False}
- self.resource_api.create_project(project['id'], project)
+ project = unit.new_project_ref(domain_id=domain['id'])
+ project = self.resource_api.create_project(project['id'], project)
# Try to update the is_domain field to True
project['is_domain'] = True
@@ -1772,97 +1618,87 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
project['id'], project)
def test_delete_is_domain_project(self):
- self._assert_create_is_domain_project_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_create_domain_under_regular_project_hierarchy_fails(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_create_not_is_domain_project_under_is_domain_hierarchy(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
- def test_create_is_domain_project(self):
- self._assert_create_is_domain_project_not_allowed()
+ def test_create_project_passing_is_domain_flag_true(self):
+ self.skipTest('Resource LDAP has been removed')
def test_create_project_with_parent_id_and_without_domain_id(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_check_leaf_projects(self):
- projects = self._assert_create_hierarchy_not_allowed()
- for project in projects:
- self.assertTrue(self.resource_api.is_leaf_project(project))
+ self.skipTest('Resource LDAP has been removed')
def test_list_projects_in_subtree(self):
- projects = self._assert_create_hierarchy_not_allowed()
- for project in projects:
- subtree_list = self.resource_api.list_projects_in_subtree(
- project['id'])
- self.assertEqual(0, len(subtree_list))
+ self.skipTest('Resource LDAP has been removed')
def test_list_projects_in_subtree_with_circular_reference(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_list_project_parents(self):
- projects = self._assert_create_hierarchy_not_allowed()
- for project in projects:
- parents_list = self.resource_api.list_project_parents(
- project['id'])
- self.assertEqual(0, len(parents_list))
+ self.skipTest('Resource LDAP has been removed')
+
+ def test_update_project_enabled_cascade(self):
+ self.skipTest('Resource LDAP has been removed')
+
+ def test_cannot_enable_cascade_with_parent_disabled(self):
+ self.skipTest('Resource LDAP has been removed')
def test_hierarchical_projects_crud(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_create_project_under_disabled_one(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_create_project_with_invalid_parent(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_create_leaf_project_with_invalid_domain(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_update_project_parent(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_enable_project_with_disabled_parent(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_disable_hierarchical_leaf_project(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_disable_hierarchical_not_leaf_project(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_delete_hierarchical_leaf_project(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_delete_hierarchical_not_leaf_project(self):
- self._assert_create_hierarchy_not_allowed()
+ self.skipTest('Resource LDAP has been removed')
def test_check_hierarchy_depth(self):
- projects = self._assert_create_hierarchy_not_allowed()
- for project in projects:
- depth = self._get_hierarchy_depth(project['id'])
- self.assertEqual(1, depth)
+ self.skipTest('Resource LDAP has been removed')
def test_multi_role_grant_by_user_group_on_project_domain(self):
# This is a partial implementation of the standard test that
- # is defined in test_backend.py. It omits both domain and
- # group grants. since neither of these are yet supported by
- # the ldap backend.
+ # is defined in unit.assignment.test_backends.py. It omits
+ # both domain and group grants. since neither of these are
+ # yet supported by the ldap backend.
role_list = []
for _ in range(2):
- role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
- user1 = {'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'password': uuid.uuid4().hex,
- 'enabled': True}
+ user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user1 = self.identity_api.create_user(user1)
- project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id}
+ project1 = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(project1['id'], project1)
self.assignment_api.add_role_to_user_and_project(
@@ -1947,7 +1783,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
expected_group_ids = []
numgroups = 3
for _ in range(numgroups):
- group = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group = unit.new_group_ref(domain_id=domain['id'])
group = self.identity_api.create_group(group)
expected_group_ids.append(group['id'])
# Fetch the test groups and ensure that they don't contain a dn.
@@ -1960,16 +1796,14 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
def test_list_groups_for_user_no_dn(self):
# Create a test user.
- user = {'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'password': uuid.uuid4().hex, 'enabled': True}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
# Create some test groups and add the test user as a member.
domain = self._get_domain_fixture()
expected_group_ids = []
numgroups = 3
for _ in range(numgroups):
- group = {'name': uuid.uuid4().hex, 'domain_id': domain['id']}
+ group = unit.new_group_ref(domain_id=domain['id'])
group = self.identity_api.create_group(group)
expected_group_ids.append(group['id'])
self.identity_api.add_user_to_group(user['id'], group['id'])
@@ -1987,9 +1821,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
CONF.identity.default_domain_id)
driver.user.id_attr = 'mail'
- user = {'name': u'fäké1',
- 'password': u'fäképass1',
- 'domain_id': CONF.identity.default_domain_id}
+ user = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user)
user_ref = self.identity_api.get_user(user['id'])
# 'email' attribute should've created because it is also being used
@@ -2083,6 +1915,35 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase):
self.assertEqual('Foo Bar', user_ref['name'])
+class LDAPLimitTests(unit.TestCase, identity_tests.LimitTests):
+ def setUp(self):
+ super(LDAPLimitTests, self).setUp()
+
+ self.useFixture(ldapdb.LDAPDatabase())
+ self.useFixture(database.Database(self.sql_driver_version_overrides))
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+ identity_tests.LimitTests.setUp(self)
+ _assert_backends(self,
+ assignment='sql',
+ identity='ldap',
+ resource='sql')
+
+ def config_overrides(self):
+ super(LDAPLimitTests, self).config_overrides()
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='identity',
+ list_limit=len(default_fixtures.USERS) - 1)
+
+ def config_files(self):
+ config_files = super(LDAPLimitTests, self).config_files()
+ config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
+ return config_files
+
+ def test_list_projects_filtered_and_limited(self):
+ self.skipTest("ldap for storing projects is deprecated")
+
+
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):
super(LDAPIdentityEnabledEmulation, self).setUp()
@@ -2092,10 +1953,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
for obj in [self.tenant_bar, self.tenant_baz, self.user_foo,
self.user_two, self.user_badguy]:
obj.setdefault('enabled', True)
- _assert_backends(self,
- assignment='ldap',
- identity='ldap',
- resource='ldap')
+ _assert_backends(self, identity='ldap')
def load_fixtures(self, fixtures):
# Override super impl since need to create group container.
@@ -2110,60 +1968,62 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
def config_overrides(self):
super(LDAPIdentityEnabledEmulation, self).config_overrides()
self.config_fixture.config(group='ldap',
- user_enabled_emulation=True,
- project_enabled_emulation=True)
+ user_enabled_emulation=True)
def test_project_crud(self):
# NOTE(topol): LDAPIdentityEnabledEmulation will create an
# enabled key in the project dictionary so this
# method override handles this side-effect
- project = {
- 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'description': uuid.uuid4().hex,
- 'parent_id': None,
- 'is_domain': False}
-
- self.resource_api.create_project(project['id'], project)
+ project = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+
+ project = self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
# self.resource_api.create_project adds an enabled
# key with a value of True when LDAPIdentityEnabledEmulation
# is used so we now add this expected key to the project dictionary
project['enabled'] = True
- self.assertDictEqual(project_ref, project)
+ self.assertDictEqual(project, project_ref)
project['description'] = uuid.uuid4().hex
self.resource_api.update_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project_ref, project)
+ self.assertDictEqual(project, project_ref)
self.resource_api.delete_project(project['id'])
self.assertRaises(exception.ProjectNotFound,
self.resource_api.get_project,
project['id'])
- def test_user_crud(self):
- user_dict = {
- 'domain_id': CONF.identity.default_domain_id,
- 'name': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex}
+ @mock.patch.object(versionutils, 'report_deprecated_feature')
+ def test_user_crud(self, mock_deprecator):
+ # NOTE(stevemar): As of the Mitaka release, we now check for calls that
+ # the LDAP write functionality has been deprecated.
+ user_dict = self.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
user = self.identity_api.create_user(user_dict)
- user_dict['enabled'] = True
- user_ref = self.identity_api.get_user(user['id'])
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("create_user for the LDAP identity backend", args[1])
+
del user_dict['password']
+ user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
user_dict['password'] = uuid.uuid4().hex
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
+ self.identity_api.update_user(user['id'], user_dict)
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("update_user for the LDAP identity backend", args[1])
+
del user_dict['password']
+ user_ref = self.identity_api.get_user(user['id'])
user_ref_dict = {x: user_ref[x] for x in user_ref}
self.assertDictContainsSubset(user_dict, user_ref_dict)
self.identity_api.delete_user(user['id'])
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("delete_user for the LDAP identity backend", args[1])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
user['id'])
@@ -2192,8 +2052,8 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.load_fixtures(default_fixtures)
# Create a user and ensure they are enabled.
- user1 = {'name': u'fäké1', 'enabled': True,
- 'domain_id': CONF.identity.default_domain_id}
+ user1 = unit.new_user_ref(enabled=True,
+ domain_id=CONF.identity.default_domain_id)
user_ref = self.identity_api.create_user(user1)
self.assertIs(True, user_ref['enabled'])
@@ -2208,14 +2068,12 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.load_backends()
self.load_fixtures(default_fixtures)
- user1 = {'name': u'fäké1', 'enabled': True,
- 'domain_id': CONF.identity.default_domain_id}
+ user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user2 = {'name': u'fäké2', 'enabled': False,
- 'domain_id': CONF.identity.default_domain_id}
+ user2 = self.new_user_ref(enabled=False,
+ domain_id=CONF.identity.default_domain_id)
- user3 = {'name': u'fäké3',
- 'domain_id': CONF.identity.default_domain_id}
+ user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
# Ensure that the enabled LDAP attribute is not set for a
# newly created enabled user.
@@ -2282,121 +2140,103 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
user_ref = user_api.get('123456789')
self.assertIs(False, user_ref['enabled'])
+ def test_escape_member_dn(self):
+ # The enabled member DN is properly escaped when querying for enabled
+ # user.
-class LdapIdentitySqlAssignment(BaseLDAPIdentity, unit.SQLDriverOverrides,
- unit.TestCase):
+ object_id = uuid.uuid4().hex
+ driver = self.identity_api._select_identity_driver(
+ CONF.identity.default_domain_id)
- def config_files(self):
- config_files = super(LdapIdentitySqlAssignment, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
- return config_files
+ # driver.user is the EnabledEmuMixIn implementation used for this test.
+ mixin_impl = driver.user
- def setUp(self):
- sqldb = self.useFixture(database.Database())
- super(LdapIdentitySqlAssignment, self).setUp()
- self.ldapdb.clear()
- self.load_backends()
- cache.configure_cache_region(cache.REGION)
+ # ) is a special char in a filter and must be escaped.
+ sample_dn = 'cn=foo)bar'
+ # LDAP requires ) is escaped by being replaced with "\29"
+ sample_dn_filter_esc = r'cn=foo\29bar'
- sqldb.recreate()
- self.load_fixtures(default_fixtures)
- # defaulted by the data load
- self.user_foo['enabled'] = True
- _assert_backends(self,
- assignment='sql',
- identity='ldap',
- resource='sql')
+ # Override the tree_dn, it's used to build the enabled member filter
+ mixin_impl.tree_dn = sample_dn
- def config_overrides(self):
- super(LdapIdentitySqlAssignment, self).config_overrides()
- self.config_fixture.config(group='identity', driver='ldap')
- self.config_fixture.config(group='resource', driver='sql')
- self.config_fixture.config(group='assignment', driver='sql')
+ # The filter that _get_enabled is going to build contains the
+ # tree_dn, which better be escaped in this case.
+ exp_filter = '(%s=%s=%s,%s)' % (
+ mixin_impl.member_attribute, mixin_impl.id_attr, object_id,
+ sample_dn_filter_esc)
- def test_domain_crud(self):
- pass
+ with mixin_impl.get_connection() as conn:
+ m = self.useFixture(mockpatch.PatchObject(conn, 'search_s')).mock
+ mixin_impl._get_enabled(object_id, conn)
+ # The 3rd argument is the DN.
+ self.assertEqual(exp_filter, m.call_args[0][2])
- def test_list_domains(self):
- domains = self.resource_api.list_domains()
- self.assertEqual([resource.calc_default_domain()], domains)
- def test_list_domains_non_default_domain_id(self):
- # If change the default_domain_id, the ID of the default domain
- # returned by list_domains doesn't change because the SQL identity
- # backend reads it from the database, which doesn't get updated by
- # config change.
+class LDAPPosixGroupsTest(unit.TestCase):
- orig_default_domain_id = CONF.identity.default_domain_id
+ def setUp(self):
- new_domain_id = uuid.uuid4().hex
- self.config_fixture.config(group='identity',
- default_domain_id=new_domain_id)
+ super(LDAPPosixGroupsTest, self).setUp()
- domains = self.resource_api.list_domains()
+ self.useFixture(ldapdb.LDAPDatabase())
+ self.useFixture(database.Database())
- self.assertEqual(orig_default_domain_id, domains[0]['id'])
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
- def test_create_domain(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'enabled': True}
- self.assertRaises(exception.Forbidden,
- self.resource_api.create_domain,
- domain['id'],
- domain)
+ _assert_backends(self, identity='ldap')
- def test_get_and_remove_role_grant_by_group_and_domain(self):
- # TODO(henry-nash): We should really rewrite the tests in test_backend
- # to be more flexible as to where the domains are sourced from, so
- # that we would not need to override such tests here. This is raised
- # as bug 1373865.
- new_domain = self._get_domain_fixture()
- new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex}
- new_group = self.identity_api.create_group(new_group)
- new_user = {'name': 'new_user', 'password': uuid.uuid4().hex,
- 'enabled': True, 'domain_id': new_domain['id']}
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
+ def load_fixtures(self, fixtures):
+ # Override super impl since need to create group container.
+ create_group_container(self.identity_api)
+ super(LDAPPosixGroupsTest, self).load_fixtures(fixtures)
- roles_ref = self.assignment_api.list_grants(
- group_id=new_group['id'],
- domain_id=new_domain['id'])
- self.assertEqual(0, len(roles_ref))
+ def config_overrides(self):
+ super(LDAPPosixGroupsTest, self).config_overrides()
+ self.config_fixture.config(group='identity', driver='ldap')
+ self.config_fixture.config(group='ldap', group_members_are_ids=True,
+ group_member_attribute='memberUID')
- self.assignment_api.create_grant(group_id=new_group['id'],
- domain_id=new_domain['id'],
- role_id='member')
+ def config_files(self):
+ config_files = super(LDAPPosixGroupsTest, self).config_files()
+ config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
+ return config_files
- roles_ref = self.assignment_api.list_grants(
- group_id=new_group['id'],
- domain_id=new_domain['id'])
- self.assertDictEqual(roles_ref[0], self.role_member)
+ def _get_domain_fixture(self):
+ """Domains in LDAP are read-only, so just return the static one."""
+ return self.resource_api.get_domain(CONF.identity.default_domain_id)
- self.assignment_api.delete_grant(group_id=new_group['id'],
- domain_id=new_domain['id'],
- role_id='member')
- roles_ref = self.assignment_api.list_grants(
- group_id=new_group['id'],
- domain_id=new_domain['id'])
- self.assertEqual(0, len(roles_ref))
- self.assertRaises(exception.NotFound,
- self.assignment_api.delete_grant,
- group_id=new_group['id'],
- domain_id=new_domain['id'],
- role_id='member')
+ def test_posix_member_id(self):
+ domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ # Make sure we get an empty list back on a new group, not an error.
+ user_refs = self.identity_api.list_users_in_group(new_group['id'])
+ self.assertEqual([], user_refs)
+ # Make sure we get the correct users back once they have been added
+ # to the group.
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
- def test_project_enabled_ignored_disable_error(self):
- # Override
- self.skipTest("Doesn't apply since LDAP configuration is ignored for "
- "SQL assignment backend.")
+ # NOTE(amakarov): Create the group directly using LDAP operations
+ # rather than going through the manager.
+ group_api = self.identity_api.driver.group
+ group_ref = group_api.get(new_group['id'])
+ mod = (ldap.MOD_ADD, group_api.member_attribute, new_user['id'])
+ conn = group_api.get_connection()
+ conn.modify_s(group_ref['dn'], [mod])
- def test_list_role_assignments_filtered_by_role(self):
- # Domain roles are supported by the SQL Assignment backend
- base = super(BaseLDAPIdentity, self)
- base.test_list_role_assignments_filtered_by_role()
+ # Testing the case "the group contains a user"
+ user_refs = self.identity_api.list_users_in_group(new_group['id'])
+ self.assertIn(new_user['id'], (x['id'] for x in user_refs))
+ # Testing the case "the user is a member of a group"
+ group_refs = self.identity_api.list_groups_for_user(new_user['id'])
+ self.assertIn(new_group['id'], (x['id'] for x in group_refs))
-class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
+
+class LdapIdentityWithMapping(
+ BaseLDAPIdentity, unit.SQLDriverOverrides, unit.TestCase):
"""Class to test mapping of default LDAP backend.
The default configuration is not to enable mapping when using a single
@@ -2405,8 +2245,28 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
Setting backward_compatible_ids to False will enable this mapping.
"""
+
+ def config_files(self):
+ config_files = super(LdapIdentityWithMapping, self).config_files()
+ config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf'))
+ return config_files
+
+ def setUp(self):
+ sqldb = self.useFixture(database.Database())
+ super(LdapIdentityWithMapping, self).setUp()
+ self.ldapdb.clear()
+ self.load_backends()
+ cache.configure_cache()
+
+ sqldb.recreate()
+ self.load_fixtures(default_fixtures)
+ # defaulted by the data load
+ self.user_foo['enabled'] = True
+ _assert_backends(self, identity='ldap')
+
def config_overrides(self):
- super(LdapIdentitySqlAssignmentWithMapping, self).config_overrides()
+ super(LdapIdentityWithMapping, self).config_overrides()
+ self.config_fixture.config(group='identity', driver='ldap')
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
@@ -2420,13 +2280,9 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
"""
initial_mappings = len(mapping_sql.list_id_mappings())
- user1 = {'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'password': uuid.uuid4().hex, 'enabled': True}
+ user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user1 = self.identity_api.create_user(user1)
- user2 = {'name': uuid.uuid4().hex,
- 'domain_id': CONF.identity.default_domain_id,
- 'password': uuid.uuid4().hex, 'enabled': True}
+ user2 = self.new_user_ref(domain_id=CONF.identity.default_domain_id)
user2 = self.identity_api.create_user(user2)
mappings = mapping_sql.list_id_mappings()
self.assertEqual(initial_mappings + 2, len(mappings))
@@ -2453,35 +2309,29 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
self.skipTest('N/A: We never generate the same ID for a user and '
'group in our mapping table')
+ def test_list_domains(self):
+ domains = self.resource_api.list_domains()
+ self.assertEqual([resource.calc_default_domain()], domains)
+
class BaseMultiLDAPandSQLIdentity(object):
"""Mixin class with support methods for domain-specific config testing."""
- def create_user(self, domain_id):
- user = {'name': uuid.uuid4().hex,
- 'domain_id': domain_id,
- 'password': uuid.uuid4().hex,
- 'enabled': True}
- user_ref = self.identity_api.create_user(user)
- # Put the password back in, since this is used later by tests to
- # authenticate.
- user_ref['password'] = user['password']
- return user_ref
-
def create_users_across_domains(self):
"""Create a set of users, each with a role on their own domain."""
-
# We also will check that the right number of id mappings get created
initial_mappings = len(mapping_sql.list_id_mappings())
- self.users['user0'] = self.create_user(
+ self.users['user0'] = unit.create_user(
+ self.identity_api,
self.domains['domain_default']['id'])
self.assignment_api.create_grant(
user_id=self.users['user0']['id'],
domain_id=self.domains['domain_default']['id'],
role_id=self.role_member['id'])
for x in range(1, self.domain_count):
- self.users['user%s' % x] = self.create_user(
+ self.users['user%s' % x] = unit.create_user(
+ self.identity_api,
self.domains['domain%s' % x]['id'])
self.assignment_api.create_grant(
user_id=self.users['user%s' % x]['id'],
@@ -2506,13 +2356,13 @@ class BaseMultiLDAPandSQLIdentity(object):
self.identity_api._get_domain_driver_and_entity_id(
user['id']))
- if expected_status == 200:
+ if expected_status == http_client.OK:
ref = driver.get_user(entity_id)
ref = self.identity_api._set_domain_id_and_mapping(
ref, domain_id, driver, map.EntityType.USER)
user = user.copy()
del user['password']
- self.assertDictEqual(ref, user)
+ self.assertDictEqual(user, ref)
else:
# TODO(henry-nash): Use AssertRaises here, although
# there appears to be an issue with using driver.get_user
@@ -2570,6 +2420,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
domain.
"""
+
def setUp(self):
sqldb = self.useFixture(database.Database())
super(MultiLDAPandSQLIdentity, self).setUp()
@@ -2614,11 +2465,14 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
# Create some identity entities BEFORE we switch to multi-backend, so
# we can test that these are still accessible
self.users = {}
- self.users['userA'] = self.create_user(
+ self.users['userA'] = unit.create_user(
+ self.identity_api,
self.domains['domain_default']['id'])
- self.users['userB'] = self.create_user(
+ self.users['userB'] = unit.create_user(
+ self.identity_api,
self.domains['domain1']['id'])
- self.users['userC'] = self.create_user(
+ self.users['userC'] = unit.create_user(
+ self.identity_api,
self.domains['domain3']['id'])
def enable_multi_domain(self):
@@ -2631,7 +2485,8 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
"""
self.config_fixture.config(
group='identity', domain_specific_drivers_enabled=True,
- domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap')
+ domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap',
+ list_limit=1000)
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
@@ -2640,14 +2495,6 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
# if no specific config defined for this domain
return self.identity_api.domain_configs.get_domain_conf(domain_id)
- def test_list_domains(self):
- self.skipTest(
- 'N/A: Not relevant for multi ldap testing')
-
- def test_list_domains_non_default_domain_id(self):
- self.skipTest(
- 'N/A: Not relevant for multi ldap testing')
-
def test_list_users(self):
# Override the standard list users, since we have added an extra user
# to the default domain, so the number of expected users is one more
@@ -2664,6 +2511,36 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
+ @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
+ def test_list_limit_domain_specific_inheritance(self, ldap_get_all):
+ # passiging hints is important, because if it's not passed, limiting
+ # is considered be disabled
+ hints = driver_hints.Hints()
+ self.identity_api.list_users(
+ domain_scope=self.domains['domain2']['id'],
+ hints=hints)
+ # since list_limit is not specified in keystone.domain2.conf, it should
+ # take the default, which is 1000
+ self.assertTrue(ldap_get_all.called)
+ args, kwargs = ldap_get_all.call_args
+ hints = args[0]
+ self.assertEqual(1000, hints.limit['limit'])
+
+ @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all')
+ def test_list_limit_domain_specific_override(self, ldap_get_all):
+ # passiging hints is important, because if it's not passed, limiting
+ # is considered to be disabled
+ hints = driver_hints.Hints()
+ self.identity_api.list_users(
+ domain_scope=self.domains['domain1']['id'],
+ hints=hints)
+ # this should have the list_limit set in Keystone.domain1.conf, which
+ # is 101
+ self.assertTrue(ldap_get_all.called)
+ args, kwargs = ldap_get_all.call_args
+ hints = args[0]
+ self.assertEqual(101, hints.limit['limit'])
+
def test_domain_segregation(self):
"""Test that separate configs have segregated the domain.
@@ -2680,21 +2557,23 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
check_user = self.check_user
check_user(self.users['user0'],
- self.domains['domain_default']['id'], 200)
+ self.domains['domain_default']['id'], http_client.OK)
for domain in [self.domains['domain1']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user0'], domain, exception.UserNotFound)
- check_user(self.users['user1'], self.domains['domain1']['id'], 200)
+ check_user(self.users['user1'], self.domains['domain1']['id'],
+ http_client.OK)
for domain in [self.domains['domain_default']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user1'], domain, exception.UserNotFound)
- check_user(self.users['user2'], self.domains['domain2']['id'], 200)
+ check_user(self.users['user2'], self.domains['domain2']['id'],
+ http_client.OK)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
self.domains['domain3']['id'],
@@ -2704,10 +2583,14 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
# domain3 and domain4 share the same backend, so you should be
# able to see user3 and user4 from either.
- check_user(self.users['user3'], self.domains['domain3']['id'], 200)
- check_user(self.users['user3'], self.domains['domain4']['id'], 200)
- check_user(self.users['user4'], self.domains['domain3']['id'], 200)
- check_user(self.users['user4'], self.domains['domain4']['id'], 200)
+ check_user(self.users['user3'], self.domains['domain3']['id'],
+ http_client.OK)
+ check_user(self.users['user3'], self.domains['domain4']['id'],
+ http_client.OK)
+ check_user(self.users['user4'], self.domains['domain3']['id'],
+ http_client.OK)
+ check_user(self.users['user4'], self.domains['domain4']['id'],
+ http_client.OK)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
@@ -2789,19 +2672,12 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
self.assertEqual('fake://memory1', conf.ldap.url)
def test_delete_domain_with_user_added(self):
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'enabled': True}
- project = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- 'domain_id': domain['id'],
- 'description': uuid.uuid4().hex,
- 'parent_id': None,
- 'enabled': True,
- 'is_domain': False}
+ domain = unit.new_domain_ref()
+ project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_domain(domain['id'], domain)
- self.resource_api.create_project(project['id'], project)
+ project = self.resource_api.create_project(project['id'], project)
project_ref = self.resource_api.get_project(project['id'])
- self.assertDictEqual(project_ref, project)
+ self.assertDictEqual(project, project_ref)
self.assignment_api.create_grant(user_id=self.user_foo['id'],
project_id=project['id'],
@@ -2839,13 +2715,37 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
def test_list_role_assignment_by_domain(self):
# With multi LDAP this method should work, so override the override
# from BaseLDAPIdentity
- super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain
+ super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain()
def test_list_role_assignment_by_user_with_domain_group_roles(self):
# With multi LDAP this method should work, so override the override
# from BaseLDAPIdentity
super(BaseLDAPIdentity, self).\
- test_list_role_assignment_by_user_with_domain_group_roles
+ test_list_role_assignment_by_user_with_domain_group_roles()
+
+ def test_list_role_assignment_using_sourced_groups_with_domains(self):
+ # With SQL Assignment this method should work, so override the override
+ # from BaseLDAPIdentity
+ base = super(BaseLDAPIdentity, self)
+ base.test_list_role_assignment_using_sourced_groups_with_domains()
+
+ def test_create_project_with_domain_id_and_without_parent_id(self):
+ # With multi LDAP this method should work, so override the override
+ # from BaseLDAPIdentity
+ super(BaseLDAPIdentity, self).\
+ test_create_project_with_domain_id_and_without_parent_id()
+
+ def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
+ # With multi LDAP this method should work, so override the override
+ # from BaseLDAPIdentity
+ super(BaseLDAPIdentity, self).\
+ test_create_project_with_domain_id_mismatch_to_parent_domain()
+
+ def test_remove_foreign_assignments_when_deleting_a_domain(self):
+ # With multi LDAP this method should work, so override the override
+ # from BaseLDAPIdentity
+ base = super(BaseLDAPIdentity, self)
+ base.test_remove_foreign_assignments_when_deleting_a_domain()
class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
@@ -2870,7 +2770,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def enable_multi_domain(self):
# The values below are the same as in the domain_configs_multi_ldap
- # cdirectory of test config_files.
+ # directory of test config_files.
default_config = {
'ldap': {'url': 'fake://memory',
'user': 'cn=Admin',
@@ -2883,7 +2783,8 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
'user': 'cn=Admin',
'password': 'password',
'suffix': 'cn=example,cn=com'},
- 'identity': {'driver': 'ldap'}
+ 'identity': {'driver': 'ldap',
+ 'list_limit': '101'}
}
domain2_config = {
'ldap': {'url': 'fake://memory',
@@ -2904,7 +2805,8 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
self.config_fixture.config(
group='identity', domain_specific_drivers_enabled=True,
- domain_configurations_from_database=True)
+ domain_configurations_from_database=True,
+ list_limit=1000)
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
@@ -2933,7 +2835,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def test_reloading_domain_config(self):
"""Ensure domain drivers are reloaded on a config modification."""
-
domain_cfgs = self.identity_api.domain_configs
# Create a new config for the default domain, hence overwriting the
@@ -2965,7 +2866,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def test_setting_multiple_sql_driver_raises_exception(self):
"""Ensure setting multiple domain specific sql drivers is prevented."""
-
new_config = {'identity': {'driver': 'sql'}}
self.domain_config_api.create_config(
CONF.identity.default_domain_id, new_config)
@@ -2979,7 +2879,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def test_same_domain_gets_sql_driver(self):
"""Ensure we can set an SQL driver if we have had it before."""
-
new_config = {'identity': {'driver': 'sql'}}
self.domain_config_api.create_config(
CONF.identity.default_domain_id, new_config)
@@ -2997,8 +2896,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def test_delete_domain_clears_sql_registration(self):
"""Ensure registration is deleted when a domain is deleted."""
-
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain = unit.new_domain_ref()
domain = self.resource_api.create_domain(domain['id'], domain)
new_config = {'identity': {'driver': 'sql'}}
self.domain_config_api.create_config(domain['id'], new_config)
@@ -3025,8 +2923,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
def test_orphaned_registration_does_not_prevent_getting_sql_driver(self):
"""Ensure we self heal an orphaned sql registration."""
-
- domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain = unit.new_domain_ref()
domain = self.resource_api.create_domain(domain['id'], domain)
new_config = {'identity': {'driver': 'sql'}}
self.domain_config_api.create_config(domain['id'], new_config)
@@ -3047,7 +2944,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity):
# should still be able to set another domain to SQL, since we should
# self heal this issue.
- self.resource_api.driver.delete_domain(domain['id'])
+ self.resource_api.driver.delete_project(domain['id'])
# Invalidate cache (so we will see the domain has gone)
self.resource_api.get_domain.invalidate(
self.resource_api, domain['id'])
@@ -3072,6 +2969,7 @@ class DomainSpecificLDAPandSQLIdentity(
Although the default driver still exists, we don't use it.
"""
+
def setUp(self):
sqldb = self.useFixture(database.Database())
super(DomainSpecificLDAPandSQLIdentity, self).setUp()
@@ -3133,6 +3031,17 @@ class DomainSpecificLDAPandSQLIdentity(
self.skipTest(
'N/A: Not relevant for multi ldap testing')
+ def test_not_delete_domain_with_enabled_subdomains(self):
+ self.skipTest(
+ 'N/A: Not relevant for multi ldap testing')
+
+ def test_delete_domain(self):
+ # With this restricted multi LDAP class, tests that use multiple
+ # domains and identity, are still not supported
+ self.assertRaises(
+ exception.DomainNotFound,
+ super(BaseLDAPIdentity, self).test_delete_domain_with_project_api)
+
def test_list_users(self):
# Override the standard list users, since we have added an extra user
# to the default domain, so the number of expected users is one more
@@ -3164,12 +3073,12 @@ class DomainSpecificLDAPandSQLIdentity(
# driver, but won't find it via any other domain driver
self.check_user(self.users['user0'],
- self.domains['domain_default']['id'], 200)
+ self.domains['domain_default']['id'], http_client.OK)
self.check_user(self.users['user0'],
self.domains['domain1']['id'], exception.UserNotFound)
self.check_user(self.users['user1'],
- self.domains['domain1']['id'], 200)
+ self.domains['domain1']['id'], http_client.OK)
self.check_user(self.users['user1'],
self.domains['domain_default']['id'],
exception.UserNotFound)
@@ -3182,10 +3091,10 @@ class DomainSpecificLDAPandSQLIdentity(
domain_scope=self.domains['domain1']['id']),
matchers.HasLength(1))
- def test_add_role_grant_to_user_and_project_404(self):
+ def test_add_role_grant_to_user_and_project_returns_not_found(self):
self.skipTest('Blocked by bug 1101287')
- def test_get_role_grants_for_user_and_project_404(self):
+ def test_get_role_grants_for_user_and_project_returns_not_found(self):
self.skipTest('Blocked by bug 1101287')
def test_list_projects_for_user_with_grants(self):
@@ -3223,6 +3132,25 @@ class DomainSpecificLDAPandSQLIdentity(
base = super(BaseLDAPIdentity, self)
base.test_list_role_assignments_filtered_by_role()
+ def test_delete_domain_with_project_api(self):
+ # With this restricted multi LDAP class, tests that use multiple
+ # domains and identity, are still not supported
+ self.assertRaises(
+ exception.DomainNotFound,
+ super(BaseLDAPIdentity, self).test_delete_domain_with_project_api)
+
+ def test_create_project_with_domain_id_and_without_parent_id(self):
+ # With restricted multi LDAP, tests that don't use identity, but do
+ # required aditional domains will work
+ base = super(BaseLDAPIdentity, self)
+ base.test_create_project_with_domain_id_and_without_parent_id()
+
+ def test_create_project_with_domain_id_mismatch_to_parent_domain(self):
+ # With restricted multi LDAP, tests that don't use identity, but do
+ # required aditional domains will work
+ base = super(BaseLDAPIdentity, self)
+ base.test_create_project_with_domain_id_mismatch_to_parent_domain()
+
class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
"""Class to test simplest use of domain-specific SQL driver.
@@ -3236,6 +3164,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
- A separate SQL backend for domain1
"""
+
def initial_setup(self, sqldb):
# We aren't setting up any initial data ahead of switching to
# domain-specific operation, so make the switch straight away.
@@ -3323,7 +3252,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
'domain2')
-class LdapFilterTests(test_backend.FilterTests, unit.TestCase):
+class LdapFilterTests(identity_tests.FilterTests, unit.TestCase):
def setUp(self):
super(LdapFilterTests, self).setUp()
@@ -3333,7 +3262,7 @@ class LdapFilterTests(test_backend.FilterTests, unit.TestCase):
self.load_backends()
self.load_fixtures(default_fixtures)
sqldb.recreate()
- _assert_backends(self, assignment='ldap', identity='ldap')
+ _assert_backends(self, identity='ldap')
def config_overrides(self):
super(LdapFilterTests, self).config_overrides()
@@ -3344,13 +3273,15 @@ class LdapFilterTests(test_backend.FilterTests, unit.TestCase):
config_files.append(unit.dirs.tests_conf('backend_ldap.conf'))
return config_files
- def test_list_users_in_group_filtered(self):
+ @wip('Not supported by LDAP identity driver')
+ def test_list_users_in_group_inexact_filtered(self):
+ # The LDAP identity driver currently does not support filtering on the
+ # listing users for a given group, so will fail this test.
+ super(LdapFilterTests,
+ self).test_list_users_in_group_inexact_filtered()
+
+ @wip('Not supported by LDAP identity driver')
+ def test_list_users_in_group_exact_filtered(self):
# The LDAP identity driver currently does not support filtering on the
# listing users for a given group, so will fail this test.
- try:
- super(LdapFilterTests, self).test_list_users_in_group_filtered()
- except matchers.MismatchError:
- return
- # We shouldn't get here...if we do, it means someone has implemented
- # filtering, so we can remove this test override.
- self.assertTrue(False)
+ super(LdapFilterTests, self).test_list_users_in_group_exact_filtered()