aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/identity/test_backends.py
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2016-06-09 08:12:34 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2016-06-09 08:12:34 +0000
commit4bc079a2664f9a407e332291f34d174625a9d5ea (patch)
tree7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/identity/test_backends.py
parent2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff)
parent2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff)
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/identity/test_backends.py')
-rw-r--r--keystone-moon/keystone/tests/unit/identity/test_backends.py1297
1 files changed, 1297 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/identity/test_backends.py b/keystone-moon/keystone/tests/unit/identity/test_backends.py
new file mode 100644
index 00000000..8b5c0def
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/identity/test_backends.py
@@ -0,0 +1,1297 @@
+# Copyright 2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import mock
+from oslo_config import cfg
+from six.moves import range
+from testtools import matchers
+
+from keystone.common import driver_hints
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit import default_fixtures
+from keystone.tests.unit import filtering
+
+
+CONF = cfg.CONF
+
+
+class IdentityTests(object):
+
+ def _get_domain_fixture(self):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ return domain
+
+ def _set_domain_scope(self, domain_id):
+ # We only provide a domain scope if we have multiple drivers
+ if CONF.identity.domain_specific_drivers_enabled:
+ return domain_id
+
+ def test_authenticate_bad_user(self):
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=uuid.uuid4().hex,
+ password=self.user_foo['password'])
+
+ def test_authenticate_bad_password(self):
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=self.user_foo['id'],
+ password=uuid.uuid4().hex)
+
+ def test_authenticate(self):
+ user_ref = self.identity_api.authenticate(
+ context={},
+ user_id=self.user_sna['id'],
+ password=self.user_sna['password'])
+ # NOTE(termie): the password field is left in user_sna to make
+ # it easier to authenticate in tests, but should
+ # not be returned by the api
+ self.user_sna.pop('password')
+ self.user_sna['enabled'] = True
+ self.assertDictEqual(self.user_sna, user_ref)
+
+ def test_authenticate_and_get_roles_no_metadata(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+
+ # Remove user id. It is ignored by create_user() and will break the
+ # subset test below.
+ del user['id']
+
+ new_user = self.identity_api.create_user(user)
+ self.assignment_api.add_user_to_project(self.tenant_baz['id'],
+ new_user['id'])
+ user_ref = self.identity_api.authenticate(
+ context={},
+ user_id=new_user['id'],
+ password=user['password'])
+ self.assertNotIn('password', user_ref)
+ # NOTE(termie): the password field is left in user_sna to make
+ # it easier to authenticate in tests, but should
+ # not be returned by the api
+ user.pop('password')
+ self.assertDictContainsSubset(user, user_ref)
+ role_list = self.assignment_api.get_roles_for_user_and_project(
+ new_user['id'], self.tenant_baz['id'])
+ self.assertEqual(1, len(role_list))
+ self.assertIn(CONF.member_role_id, role_list)
+
+ def test_authenticate_if_no_password_set(self):
+ id_ = uuid.uuid4().hex
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ self.identity_api.create_user(user)
+
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=id_,
+ password='password')
+
+ def test_create_unicode_user_name(self):
+ unicode_name = u'name \u540d\u5b57'
+ user = unit.new_user_ref(name=unicode_name,
+ domain_id=CONF.identity.default_domain_id)
+ ref = self.identity_api.create_user(user)
+ self.assertEqual(unicode_name, ref['name'])
+
+ def test_get_user(self):
+ user_ref = self.identity_api.get_user(self.user_foo['id'])
+ # NOTE(termie): the password field is left in user_foo to make
+ # it easier to authenticate in tests, but should
+ # not be returned by the api
+ self.user_foo.pop('password')
+ self.assertDictEqual(self.user_foo, user_ref)
+
+ @unit.skip_if_cache_disabled('identity')
+ def test_cache_layer_get_user(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ self.identity_api.create_user(user)
+ ref = self.identity_api.get_user_by_name(user['name'],
+ user['domain_id'])
+ # cache the result.
+ self.identity_api.get_user(ref['id'])
+ # delete bypassing identity api
+ domain_id, driver, entity_id = (
+ self.identity_api._get_domain_driver_and_entity_id(ref['id']))
+ driver.delete_user(entity_id)
+
+ self.assertDictEqual(ref, self.identity_api.get_user(ref['id']))
+ self.identity_api.get_user.invalidate(self.identity_api, ref['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user, ref['id'])
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ ref = self.identity_api.get_user_by_name(user['name'],
+ user['domain_id'])
+ user['description'] = uuid.uuid4().hex
+ # cache the result.
+ self.identity_api.get_user(ref['id'])
+ # update using identity api and get back updated user.
+ user_updated = self.identity_api.update_user(ref['id'], user)
+ self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
+ user_updated)
+ self.assertDictContainsSubset(
+ self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
+ user_updated)
+
+ def test_get_user_returns_not_found(self):
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ uuid.uuid4().hex)
+
+ def test_get_user_by_name(self):
+ user_ref = self.identity_api.get_user_by_name(
+ self.user_foo['name'], CONF.identity.default_domain_id)
+ # NOTE(termie): the password field is left in user_foo to make
+ # it easier to authenticate in tests, but should
+ # not be returned by the api
+ self.user_foo.pop('password')
+ self.assertDictEqual(self.user_foo, user_ref)
+
+ @unit.skip_if_cache_disabled('identity')
+ def test_cache_layer_get_user_by_name(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ self.identity_api.create_user(user)
+ ref = self.identity_api.get_user_by_name(user['name'],
+ user['domain_id'])
+ # delete bypassing the identity api.
+ domain_id, driver, entity_id = (
+ self.identity_api._get_domain_driver_and_entity_id(ref['id']))
+ driver.delete_user(entity_id)
+
+ self.assertDictEqual(ref, self.identity_api.get_user_by_name(
+ user['name'], CONF.identity.default_domain_id))
+ self.identity_api.get_user_by_name.invalidate(
+ self.identity_api, user['name'], CONF.identity.default_domain_id)
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user_by_name,
+ user['name'], CONF.identity.default_domain_id)
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ ref = self.identity_api.get_user_by_name(user['name'],
+ user['domain_id'])
+ user['description'] = uuid.uuid4().hex
+ user_updated = self.identity_api.update_user(ref['id'], user)
+ self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
+ user_updated)
+ self.assertDictContainsSubset(
+ self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
+ user_updated)
+
+ def test_get_user_by_name_returns_not_found(self):
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user_by_name,
+ uuid.uuid4().hex,
+ CONF.identity.default_domain_id)
+
+ def test_create_duplicate_user_name_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ self.assertRaises(exception.Conflict,
+ self.identity_api.create_user,
+ user)
+
+ def test_create_duplicate_user_name_in_different_domains(self):
+ new_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(new_domain['id'], new_domain)
+ user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+
+ user2 = unit.new_user_ref(name=user1['name'],
+ domain_id=new_domain['id'])
+
+ self.identity_api.create_user(user1)
+ self.identity_api.create_user(user2)
+
+ def test_move_user_between_domains(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ user = unit.new_user_ref(domain_id=domain1['id'])
+ user = self.identity_api.create_user(user)
+ user['domain_id'] = domain2['id']
+ # Update the user asserting that a deprecation warning is emitted
+ with mock.patch(
+ 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
+ self.identity_api.update_user(user['id'], user)
+ self.assertTrue(mock_dep.called)
+
+ updated_user_ref = self.identity_api.get_user(user['id'])
+ self.assertEqual(domain2['id'], updated_user_ref['domain_id'])
+
+ def test_move_user_between_domains_with_clashing_names_fails(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ # First, create a user in domain1
+ user1 = unit.new_user_ref(domain_id=domain1['id'])
+ user1 = self.identity_api.create_user(user1)
+ # Now create a user in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ user2 = unit.new_user_ref(name=user1['name'],
+ domain_id=domain2['id'])
+ user2 = self.identity_api.create_user(user2)
+ # Now try and move user1 into the 2nd domain - which should
+ # fail since the names clash
+ user1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_user,
+ user1['id'],
+ user1)
+
+ def test_rename_duplicate_user_name_fails(self):
+ user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ self.identity_api.create_user(user1)
+ user2 = self.identity_api.create_user(user2)
+ user2['name'] = user1['name']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_user,
+ user2['id'],
+ user2)
+
+ def test_update_user_id_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ original_id = user['id']
+ user['id'] = 'fake2'
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ original_id,
+ user)
+ user_ref = self.identity_api.get_user(original_id)
+ self.assertEqual(original_id, user_ref['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ 'fake2')
+
+ def test_delete_user_with_group_project_domain_links(self):
+ role1 = unit.new_role_ref()
+ self.role_api.create_role(role1['id'], role1)
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ project1 = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ user1 = unit.new_user_ref(domain_id=domain1['id'])
+ user1 = self.identity_api.create_user(user1)
+ group1 = unit.new_group_ref(domain_id=domain1['id'])
+ group1 = self.identity_api.create_group(group1)
+ self.assignment_api.create_grant(user_id=user1['id'],
+ project_id=project1['id'],
+ role_id=role1['id'])
+ self.assignment_api.create_grant(user_id=user1['id'],
+ domain_id=domain1['id'],
+ role_id=role1['id'])
+ self.identity_api.add_user_to_group(user_id=user1['id'],
+ group_id=group1['id'])
+ roles_ref = self.assignment_api.list_grants(
+ user_id=user1['id'],
+ project_id=project1['id'])
+ self.assertEqual(1, len(roles_ref))
+ roles_ref = self.assignment_api.list_grants(
+ user_id=user1['id'],
+ domain_id=domain1['id'])
+ self.assertEqual(1, len(roles_ref))
+ self.identity_api.check_user_in_group(
+ user_id=user1['id'],
+ group_id=group1['id'])
+ self.identity_api.delete_user(user1['id'])
+ self.assertRaises(exception.NotFound,
+ self.identity_api.check_user_in_group,
+ user1['id'],
+ group1['id'])
+
+ def test_delete_group_with_user_project_domain_links(self):
+ role1 = unit.new_role_ref()
+ self.role_api.create_role(role1['id'], role1)
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ project1 = unit.new_project_ref(domain_id=domain1['id'])
+ self.resource_api.create_project(project1['id'], project1)
+ user1 = unit.new_user_ref(domain_id=domain1['id'])
+ user1 = self.identity_api.create_user(user1)
+ group1 = unit.new_group_ref(domain_id=domain1['id'])
+ group1 = self.identity_api.create_group(group1)
+
+ self.assignment_api.create_grant(group_id=group1['id'],
+ project_id=project1['id'],
+ role_id=role1['id'])
+ self.assignment_api.create_grant(group_id=group1['id'],
+ domain_id=domain1['id'],
+ role_id=role1['id'])
+ self.identity_api.add_user_to_group(user_id=user1['id'],
+ group_id=group1['id'])
+ roles_ref = self.assignment_api.list_grants(
+ group_id=group1['id'],
+ project_id=project1['id'])
+ self.assertEqual(1, len(roles_ref))
+ roles_ref = self.assignment_api.list_grants(
+ group_id=group1['id'],
+ domain_id=domain1['id'])
+ self.assertEqual(1, len(roles_ref))
+ self.identity_api.check_user_in_group(
+ user_id=user1['id'],
+ group_id=group1['id'])
+ self.identity_api.delete_group(group1['id'])
+ self.identity_api.get_user(user1['id'])
+
+ def test_update_user_returns_not_found(self):
+ user_id = uuid.uuid4().hex
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.update_user,
+ user_id,
+ {'id': user_id,
+ 'domain_id': CONF.identity.default_domain_id})
+
+ def test_delete_user_returns_not_found(self):
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.delete_user,
+ uuid.uuid4().hex)
+
+ def test_create_user_long_name_fails(self):
+ user = unit.new_user_ref(name='a' * 256,
+ domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.create_user,
+ user)
+
+ def test_create_user_blank_name_fails(self):
+ user = unit.new_user_ref(name='',
+ domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.create_user,
+ user)
+
+ def test_create_user_missed_password(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ self.identity_api.get_user(user['id'])
+ # Make sure the user is not allowed to login
+ # with a password that is empty string or None
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=user['id'],
+ password='')
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=user['id'],
+ password=None)
+
+ def test_create_user_none_password(self):
+ user = unit.new_user_ref(password=None,
+ domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ self.identity_api.get_user(user['id'])
+ # Make sure the user is not allowed to login
+ # with a password that is empty string or None
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=user['id'],
+ password='')
+ self.assertRaises(AssertionError,
+ self.identity_api.authenticate,
+ context={},
+ user_id=user['id'],
+ password=None)
+
+ def test_create_user_invalid_name_fails(self):
+ user = unit.new_user_ref(name=None,
+ domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.create_user,
+ user)
+
+ user = unit.new_user_ref(name=123,
+ domain_id=CONF.identity.default_domain_id)
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.create_user,
+ user)
+
+ def test_create_user_invalid_enabled_type_string(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
+ # invalid string value
+ enabled='true')
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.create_user,
+ user)
+
+ def test_update_user_long_name_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ user['name'] = 'a' * 256
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ user['id'],
+ user)
+
+ def test_update_user_blank_name_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ user['name'] = ''
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ user['id'],
+ user)
+
+ def test_update_user_invalid_name_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+
+ user['name'] = None
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ user['id'],
+ user)
+
+ user['name'] = 123
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ user['id'],
+ user)
+
+ def test_list_users(self):
+ users = self.identity_api.list_users(
+ domain_scope=self._set_domain_scope(
+ CONF.identity.default_domain_id))
+ self.assertEqual(len(default_fixtures.USERS), len(users))
+ user_ids = set(user['id'] for user in users)
+ expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
+ for user in default_fixtures.USERS)
+ for user_ref in users:
+ self.assertNotIn('password', user_ref)
+ self.assertEqual(expected_user_ids, user_ids)
+
+ def test_list_groups(self):
+ group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group1 = self.identity_api.create_group(group1)
+ group2 = self.identity_api.create_group(group2)
+ groups = self.identity_api.list_groups(
+ domain_scope=self._set_domain_scope(
+ CONF.identity.default_domain_id))
+ self.assertEqual(2, len(groups))
+ group_ids = []
+ for group in groups:
+ group_ids.append(group.get('id'))
+ self.assertIn(group1['id'], group_ids)
+ self.assertIn(group2['id'], group_ids)
+
+ def test_create_user_doesnt_modify_passed_in_dict(self):
+ new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ original_user = new_user.copy()
+ self.identity_api.create_user(new_user)
+ self.assertDictEqual(original_user, new_user)
+
+ def test_update_user_enable(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertTrue(user_ref['enabled'])
+
+ user['enabled'] = False
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertEqual(user['enabled'], user_ref['enabled'])
+
+ # If not present, enabled field should not be updated
+ del user['enabled']
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertFalse(user_ref['enabled'])
+
+ user['enabled'] = True
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertEqual(user['enabled'], user_ref['enabled'])
+
+ del user['enabled']
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertTrue(user_ref['enabled'])
+
+ # Integers are valid Python's booleans. Explicitly test it.
+ user['enabled'] = 0
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertFalse(user_ref['enabled'])
+
+ # Any integers other than 0 are interpreted as True
+ user['enabled'] = -42
+ self.identity_api.update_user(user['id'], user)
+ user_ref = self.identity_api.get_user(user['id'])
+ # NOTE(breton): below, attribute `enabled` is explicitly tested to be
+ # equal True. assertTrue should not be used, because it converts
+ # the passed value to bool().
+ self.assertIs(user_ref['enabled'], True)
+
+ def test_update_user_name(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertEqual(user['name'], user_ref['name'])
+
+ changed_name = user_ref['name'] + '_changed'
+ user_ref['name'] = changed_name
+ updated_user = self.identity_api.update_user(user_ref['id'], user_ref)
+
+ # NOTE(dstanek): the SQL backend adds an 'extra' field containing a
+ # dictionary of the extra fields in addition to the
+ # fields in the object. For the details see:
+ # SqlIdentity.test_update_project_returns_extra
+ updated_user.pop('extra', None)
+
+ self.assertDictEqual(user_ref, updated_user)
+
+ user_ref = self.identity_api.get_user(user_ref['id'])
+ self.assertEqual(changed_name, user_ref['name'])
+
+ def test_update_user_enable_fails(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ user = self.identity_api.create_user(user)
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertTrue(user_ref['enabled'])
+
+ # Strings are not valid boolean values
+ user['enabled'] = 'false'
+ self.assertRaises(exception.ValidationError,
+ self.identity_api.update_user,
+ user['id'],
+ user)
+
+ def test_add_user_to_group(self):
+ domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+ groups = self.identity_api.list_groups_for_user(new_user['id'])
+
+ found = False
+ for x in groups:
+ if (x['id'] == new_group['id']):
+ found = True
+ self.assertTrue(found)
+
+ def test_add_user_to_group_returns_not_found(self):
+ domain = self._get_domain_fixture()
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.add_user_to_group,
+ new_user['id'],
+ uuid.uuid4().hex)
+
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.add_user_to_group,
+ uuid.uuid4().hex,
+ new_group['id'])
+
+ self.assertRaises(exception.NotFound,
+ self.identity_api.add_user_to_group,
+ uuid.uuid4().hex,
+ uuid.uuid4().hex)
+
+ def test_check_user_in_group(self):
+ domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+ self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
+
+ def test_check_user_not_in_group(self):
+ new_group = unit.new_group_ref(
+ domain_id=CONF.identity.default_domain_id)
+ new_group = self.identity_api.create_group(new_group)
+
+ new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ new_user = self.identity_api.create_user(new_user)
+
+ self.assertRaises(exception.NotFound,
+ self.identity_api.check_user_in_group,
+ new_user['id'],
+ new_group['id'])
+
+ def test_check_user_in_group_returns_not_found(self):
+ new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+ new_user = self.identity_api.create_user(new_user)
+
+ new_group = unit.new_group_ref(
+ domain_id=CONF.identity.default_domain_id)
+ new_group = self.identity_api.create_group(new_group)
+
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.check_user_in_group,
+ uuid.uuid4().hex,
+ new_group['id'])
+
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.check_user_in_group,
+ new_user['id'],
+ uuid.uuid4().hex)
+
+ self.assertRaises(exception.NotFound,
+ self.identity_api.check_user_in_group,
+ uuid.uuid4().hex,
+ uuid.uuid4().hex)
+
+ def test_list_users_in_group(self):
+ domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ # Make sure we get an empty list back on a new group, not an error.
+ user_refs = self.identity_api.list_users_in_group(new_group['id'])
+ self.assertEqual([], user_refs)
+ # Make sure we get the correct users back once they have been added
+ # to the group.
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+ user_refs = self.identity_api.list_users_in_group(new_group['id'])
+ found = False
+ for x in user_refs:
+ if (x['id'] == new_user['id']):
+ found = True
+ self.assertNotIn('password', x)
+ self.assertTrue(found)
+
+ def test_list_users_in_group_returns_not_found(self):
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.list_users_in_group,
+ uuid.uuid4().hex)
+
+ def test_list_groups_for_user(self):
+ domain = self._get_domain_fixture()
+ test_groups = []
+ test_users = []
+ GROUP_COUNT = 3
+ USER_COUNT = 2
+
+ for x in range(0, USER_COUNT):
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ test_users.append(new_user)
+ positive_user = test_users[0]
+ negative_user = test_users[1]
+
+ for x in range(0, USER_COUNT):
+ group_refs = self.identity_api.list_groups_for_user(
+ test_users[x]['id'])
+ self.assertEqual(0, len(group_refs))
+
+ for x in range(0, GROUP_COUNT):
+ before_count = x
+ after_count = x + 1
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ test_groups.append(new_group)
+
+ # add the user to the group and ensure that the
+ # group count increases by one for each
+ group_refs = self.identity_api.list_groups_for_user(
+ positive_user['id'])
+ self.assertEqual(before_count, len(group_refs))
+ self.identity_api.add_user_to_group(
+ positive_user['id'],
+ new_group['id'])
+ group_refs = self.identity_api.list_groups_for_user(
+ positive_user['id'])
+ self.assertEqual(after_count, len(group_refs))
+
+ # Make sure the group count for the unrelated user did not change
+ group_refs = self.identity_api.list_groups_for_user(
+ negative_user['id'])
+ self.assertEqual(0, len(group_refs))
+
+ # remove the user from each group and ensure that
+ # the group count reduces by one for each
+ for x in range(0, 3):
+ before_count = GROUP_COUNT - x
+ after_count = GROUP_COUNT - x - 1
+ group_refs = self.identity_api.list_groups_for_user(
+ positive_user['id'])
+ self.assertEqual(before_count, len(group_refs))
+ self.identity_api.remove_user_from_group(
+ positive_user['id'],
+ test_groups[x]['id'])
+ group_refs = self.identity_api.list_groups_for_user(
+ positive_user['id'])
+ self.assertEqual(after_count, len(group_refs))
+ # Make sure the group count for the unrelated user
+ # did not change
+ group_refs = self.identity_api.list_groups_for_user(
+ negative_user['id'])
+ self.assertEqual(0, len(group_refs))
+
+ def test_remove_user_from_group(self):
+ domain = self._get_domain_fixture()
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ self.identity_api.add_user_to_group(new_user['id'],
+ new_group['id'])
+ groups = self.identity_api.list_groups_for_user(new_user['id'])
+ self.assertIn(new_group['id'], [x['id'] for x in groups])
+ self.identity_api.remove_user_from_group(new_user['id'],
+ new_group['id'])
+ groups = self.identity_api.list_groups_for_user(new_user['id'])
+ self.assertNotIn(new_group['id'], [x['id'] for x in groups])
+
+ def test_remove_user_from_group_returns_not_found(self):
+ domain = self._get_domain_fixture()
+ new_user = unit.new_user_ref(domain_id=domain['id'])
+ new_user = self.identity_api.create_user(new_user)
+ new_group = unit.new_group_ref(domain_id=domain['id'])
+ new_group = self.identity_api.create_group(new_group)
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.remove_user_from_group,
+ new_user['id'],
+ uuid.uuid4().hex)
+
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.remove_user_from_group,
+ uuid.uuid4().hex,
+ new_group['id'])
+
+ self.assertRaises(exception.NotFound,
+ self.identity_api.remove_user_from_group,
+ uuid.uuid4().hex,
+ uuid.uuid4().hex)
+
+ def test_group_crud(self):
+ domain = unit.new_domain_ref()
+ self.resource_api.create_domain(domain['id'], domain)
+ group = unit.new_group_ref(domain_id=domain['id'])
+ group = self.identity_api.create_group(group)
+ group_ref = self.identity_api.get_group(group['id'])
+ self.assertDictContainsSubset(group, group_ref)
+
+ group['name'] = uuid.uuid4().hex
+ self.identity_api.update_group(group['id'], group)
+ group_ref = self.identity_api.get_group(group['id'])
+ self.assertDictContainsSubset(group, group_ref)
+
+ self.identity_api.delete_group(group['id'])
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.get_group,
+ group['id'])
+
+ def test_get_group_by_name(self):
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group_name = group['name']
+ group = self.identity_api.create_group(group)
+ spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ self.identity_api.create_group(spoiler)
+
+ group_ref = self.identity_api.get_group_by_name(
+ group_name, CONF.identity.default_domain_id)
+ self.assertDictEqual(group, group_ref)
+
+ def test_get_group_by_name_returns_not_found(self):
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.get_group_by_name,
+ uuid.uuid4().hex,
+ CONF.identity.default_domain_id)
+
+ @unit.skip_if_cache_disabled('identity')
+ def test_cache_layer_group_crud(self):
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group = self.identity_api.create_group(group)
+ # cache the result
+ group_ref = self.identity_api.get_group(group['id'])
+ # delete the group bypassing identity api.
+ domain_id, driver, entity_id = (
+ self.identity_api._get_domain_driver_and_entity_id(group['id']))
+ driver.delete_group(entity_id)
+
+ self.assertEqual(group_ref, self.identity_api.get_group(group['id']))
+ self.identity_api.get_group.invalidate(self.identity_api, group['id'])
+ self.assertRaises(exception.GroupNotFound,
+ self.identity_api.get_group, group['id'])
+
+ group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group = self.identity_api.create_group(group)
+ # cache the result
+ self.identity_api.get_group(group['id'])
+ group['name'] = uuid.uuid4().hex
+ group_ref = self.identity_api.update_group(group['id'], group)
+ # after updating through identity api, get updated group
+ self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
+ group_ref)
+
+ def test_create_duplicate_group_name_fails(self):
+ group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
+ name=group1['name'])
+ group1 = self.identity_api.create_group(group1)
+ self.assertRaises(exception.Conflict,
+ self.identity_api.create_group,
+ group2)
+
+ def test_create_duplicate_group_name_in_different_domains(self):
+ new_domain = unit.new_domain_ref()
+ self.resource_api.create_domain(new_domain['id'], new_domain)
+ group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
+ group2 = unit.new_group_ref(domain_id=new_domain['id'],
+ name=group1['name'])
+ group1 = self.identity_api.create_group(group1)
+ group2 = self.identity_api.create_group(group2)
+
+ def test_move_group_between_domains(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ group = unit.new_group_ref(domain_id=domain1['id'])
+ group = self.identity_api.create_group(group)
+ group['domain_id'] = domain2['id']
+ # Update the group asserting that a deprecation warning is emitted
+ with mock.patch(
+ 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
+ self.identity_api.update_group(group['id'], group)
+ self.assertTrue(mock_dep.called)
+
+ updated_group_ref = self.identity_api.get_group(group['id'])
+ self.assertEqual(domain2['id'], updated_group_ref['domain_id'])
+
+ def test_move_group_between_domains_with_clashing_names_fails(self):
+ domain1 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain1['id'], domain1)
+ domain2 = unit.new_domain_ref()
+ self.resource_api.create_domain(domain2['id'], domain2)
+ # First, create a group in domain1
+ group1 = unit.new_group_ref(domain_id=domain1['id'])
+ group1 = self.identity_api.create_group(group1)
+ # Now create a group in domain2 with a potentially clashing
+ # name - which should work since we have domain separation
+ group2 = unit.new_group_ref(name=group1['name'],
+ domain_id=domain2['id'])
+ group2 = self.identity_api.create_group(group2)
+ # Now try and move group1 into the 2nd domain - which should
+ # fail since the names clash
+ group1['domain_id'] = domain2['id']
+ self.assertRaises(exception.Conflict,
+ self.identity_api.update_group,
+ group1['id'],
+ group1)
+
+ def test_user_crud(self):
+ user_dict = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ del user_dict['id']
+ user = self.identity_api.create_user(user_dict)
+ user_ref = self.identity_api.get_user(user['id'])
+ del user_dict['password']
+ user_ref_dict = {x: user_ref[x] for x in user_ref}
+ self.assertDictContainsSubset(user_dict, user_ref_dict)
+
+ user_dict['password'] = uuid.uuid4().hex
+ self.identity_api.update_user(user['id'], user_dict)
+ user_ref = self.identity_api.get_user(user['id'])
+ del user_dict['password']
+ user_ref_dict = {x: user_ref[x] for x in user_ref}
+ self.assertDictContainsSubset(user_dict, user_ref_dict)
+
+ self.identity_api.delete_user(user['id'])
+ self.assertRaises(exception.UserNotFound,
+ self.identity_api.get_user,
+ user['id'])
+
+ def test_arbitrary_attributes_are_returned_from_create_user(self):
+ attr_value = uuid.uuid4().hex
+ user_data = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id,
+ arbitrary_attr=attr_value)
+
+ user = self.identity_api.create_user(user_data)
+
+ self.assertEqual(attr_value, user['arbitrary_attr'])
+
+ def test_arbitrary_attributes_are_returned_from_get_user(self):
+ attr_value = uuid.uuid4().hex
+ user_data = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id,
+ arbitrary_attr=attr_value)
+
+ user_data = self.identity_api.create_user(user_data)
+
+ user = self.identity_api.get_user(user_data['id'])
+ self.assertEqual(attr_value, user['arbitrary_attr'])
+
+ def test_new_arbitrary_attributes_are_returned_from_update_user(self):
+ user_data = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+
+ user = self.identity_api.create_user(user_data)
+ attr_value = uuid.uuid4().hex
+ user['arbitrary_attr'] = attr_value
+ updated_user = self.identity_api.update_user(user['id'], user)
+
+ self.assertEqual(attr_value, updated_user['arbitrary_attr'])
+
+ def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
+ attr_value = uuid.uuid4().hex
+ user_data = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id,
+ arbitrary_attr=attr_value)
+
+ new_attr_value = uuid.uuid4().hex
+ user = self.identity_api.create_user(user_data)
+ user['arbitrary_attr'] = new_attr_value
+ updated_user = self.identity_api.update_user(user['id'], user)
+
+ self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
+
+ def test_user_update_and_user_get_return_same_response(self):
+ user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
+
+ user = self.identity_api.create_user(user)
+
+ updated_user = {'enabled': False}
+ updated_user_ref = self.identity_api.update_user(
+ user['id'], updated_user)
+
+ # SQL backend adds 'extra' field
+ updated_user_ref.pop('extra', None)
+
+ self.assertIs(False, updated_user_ref['enabled'])
+
+ user_ref = self.identity_api.get_user(user['id'])
+ self.assertDictEqual(updated_user_ref, user_ref)
+
+
+class FilterTests(filtering.FilterTests):
+ def test_list_entities_filtered(self):
+ for entity in ['user', 'group', 'project']:
+ # Create 20 entities
+ entity_list = self._create_test_data(entity, 20)
+
+ # Try filtering to get one an exact item out of the list
+ hints = driver_hints.Hints()
+ hints.add_filter('name', entity_list[10]['name'])
+ entities = self._list_entities(entity)(hints=hints)
+ self.assertEqual(1, len(entities))
+ self.assertEqual(entity_list[10]['id'], entities[0]['id'])
+ # Check the driver has removed the filter from the list hints
+ self.assertFalse(hints.get_exact_filter_by_name('name'))
+ self._delete_test_data(entity, entity_list)
+
+ def test_list_users_inexact_filtered(self):
+ # Create 20 users, some with specific names. We set the names at create
+ # time (rather than updating them), since the LDAP driver does not
+ # support name updates.
+ user_name_data = {
+ # user index: name for user
+ 5: 'The',
+ 6: 'The Ministry',
+ 7: 'The Ministry of',
+ 8: 'The Ministry of Silly',
+ 9: 'The Ministry of Silly Walks',
+ # ...and one for useful case insensitivity testing
+ 10: 'The ministry of silly walks OF'
+ }
+ user_list = self._create_test_data(
+ 'user', 20, domain_id=CONF.identity.default_domain_id,
+ name_dict=user_name_data)
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'ministry', comparator='contains')
+ users = self.identity_api.list_users(hints=hints)
+ self.assertEqual(5, len(users))
+ self._match_with_list(users, user_list,
+ list_start=6, list_end=11)
+ # TODO(henry-nash) Check inexact filter has been removed.
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'The', comparator='startswith')
+ users = self.identity_api.list_users(hints=hints)
+ self.assertEqual(6, len(users))
+ self._match_with_list(users, user_list,
+ list_start=5, list_end=11)
+ # TODO(henry-nash) Check inexact filter has been removed.
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'of', comparator='endswith')
+ users = self.identity_api.list_users(hints=hints)
+ self.assertEqual(2, len(users))
+ # We can't assume we will get back the users in any particular order
+ self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
+ self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
+ # TODO(henry-nash) Check inexact filter has been removed.
+
+ # TODO(henry-nash): Add some case sensitive tests. However,
+ # these would be hard to validate currently, since:
+ #
+ # For SQL, the issue is that MySQL 0.7, by default, is installed in
+ # case insensitive mode (which is what is run by default for our
+ # SQL backend tests). For production deployments. OpenStack
+ # assumes a case sensitive database. For these tests, therefore, we
+ # need to be able to check the sensitivity of the database so as to
+ # know whether to run case sensitive tests here.
+ #
+ # For LDAP/AD, although dependent on the schema being used, attributes
+ # are typically configured to be case aware, but not case sensitive.
+
+ self._delete_test_data('user', user_list)
+
+ def _groups_for_user_data(self):
+ number_of_groups = 10
+ group_name_data = {
+ # entity index: name for entity
+ 5: 'The',
+ 6: 'The Ministry',
+ 9: 'The Ministry of Silly Walks',
+ }
+ group_list = self._create_test_data(
+ 'group', number_of_groups,
+ domain_id=CONF.identity.default_domain_id,
+ name_dict=group_name_data)
+ user_list = self._create_test_data('user', 2)
+
+ for group in range(7):
+ # Create membership, including with two out of the three groups
+ # with well know names
+ self.identity_api.add_user_to_group(user_list[0]['id'],
+ group_list[group]['id'])
+ # ...and some spoiler memberships
+ for group in range(7, number_of_groups):
+ self.identity_api.add_user_to_group(user_list[1]['id'],
+ group_list[group]['id'])
+
+ return group_list, user_list
+
+ def test_groups_for_user_inexact_filtered(self):
+ """Test use of filtering doesn't break groups_for_user listing.
+
+ Some backends may use filtering to achieve the list of groups for a
+ user, so test that it can combine a second filter.
+
+ Test Plan:
+
+ - Create 10 groups, some with names we can filter on
+ - Create 2 users
+ - Assign 1 of those users to most of the groups, including some of the
+ well known named ones
+ - Assign the other user to other groups as spoilers
+ - Ensure that when we list groups for users with a filter on the group
+ name, both restrictions have been enforced on what is returned.
+
+ """
+ group_list, user_list = self._groups_for_user_data()
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'Ministry', comparator='contains')
+ groups = self.identity_api.list_groups_for_user(
+ user_list[0]['id'], hints=hints)
+ # We should only get back one group, since of the two that contain
+ # 'Ministry' the user only belongs to one.
+ self.assertThat(len(groups), matchers.Equals(1))
+ self.assertEqual(group_list[6]['id'], groups[0]['id'])
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'The', comparator='startswith')
+ groups = self.identity_api.list_groups_for_user(
+ user_list[0]['id'], hints=hints)
+ # We should only get back 2 out of the 3 groups that start with 'The'
+ # hence showing that both "filters" have been applied
+ self.assertThat(len(groups), matchers.Equals(2))
+ self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
+ self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
+
+ hints.add_filter('name', 'The', comparator='endswith')
+ groups = self.identity_api.list_groups_for_user(
+ user_list[0]['id'], hints=hints)
+ # We should only get back one group since it is the only one that
+ # ends with 'The'
+ self.assertThat(len(groups), matchers.Equals(1))
+ self.assertEqual(group_list[5]['id'], groups[0]['id'])
+
+ self._delete_test_data('user', user_list)
+ self._delete_test_data('group', group_list)
+
+ def test_groups_for_user_exact_filtered(self):
+ """Test exact filters doesn't break groups_for_user listing."""
+ group_list, user_list = self._groups_for_user_data()
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'The Ministry', comparator='equals')
+ groups = self.identity_api.list_groups_for_user(
+ user_list[0]['id'], hints=hints)
+ # We should only get back 1 out of the 3 groups with name 'The
+ # Ministry' hence showing that both "filters" have been applied.
+ self.assertEqual(1, len(groups))
+ self.assertEqual(group_list[6]['id'], groups[0]['id'])
+ self._delete_test_data('user', user_list)
+ self._delete_test_data('group', group_list)
+
+ def _get_user_name_field_size(self):
+ """Return the size of the user name field for the backend.
+
+ Subclasses can override this method to indicate that the user name
+ field is limited in length. The user name is the field used in the test
+ that validates that a filter value works even if it's longer than a
+ field.
+
+ If the backend doesn't limit the value length then return None.
+
+ """
+ return None
+
+ def test_filter_value_wider_than_field(self):
+ # If a filter value is given that's larger than the field in the
+ # backend then no values are returned.
+
+ user_name_field_size = self._get_user_name_field_size()
+
+ if user_name_field_size is None:
+ # The backend doesn't limit the size of the user name, so pass this
+ # test.
+ return
+
+ # Create some users just to make sure would return something if the
+ # filter was ignored.
+ self._create_test_data('user', 2)
+
+ hints = driver_hints.Hints()
+ value = 'A' * (user_name_field_size + 1)
+ hints.add_filter('name', value)
+ users = self.identity_api.list_users(hints=hints)
+ self.assertEqual([], users)
+
+ def _list_users_in_group_data(self):
+ number_of_users = 10
+ user_name_data = {
+ 1: 'Arthur Conan Doyle',
+ 3: 'Arthur Rimbaud',
+ 9: 'Arthur Schopenhauer',
+ }
+ user_list = self._create_test_data(
+ 'user', number_of_users,
+ domain_id=CONF.identity.default_domain_id,
+ name_dict=user_name_data)
+ group = self._create_one_entity(
+ 'group', CONF.identity.default_domain_id, 'Great Writers')
+ for i in range(7):
+ self.identity_api.add_user_to_group(user_list[i]['id'],
+ group['id'])
+
+ return user_list, group
+
+ def test_list_users_in_group_inexact_filtered(self):
+ user_list, group = self._list_users_in_group_data()
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'Arthur', comparator='contains')
+ users = self.identity_api.list_users_in_group(group['id'], hints=hints)
+ self.assertThat(len(users), matchers.Equals(2))
+ self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
+ self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'Arthur', comparator='startswith')
+ users = self.identity_api.list_users_in_group(group['id'], hints=hints)
+ self.assertThat(len(users), matchers.Equals(2))
+ self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
+ self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
+
+ hints = driver_hints.Hints()
+ hints.add_filter('name', 'Doyle', comparator='endswith')
+ users = self.identity_api.list_users_in_group(group['id'], hints=hints)
+ self.assertThat(len(users), matchers.Equals(1))
+ self.assertEqual(user_list[1]['id'], users[0]['id'])
+
+ self._delete_test_data('user', user_list)
+ self._delete_entity('group')(group['id'])
+
+ def test_list_users_in_group_exact_filtered(self):
+ hints = driver_hints.Hints()
+ user_list, group = self._list_users_in_group_data()
+ hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
+ users = self.identity_api.list_users_in_group(group['id'], hints=hints)
+ self.assertEqual(1, len(users))
+ self.assertEqual(user_list[3]['id'], users[0]['id'])
+ self._delete_test_data('user', user_list)
+ self._delete_entity('group')(group['id'])
+
+
+class LimitTests(filtering.FilterTests):
+ ENTITIES = ['user', 'group', 'project']
+
+ def setUp(self):
+ """Setup for Limit Test Cases."""
+ self.entity_lists = {}
+
+ for entity in self.ENTITIES:
+ # Create 20 entities
+ self.entity_lists[entity] = self._create_test_data(entity, 20)
+ self.addCleanup(self.clean_up_entities)
+
+ def clean_up_entities(self):
+ """Clean up entity test data from Limit Test Cases."""
+ for entity in self.ENTITIES:
+ self._delete_test_data(entity, self.entity_lists[entity])
+ del self.entity_lists
+
+ def _test_list_entity_filtered_and_limited(self, entity):
+ self.config_fixture.config(list_limit=10)
+ # Should get back just 10 entities
+ hints = driver_hints.Hints()
+ entities = self._list_entities(entity)(hints=hints)
+ self.assertEqual(hints.limit['limit'], len(entities))
+ self.assertTrue(hints.limit['truncated'])
+
+ # Override with driver specific limit
+ if entity == 'project':
+ self.config_fixture.config(group='resource', list_limit=5)
+ else:
+ self.config_fixture.config(group='identity', list_limit=5)
+
+ # Should get back just 5 users
+ hints = driver_hints.Hints()
+ entities = self._list_entities(entity)(hints=hints)
+ self.assertEqual(hints.limit['limit'], len(entities))
+
+ # Finally, let's pretend we want to get the full list of entities,
+ # even with the limits set, as part of some internal calculation.
+ # Calling the API without a hints list should achieve this, and
+ # return at least the 20 entries we created (there may be other
+ # entities lying around created by other tests/setup).
+ entities = self._list_entities(entity)()
+ self.assertTrue(len(entities) >= 20)
+ self._match_with_list(self.entity_lists[entity], entities)
+
+ def test_list_users_filtered_and_limited(self):
+ self._test_list_entity_filtered_and_limited('user')
+
+ def test_list_groups_filtered_and_limited(self):
+ self._test_list_entity_filtered_and_limited('group')
+
+ def test_list_projects_filtered_and_limited(self):
+ self._test_list_entity_filtered_and_limited('project')