diff options
author | Ruan HE <ruan.he@orange.com> | 2016-06-09 08:12:34 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@172.30.200.206> | 2016-06-09 08:12:34 +0000 |
commit | 4bc079a2664f9a407e332291f34d174625a9d5ea (patch) | |
tree | 7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_backend_ldap.py | |
parent | 2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff) | |
parent | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff) |
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_backend_ldap.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_backend_ldap.py | 1285 |
1 files changed, 608 insertions, 677 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py index d96ec376..cf618633 100644 --- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py +++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py @@ -20,11 +20,15 @@ import uuid import ldap import mock from oslo_config import cfg +from oslo_log import versionutils +from oslotest import mockpatch import pkg_resources +from six.moves import http_client from six.moves import range from testtools import matchers from keystone.common import cache +from keystone.common import driver_hints from keystone.common import ldap as common_ldap from keystone.common.ldap import core as common_ldap_core from keystone import exception @@ -32,11 +36,14 @@ from keystone import identity from keystone.identity.mapping_backends import mapping as map from keystone import resource from keystone.tests import unit +from keystone.tests.unit.assignment import test_backends as assignment_tests from keystone.tests.unit import default_fixtures +from keystone.tests.unit.identity import test_backends as identity_tests from keystone.tests.unit import identity_mapping as mapping_sql from keystone.tests.unit.ksfixtures import database from keystone.tests.unit.ksfixtures import ldapdb -from keystone.tests.unit import test_backend +from keystone.tests.unit.resource import test_backends as resource_tests +from keystone.tests.unit.utils import wip CONF = cfg.CONF @@ -115,7 +122,9 @@ def create_group_container(identity_api): ('ou', ['Groups'])]) -class BaseLDAPIdentity(test_backend.IdentityTests): +class BaseLDAPIdentity(identity_tests.IdentityTests, + assignment_tests.AssignmentTests, + resource_tests.ResourceTests): def setUp(self): super(BaseLDAPIdentity, self).setUp() @@ -123,6 +132,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.load_backends() self.load_fixtures(default_fixtures) + self.config_fixture.config(group='os_inherit', enabled=False) def _get_domain_fixture(self): """Domains in LDAP are read-only, so just return the static one.""" @@ -141,6 +151,13 @@ class BaseLDAPIdentity(test_backend.IdentityTests): config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) return config_files + def new_user_ref(self, domain_id, project_id=None, **kwargs): + ref = unit.new_user_ref(domain_id=domain_id, project_id=project_id, + **kwargs) + if 'id' not in kwargs: + del ref['id'] + return ref + def get_user_enabled_vals(self, user): user_dn = ( self.identity_api.driver.user._id_to_dn_string(user['id'])) @@ -156,17 +173,13 @@ class BaseLDAPIdentity(test_backend.IdentityTests): return None def test_build_tree(self): - """Regression test for building the tree names - """ + """Regression test for building the tree names.""" user_api = identity.backends.ldap.UserApi(CONF) self.assertTrue(user_api) self.assertEqual("ou=Users,%s" % CONF.ldap.suffix, user_api.tree_dn) def test_configurable_allowed_user_actions(self): - user = {'name': u'fäké1', - 'password': u'fäképass1', - 'domain_id': CONF.identity.default_domain_id, - 'tenants': ['bar']} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) self.identity_api.get_user(user['id']) @@ -185,10 +198,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): driver.user.allow_update = False driver.user.allow_delete = False - user = {'name': u'fäké1', - 'password': u'fäképass1', - 'domain_id': CONF.identity.default_domain_id, - 'tenants': ['bar']} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) self.assertRaises(exception.ForbiddenAction, self.identity_api.create_user, user) @@ -215,7 +225,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_user_filter(self): user_ref = self.identity_api.get_user(self.user_foo['id']) self.user_foo.pop('password') - self.assertDictEqual(user_ref, self.user_foo) + self.assertDictEqual(self.user_foo, user_ref) driver = self.identity_api._select_identity_driver( user_ref['domain_id']) @@ -227,6 +237,20 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.identity_api.get_user, self.user_foo['id']) + def test_list_users_by_name_and_with_filter(self): + # confirm that the user is not exposed when it does not match the + # filter setting in conf even if it is requested by name in user list + hints = driver_hints.Hints() + hints.add_filter('name', self.user_foo['name']) + domain_id = self.user_foo['domain_id'] + driver = self.identity_api._select_identity_driver(domain_id) + driver.user.ldap_filter = ('(|(cn=%s)(cn=%s))' % + (self.user_sna['id'], self.user_two['id'])) + users = self.identity_api.list_users( + domain_scope=self._set_domain_scope(domain_id), + hints=hints) + self.assertEqual(0, len(users)) + def test_remove_role_grant_from_user_and_project(self): self.assignment_api.create_grant(user_id=self.user_foo['id'], project_id=self.tenant_baz['id'], @@ -234,7 +258,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): roles_ref = self.assignment_api.list_grants( user_id=self.user_foo['id'], project_id=self.tenant_baz['id']) - self.assertDictEqual(roles_ref[0], self.role_member) + self.assertDictEqual(self.role_member, roles_ref[0]) self.assignment_api.delete_grant(user_id=self.user_foo['id'], project_id=self.tenant_baz['id'], @@ -251,11 +275,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_get_and_remove_role_grant_by_group_and_project(self): new_domain = self._get_domain_fixture() - new_group = {'domain_id': new_domain['id'], - 'name': uuid.uuid4().hex} + new_group = unit.new_group_ref(domain_id=new_domain['id']) new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'enabled': True, - 'domain_id': new_domain['id']} + new_user = self.new_user_ref(domain_id=new_domain['id']) new_user = self.identity_api.create_user(new_user) self.identity_api.add_user_to_group(new_user['id'], new_group['id']) @@ -273,7 +295,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): group_id=new_group['id'], project_id=self.tenant_bar['id']) self.assertNotEmpty(roles_ref) - self.assertDictEqual(roles_ref[0], self.role_member) + self.assertDictEqual(self.role_member, roles_ref[0]) self.assignment_api.delete_grant(group_id=new_group['id'], project_id=self.tenant_bar['id'], @@ -289,7 +311,44 @@ class BaseLDAPIdentity(test_backend.IdentityTests): role_id='member') def test_get_and_remove_role_grant_by_group_and_domain(self): - self.skipTest('N/A: LDAP does not support multiple domains') + # TODO(henry-nash): We should really rewrite the tests in + # unit.resource.test_backends to be more flexible as to where the + # domains are sourced from, so that we would not need to override such + # tests here. This is raised as bug 1373865. + new_domain = self._get_domain_fixture() + new_group = unit.new_group_ref(domain_id=new_domain['id'],) + new_group = self.identity_api.create_group(new_group) + new_user = self.new_user_ref(domain_id=new_domain['id']) + new_user = self.identity_api.create_user(new_user) + self.identity_api.add_user_to_group(new_user['id'], + new_group['id']) + + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + + self.assignment_api.create_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertDictEqual(self.role_member, roles_ref[0]) + + self.assignment_api.delete_grant(group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') + roles_ref = self.assignment_api.list_grants( + group_id=new_group['id'], + domain_id=new_domain['id']) + self.assertEqual(0, len(roles_ref)) + self.assertRaises(exception.NotFound, + self.assignment_api.delete_grant, + group_id=new_group['id'], + domain_id=new_domain['id'], + role_id='member') def test_get_role_assignment_by_domain_not_found(self): self.skipTest('N/A: LDAP does not support multiple domains') @@ -327,10 +386,12 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_delete_group_with_user_project_domain_links(self): self.skipTest('N/A: LDAP does not support multiple domains') + def test_list_role_assignment_containing_names(self): + self.skipTest('N/A: LDAP does not support multiple domains') + def test_list_projects_for_user(self): domain = self._get_domain_fixture() - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} + user1 = self.new_user_ref(domain_id=domain['id']) user1 = self.identity_api.create_user(user1) user_projects = self.assignment_api.list_projects_for_user(user1['id']) self.assertThat(user_projects, matchers.HasLength(0)) @@ -347,11 +408,10 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertThat(user_projects, matchers.HasLength(2)) # Now, check number of projects through groups - user2 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} + user2 = self.new_user_ref(domain_id=domain['id']) user2 = self.identity_api.create_user(user2) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) self.identity_api.add_user_to_group(user2['id'], group1['id']) @@ -377,12 +437,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_projects_for_user_and_groups(self): domain = self._get_domain_fixture() # Create user1 - user1 = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'domain_id': domain['id'], 'enabled': True} + user1 = self.new_user_ref(domain_id=domain['id']) user1 = self.identity_api.create_user(user1) # Create new group for user1 - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) # Add user1 to group1 @@ -412,20 +471,17 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_projects_for_user_with_grants(self): domain = self._get_domain_fixture() - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': domain['id']} + new_user = self.new_user_ref(domain_id=domain['id']) new_user = self.identity_api.create_user(new_user) - group1 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group1 = unit.new_group_ref(domain_id=domain['id']) group1 = self.identity_api.create_group(group1) - group2 = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group2 = unit.new_group_ref(domain_id=domain['id']) group2 = self.identity_api.create_group(group2) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} + project1 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project1['id'], project1) - project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain['id']} + project2 = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project2['id'], project2) self.identity_api.add_user_to_group(new_user['id'], @@ -496,14 +552,11 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_role_assignments_unfiltered(self): new_domain = self._get_domain_fixture() - new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} + new_user = self.new_user_ref(domain_id=new_domain['id']) new_user = self.identity_api.create_user(new_user) - new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} + new_group = unit.new_group_ref(domain_id=new_domain['id']) new_group = self.identity_api.create_group(new_group) - new_project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': new_domain['id']} + new_project = unit.new_project_ref(domain_id=new_domain['id']) self.resource_api.create_project(new_project['id'], new_project) # First check how many role grant already exist @@ -520,13 +573,6 @@ class BaseLDAPIdentity(test_backend.IdentityTests): after_assignments = len(self.assignment_api.list_role_assignments()) self.assertEqual(existing_assignments + 2, after_assignments) - def test_list_role_assignments_filtered_by_role(self): - # Domain roles are not supported by the LDAP Assignment backend - self.assertRaises( - exception.NotImplemented, - super(BaseLDAPIdentity, self). - test_list_role_assignments_filtered_by_role) - def test_list_role_assignments_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) self.ldapdb.clear() @@ -534,12 +580,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.load_fixtures(default_fixtures) new_domain = self._get_domain_fixture() - new_user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} + new_user = self.new_user_ref(domain_id=new_domain['id']) new_user = self.identity_api.create_user(new_user) - new_project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': new_domain['id']} + new_project = unit.new_project_ref(domain_id=new_domain['id']) self.resource_api.create_project(new_project['id'], new_project) self.assignment_api.create_grant(user_id=new_user['id'], project_id=new_project['id'], @@ -558,8 +601,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.load_backends() self.load_fixtures(default_fixtures) - user = {'name': uuid.uuid4().hex, 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': test_backend.DEFAULT_DOMAIN_ID} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) self.assignment_api.add_user_to_project(self.tenant_baz['id'], @@ -582,10 +624,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests): are returned. """ - # Create a group - group = dict(name=uuid.uuid4().hex, - domain_id=CONF.identity.default_domain_id) + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group_id = self.identity_api.create_group(group)['id'] # Create a couple of users and add them to the group. @@ -617,10 +657,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_list_group_members_when_no_members(self): # List group members when there is no member in the group. # No exception should be raised. - group = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex} + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group = self.identity_api.create_group(group) # If this doesn't raise, then the test is successful. @@ -633,8 +670,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.load_fixtures(default_fixtures) # Create a group - group = dict(name=uuid.uuid4().hex, - domain_id=CONF.identity.default_domain_id) + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group_id = self.identity_api.create_group(group)['id'] # Create a user @@ -651,30 +687,23 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertNotIn(dumb_id, user_ids) def test_list_domains(self): + # We have more domains here than the parent class, check for the + # correct number of domains for the multildap backend configs + domain1 = unit.new_domain_ref() + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + self.resource_api.create_domain(domain2['id'], domain2) domains = self.resource_api.list_domains() - self.assertEqual( - [resource.calc_default_domain()], - domains) - - def test_list_domains_non_default_domain_id(self): - # If change the default_domain_id, the ID of the default domain - # returned by list_domains changes is the new default_domain_id. - - new_domain_id = uuid.uuid4().hex - self.config_fixture.config(group='identity', - default_domain_id=new_domain_id) - - domains = self.resource_api.list_domains() - - self.assertEqual(new_domain_id, domains[0]['id']) + self.assertEqual(7, len(domains)) + domain_ids = [] + for domain in domains: + domain_ids.append(domain.get('id')) + self.assertIn(CONF.identity.default_domain_id, domain_ids) + self.assertIn(domain1['id'], domain_ids) + self.assertIn(domain2['id'], domain_ids) def test_authenticate_requires_simple_bind(self): - user = { - 'name': 'NO_META', - 'domain_id': test_backend.DEFAULT_DOMAIN_ID, - 'password': 'no_meta2', - 'enabled': True, - } + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) self.assignment_api.add_user_to_project(self.tenant_baz['id'], user['id']) @@ -689,34 +718,54 @@ class BaseLDAPIdentity(test_backend.IdentityTests): user_id=user['id'], password=None) - # (spzala)The group and domain crud tests below override the standard ones - # in test_backend.py so that we can exclude the update name test, since we - # do not yet support the update of either group or domain names with LDAP. - # In the tests below, the update is demonstrated by updating description. - # Refer to bug 1136403 for more detail. - def test_group_crud(self): - group = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex, - 'description': uuid.uuid4().hex} + # The group and domain CRUD tests below override the standard ones in + # unit.identity.test_backends.py so that we can exclude the update name + # test, since we do not (and will not) support the update of either group + # or domain names with LDAP. In the tests below, the update is tested by + # updating description. + @mock.patch.object(versionutils, 'report_deprecated_feature') + def test_group_crud(self, mock_deprecator): + # NOTE(stevemar): As of the Mitaka release, we now check for calls that + # the LDAP write functionality has been deprecated. + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group = self.identity_api.create_group(group) + args, _kwargs = mock_deprecator.call_args + self.assertIn("create_group for the LDAP identity backend", args[1]) + group_ref = self.identity_api.get_group(group['id']) - self.assertDictEqual(group_ref, group) + self.assertDictEqual(group, group_ref) group['description'] = uuid.uuid4().hex self.identity_api.update_group(group['id'], group) + args, _kwargs = mock_deprecator.call_args + self.assertIn("update_group for the LDAP identity backend", args[1]) + group_ref = self.identity_api.get_group(group['id']) - self.assertDictEqual(group_ref, group) + self.assertDictEqual(group, group_ref) self.identity_api.delete_group(group['id']) + args, _kwargs = mock_deprecator.call_args + self.assertIn("delete_group for the LDAP identity backend", args[1]) self.assertRaises(exception.GroupNotFound, self.identity_api.get_group, group['id']) + @mock.patch.object(versionutils, 'report_deprecated_feature') + def test_add_remove_user_group_deprecated(self, mock_deprecator): + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group = self.identity_api.create_group(group) + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = self.identity_api.create_user(user) + self.identity_api.add_user_to_group(user['id'], group['id']) + args, _kwargs = mock_deprecator.call_args + self.assertIn("add_user_to_group for the LDAP identity", args[1]) + + self.identity_api.remove_user_from_group(user['id'], group['id']) + args, _kwargs = mock_deprecator.call_args + self.assertIn("remove_user_from_group for the LDAP identity", args[1]) + @unit.skip_if_cache_disabled('identity') def test_cache_layer_group_crud(self): - group = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex} + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group = self.identity_api.create_group(group) # cache the result group_ref = self.identity_api.get_group(group['id']) @@ -731,9 +780,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertRaises(exception.GroupNotFound, self.identity_api.get_group, group['id']) - group = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex} + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) group = self.identity_api.create_group(group) # cache the result self.identity_api.get_group(group['id']) @@ -749,11 +796,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests): CONF.identity.default_domain_id) driver.user.attribute_ignore = ['enabled', 'email', 'tenants', 'tenantId'] - user = {'name': u'fäké1', - 'password': u'fäképass1', - 'domain_id': CONF.identity.default_domain_id, - 'default_project_id': 'maps_to_none', - } + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id, + project_id='maps_to_none') # If this doesn't raise, then the test is successful. user = self.identity_api.create_user(user) @@ -765,9 +809,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests): boolean_strings = ['TRUE', 'FALSE', 'true', 'false', 'True', 'False', 'TrUe' 'FaLse'] for name in boolean_strings: - user = { - 'name': name, - 'domain_id': CONF.identity.default_domain_id} + user = self.new_user_ref(name=name, + domain_id=CONF.identity.default_domain_id) user_ref = self.identity_api.create_user(user) user_info = self.identity_api.get_user(user_ref['id']) self.assertEqual(name, user_info['name']) @@ -786,10 +829,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): driver.user.attribute_ignore = ['enabled', 'email', 'tenants', 'tenantId'] - user = {'name': u'fäké1', - 'password': u'fäképass1', - 'domain_id': CONF.identity.default_domain_id, - } + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user_ref = self.identity_api.create_user(user) @@ -818,19 +858,14 @@ class BaseLDAPIdentity(test_backend.IdentityTests): def test_user_id_comma(self): """Even if the user has a , in their ID, groups can be listed.""" - # Create a user with a , in their ID # NOTE(blk-u): the DN for this user is hard-coded in fakeldap! # Since we want to fake up this special ID, we'll squirt this # direct into the driver and bypass the manager layer. user_id = u'Doe, John' - user = { - 'id': user_id, - 'name': self.getUniqueString(), - 'password': self.getUniqueString(), - 'domain_id': CONF.identity.default_domain_id, - } + user = self.new_user_ref(id=user_id, + domain_id=CONF.identity.default_domain_id) user = self.identity_api.driver.create_user(user_id, user) # Now we'll use the manager to discover it, which will create a @@ -843,13 +878,8 @@ class BaseLDAPIdentity(test_backend.IdentityTests): break # Create a group - group_id = uuid.uuid4().hex - group = { - 'id': group_id, - 'name': self.getUniqueString(prefix='tuidc'), - 'description': self.getUniqueString(), - 'domain_id': CONF.identity.default_domain_id, - } + group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + group_id = group['id'] group = self.identity_api.driver.create_group(group_id, group) # Now we'll use the manager to discover it, which will create a # Public ID for it. @@ -870,21 +900,15 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertThat(ref_list, matchers.Equals([group])) def test_user_id_comma_grants(self): - """Even if the user has a , in their ID, can get user and group grants. - """ - + """List user and group grants, even with a comma in the user's ID.""" # Create a user with a , in their ID # NOTE(blk-u): the DN for this user is hard-coded in fakeldap! # Since we want to fake up this special ID, we'll squirt this # direct into the driver and bypass the manager layer user_id = u'Doe, John' - user = { - 'id': user_id, - 'name': self.getUniqueString(), - 'password': self.getUniqueString(), - 'domain_id': CONF.identity.default_domain_id, - } + user = self.new_user_ref(id=user_id, + domain_id=CONF.identity.default_domain_id) self.identity_api.driver.create_user(user_id, user) # Now we'll use the manager to discover it, which will create a @@ -943,8 +967,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): # There's no group fixture so create a group. new_domain = self._get_domain_fixture() - new_group = {'domain_id': new_domain['id'], - 'name': uuid.uuid4().hex} + new_group = unit.new_group_ref(domain_id=new_domain['id']) new_group = self.identity_api.create_group(new_group) # Attempt to disable the group. @@ -959,39 +982,55 @@ class BaseLDAPIdentity(test_backend.IdentityTests): self.assertNotIn('enabled', group_info) def test_project_enabled_ignored_disable_error(self): - # When the server is configured so that the enabled attribute is - # ignored for projects, projects cannot be disabled. - - self.config_fixture.config(group='ldap', - project_attribute_ignore=['enabled']) - - # Need to re-load backends for the config change to take effect. - self.load_backends() - - # Attempt to disable the project. - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - self.tenant_baz['id'], {'enabled': False}) - - project_info = self.resource_api.get_project(self.tenant_baz['id']) - - # Unlike other entities, if 'enabled' is ignored then 'enabled' is - # returned as part of the ref. - self.assertIs(True, project_info['enabled']) + self.skipTest('Resource LDAP has been removed') def test_list_role_assignment_by_domain(self): """Multiple domain assignments are not supported.""" self.assertRaises( - (exception.Forbidden, exception.DomainNotFound), + (exception.Forbidden, exception.DomainNotFound, + exception.ValidationError), super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain) def test_list_role_assignment_by_user_with_domain_group_roles(self): """Multiple domain assignments are not supported.""" self.assertRaises( - (exception.Forbidden, exception.DomainNotFound), + (exception.Forbidden, exception.DomainNotFound, + exception.ValidationError), super(BaseLDAPIdentity, self). test_list_role_assignment_by_user_with_domain_group_roles) + def test_domain_crud(self): + self.skipTest('Resource LDAP has been removed') + + def test_list_role_assignment_using_sourced_groups_with_domains(self): + """Multiple domain assignments are not supported.""" + self.assertRaises( + (exception.Forbidden, exception.ValidationError, + exception.DomainNotFound), + super(BaseLDAPIdentity, self). + test_list_role_assignment_using_sourced_groups_with_domains) + + def test_create_project_with_domain_id_and_without_parent_id(self): + """Multiple domains are not supported.""" + self.assertRaises( + exception.ValidationError, + super(BaseLDAPIdentity, self). + test_create_project_with_domain_id_and_without_parent_id) + + def test_create_project_with_domain_id_mismatch_to_parent_domain(self): + """Multiple domains are not supported.""" + self.assertRaises( + exception.ValidationError, + super(BaseLDAPIdentity, self). + test_create_project_with_domain_id_mismatch_to_parent_domain) + + def test_remove_foreign_assignments_when_deleting_a_domain(self): + """Multiple domains are not supported.""" + self.assertRaises( + (exception.ValidationError, exception.DomainNotFound), + super(BaseLDAPIdentity, + self).test_remove_foreign_assignments_when_deleting_a_domain) + class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): @@ -1002,46 +1041,46 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.useFixture(database.Database()) super(LDAPIdentity, self).setUp() _assert_backends(self, - assignment='ldap', + assignment='sql', identity='ldap', - resource='ldap') + resource='sql') def load_fixtures(self, fixtures): # Override super impl since need to create group container. create_group_container(self.identity_api) super(LDAPIdentity, self).load_fixtures(fixtures) + def test_list_domains(self): + domains = self.resource_api.list_domains() + self.assertEqual([resource.calc_default_domain()], domains) + def test_configurable_allowed_project_actions(self): domain = self._get_domain_fixture() - tenant = {'id': u'fäké1', 'name': u'fäké1', 'enabled': True, - 'domain_id': domain['id']} - self.resource_api.create_project(u'fäké1', tenant) - tenant_ref = self.resource_api.get_project(u'fäké1') - self.assertEqual(u'fäké1', tenant_ref['id']) + project = unit.new_project_ref(domain_id=domain['id']) + project = self.resource_api.create_project(project['id'], project) + project_ref = self.resource_api.get_project(project['id']) + self.assertEqual(project['id'], project_ref['id']) - tenant['enabled'] = False - self.resource_api.update_project(u'fäké1', tenant) + project['enabled'] = False + self.resource_api.update_project(project['id'], project) - self.resource_api.delete_project(u'fäké1') + self.resource_api.delete_project(project['id']) self.assertRaises(exception.ProjectNotFound, self.resource_api.get_project, - u'fäké1') + project['id']) def test_configurable_subtree_delete(self): self.config_fixture.config(group='ldap', allow_subtree_delete=True) self.load_backends() - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id} + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(project1['id'], project1) - role1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role1 = unit.new_role_ref() self.role_api.create_role(role1['id'], role1) - user1 = {'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'password': uuid.uuid4().hex, - 'enabled': True} + user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user1 = self.identity_api.create_user(user1) self.assignment_api.add_role_to_user_and_project( @@ -1062,48 +1101,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.assertEqual(0, len(list)) def test_configurable_forbidden_project_actions(self): - self.config_fixture.config( - group='ldap', project_allow_create=False, - project_allow_update=False, project_allow_delete=False) - self.load_backends() - - domain = self._get_domain_fixture() - tenant = {'id': u'fäké1', 'name': u'fäké1', 'domain_id': domain['id']} - self.assertRaises(exception.ForbiddenAction, - self.resource_api.create_project, - u'fäké1', - tenant) - - self.tenant_bar['enabled'] = False - self.assertRaises(exception.ForbiddenAction, - self.resource_api.update_project, - self.tenant_bar['id'], - self.tenant_bar) - self.assertRaises(exception.ForbiddenAction, - self.resource_api.delete_project, - self.tenant_bar['id']) + self.skipTest('Resource LDAP has been removed') def test_project_filter(self): - tenant_ref = self.resource_api.get_project(self.tenant_bar['id']) - self.assertDictEqual(tenant_ref, self.tenant_bar) - - self.config_fixture.config(group='ldap', - project_filter='(CN=DOES_NOT_MATCH)') - self.load_backends() - # NOTE(morganfainberg): CONF.ldap.project_filter will not be - # dynamically changed at runtime. This invalidate is a work-around for - # the expectation that it is safe to change config values in tests that - # could affect what the drivers would return up to the manager. This - # solves this assumption when working with aggressive (on-create) - # cache population. - self.role_api.get_role.invalidate(self.role_api, - self.role_member['id']) - self.role_api.get_role(self.role_member['id']) - self.resource_api.get_project.invalidate(self.resource_api, - self.tenant_bar['id']) - self.assertRaises(exception.ProjectNotFound, - self.resource_api.get_project, - self.tenant_bar['id']) + self.skipTest('Resource LDAP has been removed') def test_dumb_member(self): self.config_fixture.config(group='ldap', use_dumb_member=True) @@ -1116,71 +1117,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): dumb_id) def test_project_attribute_mapping(self): - self.config_fixture.config( - group='ldap', project_name_attribute='ou', - project_desc_attribute='description', - project_enabled_attribute='enabled') - self.ldapdb.clear() - self.load_backends() - self.load_fixtures(default_fixtures) - # NOTE(morganfainberg): CONF.ldap.project_name_attribute, - # CONF.ldap.project_desc_attribute, and - # CONF.ldap.project_enabled_attribute will not be - # dynamically changed at runtime. This invalidate is a work-around for - # the expectation that it is safe to change config values in tests that - # could affect what the drivers would return up to the manager. This - # solves this assumption when working with aggressive (on-create) - # cache population. - self.resource_api.get_project.invalidate(self.resource_api, - self.tenant_baz['id']) - tenant_ref = self.resource_api.get_project(self.tenant_baz['id']) - self.assertEqual(self.tenant_baz['id'], tenant_ref['id']) - self.assertEqual(self.tenant_baz['name'], tenant_ref['name']) - self.assertEqual( - self.tenant_baz['description'], - tenant_ref['description']) - self.assertEqual(self.tenant_baz['enabled'], tenant_ref['enabled']) - - self.config_fixture.config(group='ldap', - project_name_attribute='description', - project_desc_attribute='ou') - self.load_backends() - # NOTE(morganfainberg): CONF.ldap.project_name_attribute, - # CONF.ldap.project_desc_attribute, and - # CONF.ldap.project_enabled_attribute will not be - # dynamically changed at runtime. This invalidate is a work-around for - # the expectation that it is safe to change config values in tests that - # could affect what the drivers would return up to the manager. This - # solves this assumption when working with aggressive (on-create) - # cache population. - self.resource_api.get_project.invalidate(self.resource_api, - self.tenant_baz['id']) - tenant_ref = self.resource_api.get_project(self.tenant_baz['id']) - self.assertEqual(self.tenant_baz['id'], tenant_ref['id']) - self.assertEqual(self.tenant_baz['description'], tenant_ref['name']) - self.assertEqual(self.tenant_baz['name'], tenant_ref['description']) - self.assertEqual(self.tenant_baz['enabled'], tenant_ref['enabled']) + self.skipTest('Resource LDAP has been removed') def test_project_attribute_ignore(self): - self.config_fixture.config( - group='ldap', - project_attribute_ignore=['name', 'description', 'enabled']) - self.ldapdb.clear() - self.load_backends() - self.load_fixtures(default_fixtures) - # NOTE(morganfainberg): CONF.ldap.project_attribute_ignore will not be - # dynamically changed at runtime. This invalidate is a work-around for - # the expectation that it is safe to change configs values in tests - # that could affect what the drivers would return up to the manager. - # This solves this assumption when working with aggressive (on-create) - # cache population. - self.resource_api.get_project.invalidate(self.resource_api, - self.tenant_baz['id']) - tenant_ref = self.resource_api.get_project(self.tenant_baz['id']) - self.assertEqual(self.tenant_baz['id'], tenant_ref['id']) - self.assertNotIn('name', tenant_ref) - self.assertNotIn('description', tenant_ref) - self.assertNotIn('enabled', tenant_ref) + self.skipTest('Resource LDAP has been removed') def test_user_enable_attribute_mask(self): self.config_fixture.config(group='ldap', user_enabled_mask=2, @@ -1189,8 +1129,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.load_backends() self.load_fixtures(default_fixtures) - user = {'name': u'fäké1', 'enabled': True, - 'domain_id': CONF.identity.default_domain_id} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user_ref = self.identity_api.create_user(user) @@ -1237,14 +1176,12 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.load_backends() self.load_fixtures(default_fixtures) - user1 = {'name': u'fäké1', 'enabled': True, - 'domain_id': CONF.identity.default_domain_id} + user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) - user2 = {'name': u'fäké2', 'enabled': False, - 'domain_id': CONF.identity.default_domain_id} + user2 = self.new_user_ref(enabled=False, + domain_id=CONF.identity.default_domain_id) - user3 = {'name': u'fäké3', - 'domain_id': CONF.identity.default_domain_id} + user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) # Ensure that the LDAP attribute is False for a newly created # enabled user. @@ -1473,15 +1410,28 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): group='ldap', user_additional_attribute_mapping=['description:name']) self.load_backends() - user = { - 'name': 'EXTRA_ATTRIBUTES', - 'password': 'extra', - 'domain_id': CONF.identity.default_domain_id - } + user = self.new_user_ref(name='EXTRA_ATTRIBUTES', + password='extra', + domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) dn, attrs = self.identity_api.driver.user._ldap_get(user['id']) self.assertThat([user['name']], matchers.Equals(attrs['description'])) + def test_user_description_attribute_mapping(self): + self.config_fixture.config( + group='ldap', + user_description_attribute='displayName') + self.load_backends() + + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id, + displayName=uuid.uuid4().hex) + description = user['displayName'] + user = self.identity_api.create_user(user) + res = self.identity_api.driver.user.get_all() + + new_user = [u for u in res if u['id'] == user['id']][0] + self.assertThat(new_user['description'], matchers.Equals(description)) + def test_user_extra_attribute_mapping_description_is_returned(self): # Given a mapping like description:description, the description is # returned. @@ -1491,13 +1441,9 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): user_additional_attribute_mapping=['description:description']) self.load_backends() - description = uuid.uuid4().hex - user = { - 'name': uuid.uuid4().hex, - 'description': description, - 'password': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id - } + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id, + description=uuid.uuid4().hex) + description = user['description'] user = self.identity_api.create_user(user) res = self.identity_api.driver.user.get_all() @@ -1551,52 +1497,17 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): 'fake': 'invalid', 'invalid2': ''} self.assertDictEqual(expected_dict, mapping) -# TODO(henry-nash): These need to be removed when the full LDAP implementation -# is submitted - see Bugs 1092187, 1101287, 1101276, 1101289 - - def test_domain_crud(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True, 'description': uuid.uuid4().hex} - self.assertRaises(exception.Forbidden, + def test_create_domain(self): + domain = unit.new_domain_ref() + self.assertRaises(exception.ValidationError, self.resource_api.create_domain, domain['id'], domain) - self.assertRaises(exception.Conflict, - self.resource_api.create_domain, - CONF.identity.default_domain_id, - domain) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - - domain['description'] = uuid.uuid4().hex - self.assertRaises(exception.DomainNotFound, - self.resource_api.update_domain, - domain['id'], - domain) - self.assertRaises(exception.Forbidden, - self.resource_api.update_domain, - CONF.identity.default_domain_id, - domain) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) - self.assertRaises(exception.DomainNotFound, - self.resource_api.delete_domain, - domain['id']) - self.assertRaises(exception.Forbidden, - self.resource_api.delete_domain, - CONF.identity.default_domain_id) - self.assertRaises(exception.DomainNotFound, - self.resource_api.get_domain, - domain['id']) @unit.skip_if_no_multiple_domains_support def test_create_domain_case_sensitivity(self): # domains are read-only, so case sensitivity isn't an issue - ref = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} + ref = unit.new_domain_ref() self.assertRaises(exception.Forbidden, self.resource_api.create_domain, ref['id'], @@ -1624,22 +1535,18 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): # NOTE(topol): LDAP implementation does not currently support the # updating of a project name so this method override # provides a different update test - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'description': uuid.uuid4().hex, - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project['id'], project) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + + project = self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) + self.assertDictEqual(project, project_ref) project['description'] = uuid.uuid4().hex self.resource_api.update_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) + self.assertDictEqual(project, project_ref) self.resource_api.delete_project(project['id']) self.assertRaises(exception.ProjectNotFound, @@ -1651,12 +1558,11 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): # NOTE(morganfainberg): LDAP implementation does not currently support # updating project names. This method override provides a different # update test. - project = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'description': uuid.uuid4().hex} + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) project_id = project['id'] # Create a project - self.resource_api.create_project(project_id, project) + project = self.resource_api.create_project(project_id, project) self.resource_api.get_project(project_id) updated_project = copy.deepcopy(project) updated_project['description'] = uuid.uuid4().hex @@ -1700,70 +1606,10 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.resource_api.get_project, project_id) - def _assert_create_hierarchy_not_allowed(self): - domain = self._get_domain_fixture() - - project1 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': domain['id'], - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project1['id'], project1) - - # Creating project2 under project1. LDAP will not allow - # the creation of a project with parent_id being set - project2 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': domain['id'], - 'enabled': True, - 'parent_id': project1['id'], - 'is_domain': False} - - self.assertRaises(exception.InvalidParentProject, - self.resource_api.create_project, - project2['id'], - project2) - - # Now, we'll create project 2 with no parent - project2['parent_id'] = None - self.resource_api.create_project(project2['id'], project2) - - # Returning projects to be used across the tests - return [project1, project2] - - def _assert_create_is_domain_project_not_allowed(self): - """Tests that we can't create more than one project acting as domain. - - This method will be used at any test that require the creation of a - project that act as a domain. LDAP does not support multiple domains - and the only domain it has (default) is immutable. - """ - domain = self._get_domain_fixture() - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': domain['id'], - 'enabled': True, - 'parent_id': None, - 'is_domain': True} - - self.assertRaises(exception.ValidationError, - self.resource_api.create_project, - project['id'], project) - def test_update_is_domain_field(self): domain = self._get_domain_fixture() - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'description': '', - 'domain_id': domain['id'], - 'enabled': True, - 'parent_id': None, - 'is_domain': False} - self.resource_api.create_project(project['id'], project) + project = unit.new_project_ref(domain_id=domain['id']) + project = self.resource_api.create_project(project['id'], project) # Try to update the is_domain field to True project['is_domain'] = True @@ -1772,97 +1618,87 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): project['id'], project) def test_delete_is_domain_project(self): - self._assert_create_is_domain_project_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_create_domain_under_regular_project_hierarchy_fails(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_create_not_is_domain_project_under_is_domain_hierarchy(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') - def test_create_is_domain_project(self): - self._assert_create_is_domain_project_not_allowed() + def test_create_project_passing_is_domain_flag_true(self): + self.skipTest('Resource LDAP has been removed') def test_create_project_with_parent_id_and_without_domain_id(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_check_leaf_projects(self): - projects = self._assert_create_hierarchy_not_allowed() - for project in projects: - self.assertTrue(self.resource_api.is_leaf_project(project)) + self.skipTest('Resource LDAP has been removed') def test_list_projects_in_subtree(self): - projects = self._assert_create_hierarchy_not_allowed() - for project in projects: - subtree_list = self.resource_api.list_projects_in_subtree( - project['id']) - self.assertEqual(0, len(subtree_list)) + self.skipTest('Resource LDAP has been removed') def test_list_projects_in_subtree_with_circular_reference(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_list_project_parents(self): - projects = self._assert_create_hierarchy_not_allowed() - for project in projects: - parents_list = self.resource_api.list_project_parents( - project['id']) - self.assertEqual(0, len(parents_list)) + self.skipTest('Resource LDAP has been removed') + + def test_update_project_enabled_cascade(self): + self.skipTest('Resource LDAP has been removed') + + def test_cannot_enable_cascade_with_parent_disabled(self): + self.skipTest('Resource LDAP has been removed') def test_hierarchical_projects_crud(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_create_project_under_disabled_one(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_create_project_with_invalid_parent(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_create_leaf_project_with_invalid_domain(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_update_project_parent(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_enable_project_with_disabled_parent(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_disable_hierarchical_leaf_project(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_disable_hierarchical_not_leaf_project(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_delete_hierarchical_leaf_project(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_delete_hierarchical_not_leaf_project(self): - self._assert_create_hierarchy_not_allowed() + self.skipTest('Resource LDAP has been removed') def test_check_hierarchy_depth(self): - projects = self._assert_create_hierarchy_not_allowed() - for project in projects: - depth = self._get_hierarchy_depth(project['id']) - self.assertEqual(1, depth) + self.skipTest('Resource LDAP has been removed') def test_multi_role_grant_by_user_group_on_project_domain(self): # This is a partial implementation of the standard test that - # is defined in test_backend.py. It omits both domain and - # group grants. since neither of these are yet supported by - # the ldap backend. + # is defined in unit.assignment.test_backends.py. It omits + # both domain and group grants. since neither of these are + # yet supported by the ldap backend. role_list = [] for _ in range(2): - role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) - user1 = {'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'password': uuid.uuid4().hex, - 'enabled': True} + user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user1 = self.identity_api.create_user(user1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id} + project1 = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(project1['id'], project1) self.assignment_api.add_role_to_user_and_project( @@ -1947,7 +1783,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): expected_group_ids = [] numgroups = 3 for _ in range(numgroups): - group = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group = unit.new_group_ref(domain_id=domain['id']) group = self.identity_api.create_group(group) expected_group_ids.append(group['id']) # Fetch the test groups and ensure that they don't contain a dn. @@ -1960,16 +1796,14 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): def test_list_groups_for_user_no_dn(self): # Create a test user. - user = {'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'password': uuid.uuid4().hex, 'enabled': True} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) # Create some test groups and add the test user as a member. domain = self._get_domain_fixture() expected_group_ids = [] numgroups = 3 for _ in range(numgroups): - group = {'name': uuid.uuid4().hex, 'domain_id': domain['id']} + group = unit.new_group_ref(domain_id=domain['id']) group = self.identity_api.create_group(group) expected_group_ids.append(group['id']) self.identity_api.add_user_to_group(user['id'], group['id']) @@ -1987,9 +1821,7 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): CONF.identity.default_domain_id) driver.user.id_attr = 'mail' - user = {'name': u'fäké1', - 'password': u'fäképass1', - 'domain_id': CONF.identity.default_domain_id} + user = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user) user_ref = self.identity_api.get_user(user['id']) # 'email' attribute should've created because it is also being used @@ -2083,6 +1915,35 @@ class LDAPIdentity(BaseLDAPIdentity, unit.TestCase): self.assertEqual('Foo Bar', user_ref['name']) +class LDAPLimitTests(unit.TestCase, identity_tests.LimitTests): + def setUp(self): + super(LDAPLimitTests, self).setUp() + + self.useFixture(ldapdb.LDAPDatabase()) + self.useFixture(database.Database(self.sql_driver_version_overrides)) + self.load_backends() + self.load_fixtures(default_fixtures) + identity_tests.LimitTests.setUp(self) + _assert_backends(self, + assignment='sql', + identity='ldap', + resource='sql') + + def config_overrides(self): + super(LDAPLimitTests, self).config_overrides() + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='identity', + list_limit=len(default_fixtures.USERS) - 1) + + def config_files(self): + config_files = super(LDAPLimitTests, self).config_files() + config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) + return config_files + + def test_list_projects_filtered_and_limited(self): + self.skipTest("ldap for storing projects is deprecated") + + class LDAPIdentityEnabledEmulation(LDAPIdentity): def setUp(self): super(LDAPIdentityEnabledEmulation, self).setUp() @@ -2092,10 +1953,7 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): for obj in [self.tenant_bar, self.tenant_baz, self.user_foo, self.user_two, self.user_badguy]: obj.setdefault('enabled', True) - _assert_backends(self, - assignment='ldap', - identity='ldap', - resource='ldap') + _assert_backends(self, identity='ldap') def load_fixtures(self, fixtures): # Override super impl since need to create group container. @@ -2110,60 +1968,62 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): def config_overrides(self): super(LDAPIdentityEnabledEmulation, self).config_overrides() self.config_fixture.config(group='ldap', - user_enabled_emulation=True, - project_enabled_emulation=True) + user_enabled_emulation=True) def test_project_crud(self): # NOTE(topol): LDAPIdentityEnabledEmulation will create an # enabled key in the project dictionary so this # method override handles this side-effect - project = { - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'description': uuid.uuid4().hex, - 'parent_id': None, - 'is_domain': False} - - self.resource_api.create_project(project['id'], project) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + + project = self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) # self.resource_api.create_project adds an enabled # key with a value of True when LDAPIdentityEnabledEmulation # is used so we now add this expected key to the project dictionary project['enabled'] = True - self.assertDictEqual(project_ref, project) + self.assertDictEqual(project, project_ref) project['description'] = uuid.uuid4().hex self.resource_api.update_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) + self.assertDictEqual(project, project_ref) self.resource_api.delete_project(project['id']) self.assertRaises(exception.ProjectNotFound, self.resource_api.get_project, project['id']) - def test_user_crud(self): - user_dict = { - 'domain_id': CONF.identity.default_domain_id, - 'name': uuid.uuid4().hex, - 'password': uuid.uuid4().hex} + @mock.patch.object(versionutils, 'report_deprecated_feature') + def test_user_crud(self, mock_deprecator): + # NOTE(stevemar): As of the Mitaka release, we now check for calls that + # the LDAP write functionality has been deprecated. + user_dict = self.new_user_ref( + domain_id=CONF.identity.default_domain_id) user = self.identity_api.create_user(user_dict) - user_dict['enabled'] = True - user_ref = self.identity_api.get_user(user['id']) + args, _kwargs = mock_deprecator.call_args + self.assertIn("create_user for the LDAP identity backend", args[1]) + del user_dict['password'] + user_ref = self.identity_api.get_user(user['id']) user_ref_dict = {x: user_ref[x] for x in user_ref} self.assertDictContainsSubset(user_dict, user_ref_dict) user_dict['password'] = uuid.uuid4().hex - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) + self.identity_api.update_user(user['id'], user_dict) + args, _kwargs = mock_deprecator.call_args + self.assertIn("update_user for the LDAP identity backend", args[1]) + del user_dict['password'] + user_ref = self.identity_api.get_user(user['id']) user_ref_dict = {x: user_ref[x] for x in user_ref} self.assertDictContainsSubset(user_dict, user_ref_dict) self.identity_api.delete_user(user['id']) + args, _kwargs = mock_deprecator.call_args + self.assertIn("delete_user for the LDAP identity backend", args[1]) self.assertRaises(exception.UserNotFound, self.identity_api.get_user, user['id']) @@ -2192,8 +2052,8 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): self.load_fixtures(default_fixtures) # Create a user and ensure they are enabled. - user1 = {'name': u'fäké1', 'enabled': True, - 'domain_id': CONF.identity.default_domain_id} + user1 = unit.new_user_ref(enabled=True, + domain_id=CONF.identity.default_domain_id) user_ref = self.identity_api.create_user(user1) self.assertIs(True, user_ref['enabled']) @@ -2208,14 +2068,12 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): self.load_backends() self.load_fixtures(default_fixtures) - user1 = {'name': u'fäké1', 'enabled': True, - 'domain_id': CONF.identity.default_domain_id} + user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) - user2 = {'name': u'fäké2', 'enabled': False, - 'domain_id': CONF.identity.default_domain_id} + user2 = self.new_user_ref(enabled=False, + domain_id=CONF.identity.default_domain_id) - user3 = {'name': u'fäké3', - 'domain_id': CONF.identity.default_domain_id} + user3 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) # Ensure that the enabled LDAP attribute is not set for a # newly created enabled user. @@ -2282,121 +2140,103 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): user_ref = user_api.get('123456789') self.assertIs(False, user_ref['enabled']) + def test_escape_member_dn(self): + # The enabled member DN is properly escaped when querying for enabled + # user. -class LdapIdentitySqlAssignment(BaseLDAPIdentity, unit.SQLDriverOverrides, - unit.TestCase): + object_id = uuid.uuid4().hex + driver = self.identity_api._select_identity_driver( + CONF.identity.default_domain_id) - def config_files(self): - config_files = super(LdapIdentitySqlAssignment, self).config_files() - config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf')) - return config_files + # driver.user is the EnabledEmuMixIn implementation used for this test. + mixin_impl = driver.user - def setUp(self): - sqldb = self.useFixture(database.Database()) - super(LdapIdentitySqlAssignment, self).setUp() - self.ldapdb.clear() - self.load_backends() - cache.configure_cache_region(cache.REGION) + # ) is a special char in a filter and must be escaped. + sample_dn = 'cn=foo)bar' + # LDAP requires ) is escaped by being replaced with "\29" + sample_dn_filter_esc = r'cn=foo\29bar' - sqldb.recreate() - self.load_fixtures(default_fixtures) - # defaulted by the data load - self.user_foo['enabled'] = True - _assert_backends(self, - assignment='sql', - identity='ldap', - resource='sql') + # Override the tree_dn, it's used to build the enabled member filter + mixin_impl.tree_dn = sample_dn - def config_overrides(self): - super(LdapIdentitySqlAssignment, self).config_overrides() - self.config_fixture.config(group='identity', driver='ldap') - self.config_fixture.config(group='resource', driver='sql') - self.config_fixture.config(group='assignment', driver='sql') + # The filter that _get_enabled is going to build contains the + # tree_dn, which better be escaped in this case. + exp_filter = '(%s=%s=%s,%s)' % ( + mixin_impl.member_attribute, mixin_impl.id_attr, object_id, + sample_dn_filter_esc) - def test_domain_crud(self): - pass + with mixin_impl.get_connection() as conn: + m = self.useFixture(mockpatch.PatchObject(conn, 'search_s')).mock + mixin_impl._get_enabled(object_id, conn) + # The 3rd argument is the DN. + self.assertEqual(exp_filter, m.call_args[0][2]) - def test_list_domains(self): - domains = self.resource_api.list_domains() - self.assertEqual([resource.calc_default_domain()], domains) - def test_list_domains_non_default_domain_id(self): - # If change the default_domain_id, the ID of the default domain - # returned by list_domains doesn't change because the SQL identity - # backend reads it from the database, which doesn't get updated by - # config change. +class LDAPPosixGroupsTest(unit.TestCase): - orig_default_domain_id = CONF.identity.default_domain_id + def setUp(self): - new_domain_id = uuid.uuid4().hex - self.config_fixture.config(group='identity', - default_domain_id=new_domain_id) + super(LDAPPosixGroupsTest, self).setUp() - domains = self.resource_api.list_domains() + self.useFixture(ldapdb.LDAPDatabase()) + self.useFixture(database.Database()) - self.assertEqual(orig_default_domain_id, domains[0]['id']) + self.load_backends() + self.load_fixtures(default_fixtures) - def test_create_domain(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - self.assertRaises(exception.Forbidden, - self.resource_api.create_domain, - domain['id'], - domain) + _assert_backends(self, identity='ldap') - def test_get_and_remove_role_grant_by_group_and_domain(self): - # TODO(henry-nash): We should really rewrite the tests in test_backend - # to be more flexible as to where the domains are sourced from, so - # that we would not need to override such tests here. This is raised - # as bug 1373865. - new_domain = self._get_domain_fixture() - new_group = {'domain_id': new_domain['id'], 'name': uuid.uuid4().hex} - new_group = self.identity_api.create_group(new_group) - new_user = {'name': 'new_user', 'password': uuid.uuid4().hex, - 'enabled': True, 'domain_id': new_domain['id']} - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) + def load_fixtures(self, fixtures): + # Override super impl since need to create group container. + create_group_container(self.identity_api) + super(LDAPPosixGroupsTest, self).load_fixtures(fixtures) - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) + def config_overrides(self): + super(LDAPPosixGroupsTest, self).config_overrides() + self.config_fixture.config(group='identity', driver='ldap') + self.config_fixture.config(group='ldap', group_members_are_ids=True, + group_member_attribute='memberUID') - self.assignment_api.create_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') + def config_files(self): + config_files = super(LDAPPosixGroupsTest, self).config_files() + config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) + return config_files - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertDictEqual(roles_ref[0], self.role_member) + def _get_domain_fixture(self): + """Domains in LDAP are read-only, so just return the static one.""" + return self.resource_api.get_domain(CONF.identity.default_domain_id) - self.assignment_api.delete_grant(group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') - roles_ref = self.assignment_api.list_grants( - group_id=new_group['id'], - domain_id=new_domain['id']) - self.assertEqual(0, len(roles_ref)) - self.assertRaises(exception.NotFound, - self.assignment_api.delete_grant, - group_id=new_group['id'], - domain_id=new_domain['id'], - role_id='member') + def test_posix_member_id(self): + domain = self._get_domain_fixture() + new_group = unit.new_group_ref(domain_id=domain['id']) + new_group = self.identity_api.create_group(new_group) + # Make sure we get an empty list back on a new group, not an error. + user_refs = self.identity_api.list_users_in_group(new_group['id']) + self.assertEqual([], user_refs) + # Make sure we get the correct users back once they have been added + # to the group. + new_user = unit.new_user_ref(domain_id=domain['id']) + new_user = self.identity_api.create_user(new_user) - def test_project_enabled_ignored_disable_error(self): - # Override - self.skipTest("Doesn't apply since LDAP configuration is ignored for " - "SQL assignment backend.") + # NOTE(amakarov): Create the group directly using LDAP operations + # rather than going through the manager. + group_api = self.identity_api.driver.group + group_ref = group_api.get(new_group['id']) + mod = (ldap.MOD_ADD, group_api.member_attribute, new_user['id']) + conn = group_api.get_connection() + conn.modify_s(group_ref['dn'], [mod]) - def test_list_role_assignments_filtered_by_role(self): - # Domain roles are supported by the SQL Assignment backend - base = super(BaseLDAPIdentity, self) - base.test_list_role_assignments_filtered_by_role() + # Testing the case "the group contains a user" + user_refs = self.identity_api.list_users_in_group(new_group['id']) + self.assertIn(new_user['id'], (x['id'] for x in user_refs)) + # Testing the case "the user is a member of a group" + group_refs = self.identity_api.list_groups_for_user(new_user['id']) + self.assertIn(new_group['id'], (x['id'] for x in group_refs)) -class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment): + +class LdapIdentityWithMapping( + BaseLDAPIdentity, unit.SQLDriverOverrides, unit.TestCase): """Class to test mapping of default LDAP backend. The default configuration is not to enable mapping when using a single @@ -2405,8 +2245,28 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment): Setting backward_compatible_ids to False will enable this mapping. """ + + def config_files(self): + config_files = super(LdapIdentityWithMapping, self).config_files() + config_files.append(unit.dirs.tests_conf('backend_ldap_sql.conf')) + return config_files + + def setUp(self): + sqldb = self.useFixture(database.Database()) + super(LdapIdentityWithMapping, self).setUp() + self.ldapdb.clear() + self.load_backends() + cache.configure_cache() + + sqldb.recreate() + self.load_fixtures(default_fixtures) + # defaulted by the data load + self.user_foo['enabled'] = True + _assert_backends(self, identity='ldap') + def config_overrides(self): - super(LdapIdentitySqlAssignmentWithMapping, self).config_overrides() + super(LdapIdentityWithMapping, self).config_overrides() + self.config_fixture.config(group='identity', driver='ldap') self.config_fixture.config(group='identity_mapping', backward_compatible_ids=False) @@ -2420,13 +2280,9 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment): """ initial_mappings = len(mapping_sql.list_id_mappings()) - user1 = {'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'password': uuid.uuid4().hex, 'enabled': True} + user1 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user1 = self.identity_api.create_user(user1) - user2 = {'name': uuid.uuid4().hex, - 'domain_id': CONF.identity.default_domain_id, - 'password': uuid.uuid4().hex, 'enabled': True} + user2 = self.new_user_ref(domain_id=CONF.identity.default_domain_id) user2 = self.identity_api.create_user(user2) mappings = mapping_sql.list_id_mappings() self.assertEqual(initial_mappings + 2, len(mappings)) @@ -2453,35 +2309,29 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment): self.skipTest('N/A: We never generate the same ID for a user and ' 'group in our mapping table') + def test_list_domains(self): + domains = self.resource_api.list_domains() + self.assertEqual([resource.calc_default_domain()], domains) + class BaseMultiLDAPandSQLIdentity(object): """Mixin class with support methods for domain-specific config testing.""" - def create_user(self, domain_id): - user = {'name': uuid.uuid4().hex, - 'domain_id': domain_id, - 'password': uuid.uuid4().hex, - 'enabled': True} - user_ref = self.identity_api.create_user(user) - # Put the password back in, since this is used later by tests to - # authenticate. - user_ref['password'] = user['password'] - return user_ref - def create_users_across_domains(self): """Create a set of users, each with a role on their own domain.""" - # We also will check that the right number of id mappings get created initial_mappings = len(mapping_sql.list_id_mappings()) - self.users['user0'] = self.create_user( + self.users['user0'] = unit.create_user( + self.identity_api, self.domains['domain_default']['id']) self.assignment_api.create_grant( user_id=self.users['user0']['id'], domain_id=self.domains['domain_default']['id'], role_id=self.role_member['id']) for x in range(1, self.domain_count): - self.users['user%s' % x] = self.create_user( + self.users['user%s' % x] = unit.create_user( + self.identity_api, self.domains['domain%s' % x]['id']) self.assignment_api.create_grant( user_id=self.users['user%s' % x]['id'], @@ -2506,13 +2356,13 @@ class BaseMultiLDAPandSQLIdentity(object): self.identity_api._get_domain_driver_and_entity_id( user['id'])) - if expected_status == 200: + if expected_status == http_client.OK: ref = driver.get_user(entity_id) ref = self.identity_api._set_domain_id_and_mapping( ref, domain_id, driver, map.EntityType.USER) user = user.copy() del user['password'] - self.assertDictEqual(ref, user) + self.assertDictEqual(user, ref) else: # TODO(henry-nash): Use AssertRaises here, although # there appears to be an issue with using driver.get_user @@ -2570,6 +2420,7 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, domain. """ + def setUp(self): sqldb = self.useFixture(database.Database()) super(MultiLDAPandSQLIdentity, self).setUp() @@ -2614,11 +2465,14 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, # Create some identity entities BEFORE we switch to multi-backend, so # we can test that these are still accessible self.users = {} - self.users['userA'] = self.create_user( + self.users['userA'] = unit.create_user( + self.identity_api, self.domains['domain_default']['id']) - self.users['userB'] = self.create_user( + self.users['userB'] = unit.create_user( + self.identity_api, self.domains['domain1']['id']) - self.users['userC'] = self.create_user( + self.users['userC'] = unit.create_user( + self.identity_api, self.domains['domain3']['id']) def enable_multi_domain(self): @@ -2631,7 +2485,8 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, """ self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True, - domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap') + domain_config_dir=unit.TESTCONF + '/domain_configs_multi_ldap', + list_limit=1000) self.config_fixture.config(group='identity_mapping', backward_compatible_ids=False) @@ -2640,14 +2495,6 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, # if no specific config defined for this domain return self.identity_api.domain_configs.get_domain_conf(domain_id) - def test_list_domains(self): - self.skipTest( - 'N/A: Not relevant for multi ldap testing') - - def test_list_domains_non_default_domain_id(self): - self.skipTest( - 'N/A: Not relevant for multi ldap testing') - def test_list_users(self): # Override the standard list users, since we have added an extra user # to the default domain, so the number of expected users is one more @@ -2664,6 +2511,36 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, self.assertNotIn('password', user_ref) self.assertEqual(expected_user_ids, user_ids) + @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all') + def test_list_limit_domain_specific_inheritance(self, ldap_get_all): + # passiging hints is important, because if it's not passed, limiting + # is considered be disabled + hints = driver_hints.Hints() + self.identity_api.list_users( + domain_scope=self.domains['domain2']['id'], + hints=hints) + # since list_limit is not specified in keystone.domain2.conf, it should + # take the default, which is 1000 + self.assertTrue(ldap_get_all.called) + args, kwargs = ldap_get_all.call_args + hints = args[0] + self.assertEqual(1000, hints.limit['limit']) + + @mock.patch.object(common_ldap_core.BaseLdap, '_ldap_get_all') + def test_list_limit_domain_specific_override(self, ldap_get_all): + # passiging hints is important, because if it's not passed, limiting + # is considered to be disabled + hints = driver_hints.Hints() + self.identity_api.list_users( + domain_scope=self.domains['domain1']['id'], + hints=hints) + # this should have the list_limit set in Keystone.domain1.conf, which + # is 101 + self.assertTrue(ldap_get_all.called) + args, kwargs = ldap_get_all.call_args + hints = args[0] + self.assertEqual(101, hints.limit['limit']) + def test_domain_segregation(self): """Test that separate configs have segregated the domain. @@ -2680,21 +2557,23 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, check_user = self.check_user check_user(self.users['user0'], - self.domains['domain_default']['id'], 200) + self.domains['domain_default']['id'], http_client.OK) for domain in [self.domains['domain1']['id'], self.domains['domain2']['id'], self.domains['domain3']['id'], self.domains['domain4']['id']]: check_user(self.users['user0'], domain, exception.UserNotFound) - check_user(self.users['user1'], self.domains['domain1']['id'], 200) + check_user(self.users['user1'], self.domains['domain1']['id'], + http_client.OK) for domain in [self.domains['domain_default']['id'], self.domains['domain2']['id'], self.domains['domain3']['id'], self.domains['domain4']['id']]: check_user(self.users['user1'], domain, exception.UserNotFound) - check_user(self.users['user2'], self.domains['domain2']['id'], 200) + check_user(self.users['user2'], self.domains['domain2']['id'], + http_client.OK) for domain in [self.domains['domain_default']['id'], self.domains['domain1']['id'], self.domains['domain3']['id'], @@ -2704,10 +2583,14 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, # domain3 and domain4 share the same backend, so you should be # able to see user3 and user4 from either. - check_user(self.users['user3'], self.domains['domain3']['id'], 200) - check_user(self.users['user3'], self.domains['domain4']['id'], 200) - check_user(self.users['user4'], self.domains['domain3']['id'], 200) - check_user(self.users['user4'], self.domains['domain4']['id'], 200) + check_user(self.users['user3'], self.domains['domain3']['id'], + http_client.OK) + check_user(self.users['user3'], self.domains['domain4']['id'], + http_client.OK) + check_user(self.users['user4'], self.domains['domain3']['id'], + http_client.OK) + check_user(self.users['user4'], self.domains['domain4']['id'], + http_client.OK) for domain in [self.domains['domain_default']['id'], self.domains['domain1']['id'], @@ -2789,19 +2672,12 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, self.assertEqual('fake://memory1', conf.ldap.url) def test_delete_domain_with_user_added(self): - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'enabled': True} - project = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - 'domain_id': domain['id'], - 'description': uuid.uuid4().hex, - 'parent_id': None, - 'enabled': True, - 'is_domain': False} + domain = unit.new_domain_ref() + project = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_domain(domain['id'], domain) - self.resource_api.create_project(project['id'], project) + project = self.resource_api.create_project(project['id'], project) project_ref = self.resource_api.get_project(project['id']) - self.assertDictEqual(project_ref, project) + self.assertDictEqual(project, project_ref) self.assignment_api.create_grant(user_id=self.user_foo['id'], project_id=project['id'], @@ -2839,13 +2715,37 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, def test_list_role_assignment_by_domain(self): # With multi LDAP this method should work, so override the override # from BaseLDAPIdentity - super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain + super(BaseLDAPIdentity, self).test_list_role_assignment_by_domain() def test_list_role_assignment_by_user_with_domain_group_roles(self): # With multi LDAP this method should work, so override the override # from BaseLDAPIdentity super(BaseLDAPIdentity, self).\ - test_list_role_assignment_by_user_with_domain_group_roles + test_list_role_assignment_by_user_with_domain_group_roles() + + def test_list_role_assignment_using_sourced_groups_with_domains(self): + # With SQL Assignment this method should work, so override the override + # from BaseLDAPIdentity + base = super(BaseLDAPIdentity, self) + base.test_list_role_assignment_using_sourced_groups_with_domains() + + def test_create_project_with_domain_id_and_without_parent_id(self): + # With multi LDAP this method should work, so override the override + # from BaseLDAPIdentity + super(BaseLDAPIdentity, self).\ + test_create_project_with_domain_id_and_without_parent_id() + + def test_create_project_with_domain_id_mismatch_to_parent_domain(self): + # With multi LDAP this method should work, so override the override + # from BaseLDAPIdentity + super(BaseLDAPIdentity, self).\ + test_create_project_with_domain_id_mismatch_to_parent_domain() + + def test_remove_foreign_assignments_when_deleting_a_domain(self): + # With multi LDAP this method should work, so override the override + # from BaseLDAPIdentity + base = super(BaseLDAPIdentity, self) + base.test_remove_foreign_assignments_when_deleting_a_domain() class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): @@ -2870,7 +2770,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def enable_multi_domain(self): # The values below are the same as in the domain_configs_multi_ldap - # cdirectory of test config_files. + # directory of test config_files. default_config = { 'ldap': {'url': 'fake://memory', 'user': 'cn=Admin', @@ -2883,7 +2783,8 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): 'user': 'cn=Admin', 'password': 'password', 'suffix': 'cn=example,cn=com'}, - 'identity': {'driver': 'ldap'} + 'identity': {'driver': 'ldap', + 'list_limit': '101'} } domain2_config = { 'ldap': {'url': 'fake://memory', @@ -2904,7 +2805,8 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): self.config_fixture.config( group='identity', domain_specific_drivers_enabled=True, - domain_configurations_from_database=True) + domain_configurations_from_database=True, + list_limit=1000) self.config_fixture.config(group='identity_mapping', backward_compatible_ids=False) @@ -2933,7 +2835,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def test_reloading_domain_config(self): """Ensure domain drivers are reloaded on a config modification.""" - domain_cfgs = self.identity_api.domain_configs # Create a new config for the default domain, hence overwriting the @@ -2965,7 +2866,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def test_setting_multiple_sql_driver_raises_exception(self): """Ensure setting multiple domain specific sql drivers is prevented.""" - new_config = {'identity': {'driver': 'sql'}} self.domain_config_api.create_config( CONF.identity.default_domain_id, new_config) @@ -2979,7 +2879,6 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def test_same_domain_gets_sql_driver(self): """Ensure we can set an SQL driver if we have had it before.""" - new_config = {'identity': {'driver': 'sql'}} self.domain_config_api.create_config( CONF.identity.default_domain_id, new_config) @@ -2997,8 +2896,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def test_delete_domain_clears_sql_registration(self): """Ensure registration is deleted when a domain is deleted.""" - - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain = unit.new_domain_ref() domain = self.resource_api.create_domain(domain['id'], domain) new_config = {'identity': {'driver': 'sql'}} self.domain_config_api.create_config(domain['id'], new_config) @@ -3025,8 +2923,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): def test_orphaned_registration_does_not_prevent_getting_sql_driver(self): """Ensure we self heal an orphaned sql registration.""" - - domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain = unit.new_domain_ref() domain = self.resource_api.create_domain(domain['id'], domain) new_config = {'identity': {'driver': 'sql'}} self.domain_config_api.create_config(domain['id'], new_config) @@ -3047,7 +2944,7 @@ class MultiLDAPandSQLIdentityDomainConfigsInSQL(MultiLDAPandSQLIdentity): # should still be able to set another domain to SQL, since we should # self heal this issue. - self.resource_api.driver.delete_domain(domain['id']) + self.resource_api.driver.delete_project(domain['id']) # Invalidate cache (so we will see the domain has gone) self.resource_api.get_domain.invalidate( self.resource_api, domain['id']) @@ -3072,6 +2969,7 @@ class DomainSpecificLDAPandSQLIdentity( Although the default driver still exists, we don't use it. """ + def setUp(self): sqldb = self.useFixture(database.Database()) super(DomainSpecificLDAPandSQLIdentity, self).setUp() @@ -3133,6 +3031,17 @@ class DomainSpecificLDAPandSQLIdentity( self.skipTest( 'N/A: Not relevant for multi ldap testing') + def test_not_delete_domain_with_enabled_subdomains(self): + self.skipTest( + 'N/A: Not relevant for multi ldap testing') + + def test_delete_domain(self): + # With this restricted multi LDAP class, tests that use multiple + # domains and identity, are still not supported + self.assertRaises( + exception.DomainNotFound, + super(BaseLDAPIdentity, self).test_delete_domain_with_project_api) + def test_list_users(self): # Override the standard list users, since we have added an extra user # to the default domain, so the number of expected users is one more @@ -3164,12 +3073,12 @@ class DomainSpecificLDAPandSQLIdentity( # driver, but won't find it via any other domain driver self.check_user(self.users['user0'], - self.domains['domain_default']['id'], 200) + self.domains['domain_default']['id'], http_client.OK) self.check_user(self.users['user0'], self.domains['domain1']['id'], exception.UserNotFound) self.check_user(self.users['user1'], - self.domains['domain1']['id'], 200) + self.domains['domain1']['id'], http_client.OK) self.check_user(self.users['user1'], self.domains['domain_default']['id'], exception.UserNotFound) @@ -3182,10 +3091,10 @@ class DomainSpecificLDAPandSQLIdentity( domain_scope=self.domains['domain1']['id']), matchers.HasLength(1)) - def test_add_role_grant_to_user_and_project_404(self): + def test_add_role_grant_to_user_and_project_returns_not_found(self): self.skipTest('Blocked by bug 1101287') - def test_get_role_grants_for_user_and_project_404(self): + def test_get_role_grants_for_user_and_project_returns_not_found(self): self.skipTest('Blocked by bug 1101287') def test_list_projects_for_user_with_grants(self): @@ -3223,6 +3132,25 @@ class DomainSpecificLDAPandSQLIdentity( base = super(BaseLDAPIdentity, self) base.test_list_role_assignments_filtered_by_role() + def test_delete_domain_with_project_api(self): + # With this restricted multi LDAP class, tests that use multiple + # domains and identity, are still not supported + self.assertRaises( + exception.DomainNotFound, + super(BaseLDAPIdentity, self).test_delete_domain_with_project_api) + + def test_create_project_with_domain_id_and_without_parent_id(self): + # With restricted multi LDAP, tests that don't use identity, but do + # required aditional domains will work + base = super(BaseLDAPIdentity, self) + base.test_create_project_with_domain_id_and_without_parent_id() + + def test_create_project_with_domain_id_mismatch_to_parent_domain(self): + # With restricted multi LDAP, tests that don't use identity, but do + # required aditional domains will work + base = super(BaseLDAPIdentity, self) + base.test_create_project_with_domain_id_mismatch_to_parent_domain() + class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): """Class to test simplest use of domain-specific SQL driver. @@ -3236,6 +3164,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): - A separate SQL backend for domain1 """ + def initial_setup(self, sqldb): # We aren't setting up any initial data ahead of switching to # domain-specific operation, so make the switch straight away. @@ -3323,7 +3252,7 @@ class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity): 'domain2') -class LdapFilterTests(test_backend.FilterTests, unit.TestCase): +class LdapFilterTests(identity_tests.FilterTests, unit.TestCase): def setUp(self): super(LdapFilterTests, self).setUp() @@ -3333,7 +3262,7 @@ class LdapFilterTests(test_backend.FilterTests, unit.TestCase): self.load_backends() self.load_fixtures(default_fixtures) sqldb.recreate() - _assert_backends(self, assignment='ldap', identity='ldap') + _assert_backends(self, identity='ldap') def config_overrides(self): super(LdapFilterTests, self).config_overrides() @@ -3344,13 +3273,15 @@ class LdapFilterTests(test_backend.FilterTests, unit.TestCase): config_files.append(unit.dirs.tests_conf('backend_ldap.conf')) return config_files - def test_list_users_in_group_filtered(self): + @wip('Not supported by LDAP identity driver') + def test_list_users_in_group_inexact_filtered(self): + # The LDAP identity driver currently does not support filtering on the + # listing users for a given group, so will fail this test. + super(LdapFilterTests, + self).test_list_users_in_group_inexact_filtered() + + @wip('Not supported by LDAP identity driver') + def test_list_users_in_group_exact_filtered(self): # The LDAP identity driver currently does not support filtering on the # listing users for a given group, so will fail this test. - try: - super(LdapFilterTests, self).test_list_users_in_group_filtered() - except matchers.MismatchError: - return - # We shouldn't get here...if we do, it means someone has implemented - # filtering, so we can remove this test override. - self.assertTrue(False) + super(LdapFilterTests, self).test_list_users_in_group_exact_filtered() |