aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/identity/test_backends.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/identity/test_backends.py')
-rw-r--r--keystone-moon/keystone/tests/unit/identity/test_backends.py1297
1 files changed, 0 insertions, 1297 deletions
diff --git a/keystone-moon/keystone/tests/unit/identity/test_backends.py b/keystone-moon/keystone/tests/unit/identity/test_backends.py
deleted file mode 100644
index 8b5c0def..00000000
--- a/keystone-moon/keystone/tests/unit/identity/test_backends.py
+++ /dev/null
@@ -1,1297 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-import mock
-from oslo_config import cfg
-from six.moves import range
-from testtools import matchers
-
-from keystone.common import driver_hints
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import filtering
-
-
-CONF = cfg.CONF
-
-
-class IdentityTests(object):
-
- def _get_domain_fixture(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- return domain
-
- def _set_domain_scope(self, domain_id):
- # We only provide a domain scope if we have multiple drivers
- if CONF.identity.domain_specific_drivers_enabled:
- return domain_id
-
- def test_authenticate_bad_user(self):
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=uuid.uuid4().hex,
- password=self.user_foo['password'])
-
- def test_authenticate_bad_password(self):
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=self.user_foo['id'],
- password=uuid.uuid4().hex)
-
- def test_authenticate(self):
- user_ref = self.identity_api.authenticate(
- context={},
- user_id=self.user_sna['id'],
- password=self.user_sna['password'])
- # NOTE(termie): the password field is left in user_sna to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_sna.pop('password')
- self.user_sna['enabled'] = True
- self.assertDictEqual(self.user_sna, user_ref)
-
- def test_authenticate_and_get_roles_no_metadata(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- # Remove user id. It is ignored by create_user() and will break the
- # subset test below.
- del user['id']
-
- new_user = self.identity_api.create_user(user)
- self.assignment_api.add_user_to_project(self.tenant_baz['id'],
- new_user['id'])
- user_ref = self.identity_api.authenticate(
- context={},
- user_id=new_user['id'],
- password=user['password'])
- self.assertNotIn('password', user_ref)
- # NOTE(termie): the password field is left in user_sna to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- user.pop('password')
- self.assertDictContainsSubset(user, user_ref)
- role_list = self.assignment_api.get_roles_for_user_and_project(
- new_user['id'], self.tenant_baz['id'])
- self.assertEqual(1, len(role_list))
- self.assertIn(CONF.member_role_id, role_list)
-
- def test_authenticate_if_no_password_set(self):
- id_ = uuid.uuid4().hex
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
-
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=id_,
- password='password')
-
- def test_create_unicode_user_name(self):
- unicode_name = u'name \u540d\u5b57'
- user = unit.new_user_ref(name=unicode_name,
- domain_id=CONF.identity.default_domain_id)
- ref = self.identity_api.create_user(user)
- self.assertEqual(unicode_name, ref['name'])
-
- def test_get_user(self):
- user_ref = self.identity_api.get_user(self.user_foo['id'])
- # NOTE(termie): the password field is left in user_foo to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_foo.pop('password')
- self.assertDictEqual(self.user_foo, user_ref)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_get_user(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- # cache the result.
- self.identity_api.get_user(ref['id'])
- # delete bypassing identity api
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(ref['id']))
- driver.delete_user(entity_id)
-
- self.assertDictEqual(ref, self.identity_api.get_user(ref['id']))
- self.identity_api.get_user.invalidate(self.identity_api, ref['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user, ref['id'])
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- user['description'] = uuid.uuid4().hex
- # cache the result.
- self.identity_api.get_user(ref['id'])
- # update using identity api and get back updated user.
- user_updated = self.identity_api.update_user(ref['id'], user)
- self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
- user_updated)
- self.assertDictContainsSubset(
- self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
- user_updated)
-
- def test_get_user_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- uuid.uuid4().hex)
-
- def test_get_user_by_name(self):
- user_ref = self.identity_api.get_user_by_name(
- self.user_foo['name'], CONF.identity.default_domain_id)
- # NOTE(termie): the password field is left in user_foo to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_foo.pop('password')
- self.assertDictEqual(self.user_foo, user_ref)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_get_user_by_name(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- # delete bypassing the identity api.
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(ref['id']))
- driver.delete_user(entity_id)
-
- self.assertDictEqual(ref, self.identity_api.get_user_by_name(
- user['name'], CONF.identity.default_domain_id))
- self.identity_api.get_user_by_name.invalidate(
- self.identity_api, user['name'], CONF.identity.default_domain_id)
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user_by_name,
- user['name'], CONF.identity.default_domain_id)
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- user['description'] = uuid.uuid4().hex
- user_updated = self.identity_api.update_user(ref['id'], user)
- self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
- user_updated)
- self.assertDictContainsSubset(
- self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
- user_updated)
-
- def test_get_user_by_name_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user_by_name,
- uuid.uuid4().hex,
- CONF.identity.default_domain_id)
-
- def test_create_duplicate_user_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.assertRaises(exception.Conflict,
- self.identity_api.create_user,
- user)
-
- def test_create_duplicate_user_name_in_different_domains(self):
- new_domain = unit.new_domain_ref()
- self.resource_api.create_domain(new_domain['id'], new_domain)
- user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- user2 = unit.new_user_ref(name=user1['name'],
- domain_id=new_domain['id'])
-
- self.identity_api.create_user(user1)
- self.identity_api.create_user(user2)
-
- def test_move_user_between_domains(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- user = unit.new_user_ref(domain_id=domain1['id'])
- user = self.identity_api.create_user(user)
- user['domain_id'] = domain2['id']
- # Update the user asserting that a deprecation warning is emitted
- with mock.patch(
- 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
- self.identity_api.update_user(user['id'], user)
- self.assertTrue(mock_dep.called)
-
- updated_user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(domain2['id'], updated_user_ref['domain_id'])
-
- def test_move_user_between_domains_with_clashing_names_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- # First, create a user in domain1
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- # Now create a user in domain2 with a potentially clashing
- # name - which should work since we have domain separation
- user2 = unit.new_user_ref(name=user1['name'],
- domain_id=domain2['id'])
- user2 = self.identity_api.create_user(user2)
- # Now try and move user1 into the 2nd domain - which should
- # fail since the names clash
- user1['domain_id'] = domain2['id']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_user,
- user1['id'],
- user1)
-
- def test_rename_duplicate_user_name_fails(self):
- user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user1)
- user2 = self.identity_api.create_user(user2)
- user2['name'] = user1['name']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_user,
- user2['id'],
- user2)
-
- def test_update_user_id_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- original_id = user['id']
- user['id'] = 'fake2'
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- original_id,
- user)
- user_ref = self.identity_api.get_user(original_id)
- self.assertEqual(original_id, user_ref['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- 'fake2')
-
- def test_delete_user_with_group_project_domain_links(self):
- role1 = unit.new_role_ref()
- self.role_api.create_role(role1['id'], role1)
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
- self.assignment_api.create_grant(user_id=user1['id'],
- project_id=project1['id'],
- role_id=role1['id'])
- self.assignment_api.create_grant(user_id=user1['id'],
- domain_id=domain1['id'],
- role_id=role1['id'])
- self.identity_api.add_user_to_group(user_id=user1['id'],
- group_id=group1['id'])
- roles_ref = self.assignment_api.list_grants(
- user_id=user1['id'],
- project_id=project1['id'])
- self.assertEqual(1, len(roles_ref))
- roles_ref = self.assignment_api.list_grants(
- user_id=user1['id'],
- domain_id=domain1['id'])
- self.assertEqual(1, len(roles_ref))
- self.identity_api.check_user_in_group(
- user_id=user1['id'],
- group_id=group1['id'])
- self.identity_api.delete_user(user1['id'])
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- user1['id'],
- group1['id'])
-
- def test_delete_group_with_user_project_domain_links(self):
- role1 = unit.new_role_ref()
- self.role_api.create_role(role1['id'], role1)
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
-
- self.assignment_api.create_grant(group_id=group1['id'],
- project_id=project1['id'],
- role_id=role1['id'])
- self.assignment_api.create_grant(group_id=group1['id'],
- domain_id=domain1['id'],
- role_id=role1['id'])
- self.identity_api.add_user_to_group(user_id=user1['id'],
- group_id=group1['id'])
- roles_ref = self.assignment_api.list_grants(
- group_id=group1['id'],
- project_id=project1['id'])
- self.assertEqual(1, len(roles_ref))
- roles_ref = self.assignment_api.list_grants(
- group_id=group1['id'],
- domain_id=domain1['id'])
- self.assertEqual(1, len(roles_ref))
- self.identity_api.check_user_in_group(
- user_id=user1['id'],
- group_id=group1['id'])
- self.identity_api.delete_group(group1['id'])
- self.identity_api.get_user(user1['id'])
-
- def test_update_user_returns_not_found(self):
- user_id = uuid.uuid4().hex
- self.assertRaises(exception.UserNotFound,
- self.identity_api.update_user,
- user_id,
- {'id': user_id,
- 'domain_id': CONF.identity.default_domain_id})
-
- def test_delete_user_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.delete_user,
- uuid.uuid4().hex)
-
- def test_create_user_long_name_fails(self):
- user = unit.new_user_ref(name='a' * 256,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_blank_name_fails(self):
- user = unit.new_user_ref(name='',
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_missed_password(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.identity_api.get_user(user['id'])
- # Make sure the user is not allowed to login
- # with a password that is empty string or None
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password='')
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password=None)
-
- def test_create_user_none_password(self):
- user = unit.new_user_ref(password=None,
- domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.identity_api.get_user(user['id'])
- # Make sure the user is not allowed to login
- # with a password that is empty string or None
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password='')
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password=None)
-
- def test_create_user_invalid_name_fails(self):
- user = unit.new_user_ref(name=None,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- user = unit.new_user_ref(name=123,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_invalid_enabled_type_string(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
- # invalid string value
- enabled='true')
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_update_user_long_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user['name'] = 'a' * 256
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_update_user_blank_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user['name'] = ''
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_update_user_invalid_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
-
- user['name'] = None
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- user['name'] = 123
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_list_users(self):
- users = self.identity_api.list_users(
- domain_scope=self._set_domain_scope(
- CONF.identity.default_domain_id))
- self.assertEqual(len(default_fixtures.USERS), len(users))
- user_ids = set(user['id'] for user in users)
- expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
- for user in default_fixtures.USERS)
- for user_ref in users:
- self.assertNotIn('password', user_ref)
- self.assertEqual(expected_user_ids, user_ids)
-
- def test_list_groups(self):
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group1 = self.identity_api.create_group(group1)
- group2 = self.identity_api.create_group(group2)
- groups = self.identity_api.list_groups(
- domain_scope=self._set_domain_scope(
- CONF.identity.default_domain_id))
- self.assertEqual(2, len(groups))
- group_ids = []
- for group in groups:
- group_ids.append(group.get('id'))
- self.assertIn(group1['id'], group_ids)
- self.assertIn(group2['id'], group_ids)
-
- def test_create_user_doesnt_modify_passed_in_dict(self):
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- original_user = new_user.copy()
- self.identity_api.create_user(new_user)
- self.assertDictEqual(original_user, new_user)
-
- def test_update_user_enable(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- user['enabled'] = False
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['enabled'], user_ref['enabled'])
-
- # If not present, enabled field should not be updated
- del user['enabled']
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertFalse(user_ref['enabled'])
-
- user['enabled'] = True
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['enabled'], user_ref['enabled'])
-
- del user['enabled']
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- # Integers are valid Python's booleans. Explicitly test it.
- user['enabled'] = 0
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertFalse(user_ref['enabled'])
-
- # Any integers other than 0 are interpreted as True
- user['enabled'] = -42
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- # NOTE(breton): below, attribute `enabled` is explicitly tested to be
- # equal True. assertTrue should not be used, because it converts
- # the passed value to bool().
- self.assertIs(user_ref['enabled'], True)
-
- def test_update_user_name(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['name'], user_ref['name'])
-
- changed_name = user_ref['name'] + '_changed'
- user_ref['name'] = changed_name
- updated_user = self.identity_api.update_user(user_ref['id'], user_ref)
-
- # NOTE(dstanek): the SQL backend adds an 'extra' field containing a
- # dictionary of the extra fields in addition to the
- # fields in the object. For the details see:
- # SqlIdentity.test_update_project_returns_extra
- updated_user.pop('extra', None)
-
- self.assertDictEqual(user_ref, updated_user)
-
- user_ref = self.identity_api.get_user(user_ref['id'])
- self.assertEqual(changed_name, user_ref['name'])
-
- def test_update_user_enable_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- # Strings are not valid boolean values
- user['enabled'] = 'false'
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_add_user_to_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
-
- found = False
- for x in groups:
- if (x['id'] == new_group['id']):
- found = True
- self.assertTrue(found)
-
- def test_add_user_to_group_returns_not_found(self):
- domain = self._get_domain_fixture()
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.add_user_to_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- self.assertRaises(exception.UserNotFound,
- self.identity_api.add_user_to_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.NotFound,
- self.identity_api.add_user_to_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_check_user_in_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
-
- def test_check_user_not_in_group(self):
- new_group = unit.new_group_ref(
- domain_id=CONF.identity.default_domain_id)
- new_group = self.identity_api.create_group(new_group)
-
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- new_user = self.identity_api.create_user(new_user)
-
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- new_user['id'],
- new_group['id'])
-
- def test_check_user_in_group_returns_not_found(self):
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- new_user = self.identity_api.create_user(new_user)
-
- new_group = unit.new_group_ref(
- domain_id=CONF.identity.default_domain_id)
- new_group = self.identity_api.create_group(new_group)
-
- self.assertRaises(exception.UserNotFound,
- self.identity_api.check_user_in_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.check_user_in_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_list_users_in_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- # Make sure we get an empty list back on a new group, not an error.
- user_refs = self.identity_api.list_users_in_group(new_group['id'])
- self.assertEqual([], user_refs)
- # Make sure we get the correct users back once they have been added
- # to the group.
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- user_refs = self.identity_api.list_users_in_group(new_group['id'])
- found = False
- for x in user_refs:
- if (x['id'] == new_user['id']):
- found = True
- self.assertNotIn('password', x)
- self.assertTrue(found)
-
- def test_list_users_in_group_returns_not_found(self):
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.list_users_in_group,
- uuid.uuid4().hex)
-
- def test_list_groups_for_user(self):
- domain = self._get_domain_fixture()
- test_groups = []
- test_users = []
- GROUP_COUNT = 3
- USER_COUNT = 2
-
- for x in range(0, USER_COUNT):
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- test_users.append(new_user)
- positive_user = test_users[0]
- negative_user = test_users[1]
-
- for x in range(0, USER_COUNT):
- group_refs = self.identity_api.list_groups_for_user(
- test_users[x]['id'])
- self.assertEqual(0, len(group_refs))
-
- for x in range(0, GROUP_COUNT):
- before_count = x
- after_count = x + 1
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- test_groups.append(new_group)
-
- # add the user to the group and ensure that the
- # group count increases by one for each
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(before_count, len(group_refs))
- self.identity_api.add_user_to_group(
- positive_user['id'],
- new_group['id'])
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(after_count, len(group_refs))
-
- # Make sure the group count for the unrelated user did not change
- group_refs = self.identity_api.list_groups_for_user(
- negative_user['id'])
- self.assertEqual(0, len(group_refs))
-
- # remove the user from each group and ensure that
- # the group count reduces by one for each
- for x in range(0, 3):
- before_count = GROUP_COUNT - x
- after_count = GROUP_COUNT - x - 1
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(before_count, len(group_refs))
- self.identity_api.remove_user_from_group(
- positive_user['id'],
- test_groups[x]['id'])
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(after_count, len(group_refs))
- # Make sure the group count for the unrelated user
- # did not change
- group_refs = self.identity_api.list_groups_for_user(
- negative_user['id'])
- self.assertEqual(0, len(group_refs))
-
- def test_remove_user_from_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
- self.assertIn(new_group['id'], [x['id'] for x in groups])
- self.identity_api.remove_user_from_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
- self.assertNotIn(new_group['id'], [x['id'] for x in groups])
-
- def test_remove_user_from_group_returns_not_found(self):
- domain = self._get_domain_fixture()
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.remove_user_from_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- self.assertRaises(exception.UserNotFound,
- self.identity_api.remove_user_from_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.NotFound,
- self.identity_api.remove_user_from_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_group_crud(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- group = unit.new_group_ref(domain_id=domain['id'])
- group = self.identity_api.create_group(group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictContainsSubset(group, group_ref)
-
- group['name'] = uuid.uuid4().hex
- self.identity_api.update_group(group['id'], group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictContainsSubset(group, group_ref)
-
- self.identity_api.delete_group(group['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group,
- group['id'])
-
- def test_get_group_by_name(self):
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group_name = group['name']
- group = self.identity_api.create_group(group)
- spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_group(spoiler)
-
- group_ref = self.identity_api.get_group_by_name(
- group_name, CONF.identity.default_domain_id)
- self.assertDictEqual(group, group_ref)
-
- def test_get_group_by_name_returns_not_found(self):
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group_by_name,
- uuid.uuid4().hex,
- CONF.identity.default_domain_id)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_group_crud(self):
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group = self.identity_api.create_group(group)
- # cache the result
- group_ref = self.identity_api.get_group(group['id'])
- # delete the group bypassing identity api.
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(group['id']))
- driver.delete_group(entity_id)
-
- self.assertEqual(group_ref, self.identity_api.get_group(group['id']))
- self.identity_api.get_group.invalidate(self.identity_api, group['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group, group['id'])
-
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group = self.identity_api.create_group(group)
- # cache the result
- self.identity_api.get_group(group['id'])
- group['name'] = uuid.uuid4().hex
- group_ref = self.identity_api.update_group(group['id'], group)
- # after updating through identity api, get updated group
- self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
- group_ref)
-
- def test_create_duplicate_group_name_fails(self):
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
- name=group1['name'])
- group1 = self.identity_api.create_group(group1)
- self.assertRaises(exception.Conflict,
- self.identity_api.create_group,
- group2)
-
- def test_create_duplicate_group_name_in_different_domains(self):
- new_domain = unit.new_domain_ref()
- self.resource_api.create_domain(new_domain['id'], new_domain)
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=new_domain['id'],
- name=group1['name'])
- group1 = self.identity_api.create_group(group1)
- group2 = self.identity_api.create_group(group2)
-
- def test_move_group_between_domains(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- group = unit.new_group_ref(domain_id=domain1['id'])
- group = self.identity_api.create_group(group)
- group['domain_id'] = domain2['id']
- # Update the group asserting that a deprecation warning is emitted
- with mock.patch(
- 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
- self.identity_api.update_group(group['id'], group)
- self.assertTrue(mock_dep.called)
-
- updated_group_ref = self.identity_api.get_group(group['id'])
- self.assertEqual(domain2['id'], updated_group_ref['domain_id'])
-
- def test_move_group_between_domains_with_clashing_names_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- # First, create a group in domain1
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
- # Now create a group in domain2 with a potentially clashing
- # name - which should work since we have domain separation
- group2 = unit.new_group_ref(name=group1['name'],
- domain_id=domain2['id'])
- group2 = self.identity_api.create_group(group2)
- # Now try and move group1 into the 2nd domain - which should
- # fail since the names clash
- group1['domain_id'] = domain2['id']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_group,
- group1['id'],
- group1)
-
- def test_user_crud(self):
- user_dict = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id)
- del user_dict['id']
- user = self.identity_api.create_user(user_dict)
- user_ref = self.identity_api.get_user(user['id'])
- del user_dict['password']
- user_ref_dict = {x: user_ref[x] for x in user_ref}
- self.assertDictContainsSubset(user_dict, user_ref_dict)
-
- user_dict['password'] = uuid.uuid4().hex
- self.identity_api.update_user(user['id'], user_dict)
- user_ref = self.identity_api.get_user(user['id'])
- del user_dict['password']
- user_ref_dict = {x: user_ref[x] for x in user_ref}
- self.assertDictContainsSubset(user_dict, user_ref_dict)
-
- self.identity_api.delete_user(user['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- user['id'])
-
- def test_arbitrary_attributes_are_returned_from_create_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- user = self.identity_api.create_user(user_data)
-
- self.assertEqual(attr_value, user['arbitrary_attr'])
-
- def test_arbitrary_attributes_are_returned_from_get_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- user_data = self.identity_api.create_user(user_data)
-
- user = self.identity_api.get_user(user_data['id'])
- self.assertEqual(attr_value, user['arbitrary_attr'])
-
- def test_new_arbitrary_attributes_are_returned_from_update_user(self):
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id)
-
- user = self.identity_api.create_user(user_data)
- attr_value = uuid.uuid4().hex
- user['arbitrary_attr'] = attr_value
- updated_user = self.identity_api.update_user(user['id'], user)
-
- self.assertEqual(attr_value, updated_user['arbitrary_attr'])
-
- def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- new_attr_value = uuid.uuid4().hex
- user = self.identity_api.create_user(user_data)
- user['arbitrary_attr'] = new_attr_value
- updated_user = self.identity_api.update_user(user['id'], user)
-
- self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
-
- def test_user_update_and_user_get_return_same_response(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- user = self.identity_api.create_user(user)
-
- updated_user = {'enabled': False}
- updated_user_ref = self.identity_api.update_user(
- user['id'], updated_user)
-
- # SQL backend adds 'extra' field
- updated_user_ref.pop('extra', None)
-
- self.assertIs(False, updated_user_ref['enabled'])
-
- user_ref = self.identity_api.get_user(user['id'])
- self.assertDictEqual(updated_user_ref, user_ref)
-
-
-class FilterTests(filtering.FilterTests):
- def test_list_entities_filtered(self):
- for entity in ['user', 'group', 'project']:
- # Create 20 entities
- entity_list = self._create_test_data(entity, 20)
-
- # Try filtering to get one an exact item out of the list
- hints = driver_hints.Hints()
- hints.add_filter('name', entity_list[10]['name'])
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(1, len(entities))
- self.assertEqual(entity_list[10]['id'], entities[0]['id'])
- # Check the driver has removed the filter from the list hints
- self.assertFalse(hints.get_exact_filter_by_name('name'))
- self._delete_test_data(entity, entity_list)
-
- def test_list_users_inexact_filtered(self):
- # Create 20 users, some with specific names. We set the names at create
- # time (rather than updating them), since the LDAP driver does not
- # support name updates.
- user_name_data = {
- # user index: name for user
- 5: 'The',
- 6: 'The Ministry',
- 7: 'The Ministry of',
- 8: 'The Ministry of Silly',
- 9: 'The Ministry of Silly Walks',
- # ...and one for useful case insensitivity testing
- 10: 'The ministry of silly walks OF'
- }
- user_list = self._create_test_data(
- 'user', 20, domain_id=CONF.identity.default_domain_id,
- name_dict=user_name_data)
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'ministry', comparator='contains')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(5, len(users))
- self._match_with_list(users, user_list,
- list_start=6, list_end=11)
- # TODO(henry-nash) Check inexact filter has been removed.
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The', comparator='startswith')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(6, len(users))
- self._match_with_list(users, user_list,
- list_start=5, list_end=11)
- # TODO(henry-nash) Check inexact filter has been removed.
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'of', comparator='endswith')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(2, len(users))
- # We can't assume we will get back the users in any particular order
- self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
- # TODO(henry-nash) Check inexact filter has been removed.
-
- # TODO(henry-nash): Add some case sensitive tests. However,
- # these would be hard to validate currently, since:
- #
- # For SQL, the issue is that MySQL 0.7, by default, is installed in
- # case insensitive mode (which is what is run by default for our
- # SQL backend tests). For production deployments. OpenStack
- # assumes a case sensitive database. For these tests, therefore, we
- # need to be able to check the sensitivity of the database so as to
- # know whether to run case sensitive tests here.
- #
- # For LDAP/AD, although dependent on the schema being used, attributes
- # are typically configured to be case aware, but not case sensitive.
-
- self._delete_test_data('user', user_list)
-
- def _groups_for_user_data(self):
- number_of_groups = 10
- group_name_data = {
- # entity index: name for entity
- 5: 'The',
- 6: 'The Ministry',
- 9: 'The Ministry of Silly Walks',
- }
- group_list = self._create_test_data(
- 'group', number_of_groups,
- domain_id=CONF.identity.default_domain_id,
- name_dict=group_name_data)
- user_list = self._create_test_data('user', 2)
-
- for group in range(7):
- # Create membership, including with two out of the three groups
- # with well know names
- self.identity_api.add_user_to_group(user_list[0]['id'],
- group_list[group]['id'])
- # ...and some spoiler memberships
- for group in range(7, number_of_groups):
- self.identity_api.add_user_to_group(user_list[1]['id'],
- group_list[group]['id'])
-
- return group_list, user_list
-
- def test_groups_for_user_inexact_filtered(self):
- """Test use of filtering doesn't break groups_for_user listing.
-
- Some backends may use filtering to achieve the list of groups for a
- user, so test that it can combine a second filter.
-
- Test Plan:
-
- - Create 10 groups, some with names we can filter on
- - Create 2 users
- - Assign 1 of those users to most of the groups, including some of the
- well known named ones
- - Assign the other user to other groups as spoilers
- - Ensure that when we list groups for users with a filter on the group
- name, both restrictions have been enforced on what is returned.
-
- """
- group_list, user_list = self._groups_for_user_data()
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Ministry', comparator='contains')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back one group, since of the two that contain
- # 'Ministry' the user only belongs to one.
- self.assertThat(len(groups), matchers.Equals(1))
- self.assertEqual(group_list[6]['id'], groups[0]['id'])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The', comparator='startswith')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back 2 out of the 3 groups that start with 'The'
- # hence showing that both "filters" have been applied
- self.assertThat(len(groups), matchers.Equals(2))
- self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
- self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
-
- hints.add_filter('name', 'The', comparator='endswith')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back one group since it is the only one that
- # ends with 'The'
- self.assertThat(len(groups), matchers.Equals(1))
- self.assertEqual(group_list[5]['id'], groups[0]['id'])
-
- self._delete_test_data('user', user_list)
- self._delete_test_data('group', group_list)
-
- def test_groups_for_user_exact_filtered(self):
- """Test exact filters doesn't break groups_for_user listing."""
- group_list, user_list = self._groups_for_user_data()
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The Ministry', comparator='equals')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back 1 out of the 3 groups with name 'The
- # Ministry' hence showing that both "filters" have been applied.
- self.assertEqual(1, len(groups))
- self.assertEqual(group_list[6]['id'], groups[0]['id'])
- self._delete_test_data('user', user_list)
- self._delete_test_data('group', group_list)
-
- def _get_user_name_field_size(self):
- """Return the size of the user name field for the backend.
-
- Subclasses can override this method to indicate that the user name
- field is limited in length. The user name is the field used in the test
- that validates that a filter value works even if it's longer than a
- field.
-
- If the backend doesn't limit the value length then return None.
-
- """
- return None
-
- def test_filter_value_wider_than_field(self):
- # If a filter value is given that's larger than the field in the
- # backend then no values are returned.
-
- user_name_field_size = self._get_user_name_field_size()
-
- if user_name_field_size is None:
- # The backend doesn't limit the size of the user name, so pass this
- # test.
- return
-
- # Create some users just to make sure would return something if the
- # filter was ignored.
- self._create_test_data('user', 2)
-
- hints = driver_hints.Hints()
- value = 'A' * (user_name_field_size + 1)
- hints.add_filter('name', value)
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual([], users)
-
- def _list_users_in_group_data(self):
- number_of_users = 10
- user_name_data = {
- 1: 'Arthur Conan Doyle',
- 3: 'Arthur Rimbaud',
- 9: 'Arthur Schopenhauer',
- }
- user_list = self._create_test_data(
- 'user', number_of_users,
- domain_id=CONF.identity.default_domain_id,
- name_dict=user_name_data)
- group = self._create_one_entity(
- 'group', CONF.identity.default_domain_id, 'Great Writers')
- for i in range(7):
- self.identity_api.add_user_to_group(user_list[i]['id'],
- group['id'])
-
- return user_list, group
-
- def test_list_users_in_group_inexact_filtered(self):
- user_list, group = self._list_users_in_group_data()
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Arthur', comparator='contains')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(2))
- self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Arthur', comparator='startswith')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(2))
- self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Doyle', comparator='endswith')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(1))
- self.assertEqual(user_list[1]['id'], users[0]['id'])
-
- self._delete_test_data('user', user_list)
- self._delete_entity('group')(group['id'])
-
- def test_list_users_in_group_exact_filtered(self):
- hints = driver_hints.Hints()
- user_list, group = self._list_users_in_group_data()
- hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertEqual(1, len(users))
- self.assertEqual(user_list[3]['id'], users[0]['id'])
- self._delete_test_data('user', user_list)
- self._delete_entity('group')(group['id'])
-
-
-class LimitTests(filtering.FilterTests):
- ENTITIES = ['user', 'group', 'project']
-
- def setUp(self):
- """Setup for Limit Test Cases."""
- self.entity_lists = {}
-
- for entity in self.ENTITIES:
- # Create 20 entities
- self.entity_lists[entity] = self._create_test_data(entity, 20)
- self.addCleanup(self.clean_up_entities)
-
- def clean_up_entities(self):
- """Clean up entity test data from Limit Test Cases."""
- for entity in self.ENTITIES:
- self._delete_test_data(entity, self.entity_lists[entity])
- del self.entity_lists
-
- def _test_list_entity_filtered_and_limited(self, entity):
- self.config_fixture.config(list_limit=10)
- # Should get back just 10 entities
- hints = driver_hints.Hints()
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(hints.limit['limit'], len(entities))
- self.assertTrue(hints.limit['truncated'])
-
- # Override with driver specific limit
- if entity == 'project':
- self.config_fixture.config(group='resource', list_limit=5)
- else:
- self.config_fixture.config(group='identity', list_limit=5)
-
- # Should get back just 5 users
- hints = driver_hints.Hints()
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(hints.limit['limit'], len(entities))
-
- # Finally, let's pretend we want to get the full list of entities,
- # even with the limits set, as part of some internal calculation.
- # Calling the API without a hints list should achieve this, and
- # return at least the 20 entries we created (there may be other
- # entities lying around created by other tests/setup).
- entities = self._list_entities(entity)()
- self.assertTrue(len(entities) >= 20)
- self._match_with_list(self.entity_lists[entity], entities)
-
- def test_list_users_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('user')
-
- def test_list_groups_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('group')
-
- def test_list_projects_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('project')