From 920a49cfa055733d575282973e23558c33087a4a Mon Sep 17 00:00:00 2001 From: RHE Date: Fri, 24 Nov 2017 13:54:26 +0100 Subject: remove keystone-moon Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE --- .../keystone/tests/unit/identity/test_backends.py | 1297 -------------------- 1 file changed, 1297 deletions(-) delete mode 100644 keystone-moon/keystone/tests/unit/identity/test_backends.py (limited to 'keystone-moon/keystone/tests/unit/identity/test_backends.py') diff --git a/keystone-moon/keystone/tests/unit/identity/test_backends.py b/keystone-moon/keystone/tests/unit/identity/test_backends.py deleted file mode 100644 index 8b5c0def..00000000 --- a/keystone-moon/keystone/tests/unit/identity/test_backends.py +++ /dev/null @@ -1,1297 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -import mock -from oslo_config import cfg -from six.moves import range -from testtools import matchers - -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import filtering - - -CONF = cfg.CONF - - -class IdentityTests(object): - - def _get_domain_fixture(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - return domain - - def _set_domain_scope(self, domain_id): - # We only provide a domain scope if we have multiple drivers - if CONF.identity.domain_specific_drivers_enabled: - return domain_id - - def test_authenticate_bad_user(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=uuid.uuid4().hex, - password=self.user_foo['password']) - - def test_authenticate_bad_password(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=self.user_foo['id'], - password=uuid.uuid4().hex) - - def test_authenticate(self): - user_ref = self.identity_api.authenticate( - context={}, - user_id=self.user_sna['id'], - password=self.user_sna['password']) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_sna.pop('password') - self.user_sna['enabled'] = True - self.assertDictEqual(self.user_sna, user_ref) - - def test_authenticate_and_get_roles_no_metadata(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - # Remove user id. It is ignored by create_user() and will break the - # subset test below. - del user['id'] - - new_user = self.identity_api.create_user(user) - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - new_user['id']) - user_ref = self.identity_api.authenticate( - context={}, - user_id=new_user['id'], - password=user['password']) - self.assertNotIn('password', user_ref) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - user.pop('password') - self.assertDictContainsSubset(user, user_ref) - role_list = self.assignment_api.get_roles_for_user_and_project( - new_user['id'], self.tenant_baz['id']) - self.assertEqual(1, len(role_list)) - self.assertIn(CONF.member_role_id, role_list) - - def test_authenticate_if_no_password_set(self): - id_ = uuid.uuid4().hex - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=id_, - password='password') - - def test_create_unicode_user_name(self): - unicode_name = u'name \u540d\u5b57' - user = unit.new_user_ref(name=unicode_name, - domain_id=CONF.identity.default_domain_id) - ref = self.identity_api.create_user(user) - self.assertEqual(unicode_name, ref['name']) - - def test_get_user(self): - user_ref = self.identity_api.get_user(self.user_foo['id']) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(self.user_foo, user_ref) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # cache the result. - self.identity_api.get_user(ref['id']) - # delete bypassing identity api - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user(ref['id'])) - self.identity_api.get_user.invalidate(self.identity_api, ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, ref['id']) - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - # cache the result. - self.identity_api.get_user(ref['id']) - # update using identity api and get back updated user. - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - uuid.uuid4().hex) - - def test_get_user_by_name(self): - user_ref = self.identity_api.get_user_by_name( - self.user_foo['name'], CONF.identity.default_domain_id) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(self.user_foo, user_ref) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user_by_name(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # delete bypassing the identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user_by_name( - user['name'], CONF.identity.default_domain_id)) - self.identity_api.get_user_by_name.invalidate( - self.identity_api, user['name'], CONF.identity.default_domain_id) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - user['name'], CONF.identity.default_domain_id) - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_by_name_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - def test_create_duplicate_user_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.assertRaises(exception.Conflict, - self.identity_api.create_user, - user) - - def test_create_duplicate_user_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - user2 = unit.new_user_ref(name=user1['name'], - domain_id=new_domain['id']) - - self.identity_api.create_user(user1) - self.identity_api.create_user(user2) - - def test_move_user_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - user = unit.new_user_ref(domain_id=domain1['id']) - user = self.identity_api.create_user(user) - user['domain_id'] = domain2['id'] - # Update the user asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.identity_api.update_user(user['id'], user) - self.assertTrue(mock_dep.called) - - updated_user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(domain2['id'], updated_user_ref['domain_id']) - - def test_move_user_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a user in domain1 - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - # Now create a user in domain2 with a potentially clashing - # name - which should work since we have domain separation - user2 = unit.new_user_ref(name=user1['name'], - domain_id=domain2['id']) - user2 = self.identity_api.create_user(user2) - # Now try and move user1 into the 2nd domain - which should - # fail since the names clash - user1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user1['id'], - user1) - - def test_rename_duplicate_user_name_fails(self): - user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user1) - user2 = self.identity_api.create_user(user2) - user2['name'] = user1['name'] - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user2['id'], - user2) - - def test_update_user_id_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - original_id = user['id'] - user['id'] = 'fake2' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - original_id, - user) - user_ref = self.identity_api.get_user(original_id) - self.assertEqual(original_id, user_ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - 'fake2') - - def test_delete_user_with_group_project_domain_links(self): - role1 = unit.new_role_ref() - self.role_api.create_role(role1['id'], role1) - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_user(user1['id']) - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - user1['id'], - group1['id']) - - def test_delete_group_with_user_project_domain_links(self): - role1 = unit.new_role_ref() - self.role_api.create_role(role1['id'], role1) - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_group(group1['id']) - self.identity_api.get_user(user1['id']) - - def test_update_user_returns_not_found(self): - user_id = uuid.uuid4().hex - self.assertRaises(exception.UserNotFound, - self.identity_api.update_user, - user_id, - {'id': user_id, - 'domain_id': CONF.identity.default_domain_id}) - - def test_delete_user_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.delete_user, - uuid.uuid4().hex) - - def test_create_user_long_name_fails(self): - user = unit.new_user_ref(name='a' * 256, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_blank_name_fails(self): - user = unit.new_user_ref(name='', - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_missed_password(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_none_password(self): - user = unit.new_user_ref(password=None, - domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_invalid_name_fails(self): - user = unit.new_user_ref(name=None, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - user = unit.new_user_ref(name=123, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_invalid_enabled_type_string(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id, - # invalid string value - enabled='true') - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_update_user_long_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user['name'] = 'a' * 256 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_blank_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user['name'] = '' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_invalid_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - - user['name'] = None - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - user['name'] = 123 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_list_users(self): - users = self.identity_api.list_users( - domain_scope=self._set_domain_scope( - CONF.identity.default_domain_id)) - self.assertEqual(len(default_fixtures.USERS), len(users)) - user_ids = set(user['id'] for user in users) - expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id'] - for user in default_fixtures.USERS) - for user_ref in users: - self.assertNotIn('password', user_ref) - self.assertEqual(expected_user_ids, user_ids) - - def test_list_groups(self): - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - groups = self.identity_api.list_groups( - domain_scope=self._set_domain_scope( - CONF.identity.default_domain_id)) - self.assertEqual(2, len(groups)) - group_ids = [] - for group in groups: - group_ids.append(group.get('id')) - self.assertIn(group1['id'], group_ids) - self.assertIn(group2['id'], group_ids) - - def test_create_user_doesnt_modify_passed_in_dict(self): - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - original_user = new_user.copy() - self.identity_api.create_user(new_user) - self.assertDictEqual(original_user, new_user) - - def test_update_user_enable(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - user['enabled'] = False - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - # If not present, enabled field should not be updated - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertFalse(user_ref['enabled']) - - user['enabled'] = True - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - # Integers are valid Python's booleans. Explicitly test it. - user['enabled'] = 0 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertFalse(user_ref['enabled']) - - # Any integers other than 0 are interpreted as True - user['enabled'] = -42 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - # NOTE(breton): below, attribute `enabled` is explicitly tested to be - # equal True. assertTrue should not be used, because it converts - # the passed value to bool(). - self.assertIs(user_ref['enabled'], True) - - def test_update_user_name(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['name'], user_ref['name']) - - changed_name = user_ref['name'] + '_changed' - user_ref['name'] = changed_name - updated_user = self.identity_api.update_user(user_ref['id'], user_ref) - - # NOTE(dstanek): the SQL backend adds an 'extra' field containing a - # dictionary of the extra fields in addition to the - # fields in the object. For the details see: - # SqlIdentity.test_update_project_returns_extra - updated_user.pop('extra', None) - - self.assertDictEqual(user_ref, updated_user) - - user_ref = self.identity_api.get_user(user_ref['id']) - self.assertEqual(changed_name, user_ref['name']) - - def test_update_user_enable_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - # Strings are not valid boolean values - user['enabled'] = 'false' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_add_user_to_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - - found = False - for x in groups: - if (x['id'] == new_group['id']): - found = True - self.assertTrue(found) - - def test_add_user_to_group_returns_not_found(self): - domain = self._get_domain_fixture() - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.assertRaises(exception.GroupNotFound, - self.identity_api.add_user_to_group, - new_user['id'], - uuid.uuid4().hex) - - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.UserNotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_check_user_in_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - self.identity_api.check_user_in_group(new_user['id'], new_group['id']) - - def test_check_user_not_in_group(self): - new_group = unit.new_group_ref( - domain_id=CONF.identity.default_domain_id) - new_group = self.identity_api.create_group(new_group) - - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - new_user = self.identity_api.create_user(new_user) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - new_user['id'], - new_group['id']) - - def test_check_user_in_group_returns_not_found(self): - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - new_user = self.identity_api.create_user(new_user) - - new_group = unit.new_group_ref( - domain_id=CONF.identity.default_domain_id) - new_group = self.identity_api.create_group(new_group) - - self.assertRaises(exception.UserNotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.GroupNotFound, - self.identity_api.check_user_in_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_list_users_in_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - # Make sure we get an empty list back on a new group, not an error. - user_refs = self.identity_api.list_users_in_group(new_group['id']) - self.assertEqual([], user_refs) - # Make sure we get the correct users back once they have been added - # to the group. - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - user_refs = self.identity_api.list_users_in_group(new_group['id']) - found = False - for x in user_refs: - if (x['id'] == new_user['id']): - found = True - self.assertNotIn('password', x) - self.assertTrue(found) - - def test_list_users_in_group_returns_not_found(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.list_users_in_group, - uuid.uuid4().hex) - - def test_list_groups_for_user(self): - domain = self._get_domain_fixture() - test_groups = [] - test_users = [] - GROUP_COUNT = 3 - USER_COUNT = 2 - - for x in range(0, USER_COUNT): - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - test_users.append(new_user) - positive_user = test_users[0] - negative_user = test_users[1] - - for x in range(0, USER_COUNT): - group_refs = self.identity_api.list_groups_for_user( - test_users[x]['id']) - self.assertEqual(0, len(group_refs)) - - for x in range(0, GROUP_COUNT): - before_count = x - after_count = x + 1 - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - test_groups.append(new_group) - - # add the user to the group and ensure that the - # group count increases by one for each - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.add_user_to_group( - positive_user['id'], - new_group['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - - # Make sure the group count for the unrelated user did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - # remove the user from each group and ensure that - # the group count reduces by one for each - for x in range(0, 3): - before_count = GROUP_COUNT - x - after_count = GROUP_COUNT - x - 1 - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.remove_user_from_group( - positive_user['id'], - test_groups[x]['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - # Make sure the group count for the unrelated user - # did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - def test_remove_user_from_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertIn(new_group['id'], [x['id'] for x in groups]) - self.identity_api.remove_user_from_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertNotIn(new_group['id'], [x['id'] for x in groups]) - - def test_remove_user_from_group_returns_not_found(self): - domain = self._get_domain_fixture() - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.GroupNotFound, - self.identity_api.remove_user_from_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.UserNotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_group_crud(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - group = unit.new_group_ref(domain_id=domain['id']) - group = self.identity_api.create_group(group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - group['name'] = uuid.uuid4().hex - self.identity_api.update_group(group['id'], group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - self.identity_api.delete_group(group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - group['id']) - - def test_get_group_by_name(self): - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group_name = group['name'] - group = self.identity_api.create_group(group) - spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_group(spoiler) - - group_ref = self.identity_api.get_group_by_name( - group_name, CONF.identity.default_domain_id) - self.assertDictEqual(group, group_ref) - - def test_get_group_by_name_returns_not_found(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_group_crud(self): - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group = self.identity_api.create_group(group) - # cache the result - group_ref = self.identity_api.get_group(group['id']) - # delete the group bypassing identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(group['id'])) - driver.delete_group(entity_id) - - self.assertEqual(group_ref, self.identity_api.get_group(group['id'])) - self.identity_api.get_group.invalidate(self.identity_api, group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, group['id']) - - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group = self.identity_api.create_group(group) - # cache the result - self.identity_api.get_group(group['id']) - group['name'] = uuid.uuid4().hex - group_ref = self.identity_api.update_group(group['id'], group) - # after updating through identity api, get updated group - self.assertDictContainsSubset(self.identity_api.get_group(group['id']), - group_ref) - - def test_create_duplicate_group_name_fails(self): - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id, - name=group1['name']) - group1 = self.identity_api.create_group(group1) - self.assertRaises(exception.Conflict, - self.identity_api.create_group, - group2) - - def test_create_duplicate_group_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=new_domain['id'], - name=group1['name']) - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - - def test_move_group_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - group = unit.new_group_ref(domain_id=domain1['id']) - group = self.identity_api.create_group(group) - group['domain_id'] = domain2['id'] - # Update the group asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.identity_api.update_group(group['id'], group) - self.assertTrue(mock_dep.called) - - updated_group_ref = self.identity_api.get_group(group['id']) - self.assertEqual(domain2['id'], updated_group_ref['domain_id']) - - def test_move_group_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a group in domain1 - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - # Now create a group in domain2 with a potentially clashing - # name - which should work since we have domain separation - group2 = unit.new_group_ref(name=group1['name'], - domain_id=domain2['id']) - group2 = self.identity_api.create_group(group2) - # Now try and move group1 into the 2nd domain - which should - # fail since the names clash - group1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_group, - group1['id'], - group1) - - def test_user_crud(self): - user_dict = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - del user_dict['id'] - user = self.identity_api.create_user(user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - user_dict['password'] = uuid.uuid4().hex - self.identity_api.update_user(user['id'], user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - self.identity_api.delete_user(user['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - user['id']) - - def test_arbitrary_attributes_are_returned_from_create_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - user = self.identity_api.create_user(user_data) - - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_arbitrary_attributes_are_returned_from_get_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - user_data = self.identity_api.create_user(user_data) - - user = self.identity_api.get_user(user_data['id']) - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_new_arbitrary_attributes_are_returned_from_update_user(self): - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - - user = self.identity_api.create_user(user_data) - attr_value = uuid.uuid4().hex - user['arbitrary_attr'] = attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(attr_value, updated_user['arbitrary_attr']) - - def test_updated_arbitrary_attributes_are_returned_from_update_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - new_attr_value = uuid.uuid4().hex - user = self.identity_api.create_user(user_data) - user['arbitrary_attr'] = new_attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(new_attr_value, updated_user['arbitrary_attr']) - - def test_user_update_and_user_get_return_same_response(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - user = self.identity_api.create_user(user) - - updated_user = {'enabled': False} - updated_user_ref = self.identity_api.update_user( - user['id'], updated_user) - - # SQL backend adds 'extra' field - updated_user_ref.pop('extra', None) - - self.assertIs(False, updated_user_ref['enabled']) - - user_ref = self.identity_api.get_user(user['id']) - self.assertDictEqual(updated_user_ref, user_ref) - - -class FilterTests(filtering.FilterTests): - def test_list_entities_filtered(self): - for entity in ['user', 'group', 'project']: - # Create 20 entities - entity_list = self._create_test_data(entity, 20) - - # Try filtering to get one an exact item out of the list - hints = driver_hints.Hints() - hints.add_filter('name', entity_list[10]['name']) - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(1, len(entities)) - self.assertEqual(entity_list[10]['id'], entities[0]['id']) - # Check the driver has removed the filter from the list hints - self.assertFalse(hints.get_exact_filter_by_name('name')) - self._delete_test_data(entity, entity_list) - - def test_list_users_inexact_filtered(self): - # Create 20 users, some with specific names. We set the names at create - # time (rather than updating them), since the LDAP driver does not - # support name updates. - user_name_data = { - # user index: name for user - 5: 'The', - 6: 'The Ministry', - 7: 'The Ministry of', - 8: 'The Ministry of Silly', - 9: 'The Ministry of Silly Walks', - # ...and one for useful case insensitivity testing - 10: 'The ministry of silly walks OF' - } - user_list = self._create_test_data( - 'user', 20, domain_id=CONF.identity.default_domain_id, - name_dict=user_name_data) - - hints = driver_hints.Hints() - hints.add_filter('name', 'ministry', comparator='contains') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(5, len(users)) - self._match_with_list(users, user_list, - list_start=6, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(6, len(users)) - self._match_with_list(users, user_list, - list_start=5, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'of', comparator='endswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(2, len(users)) - # We can't assume we will get back the users in any particular order - self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']]) - # TODO(henry-nash) Check inexact filter has been removed. - - # TODO(henry-nash): Add some case sensitive tests. However, - # these would be hard to validate currently, since: - # - # For SQL, the issue is that MySQL 0.7, by default, is installed in - # case insensitive mode (which is what is run by default for our - # SQL backend tests). For production deployments. OpenStack - # assumes a case sensitive database. For these tests, therefore, we - # need to be able to check the sensitivity of the database so as to - # know whether to run case sensitive tests here. - # - # For LDAP/AD, although dependent on the schema being used, attributes - # are typically configured to be case aware, but not case sensitive. - - self._delete_test_data('user', user_list) - - def _groups_for_user_data(self): - number_of_groups = 10 - group_name_data = { - # entity index: name for entity - 5: 'The', - 6: 'The Ministry', - 9: 'The Ministry of Silly Walks', - } - group_list = self._create_test_data( - 'group', number_of_groups, - domain_id=CONF.identity.default_domain_id, - name_dict=group_name_data) - user_list = self._create_test_data('user', 2) - - for group in range(7): - # Create membership, including with two out of the three groups - # with well know names - self.identity_api.add_user_to_group(user_list[0]['id'], - group_list[group]['id']) - # ...and some spoiler memberships - for group in range(7, number_of_groups): - self.identity_api.add_user_to_group(user_list[1]['id'], - group_list[group]['id']) - - return group_list, user_list - - def test_groups_for_user_inexact_filtered(self): - """Test use of filtering doesn't break groups_for_user listing. - - Some backends may use filtering to achieve the list of groups for a - user, so test that it can combine a second filter. - - Test Plan: - - - Create 10 groups, some with names we can filter on - - Create 2 users - - Assign 1 of those users to most of the groups, including some of the - well known named ones - - Assign the other user to other groups as spoilers - - Ensure that when we list groups for users with a filter on the group - name, both restrictions have been enforced on what is returned. - - """ - group_list, user_list = self._groups_for_user_data() - - hints = driver_hints.Hints() - hints.add_filter('name', 'Ministry', comparator='contains') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back one group, since of the two that contain - # 'Ministry' the user only belongs to one. - self.assertThat(len(groups), matchers.Equals(1)) - self.assertEqual(group_list[6]['id'], groups[0]['id']) - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back 2 out of the 3 groups that start with 'The' - # hence showing that both "filters" have been applied - self.assertThat(len(groups), matchers.Equals(2)) - self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']]) - self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']]) - - hints.add_filter('name', 'The', comparator='endswith') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back one group since it is the only one that - # ends with 'The' - self.assertThat(len(groups), matchers.Equals(1)) - self.assertEqual(group_list[5]['id'], groups[0]['id']) - - self._delete_test_data('user', user_list) - self._delete_test_data('group', group_list) - - def test_groups_for_user_exact_filtered(self): - """Test exact filters doesn't break groups_for_user listing.""" - group_list, user_list = self._groups_for_user_data() - hints = driver_hints.Hints() - hints.add_filter('name', 'The Ministry', comparator='equals') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back 1 out of the 3 groups with name 'The - # Ministry' hence showing that both "filters" have been applied. - self.assertEqual(1, len(groups)) - self.assertEqual(group_list[6]['id'], groups[0]['id']) - self._delete_test_data('user', user_list) - self._delete_test_data('group', group_list) - - def _get_user_name_field_size(self): - """Return the size of the user name field for the backend. - - Subclasses can override this method to indicate that the user name - field is limited in length. The user name is the field used in the test - that validates that a filter value works even if it's longer than a - field. - - If the backend doesn't limit the value length then return None. - - """ - return None - - def test_filter_value_wider_than_field(self): - # If a filter value is given that's larger than the field in the - # backend then no values are returned. - - user_name_field_size = self._get_user_name_field_size() - - if user_name_field_size is None: - # The backend doesn't limit the size of the user name, so pass this - # test. - return - - # Create some users just to make sure would return something if the - # filter was ignored. - self._create_test_data('user', 2) - - hints = driver_hints.Hints() - value = 'A' * (user_name_field_size + 1) - hints.add_filter('name', value) - users = self.identity_api.list_users(hints=hints) - self.assertEqual([], users) - - def _list_users_in_group_data(self): - number_of_users = 10 - user_name_data = { - 1: 'Arthur Conan Doyle', - 3: 'Arthur Rimbaud', - 9: 'Arthur Schopenhauer', - } - user_list = self._create_test_data( - 'user', number_of_users, - domain_id=CONF.identity.default_domain_id, - name_dict=user_name_data) - group = self._create_one_entity( - 'group', CONF.identity.default_domain_id, 'Great Writers') - for i in range(7): - self.identity_api.add_user_to_group(user_list[i]['id'], - group['id']) - - return user_list, group - - def test_list_users_in_group_inexact_filtered(self): - user_list, group = self._list_users_in_group_data() - - hints = driver_hints.Hints() - hints.add_filter('name', 'Arthur', comparator='contains') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(2)) - self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']]) - - hints = driver_hints.Hints() - hints.add_filter('name', 'Arthur', comparator='startswith') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(2)) - self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']]) - - hints = driver_hints.Hints() - hints.add_filter('name', 'Doyle', comparator='endswith') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(1)) - self.assertEqual(user_list[1]['id'], users[0]['id']) - - self._delete_test_data('user', user_list) - self._delete_entity('group')(group['id']) - - def test_list_users_in_group_exact_filtered(self): - hints = driver_hints.Hints() - user_list, group = self._list_users_in_group_data() - hints.add_filter('name', 'Arthur Rimbaud', comparator='equals') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertEqual(1, len(users)) - self.assertEqual(user_list[3]['id'], users[0]['id']) - self._delete_test_data('user', user_list) - self._delete_entity('group')(group['id']) - - -class LimitTests(filtering.FilterTests): - ENTITIES = ['user', 'group', 'project'] - - def setUp(self): - """Setup for Limit Test Cases.""" - self.entity_lists = {} - - for entity in self.ENTITIES: - # Create 20 entities - self.entity_lists[entity] = self._create_test_data(entity, 20) - self.addCleanup(self.clean_up_entities) - - def clean_up_entities(self): - """Clean up entity test data from Limit Test Cases.""" - for entity in self.ENTITIES: - self._delete_test_data(entity, self.entity_lists[entity]) - del self.entity_lists - - def _test_list_entity_filtered_and_limited(self, entity): - self.config_fixture.config(list_limit=10) - # Should get back just 10 entities - hints = driver_hints.Hints() - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - self.assertTrue(hints.limit['truncated']) - - # Override with driver specific limit - if entity == 'project': - self.config_fixture.config(group='resource', list_limit=5) - else: - self.config_fixture.config(group='identity', list_limit=5) - - # Should get back just 5 users - hints = driver_hints.Hints() - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - - # Finally, let's pretend we want to get the full list of entities, - # even with the limits set, as part of some internal calculation. - # Calling the API without a hints list should achieve this, and - # return at least the 20 entries we created (there may be other - # entities lying around created by other tests/setup). - entities = self._list_entities(entity)() - self.assertTrue(len(entities) >= 20) - self._match_with_list(self.entity_lists[entity], entities) - - def test_list_users_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('user') - - def test_list_groups_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('group') - - def test_list_projects_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('project') -- cgit 1.2.3-korg