diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/identity | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/identity')
4 files changed, 0 insertions, 1538 deletions
diff --git a/keystone-moon/keystone/tests/unit/identity/__init__.py b/keystone-moon/keystone/tests/unit/identity/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/identity/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/identity/test_backends.py b/keystone-moon/keystone/tests/unit/identity/test_backends.py deleted file mode 100644 index 8b5c0def..00000000 --- a/keystone-moon/keystone/tests/unit/identity/test_backends.py +++ /dev/null @@ -1,1297 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -import mock -from oslo_config import cfg -from six.moves import range -from testtools import matchers - -from keystone.common import driver_hints -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import default_fixtures -from keystone.tests.unit import filtering - - -CONF = cfg.CONF - - -class IdentityTests(object): - - def _get_domain_fixture(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - return domain - - def _set_domain_scope(self, domain_id): - # We only provide a domain scope if we have multiple drivers - if CONF.identity.domain_specific_drivers_enabled: - return domain_id - - def test_authenticate_bad_user(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=uuid.uuid4().hex, - password=self.user_foo['password']) - - def test_authenticate_bad_password(self): - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=self.user_foo['id'], - password=uuid.uuid4().hex) - - def test_authenticate(self): - user_ref = self.identity_api.authenticate( - context={}, - user_id=self.user_sna['id'], - password=self.user_sna['password']) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_sna.pop('password') - self.user_sna['enabled'] = True - self.assertDictEqual(self.user_sna, user_ref) - - def test_authenticate_and_get_roles_no_metadata(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - # Remove user id. It is ignored by create_user() and will break the - # subset test below. - del user['id'] - - new_user = self.identity_api.create_user(user) - self.assignment_api.add_user_to_project(self.tenant_baz['id'], - new_user['id']) - user_ref = self.identity_api.authenticate( - context={}, - user_id=new_user['id'], - password=user['password']) - self.assertNotIn('password', user_ref) - # NOTE(termie): the password field is left in user_sna to make - # it easier to authenticate in tests, but should - # not be returned by the api - user.pop('password') - self.assertDictContainsSubset(user, user_ref) - role_list = self.assignment_api.get_roles_for_user_and_project( - new_user['id'], self.tenant_baz['id']) - self.assertEqual(1, len(role_list)) - self.assertIn(CONF.member_role_id, role_list) - - def test_authenticate_if_no_password_set(self): - id_ = uuid.uuid4().hex - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=id_, - password='password') - - def test_create_unicode_user_name(self): - unicode_name = u'name \u540d\u5b57' - user = unit.new_user_ref(name=unicode_name, - domain_id=CONF.identity.default_domain_id) - ref = self.identity_api.create_user(user) - self.assertEqual(unicode_name, ref['name']) - - def test_get_user(self): - user_ref = self.identity_api.get_user(self.user_foo['id']) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(self.user_foo, user_ref) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # cache the result. - self.identity_api.get_user(ref['id']) - # delete bypassing identity api - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user(ref['id'])) - self.identity_api.get_user.invalidate(self.identity_api, ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, ref['id']) - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - # cache the result. - self.identity_api.get_user(ref['id']) - # update using identity api and get back updated user. - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - uuid.uuid4().hex) - - def test_get_user_by_name(self): - user_ref = self.identity_api.get_user_by_name( - self.user_foo['name'], CONF.identity.default_domain_id) - # NOTE(termie): the password field is left in user_foo to make - # it easier to authenticate in tests, but should - # not be returned by the api - self.user_foo.pop('password') - self.assertDictEqual(self.user_foo, user_ref) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_get_user_by_name(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - # delete bypassing the identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(ref['id'])) - driver.delete_user(entity_id) - - self.assertDictEqual(ref, self.identity_api.get_user_by_name( - user['name'], CONF.identity.default_domain_id)) - self.identity_api.get_user_by_name.invalidate( - self.identity_api, user['name'], CONF.identity.default_domain_id) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - user['name'], CONF.identity.default_domain_id) - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - ref = self.identity_api.get_user_by_name(user['name'], - user['domain_id']) - user['description'] = uuid.uuid4().hex - user_updated = self.identity_api.update_user(ref['id'], user) - self.assertDictContainsSubset(self.identity_api.get_user(ref['id']), - user_updated) - self.assertDictContainsSubset( - self.identity_api.get_user_by_name(ref['name'], ref['domain_id']), - user_updated) - - def test_get_user_by_name_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - def test_create_duplicate_user_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.assertRaises(exception.Conflict, - self.identity_api.create_user, - user) - - def test_create_duplicate_user_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - user2 = unit.new_user_ref(name=user1['name'], - domain_id=new_domain['id']) - - self.identity_api.create_user(user1) - self.identity_api.create_user(user2) - - def test_move_user_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - user = unit.new_user_ref(domain_id=domain1['id']) - user = self.identity_api.create_user(user) - user['domain_id'] = domain2['id'] - # Update the user asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.identity_api.update_user(user['id'], user) - self.assertTrue(mock_dep.called) - - updated_user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(domain2['id'], updated_user_ref['domain_id']) - - def test_move_user_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a user in domain1 - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - # Now create a user in domain2 with a potentially clashing - # name - which should work since we have domain separation - user2 = unit.new_user_ref(name=user1['name'], - domain_id=domain2['id']) - user2 = self.identity_api.create_user(user2) - # Now try and move user1 into the 2nd domain - which should - # fail since the names clash - user1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user1['id'], - user1) - - def test_rename_duplicate_user_name_fails(self): - user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_user(user1) - user2 = self.identity_api.create_user(user2) - user2['name'] = user1['name'] - self.assertRaises(exception.Conflict, - self.identity_api.update_user, - user2['id'], - user2) - - def test_update_user_id_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - original_id = user['id'] - user['id'] = 'fake2' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - original_id, - user) - user_ref = self.identity_api.get_user(original_id) - self.assertEqual(original_id, user_ref['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - 'fake2') - - def test_delete_user_with_group_project_domain_links(self): - role1 = unit.new_role_ref() - self.role_api.create_role(role1['id'], role1) - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - self.assignment_api.create_grant(user_id=user1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(user_id=user1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - user_id=user1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_user(user1['id']) - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - user1['id'], - group1['id']) - - def test_delete_group_with_user_project_domain_links(self): - role1 = unit.new_role_ref() - self.role_api.create_role(role1['id'], role1) - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - project1 = unit.new_project_ref(domain_id=domain1['id']) - self.resource_api.create_project(project1['id'], project1) - user1 = unit.new_user_ref(domain_id=domain1['id']) - user1 = self.identity_api.create_user(user1) - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - - self.assignment_api.create_grant(group_id=group1['id'], - project_id=project1['id'], - role_id=role1['id']) - self.assignment_api.create_grant(group_id=group1['id'], - domain_id=domain1['id'], - role_id=role1['id']) - self.identity_api.add_user_to_group(user_id=user1['id'], - group_id=group1['id']) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - project_id=project1['id']) - self.assertEqual(1, len(roles_ref)) - roles_ref = self.assignment_api.list_grants( - group_id=group1['id'], - domain_id=domain1['id']) - self.assertEqual(1, len(roles_ref)) - self.identity_api.check_user_in_group( - user_id=user1['id'], - group_id=group1['id']) - self.identity_api.delete_group(group1['id']) - self.identity_api.get_user(user1['id']) - - def test_update_user_returns_not_found(self): - user_id = uuid.uuid4().hex - self.assertRaises(exception.UserNotFound, - self.identity_api.update_user, - user_id, - {'id': user_id, - 'domain_id': CONF.identity.default_domain_id}) - - def test_delete_user_returns_not_found(self): - self.assertRaises(exception.UserNotFound, - self.identity_api.delete_user, - uuid.uuid4().hex) - - def test_create_user_long_name_fails(self): - user = unit.new_user_ref(name='a' * 256, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_blank_name_fails(self): - user = unit.new_user_ref(name='', - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_missed_password(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_none_password(self): - user = unit.new_user_ref(password=None, - domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - self.identity_api.get_user(user['id']) - # Make sure the user is not allowed to login - # with a password that is empty string or None - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password='') - self.assertRaises(AssertionError, - self.identity_api.authenticate, - context={}, - user_id=user['id'], - password=None) - - def test_create_user_invalid_name_fails(self): - user = unit.new_user_ref(name=None, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - user = unit.new_user_ref(name=123, - domain_id=CONF.identity.default_domain_id) - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_create_user_invalid_enabled_type_string(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id, - # invalid string value - enabled='true') - self.assertRaises(exception.ValidationError, - self.identity_api.create_user, - user) - - def test_update_user_long_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user['name'] = 'a' * 256 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_blank_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user['name'] = '' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_update_user_invalid_name_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - - user['name'] = None - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - user['name'] = 123 - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_list_users(self): - users = self.identity_api.list_users( - domain_scope=self._set_domain_scope( - CONF.identity.default_domain_id)) - self.assertEqual(len(default_fixtures.USERS), len(users)) - user_ids = set(user['id'] for user in users) - expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id'] - for user in default_fixtures.USERS) - for user_ref in users: - self.assertNotIn('password', user_ref) - self.assertEqual(expected_user_ids, user_ids) - - def test_list_groups(self): - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - groups = self.identity_api.list_groups( - domain_scope=self._set_domain_scope( - CONF.identity.default_domain_id)) - self.assertEqual(2, len(groups)) - group_ids = [] - for group in groups: - group_ids.append(group.get('id')) - self.assertIn(group1['id'], group_ids) - self.assertIn(group2['id'], group_ids) - - def test_create_user_doesnt_modify_passed_in_dict(self): - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - original_user = new_user.copy() - self.identity_api.create_user(new_user) - self.assertDictEqual(original_user, new_user) - - def test_update_user_enable(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - user['enabled'] = False - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - # If not present, enabled field should not be updated - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertFalse(user_ref['enabled']) - - user['enabled'] = True - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['enabled'], user_ref['enabled']) - - del user['enabled'] - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - # Integers are valid Python's booleans. Explicitly test it. - user['enabled'] = 0 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - self.assertFalse(user_ref['enabled']) - - # Any integers other than 0 are interpreted as True - user['enabled'] = -42 - self.identity_api.update_user(user['id'], user) - user_ref = self.identity_api.get_user(user['id']) - # NOTE(breton): below, attribute `enabled` is explicitly tested to be - # equal True. assertTrue should not be used, because it converts - # the passed value to bool(). - self.assertIs(user_ref['enabled'], True) - - def test_update_user_name(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertEqual(user['name'], user_ref['name']) - - changed_name = user_ref['name'] + '_changed' - user_ref['name'] = changed_name - updated_user = self.identity_api.update_user(user_ref['id'], user_ref) - - # NOTE(dstanek): the SQL backend adds an 'extra' field containing a - # dictionary of the extra fields in addition to the - # fields in the object. For the details see: - # SqlIdentity.test_update_project_returns_extra - updated_user.pop('extra', None) - - self.assertDictEqual(user_ref, updated_user) - - user_ref = self.identity_api.get_user(user_ref['id']) - self.assertEqual(changed_name, user_ref['name']) - - def test_update_user_enable_fails(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - user = self.identity_api.create_user(user) - user_ref = self.identity_api.get_user(user['id']) - self.assertTrue(user_ref['enabled']) - - # Strings are not valid boolean values - user['enabled'] = 'false' - self.assertRaises(exception.ValidationError, - self.identity_api.update_user, - user['id'], - user) - - def test_add_user_to_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - - found = False - for x in groups: - if (x['id'] == new_group['id']): - found = True - self.assertTrue(found) - - def test_add_user_to_group_returns_not_found(self): - domain = self._get_domain_fixture() - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.assertRaises(exception.GroupNotFound, - self.identity_api.add_user_to_group, - new_user['id'], - uuid.uuid4().hex) - - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.UserNotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.add_user_to_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_check_user_in_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - self.identity_api.check_user_in_group(new_user['id'], new_group['id']) - - def test_check_user_not_in_group(self): - new_group = unit.new_group_ref( - domain_id=CONF.identity.default_domain_id) - new_group = self.identity_api.create_group(new_group) - - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - new_user = self.identity_api.create_user(new_user) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - new_user['id'], - new_group['id']) - - def test_check_user_in_group_returns_not_found(self): - new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - new_user = self.identity_api.create_user(new_user) - - new_group = unit.new_group_ref( - domain_id=CONF.identity.default_domain_id) - new_group = self.identity_api.create_group(new_group) - - self.assertRaises(exception.UserNotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.GroupNotFound, - self.identity_api.check_user_in_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.NotFound, - self.identity_api.check_user_in_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_list_users_in_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - # Make sure we get an empty list back on a new group, not an error. - user_refs = self.identity_api.list_users_in_group(new_group['id']) - self.assertEqual([], user_refs) - # Make sure we get the correct users back once they have been added - # to the group. - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - user_refs = self.identity_api.list_users_in_group(new_group['id']) - found = False - for x in user_refs: - if (x['id'] == new_user['id']): - found = True - self.assertNotIn('password', x) - self.assertTrue(found) - - def test_list_users_in_group_returns_not_found(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.list_users_in_group, - uuid.uuid4().hex) - - def test_list_groups_for_user(self): - domain = self._get_domain_fixture() - test_groups = [] - test_users = [] - GROUP_COUNT = 3 - USER_COUNT = 2 - - for x in range(0, USER_COUNT): - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - test_users.append(new_user) - positive_user = test_users[0] - negative_user = test_users[1] - - for x in range(0, USER_COUNT): - group_refs = self.identity_api.list_groups_for_user( - test_users[x]['id']) - self.assertEqual(0, len(group_refs)) - - for x in range(0, GROUP_COUNT): - before_count = x - after_count = x + 1 - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - test_groups.append(new_group) - - # add the user to the group and ensure that the - # group count increases by one for each - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.add_user_to_group( - positive_user['id'], - new_group['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - - # Make sure the group count for the unrelated user did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - # remove the user from each group and ensure that - # the group count reduces by one for each - for x in range(0, 3): - before_count = GROUP_COUNT - x - after_count = GROUP_COUNT - x - 1 - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(before_count, len(group_refs)) - self.identity_api.remove_user_from_group( - positive_user['id'], - test_groups[x]['id']) - group_refs = self.identity_api.list_groups_for_user( - positive_user['id']) - self.assertEqual(after_count, len(group_refs)) - # Make sure the group count for the unrelated user - # did not change - group_refs = self.identity_api.list_groups_for_user( - negative_user['id']) - self.assertEqual(0, len(group_refs)) - - def test_remove_user_from_group(self): - domain = self._get_domain_fixture() - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - self.identity_api.add_user_to_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertIn(new_group['id'], [x['id'] for x in groups]) - self.identity_api.remove_user_from_group(new_user['id'], - new_group['id']) - groups = self.identity_api.list_groups_for_user(new_user['id']) - self.assertNotIn(new_group['id'], [x['id'] for x in groups]) - - def test_remove_user_from_group_returns_not_found(self): - domain = self._get_domain_fixture() - new_user = unit.new_user_ref(domain_id=domain['id']) - new_user = self.identity_api.create_user(new_user) - new_group = unit.new_group_ref(domain_id=domain['id']) - new_group = self.identity_api.create_group(new_group) - self.assertRaises(exception.GroupNotFound, - self.identity_api.remove_user_from_group, - new_user['id'], - uuid.uuid4().hex) - - self.assertRaises(exception.UserNotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - new_group['id']) - - self.assertRaises(exception.NotFound, - self.identity_api.remove_user_from_group, - uuid.uuid4().hex, - uuid.uuid4().hex) - - def test_group_crud(self): - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - group = unit.new_group_ref(domain_id=domain['id']) - group = self.identity_api.create_group(group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - group['name'] = uuid.uuid4().hex - self.identity_api.update_group(group['id'], group) - group_ref = self.identity_api.get_group(group['id']) - self.assertDictContainsSubset(group, group_ref) - - self.identity_api.delete_group(group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, - group['id']) - - def test_get_group_by_name(self): - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group_name = group['name'] - group = self.identity_api.create_group(group) - spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - self.identity_api.create_group(spoiler) - - group_ref = self.identity_api.get_group_by_name( - group_name, CONF.identity.default_domain_id) - self.assertDictEqual(group, group_ref) - - def test_get_group_by_name_returns_not_found(self): - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group_by_name, - uuid.uuid4().hex, - CONF.identity.default_domain_id) - - @unit.skip_if_cache_disabled('identity') - def test_cache_layer_group_crud(self): - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group = self.identity_api.create_group(group) - # cache the result - group_ref = self.identity_api.get_group(group['id']) - # delete the group bypassing identity api. - domain_id, driver, entity_id = ( - self.identity_api._get_domain_driver_and_entity_id(group['id'])) - driver.delete_group(entity_id) - - self.assertEqual(group_ref, self.identity_api.get_group(group['id'])) - self.identity_api.get_group.invalidate(self.identity_api, group['id']) - self.assertRaises(exception.GroupNotFound, - self.identity_api.get_group, group['id']) - - group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group = self.identity_api.create_group(group) - # cache the result - self.identity_api.get_group(group['id']) - group['name'] = uuid.uuid4().hex - group_ref = self.identity_api.update_group(group['id'], group) - # after updating through identity api, get updated group - self.assertDictContainsSubset(self.identity_api.get_group(group['id']), - group_ref) - - def test_create_duplicate_group_name_fails(self): - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id, - name=group1['name']) - group1 = self.identity_api.create_group(group1) - self.assertRaises(exception.Conflict, - self.identity_api.create_group, - group2) - - def test_create_duplicate_group_name_in_different_domains(self): - new_domain = unit.new_domain_ref() - self.resource_api.create_domain(new_domain['id'], new_domain) - group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id) - group2 = unit.new_group_ref(domain_id=new_domain['id'], - name=group1['name']) - group1 = self.identity_api.create_group(group1) - group2 = self.identity_api.create_group(group2) - - def test_move_group_between_domains(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - group = unit.new_group_ref(domain_id=domain1['id']) - group = self.identity_api.create_group(group) - group['domain_id'] = domain2['id'] - # Update the group asserting that a deprecation warning is emitted - with mock.patch( - 'oslo_log.versionutils.report_deprecated_feature') as mock_dep: - self.identity_api.update_group(group['id'], group) - self.assertTrue(mock_dep.called) - - updated_group_ref = self.identity_api.get_group(group['id']) - self.assertEqual(domain2['id'], updated_group_ref['domain_id']) - - def test_move_group_between_domains_with_clashing_names_fails(self): - domain1 = unit.new_domain_ref() - self.resource_api.create_domain(domain1['id'], domain1) - domain2 = unit.new_domain_ref() - self.resource_api.create_domain(domain2['id'], domain2) - # First, create a group in domain1 - group1 = unit.new_group_ref(domain_id=domain1['id']) - group1 = self.identity_api.create_group(group1) - # Now create a group in domain2 with a potentially clashing - # name - which should work since we have domain separation - group2 = unit.new_group_ref(name=group1['name'], - domain_id=domain2['id']) - group2 = self.identity_api.create_group(group2) - # Now try and move group1 into the 2nd domain - which should - # fail since the names clash - group1['domain_id'] = domain2['id'] - self.assertRaises(exception.Conflict, - self.identity_api.update_group, - group1['id'], - group1) - - def test_user_crud(self): - user_dict = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - del user_dict['id'] - user = self.identity_api.create_user(user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - user_dict['password'] = uuid.uuid4().hex - self.identity_api.update_user(user['id'], user_dict) - user_ref = self.identity_api.get_user(user['id']) - del user_dict['password'] - user_ref_dict = {x: user_ref[x] for x in user_ref} - self.assertDictContainsSubset(user_dict, user_ref_dict) - - self.identity_api.delete_user(user['id']) - self.assertRaises(exception.UserNotFound, - self.identity_api.get_user, - user['id']) - - def test_arbitrary_attributes_are_returned_from_create_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - user = self.identity_api.create_user(user_data) - - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_arbitrary_attributes_are_returned_from_get_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - user_data = self.identity_api.create_user(user_data) - - user = self.identity_api.get_user(user_data['id']) - self.assertEqual(attr_value, user['arbitrary_attr']) - - def test_new_arbitrary_attributes_are_returned_from_update_user(self): - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - - user = self.identity_api.create_user(user_data) - attr_value = uuid.uuid4().hex - user['arbitrary_attr'] = attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(attr_value, updated_user['arbitrary_attr']) - - def test_updated_arbitrary_attributes_are_returned_from_update_user(self): - attr_value = uuid.uuid4().hex - user_data = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id, - arbitrary_attr=attr_value) - - new_attr_value = uuid.uuid4().hex - user = self.identity_api.create_user(user_data) - user['arbitrary_attr'] = new_attr_value - updated_user = self.identity_api.update_user(user['id'], user) - - self.assertEqual(new_attr_value, updated_user['arbitrary_attr']) - - def test_user_update_and_user_get_return_same_response(self): - user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) - - user = self.identity_api.create_user(user) - - updated_user = {'enabled': False} - updated_user_ref = self.identity_api.update_user( - user['id'], updated_user) - - # SQL backend adds 'extra' field - updated_user_ref.pop('extra', None) - - self.assertIs(False, updated_user_ref['enabled']) - - user_ref = self.identity_api.get_user(user['id']) - self.assertDictEqual(updated_user_ref, user_ref) - - -class FilterTests(filtering.FilterTests): - def test_list_entities_filtered(self): - for entity in ['user', 'group', 'project']: - # Create 20 entities - entity_list = self._create_test_data(entity, 20) - - # Try filtering to get one an exact item out of the list - hints = driver_hints.Hints() - hints.add_filter('name', entity_list[10]['name']) - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(1, len(entities)) - self.assertEqual(entity_list[10]['id'], entities[0]['id']) - # Check the driver has removed the filter from the list hints - self.assertFalse(hints.get_exact_filter_by_name('name')) - self._delete_test_data(entity, entity_list) - - def test_list_users_inexact_filtered(self): - # Create 20 users, some with specific names. We set the names at create - # time (rather than updating them), since the LDAP driver does not - # support name updates. - user_name_data = { - # user index: name for user - 5: 'The', - 6: 'The Ministry', - 7: 'The Ministry of', - 8: 'The Ministry of Silly', - 9: 'The Ministry of Silly Walks', - # ...and one for useful case insensitivity testing - 10: 'The ministry of silly walks OF' - } - user_list = self._create_test_data( - 'user', 20, domain_id=CONF.identity.default_domain_id, - name_dict=user_name_data) - - hints = driver_hints.Hints() - hints.add_filter('name', 'ministry', comparator='contains') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(5, len(users)) - self._match_with_list(users, user_list, - list_start=6, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(6, len(users)) - self._match_with_list(users, user_list, - list_start=5, list_end=11) - # TODO(henry-nash) Check inexact filter has been removed. - - hints = driver_hints.Hints() - hints.add_filter('name', 'of', comparator='endswith') - users = self.identity_api.list_users(hints=hints) - self.assertEqual(2, len(users)) - # We can't assume we will get back the users in any particular order - self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']]) - # TODO(henry-nash) Check inexact filter has been removed. - - # TODO(henry-nash): Add some case sensitive tests. However, - # these would be hard to validate currently, since: - # - # For SQL, the issue is that MySQL 0.7, by default, is installed in - # case insensitive mode (which is what is run by default for our - # SQL backend tests). For production deployments. OpenStack - # assumes a case sensitive database. For these tests, therefore, we - # need to be able to check the sensitivity of the database so as to - # know whether to run case sensitive tests here. - # - # For LDAP/AD, although dependent on the schema being used, attributes - # are typically configured to be case aware, but not case sensitive. - - self._delete_test_data('user', user_list) - - def _groups_for_user_data(self): - number_of_groups = 10 - group_name_data = { - # entity index: name for entity - 5: 'The', - 6: 'The Ministry', - 9: 'The Ministry of Silly Walks', - } - group_list = self._create_test_data( - 'group', number_of_groups, - domain_id=CONF.identity.default_domain_id, - name_dict=group_name_data) - user_list = self._create_test_data('user', 2) - - for group in range(7): - # Create membership, including with two out of the three groups - # with well know names - self.identity_api.add_user_to_group(user_list[0]['id'], - group_list[group]['id']) - # ...and some spoiler memberships - for group in range(7, number_of_groups): - self.identity_api.add_user_to_group(user_list[1]['id'], - group_list[group]['id']) - - return group_list, user_list - - def test_groups_for_user_inexact_filtered(self): - """Test use of filtering doesn't break groups_for_user listing. - - Some backends may use filtering to achieve the list of groups for a - user, so test that it can combine a second filter. - - Test Plan: - - - Create 10 groups, some with names we can filter on - - Create 2 users - - Assign 1 of those users to most of the groups, including some of the - well known named ones - - Assign the other user to other groups as spoilers - - Ensure that when we list groups for users with a filter on the group - name, both restrictions have been enforced on what is returned. - - """ - group_list, user_list = self._groups_for_user_data() - - hints = driver_hints.Hints() - hints.add_filter('name', 'Ministry', comparator='contains') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back one group, since of the two that contain - # 'Ministry' the user only belongs to one. - self.assertThat(len(groups), matchers.Equals(1)) - self.assertEqual(group_list[6]['id'], groups[0]['id']) - - hints = driver_hints.Hints() - hints.add_filter('name', 'The', comparator='startswith') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back 2 out of the 3 groups that start with 'The' - # hence showing that both "filters" have been applied - self.assertThat(len(groups), matchers.Equals(2)) - self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']]) - self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']]) - - hints.add_filter('name', 'The', comparator='endswith') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back one group since it is the only one that - # ends with 'The' - self.assertThat(len(groups), matchers.Equals(1)) - self.assertEqual(group_list[5]['id'], groups[0]['id']) - - self._delete_test_data('user', user_list) - self._delete_test_data('group', group_list) - - def test_groups_for_user_exact_filtered(self): - """Test exact filters doesn't break groups_for_user listing.""" - group_list, user_list = self._groups_for_user_data() - hints = driver_hints.Hints() - hints.add_filter('name', 'The Ministry', comparator='equals') - groups = self.identity_api.list_groups_for_user( - user_list[0]['id'], hints=hints) - # We should only get back 1 out of the 3 groups with name 'The - # Ministry' hence showing that both "filters" have been applied. - self.assertEqual(1, len(groups)) - self.assertEqual(group_list[6]['id'], groups[0]['id']) - self._delete_test_data('user', user_list) - self._delete_test_data('group', group_list) - - def _get_user_name_field_size(self): - """Return the size of the user name field for the backend. - - Subclasses can override this method to indicate that the user name - field is limited in length. The user name is the field used in the test - that validates that a filter value works even if it's longer than a - field. - - If the backend doesn't limit the value length then return None. - - """ - return None - - def test_filter_value_wider_than_field(self): - # If a filter value is given that's larger than the field in the - # backend then no values are returned. - - user_name_field_size = self._get_user_name_field_size() - - if user_name_field_size is None: - # The backend doesn't limit the size of the user name, so pass this - # test. - return - - # Create some users just to make sure would return something if the - # filter was ignored. - self._create_test_data('user', 2) - - hints = driver_hints.Hints() - value = 'A' * (user_name_field_size + 1) - hints.add_filter('name', value) - users = self.identity_api.list_users(hints=hints) - self.assertEqual([], users) - - def _list_users_in_group_data(self): - number_of_users = 10 - user_name_data = { - 1: 'Arthur Conan Doyle', - 3: 'Arthur Rimbaud', - 9: 'Arthur Schopenhauer', - } - user_list = self._create_test_data( - 'user', number_of_users, - domain_id=CONF.identity.default_domain_id, - name_dict=user_name_data) - group = self._create_one_entity( - 'group', CONF.identity.default_domain_id, 'Great Writers') - for i in range(7): - self.identity_api.add_user_to_group(user_list[i]['id'], - group['id']) - - return user_list, group - - def test_list_users_in_group_inexact_filtered(self): - user_list, group = self._list_users_in_group_data() - - hints = driver_hints.Hints() - hints.add_filter('name', 'Arthur', comparator='contains') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(2)) - self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']]) - - hints = driver_hints.Hints() - hints.add_filter('name', 'Arthur', comparator='startswith') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(2)) - self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']]) - self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']]) - - hints = driver_hints.Hints() - hints.add_filter('name', 'Doyle', comparator='endswith') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertThat(len(users), matchers.Equals(1)) - self.assertEqual(user_list[1]['id'], users[0]['id']) - - self._delete_test_data('user', user_list) - self._delete_entity('group')(group['id']) - - def test_list_users_in_group_exact_filtered(self): - hints = driver_hints.Hints() - user_list, group = self._list_users_in_group_data() - hints.add_filter('name', 'Arthur Rimbaud', comparator='equals') - users = self.identity_api.list_users_in_group(group['id'], hints=hints) - self.assertEqual(1, len(users)) - self.assertEqual(user_list[3]['id'], users[0]['id']) - self._delete_test_data('user', user_list) - self._delete_entity('group')(group['id']) - - -class LimitTests(filtering.FilterTests): - ENTITIES = ['user', 'group', 'project'] - - def setUp(self): - """Setup for Limit Test Cases.""" - self.entity_lists = {} - - for entity in self.ENTITIES: - # Create 20 entities - self.entity_lists[entity] = self._create_test_data(entity, 20) - self.addCleanup(self.clean_up_entities) - - def clean_up_entities(self): - """Clean up entity test data from Limit Test Cases.""" - for entity in self.ENTITIES: - self._delete_test_data(entity, self.entity_lists[entity]) - del self.entity_lists - - def _test_list_entity_filtered_and_limited(self, entity): - self.config_fixture.config(list_limit=10) - # Should get back just 10 entities - hints = driver_hints.Hints() - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - self.assertTrue(hints.limit['truncated']) - - # Override with driver specific limit - if entity == 'project': - self.config_fixture.config(group='resource', list_limit=5) - else: - self.config_fixture.config(group='identity', list_limit=5) - - # Should get back just 5 users - hints = driver_hints.Hints() - entities = self._list_entities(entity)(hints=hints) - self.assertEqual(hints.limit['limit'], len(entities)) - - # Finally, let's pretend we want to get the full list of entities, - # even with the limits set, as part of some internal calculation. - # Calling the API without a hints list should achieve this, and - # return at least the 20 entries we created (there may be other - # entities lying around created by other tests/setup). - entities = self._list_entities(entity)() - self.assertTrue(len(entities) >= 20) - self._match_with_list(self.entity_lists[entity], entities) - - def test_list_users_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('user') - - def test_list_groups_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('group') - - def test_list_projects_filtered_and_limited(self): - self._test_list_entity_filtered_and_limited('project') diff --git a/keystone-moon/keystone/tests/unit/identity/test_controllers.py b/keystone-moon/keystone/tests/unit/identity/test_controllers.py deleted file mode 100644 index ed2fe3ff..00000000 --- a/keystone-moon/keystone/tests/unit/identity/test_controllers.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright 2016 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslo_config import cfg - -from keystone import exception -from keystone.identity import controllers -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - -_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}} - - -class UserTestCaseNoDefaultDomain(unit.TestCase): - - def setUp(self): - super(UserTestCaseNoDefaultDomain, self).setUp() - self.useFixture(database.Database()) - self.load_backends() - self.user_controller = controllers.User() - - def test_setup(self): - # Other tests in this class assume there's no default domain, so make - # sure the setUp worked as expected. - self.assertRaises( - exception.DomainNotFound, - self.resource_api.get_domain, CONF.identity.default_domain_id) - - def test_get_users(self): - # When list_users is done and there's no default domain, the result is - # an empty list. - res = self.user_controller.get_users(_ADMIN_CONTEXT) - self.assertEqual([], res['users']) - - def test_get_user_by_name(self): - # When get_user_by_name is done and there's no default domain, the - # result is 404 Not Found - user_name = uuid.uuid4().hex - self.assertRaises( - exception.UserNotFound, - self.user_controller.get_user_by_name, _ADMIN_CONTEXT, user_name) - - def test_create_user(self): - # When a user is created using the v2 controller and there's no default - # domain, it doesn't fail with can't find domain (a default domain is - # created) - user = {'name': uuid.uuid4().hex} - self.user_controller.create_user(_ADMIN_CONTEXT, user) - # If the above doesn't fail then this is successful. diff --git a/keystone-moon/keystone/tests/unit/identity/test_core.py b/keystone-moon/keystone/tests/unit/identity/test_core.py deleted file mode 100644 index 39f3c701..00000000 --- a/keystone-moon/keystone/tests/unit/identity/test_core.py +++ /dev/null @@ -1,176 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Unit tests for core identity behavior.""" - -import itertools -import os -import uuid - -import mock -from oslo_config import cfg -from oslo_config import fixture as config_fixture - -from keystone import exception -from keystone import identity -from keystone.tests import unit -from keystone.tests.unit.ksfixtures import database - - -CONF = cfg.CONF - - -class TestDomainConfigs(unit.BaseTestCase): - - def setUp(self): - super(TestDomainConfigs, self).setUp() - self.addCleanup(CONF.reset) - - self.tmp_dir = unit.dirs.tmp() - - self.config_fixture = self.useFixture(config_fixture.Config(CONF)) - self.config_fixture.config(domain_config_dir=self.tmp_dir, - group='identity') - - def test_config_for_nonexistent_domain(self): - """Having a config for a non-existent domain will be ignored. - - There are no assertions in this test because there are no side - effects. If there is a config file for a domain that does not - exist it should be ignored. - - """ - domain_id = uuid.uuid4().hex - domain_config_filename = os.path.join(self.tmp_dir, - 'keystone.%s.conf' % domain_id) - self.addCleanup(lambda: os.remove(domain_config_filename)) - with open(domain_config_filename, 'w'): - """Write an empty config file.""" - - e = exception.DomainNotFound(domain_id=domain_id) - mock_assignment_api = mock.Mock() - mock_assignment_api.get_domain_by_name.side_effect = e - - domain_config = identity.DomainConfigs() - fake_standard_driver = None - domain_config.setup_domain_drivers(fake_standard_driver, - mock_assignment_api) - - def test_config_for_dot_name_domain(self): - # Ensure we can get the right domain name which has dots within it - # from filename. - domain_config_filename = os.path.join(self.tmp_dir, - 'keystone.abc.def.com.conf') - with open(domain_config_filename, 'w'): - """Write an empty config file.""" - self.addCleanup(os.remove, domain_config_filename) - - with mock.patch.object(identity.DomainConfigs, - '_load_config_from_file') as mock_load_config: - domain_config = identity.DomainConfigs() - fake_assignment_api = None - fake_standard_driver = None - domain_config.setup_domain_drivers(fake_standard_driver, - fake_assignment_api) - mock_load_config.assert_called_once_with(fake_assignment_api, - [domain_config_filename], - 'abc.def.com') - - def test_config_for_multiple_sql_backend(self): - domains_config = identity.DomainConfigs() - - # Create the right sequence of is_sql in the drivers being - # requested to expose the bug, which is that a False setting - # means it forgets previous True settings. - drivers = [] - files = [] - for idx, is_sql in enumerate((True, False, True)): - drv = mock.Mock(is_sql=is_sql) - drivers.append(drv) - name = 'dummy.{0}'.format(idx) - files.append(''.join(( - identity.DOMAIN_CONF_FHEAD, - name, - identity.DOMAIN_CONF_FTAIL))) - - walk_fake = lambda *a, **kwa: ( - ('/fake/keystone/domains/config', [], files), ) - - generic_driver = mock.Mock(is_sql=False) - - assignment_api = mock.Mock() - id_factory = itertools.count() - assignment_api.get_domain_by_name.side_effect = ( - lambda name: {'id': next(id_factory), '_': 'fake_domain'}) - load_driver_mock = mock.Mock(side_effect=drivers) - - with mock.patch.object(os, 'walk', walk_fake): - with mock.patch.object(identity.cfg, 'ConfigOpts'): - with mock.patch.object(domains_config, '_load_driver', - load_driver_mock): - self.assertRaises( - exception.MultipleSQLDriversInConfig, - domains_config.setup_domain_drivers, - generic_driver, assignment_api) - - self.assertEqual(3, load_driver_mock.call_count) - - -class TestDatabaseDomainConfigs(unit.TestCase): - - def setUp(self): - super(TestDatabaseDomainConfigs, self).setUp() - self.useFixture(database.Database()) - self.load_backends() - - def test_domain_config_in_database_disabled_by_default(self): - self.assertFalse(CONF.identity.domain_configurations_from_database) - - def test_loading_config_from_database(self): - self.config_fixture.config(domain_configurations_from_database=True, - group='identity') - domain = unit.new_domain_ref() - self.resource_api.create_domain(domain['id'], domain) - # Override two config options for our domain - conf = {'ldap': {'url': uuid.uuid4().hex, - 'suffix': uuid.uuid4().hex, - 'use_tls': 'True'}, - 'identity': { - 'driver': 'ldap'}} - self.domain_config_api.create_config(domain['id'], conf) - fake_standard_driver = None - domain_config = identity.DomainConfigs() - domain_config.setup_domain_drivers(fake_standard_driver, - self.resource_api) - # Make sure our two overrides are in place, and others are not affected - res = domain_config.get_domain_conf(domain['id']) - self.assertEqual(conf['ldap']['url'], res.ldap.url) - self.assertEqual(conf['ldap']['suffix'], res.ldap.suffix) - self.assertEqual(CONF.ldap.query_scope, res.ldap.query_scope) - - # Make sure the override is not changing the type of the config value - use_tls_type = type(CONF.ldap.use_tls) - self.assertEqual(use_tls_type(conf['ldap']['use_tls']), - res.ldap.use_tls) - - # Now turn off using database domain configuration and check that the - # default config file values are now seen instead of the overrides. - CONF.set_override('domain_configurations_from_database', False, - 'identity', enforce_type=True) - domain_config = identity.DomainConfigs() - domain_config.setup_domain_drivers(fake_standard_driver, - self.resource_api) - res = domain_config.get_domain_conf(domain['id']) - self.assertEqual(CONF.ldap.url, res.ldap.url) - self.assertEqual(CONF.ldap.suffix, res.ldap.suffix) - self.assertEqual(CONF.ldap.use_tls, res.ldap.use_tls) - self.assertEqual(CONF.ldap.query_scope, res.ldap.query_scope) |