aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/identity
diff options
context:
space:
mode:
authorRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
committerRHE <rebirthmonkey@gmail.com>2017-11-24 13:54:26 +0100
commit920a49cfa055733d575282973e23558c33087a4a (patch)
treed371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/identity
parentef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff)
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/identity')
-rw-r--r--keystone-moon/keystone/tests/unit/identity/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/identity/test_backends.py1297
-rw-r--r--keystone-moon/keystone/tests/unit/identity/test_controllers.py65
-rw-r--r--keystone-moon/keystone/tests/unit/identity/test_core.py176
4 files changed, 0 insertions, 1538 deletions
diff --git a/keystone-moon/keystone/tests/unit/identity/__init__.py b/keystone-moon/keystone/tests/unit/identity/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/identity/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/identity/test_backends.py b/keystone-moon/keystone/tests/unit/identity/test_backends.py
deleted file mode 100644
index 8b5c0def..00000000
--- a/keystone-moon/keystone/tests/unit/identity/test_backends.py
+++ /dev/null
@@ -1,1297 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-import mock
-from oslo_config import cfg
-from six.moves import range
-from testtools import matchers
-
-from keystone.common import driver_hints
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import default_fixtures
-from keystone.tests.unit import filtering
-
-
-CONF = cfg.CONF
-
-
-class IdentityTests(object):
-
- def _get_domain_fixture(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- return domain
-
- def _set_domain_scope(self, domain_id):
- # We only provide a domain scope if we have multiple drivers
- if CONF.identity.domain_specific_drivers_enabled:
- return domain_id
-
- def test_authenticate_bad_user(self):
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=uuid.uuid4().hex,
- password=self.user_foo['password'])
-
- def test_authenticate_bad_password(self):
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=self.user_foo['id'],
- password=uuid.uuid4().hex)
-
- def test_authenticate(self):
- user_ref = self.identity_api.authenticate(
- context={},
- user_id=self.user_sna['id'],
- password=self.user_sna['password'])
- # NOTE(termie): the password field is left in user_sna to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_sna.pop('password')
- self.user_sna['enabled'] = True
- self.assertDictEqual(self.user_sna, user_ref)
-
- def test_authenticate_and_get_roles_no_metadata(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- # Remove user id. It is ignored by create_user() and will break the
- # subset test below.
- del user['id']
-
- new_user = self.identity_api.create_user(user)
- self.assignment_api.add_user_to_project(self.tenant_baz['id'],
- new_user['id'])
- user_ref = self.identity_api.authenticate(
- context={},
- user_id=new_user['id'],
- password=user['password'])
- self.assertNotIn('password', user_ref)
- # NOTE(termie): the password field is left in user_sna to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- user.pop('password')
- self.assertDictContainsSubset(user, user_ref)
- role_list = self.assignment_api.get_roles_for_user_and_project(
- new_user['id'], self.tenant_baz['id'])
- self.assertEqual(1, len(role_list))
- self.assertIn(CONF.member_role_id, role_list)
-
- def test_authenticate_if_no_password_set(self):
- id_ = uuid.uuid4().hex
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
-
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=id_,
- password='password')
-
- def test_create_unicode_user_name(self):
- unicode_name = u'name \u540d\u5b57'
- user = unit.new_user_ref(name=unicode_name,
- domain_id=CONF.identity.default_domain_id)
- ref = self.identity_api.create_user(user)
- self.assertEqual(unicode_name, ref['name'])
-
- def test_get_user(self):
- user_ref = self.identity_api.get_user(self.user_foo['id'])
- # NOTE(termie): the password field is left in user_foo to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_foo.pop('password')
- self.assertDictEqual(self.user_foo, user_ref)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_get_user(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- # cache the result.
- self.identity_api.get_user(ref['id'])
- # delete bypassing identity api
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(ref['id']))
- driver.delete_user(entity_id)
-
- self.assertDictEqual(ref, self.identity_api.get_user(ref['id']))
- self.identity_api.get_user.invalidate(self.identity_api, ref['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user, ref['id'])
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- user['description'] = uuid.uuid4().hex
- # cache the result.
- self.identity_api.get_user(ref['id'])
- # update using identity api and get back updated user.
- user_updated = self.identity_api.update_user(ref['id'], user)
- self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
- user_updated)
- self.assertDictContainsSubset(
- self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
- user_updated)
-
- def test_get_user_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- uuid.uuid4().hex)
-
- def test_get_user_by_name(self):
- user_ref = self.identity_api.get_user_by_name(
- self.user_foo['name'], CONF.identity.default_domain_id)
- # NOTE(termie): the password field is left in user_foo to make
- # it easier to authenticate in tests, but should
- # not be returned by the api
- self.user_foo.pop('password')
- self.assertDictEqual(self.user_foo, user_ref)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_get_user_by_name(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- # delete bypassing the identity api.
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(ref['id']))
- driver.delete_user(entity_id)
-
- self.assertDictEqual(ref, self.identity_api.get_user_by_name(
- user['name'], CONF.identity.default_domain_id))
- self.identity_api.get_user_by_name.invalidate(
- self.identity_api, user['name'], CONF.identity.default_domain_id)
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user_by_name,
- user['name'], CONF.identity.default_domain_id)
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- ref = self.identity_api.get_user_by_name(user['name'],
- user['domain_id'])
- user['description'] = uuid.uuid4().hex
- user_updated = self.identity_api.update_user(ref['id'], user)
- self.assertDictContainsSubset(self.identity_api.get_user(ref['id']),
- user_updated)
- self.assertDictContainsSubset(
- self.identity_api.get_user_by_name(ref['name'], ref['domain_id']),
- user_updated)
-
- def test_get_user_by_name_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user_by_name,
- uuid.uuid4().hex,
- CONF.identity.default_domain_id)
-
- def test_create_duplicate_user_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.assertRaises(exception.Conflict,
- self.identity_api.create_user,
- user)
-
- def test_create_duplicate_user_name_in_different_domains(self):
- new_domain = unit.new_domain_ref()
- self.resource_api.create_domain(new_domain['id'], new_domain)
- user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- user2 = unit.new_user_ref(name=user1['name'],
- domain_id=new_domain['id'])
-
- self.identity_api.create_user(user1)
- self.identity_api.create_user(user2)
-
- def test_move_user_between_domains(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- user = unit.new_user_ref(domain_id=domain1['id'])
- user = self.identity_api.create_user(user)
- user['domain_id'] = domain2['id']
- # Update the user asserting that a deprecation warning is emitted
- with mock.patch(
- 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
- self.identity_api.update_user(user['id'], user)
- self.assertTrue(mock_dep.called)
-
- updated_user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(domain2['id'], updated_user_ref['domain_id'])
-
- def test_move_user_between_domains_with_clashing_names_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- # First, create a user in domain1
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- # Now create a user in domain2 with a potentially clashing
- # name - which should work since we have domain separation
- user2 = unit.new_user_ref(name=user1['name'],
- domain_id=domain2['id'])
- user2 = self.identity_api.create_user(user2)
- # Now try and move user1 into the 2nd domain - which should
- # fail since the names clash
- user1['domain_id'] = domain2['id']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_user,
- user1['id'],
- user1)
-
- def test_rename_duplicate_user_name_fails(self):
- user1 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user2 = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_user(user1)
- user2 = self.identity_api.create_user(user2)
- user2['name'] = user1['name']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_user,
- user2['id'],
- user2)
-
- def test_update_user_id_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- original_id = user['id']
- user['id'] = 'fake2'
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- original_id,
- user)
- user_ref = self.identity_api.get_user(original_id)
- self.assertEqual(original_id, user_ref['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- 'fake2')
-
- def test_delete_user_with_group_project_domain_links(self):
- role1 = unit.new_role_ref()
- self.role_api.create_role(role1['id'], role1)
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
- self.assignment_api.create_grant(user_id=user1['id'],
- project_id=project1['id'],
- role_id=role1['id'])
- self.assignment_api.create_grant(user_id=user1['id'],
- domain_id=domain1['id'],
- role_id=role1['id'])
- self.identity_api.add_user_to_group(user_id=user1['id'],
- group_id=group1['id'])
- roles_ref = self.assignment_api.list_grants(
- user_id=user1['id'],
- project_id=project1['id'])
- self.assertEqual(1, len(roles_ref))
- roles_ref = self.assignment_api.list_grants(
- user_id=user1['id'],
- domain_id=domain1['id'])
- self.assertEqual(1, len(roles_ref))
- self.identity_api.check_user_in_group(
- user_id=user1['id'],
- group_id=group1['id'])
- self.identity_api.delete_user(user1['id'])
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- user1['id'],
- group1['id'])
-
- def test_delete_group_with_user_project_domain_links(self):
- role1 = unit.new_role_ref()
- self.role_api.create_role(role1['id'], role1)
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- project1 = unit.new_project_ref(domain_id=domain1['id'])
- self.resource_api.create_project(project1['id'], project1)
- user1 = unit.new_user_ref(domain_id=domain1['id'])
- user1 = self.identity_api.create_user(user1)
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
-
- self.assignment_api.create_grant(group_id=group1['id'],
- project_id=project1['id'],
- role_id=role1['id'])
- self.assignment_api.create_grant(group_id=group1['id'],
- domain_id=domain1['id'],
- role_id=role1['id'])
- self.identity_api.add_user_to_group(user_id=user1['id'],
- group_id=group1['id'])
- roles_ref = self.assignment_api.list_grants(
- group_id=group1['id'],
- project_id=project1['id'])
- self.assertEqual(1, len(roles_ref))
- roles_ref = self.assignment_api.list_grants(
- group_id=group1['id'],
- domain_id=domain1['id'])
- self.assertEqual(1, len(roles_ref))
- self.identity_api.check_user_in_group(
- user_id=user1['id'],
- group_id=group1['id'])
- self.identity_api.delete_group(group1['id'])
- self.identity_api.get_user(user1['id'])
-
- def test_update_user_returns_not_found(self):
- user_id = uuid.uuid4().hex
- self.assertRaises(exception.UserNotFound,
- self.identity_api.update_user,
- user_id,
- {'id': user_id,
- 'domain_id': CONF.identity.default_domain_id})
-
- def test_delete_user_returns_not_found(self):
- self.assertRaises(exception.UserNotFound,
- self.identity_api.delete_user,
- uuid.uuid4().hex)
-
- def test_create_user_long_name_fails(self):
- user = unit.new_user_ref(name='a' * 256,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_blank_name_fails(self):
- user = unit.new_user_ref(name='',
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_missed_password(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.identity_api.get_user(user['id'])
- # Make sure the user is not allowed to login
- # with a password that is empty string or None
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password='')
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password=None)
-
- def test_create_user_none_password(self):
- user = unit.new_user_ref(password=None,
- domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- self.identity_api.get_user(user['id'])
- # Make sure the user is not allowed to login
- # with a password that is empty string or None
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password='')
- self.assertRaises(AssertionError,
- self.identity_api.authenticate,
- context={},
- user_id=user['id'],
- password=None)
-
- def test_create_user_invalid_name_fails(self):
- user = unit.new_user_ref(name=None,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- user = unit.new_user_ref(name=123,
- domain_id=CONF.identity.default_domain_id)
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_create_user_invalid_enabled_type_string(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id,
- # invalid string value
- enabled='true')
- self.assertRaises(exception.ValidationError,
- self.identity_api.create_user,
- user)
-
- def test_update_user_long_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user['name'] = 'a' * 256
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_update_user_blank_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user['name'] = ''
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_update_user_invalid_name_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
-
- user['name'] = None
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- user['name'] = 123
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_list_users(self):
- users = self.identity_api.list_users(
- domain_scope=self._set_domain_scope(
- CONF.identity.default_domain_id))
- self.assertEqual(len(default_fixtures.USERS), len(users))
- user_ids = set(user['id'] for user in users)
- expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
- for user in default_fixtures.USERS)
- for user_ref in users:
- self.assertNotIn('password', user_ref)
- self.assertEqual(expected_user_ids, user_ids)
-
- def test_list_groups(self):
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group1 = self.identity_api.create_group(group1)
- group2 = self.identity_api.create_group(group2)
- groups = self.identity_api.list_groups(
- domain_scope=self._set_domain_scope(
- CONF.identity.default_domain_id))
- self.assertEqual(2, len(groups))
- group_ids = []
- for group in groups:
- group_ids.append(group.get('id'))
- self.assertIn(group1['id'], group_ids)
- self.assertIn(group2['id'], group_ids)
-
- def test_create_user_doesnt_modify_passed_in_dict(self):
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- original_user = new_user.copy()
- self.identity_api.create_user(new_user)
- self.assertDictEqual(original_user, new_user)
-
- def test_update_user_enable(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- user['enabled'] = False
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['enabled'], user_ref['enabled'])
-
- # If not present, enabled field should not be updated
- del user['enabled']
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertFalse(user_ref['enabled'])
-
- user['enabled'] = True
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['enabled'], user_ref['enabled'])
-
- del user['enabled']
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- # Integers are valid Python's booleans. Explicitly test it.
- user['enabled'] = 0
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertFalse(user_ref['enabled'])
-
- # Any integers other than 0 are interpreted as True
- user['enabled'] = -42
- self.identity_api.update_user(user['id'], user)
- user_ref = self.identity_api.get_user(user['id'])
- # NOTE(breton): below, attribute `enabled` is explicitly tested to be
- # equal True. assertTrue should not be used, because it converts
- # the passed value to bool().
- self.assertIs(user_ref['enabled'], True)
-
- def test_update_user_name(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertEqual(user['name'], user_ref['name'])
-
- changed_name = user_ref['name'] + '_changed'
- user_ref['name'] = changed_name
- updated_user = self.identity_api.update_user(user_ref['id'], user_ref)
-
- # NOTE(dstanek): the SQL backend adds an 'extra' field containing a
- # dictionary of the extra fields in addition to the
- # fields in the object. For the details see:
- # SqlIdentity.test_update_project_returns_extra
- updated_user.pop('extra', None)
-
- self.assertDictEqual(user_ref, updated_user)
-
- user_ref = self.identity_api.get_user(user_ref['id'])
- self.assertEqual(changed_name, user_ref['name'])
-
- def test_update_user_enable_fails(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- user = self.identity_api.create_user(user)
- user_ref = self.identity_api.get_user(user['id'])
- self.assertTrue(user_ref['enabled'])
-
- # Strings are not valid boolean values
- user['enabled'] = 'false'
- self.assertRaises(exception.ValidationError,
- self.identity_api.update_user,
- user['id'],
- user)
-
- def test_add_user_to_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
-
- found = False
- for x in groups:
- if (x['id'] == new_group['id']):
- found = True
- self.assertTrue(found)
-
- def test_add_user_to_group_returns_not_found(self):
- domain = self._get_domain_fixture()
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.add_user_to_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- self.assertRaises(exception.UserNotFound,
- self.identity_api.add_user_to_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.NotFound,
- self.identity_api.add_user_to_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_check_user_in_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
-
- def test_check_user_not_in_group(self):
- new_group = unit.new_group_ref(
- domain_id=CONF.identity.default_domain_id)
- new_group = self.identity_api.create_group(new_group)
-
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- new_user = self.identity_api.create_user(new_user)
-
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- new_user['id'],
- new_group['id'])
-
- def test_check_user_in_group_returns_not_found(self):
- new_user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
- new_user = self.identity_api.create_user(new_user)
-
- new_group = unit.new_group_ref(
- domain_id=CONF.identity.default_domain_id)
- new_group = self.identity_api.create_group(new_group)
-
- self.assertRaises(exception.UserNotFound,
- self.identity_api.check_user_in_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.check_user_in_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- self.assertRaises(exception.NotFound,
- self.identity_api.check_user_in_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_list_users_in_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- # Make sure we get an empty list back on a new group, not an error.
- user_refs = self.identity_api.list_users_in_group(new_group['id'])
- self.assertEqual([], user_refs)
- # Make sure we get the correct users back once they have been added
- # to the group.
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- user_refs = self.identity_api.list_users_in_group(new_group['id'])
- found = False
- for x in user_refs:
- if (x['id'] == new_user['id']):
- found = True
- self.assertNotIn('password', x)
- self.assertTrue(found)
-
- def test_list_users_in_group_returns_not_found(self):
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.list_users_in_group,
- uuid.uuid4().hex)
-
- def test_list_groups_for_user(self):
- domain = self._get_domain_fixture()
- test_groups = []
- test_users = []
- GROUP_COUNT = 3
- USER_COUNT = 2
-
- for x in range(0, USER_COUNT):
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- test_users.append(new_user)
- positive_user = test_users[0]
- negative_user = test_users[1]
-
- for x in range(0, USER_COUNT):
- group_refs = self.identity_api.list_groups_for_user(
- test_users[x]['id'])
- self.assertEqual(0, len(group_refs))
-
- for x in range(0, GROUP_COUNT):
- before_count = x
- after_count = x + 1
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- test_groups.append(new_group)
-
- # add the user to the group and ensure that the
- # group count increases by one for each
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(before_count, len(group_refs))
- self.identity_api.add_user_to_group(
- positive_user['id'],
- new_group['id'])
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(after_count, len(group_refs))
-
- # Make sure the group count for the unrelated user did not change
- group_refs = self.identity_api.list_groups_for_user(
- negative_user['id'])
- self.assertEqual(0, len(group_refs))
-
- # remove the user from each group and ensure that
- # the group count reduces by one for each
- for x in range(0, 3):
- before_count = GROUP_COUNT - x
- after_count = GROUP_COUNT - x - 1
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(before_count, len(group_refs))
- self.identity_api.remove_user_from_group(
- positive_user['id'],
- test_groups[x]['id'])
- group_refs = self.identity_api.list_groups_for_user(
- positive_user['id'])
- self.assertEqual(after_count, len(group_refs))
- # Make sure the group count for the unrelated user
- # did not change
- group_refs = self.identity_api.list_groups_for_user(
- negative_user['id'])
- self.assertEqual(0, len(group_refs))
-
- def test_remove_user_from_group(self):
- domain = self._get_domain_fixture()
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- self.identity_api.add_user_to_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
- self.assertIn(new_group['id'], [x['id'] for x in groups])
- self.identity_api.remove_user_from_group(new_user['id'],
- new_group['id'])
- groups = self.identity_api.list_groups_for_user(new_user['id'])
- self.assertNotIn(new_group['id'], [x['id'] for x in groups])
-
- def test_remove_user_from_group_returns_not_found(self):
- domain = self._get_domain_fixture()
- new_user = unit.new_user_ref(domain_id=domain['id'])
- new_user = self.identity_api.create_user(new_user)
- new_group = unit.new_group_ref(domain_id=domain['id'])
- new_group = self.identity_api.create_group(new_group)
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.remove_user_from_group,
- new_user['id'],
- uuid.uuid4().hex)
-
- self.assertRaises(exception.UserNotFound,
- self.identity_api.remove_user_from_group,
- uuid.uuid4().hex,
- new_group['id'])
-
- self.assertRaises(exception.NotFound,
- self.identity_api.remove_user_from_group,
- uuid.uuid4().hex,
- uuid.uuid4().hex)
-
- def test_group_crud(self):
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- group = unit.new_group_ref(domain_id=domain['id'])
- group = self.identity_api.create_group(group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictContainsSubset(group, group_ref)
-
- group['name'] = uuid.uuid4().hex
- self.identity_api.update_group(group['id'], group)
- group_ref = self.identity_api.get_group(group['id'])
- self.assertDictContainsSubset(group, group_ref)
-
- self.identity_api.delete_group(group['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group,
- group['id'])
-
- def test_get_group_by_name(self):
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group_name = group['name']
- group = self.identity_api.create_group(group)
- spoiler = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- self.identity_api.create_group(spoiler)
-
- group_ref = self.identity_api.get_group_by_name(
- group_name, CONF.identity.default_domain_id)
- self.assertDictEqual(group, group_ref)
-
- def test_get_group_by_name_returns_not_found(self):
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group_by_name,
- uuid.uuid4().hex,
- CONF.identity.default_domain_id)
-
- @unit.skip_if_cache_disabled('identity')
- def test_cache_layer_group_crud(self):
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group = self.identity_api.create_group(group)
- # cache the result
- group_ref = self.identity_api.get_group(group['id'])
- # delete the group bypassing identity api.
- domain_id, driver, entity_id = (
- self.identity_api._get_domain_driver_and_entity_id(group['id']))
- driver.delete_group(entity_id)
-
- self.assertEqual(group_ref, self.identity_api.get_group(group['id']))
- self.identity_api.get_group.invalidate(self.identity_api, group['id'])
- self.assertRaises(exception.GroupNotFound,
- self.identity_api.get_group, group['id'])
-
- group = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group = self.identity_api.create_group(group)
- # cache the result
- self.identity_api.get_group(group['id'])
- group['name'] = uuid.uuid4().hex
- group_ref = self.identity_api.update_group(group['id'], group)
- # after updating through identity api, get updated group
- self.assertDictContainsSubset(self.identity_api.get_group(group['id']),
- group_ref)
-
- def test_create_duplicate_group_name_fails(self):
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id,
- name=group1['name'])
- group1 = self.identity_api.create_group(group1)
- self.assertRaises(exception.Conflict,
- self.identity_api.create_group,
- group2)
-
- def test_create_duplicate_group_name_in_different_domains(self):
- new_domain = unit.new_domain_ref()
- self.resource_api.create_domain(new_domain['id'], new_domain)
- group1 = unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
- group2 = unit.new_group_ref(domain_id=new_domain['id'],
- name=group1['name'])
- group1 = self.identity_api.create_group(group1)
- group2 = self.identity_api.create_group(group2)
-
- def test_move_group_between_domains(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- group = unit.new_group_ref(domain_id=domain1['id'])
- group = self.identity_api.create_group(group)
- group['domain_id'] = domain2['id']
- # Update the group asserting that a deprecation warning is emitted
- with mock.patch(
- 'oslo_log.versionutils.report_deprecated_feature') as mock_dep:
- self.identity_api.update_group(group['id'], group)
- self.assertTrue(mock_dep.called)
-
- updated_group_ref = self.identity_api.get_group(group['id'])
- self.assertEqual(domain2['id'], updated_group_ref['domain_id'])
-
- def test_move_group_between_domains_with_clashing_names_fails(self):
- domain1 = unit.new_domain_ref()
- self.resource_api.create_domain(domain1['id'], domain1)
- domain2 = unit.new_domain_ref()
- self.resource_api.create_domain(domain2['id'], domain2)
- # First, create a group in domain1
- group1 = unit.new_group_ref(domain_id=domain1['id'])
- group1 = self.identity_api.create_group(group1)
- # Now create a group in domain2 with a potentially clashing
- # name - which should work since we have domain separation
- group2 = unit.new_group_ref(name=group1['name'],
- domain_id=domain2['id'])
- group2 = self.identity_api.create_group(group2)
- # Now try and move group1 into the 2nd domain - which should
- # fail since the names clash
- group1['domain_id'] = domain2['id']
- self.assertRaises(exception.Conflict,
- self.identity_api.update_group,
- group1['id'],
- group1)
-
- def test_user_crud(self):
- user_dict = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id)
- del user_dict['id']
- user = self.identity_api.create_user(user_dict)
- user_ref = self.identity_api.get_user(user['id'])
- del user_dict['password']
- user_ref_dict = {x: user_ref[x] for x in user_ref}
- self.assertDictContainsSubset(user_dict, user_ref_dict)
-
- user_dict['password'] = uuid.uuid4().hex
- self.identity_api.update_user(user['id'], user_dict)
- user_ref = self.identity_api.get_user(user['id'])
- del user_dict['password']
- user_ref_dict = {x: user_ref[x] for x in user_ref}
- self.assertDictContainsSubset(user_dict, user_ref_dict)
-
- self.identity_api.delete_user(user['id'])
- self.assertRaises(exception.UserNotFound,
- self.identity_api.get_user,
- user['id'])
-
- def test_arbitrary_attributes_are_returned_from_create_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- user = self.identity_api.create_user(user_data)
-
- self.assertEqual(attr_value, user['arbitrary_attr'])
-
- def test_arbitrary_attributes_are_returned_from_get_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- user_data = self.identity_api.create_user(user_data)
-
- user = self.identity_api.get_user(user_data['id'])
- self.assertEqual(attr_value, user['arbitrary_attr'])
-
- def test_new_arbitrary_attributes_are_returned_from_update_user(self):
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id)
-
- user = self.identity_api.create_user(user_data)
- attr_value = uuid.uuid4().hex
- user['arbitrary_attr'] = attr_value
- updated_user = self.identity_api.update_user(user['id'], user)
-
- self.assertEqual(attr_value, updated_user['arbitrary_attr'])
-
- def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
- attr_value = uuid.uuid4().hex
- user_data = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id,
- arbitrary_attr=attr_value)
-
- new_attr_value = uuid.uuid4().hex
- user = self.identity_api.create_user(user_data)
- user['arbitrary_attr'] = new_attr_value
- updated_user = self.identity_api.update_user(user['id'], user)
-
- self.assertEqual(new_attr_value, updated_user['arbitrary_attr'])
-
- def test_user_update_and_user_get_return_same_response(self):
- user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
-
- user = self.identity_api.create_user(user)
-
- updated_user = {'enabled': False}
- updated_user_ref = self.identity_api.update_user(
- user['id'], updated_user)
-
- # SQL backend adds 'extra' field
- updated_user_ref.pop('extra', None)
-
- self.assertIs(False, updated_user_ref['enabled'])
-
- user_ref = self.identity_api.get_user(user['id'])
- self.assertDictEqual(updated_user_ref, user_ref)
-
-
-class FilterTests(filtering.FilterTests):
- def test_list_entities_filtered(self):
- for entity in ['user', 'group', 'project']:
- # Create 20 entities
- entity_list = self._create_test_data(entity, 20)
-
- # Try filtering to get one an exact item out of the list
- hints = driver_hints.Hints()
- hints.add_filter('name', entity_list[10]['name'])
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(1, len(entities))
- self.assertEqual(entity_list[10]['id'], entities[0]['id'])
- # Check the driver has removed the filter from the list hints
- self.assertFalse(hints.get_exact_filter_by_name('name'))
- self._delete_test_data(entity, entity_list)
-
- def test_list_users_inexact_filtered(self):
- # Create 20 users, some with specific names. We set the names at create
- # time (rather than updating them), since the LDAP driver does not
- # support name updates.
- user_name_data = {
- # user index: name for user
- 5: 'The',
- 6: 'The Ministry',
- 7: 'The Ministry of',
- 8: 'The Ministry of Silly',
- 9: 'The Ministry of Silly Walks',
- # ...and one for useful case insensitivity testing
- 10: 'The ministry of silly walks OF'
- }
- user_list = self._create_test_data(
- 'user', 20, domain_id=CONF.identity.default_domain_id,
- name_dict=user_name_data)
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'ministry', comparator='contains')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(5, len(users))
- self._match_with_list(users, user_list,
- list_start=6, list_end=11)
- # TODO(henry-nash) Check inexact filter has been removed.
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The', comparator='startswith')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(6, len(users))
- self._match_with_list(users, user_list,
- list_start=5, list_end=11)
- # TODO(henry-nash) Check inexact filter has been removed.
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'of', comparator='endswith')
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual(2, len(users))
- # We can't assume we will get back the users in any particular order
- self.assertIn(user_list[7]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[10]['id'], [users[0]['id'], users[1]['id']])
- # TODO(henry-nash) Check inexact filter has been removed.
-
- # TODO(henry-nash): Add some case sensitive tests. However,
- # these would be hard to validate currently, since:
- #
- # For SQL, the issue is that MySQL 0.7, by default, is installed in
- # case insensitive mode (which is what is run by default for our
- # SQL backend tests). For production deployments. OpenStack
- # assumes a case sensitive database. For these tests, therefore, we
- # need to be able to check the sensitivity of the database so as to
- # know whether to run case sensitive tests here.
- #
- # For LDAP/AD, although dependent on the schema being used, attributes
- # are typically configured to be case aware, but not case sensitive.
-
- self._delete_test_data('user', user_list)
-
- def _groups_for_user_data(self):
- number_of_groups = 10
- group_name_data = {
- # entity index: name for entity
- 5: 'The',
- 6: 'The Ministry',
- 9: 'The Ministry of Silly Walks',
- }
- group_list = self._create_test_data(
- 'group', number_of_groups,
- domain_id=CONF.identity.default_domain_id,
- name_dict=group_name_data)
- user_list = self._create_test_data('user', 2)
-
- for group in range(7):
- # Create membership, including with two out of the three groups
- # with well know names
- self.identity_api.add_user_to_group(user_list[0]['id'],
- group_list[group]['id'])
- # ...and some spoiler memberships
- for group in range(7, number_of_groups):
- self.identity_api.add_user_to_group(user_list[1]['id'],
- group_list[group]['id'])
-
- return group_list, user_list
-
- def test_groups_for_user_inexact_filtered(self):
- """Test use of filtering doesn't break groups_for_user listing.
-
- Some backends may use filtering to achieve the list of groups for a
- user, so test that it can combine a second filter.
-
- Test Plan:
-
- - Create 10 groups, some with names we can filter on
- - Create 2 users
- - Assign 1 of those users to most of the groups, including some of the
- well known named ones
- - Assign the other user to other groups as spoilers
- - Ensure that when we list groups for users with a filter on the group
- name, both restrictions have been enforced on what is returned.
-
- """
- group_list, user_list = self._groups_for_user_data()
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Ministry', comparator='contains')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back one group, since of the two that contain
- # 'Ministry' the user only belongs to one.
- self.assertThat(len(groups), matchers.Equals(1))
- self.assertEqual(group_list[6]['id'], groups[0]['id'])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The', comparator='startswith')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back 2 out of the 3 groups that start with 'The'
- # hence showing that both "filters" have been applied
- self.assertThat(len(groups), matchers.Equals(2))
- self.assertIn(group_list[5]['id'], [groups[0]['id'], groups[1]['id']])
- self.assertIn(group_list[6]['id'], [groups[0]['id'], groups[1]['id']])
-
- hints.add_filter('name', 'The', comparator='endswith')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back one group since it is the only one that
- # ends with 'The'
- self.assertThat(len(groups), matchers.Equals(1))
- self.assertEqual(group_list[5]['id'], groups[0]['id'])
-
- self._delete_test_data('user', user_list)
- self._delete_test_data('group', group_list)
-
- def test_groups_for_user_exact_filtered(self):
- """Test exact filters doesn't break groups_for_user listing."""
- group_list, user_list = self._groups_for_user_data()
- hints = driver_hints.Hints()
- hints.add_filter('name', 'The Ministry', comparator='equals')
- groups = self.identity_api.list_groups_for_user(
- user_list[0]['id'], hints=hints)
- # We should only get back 1 out of the 3 groups with name 'The
- # Ministry' hence showing that both "filters" have been applied.
- self.assertEqual(1, len(groups))
- self.assertEqual(group_list[6]['id'], groups[0]['id'])
- self._delete_test_data('user', user_list)
- self._delete_test_data('group', group_list)
-
- def _get_user_name_field_size(self):
- """Return the size of the user name field for the backend.
-
- Subclasses can override this method to indicate that the user name
- field is limited in length. The user name is the field used in the test
- that validates that a filter value works even if it's longer than a
- field.
-
- If the backend doesn't limit the value length then return None.
-
- """
- return None
-
- def test_filter_value_wider_than_field(self):
- # If a filter value is given that's larger than the field in the
- # backend then no values are returned.
-
- user_name_field_size = self._get_user_name_field_size()
-
- if user_name_field_size is None:
- # The backend doesn't limit the size of the user name, so pass this
- # test.
- return
-
- # Create some users just to make sure would return something if the
- # filter was ignored.
- self._create_test_data('user', 2)
-
- hints = driver_hints.Hints()
- value = 'A' * (user_name_field_size + 1)
- hints.add_filter('name', value)
- users = self.identity_api.list_users(hints=hints)
- self.assertEqual([], users)
-
- def _list_users_in_group_data(self):
- number_of_users = 10
- user_name_data = {
- 1: 'Arthur Conan Doyle',
- 3: 'Arthur Rimbaud',
- 9: 'Arthur Schopenhauer',
- }
- user_list = self._create_test_data(
- 'user', number_of_users,
- domain_id=CONF.identity.default_domain_id,
- name_dict=user_name_data)
- group = self._create_one_entity(
- 'group', CONF.identity.default_domain_id, 'Great Writers')
- for i in range(7):
- self.identity_api.add_user_to_group(user_list[i]['id'],
- group['id'])
-
- return user_list, group
-
- def test_list_users_in_group_inexact_filtered(self):
- user_list, group = self._list_users_in_group_data()
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Arthur', comparator='contains')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(2))
- self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Arthur', comparator='startswith')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(2))
- self.assertIn(user_list[1]['id'], [users[0]['id'], users[1]['id']])
- self.assertIn(user_list[3]['id'], [users[0]['id'], users[1]['id']])
-
- hints = driver_hints.Hints()
- hints.add_filter('name', 'Doyle', comparator='endswith')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertThat(len(users), matchers.Equals(1))
- self.assertEqual(user_list[1]['id'], users[0]['id'])
-
- self._delete_test_data('user', user_list)
- self._delete_entity('group')(group['id'])
-
- def test_list_users_in_group_exact_filtered(self):
- hints = driver_hints.Hints()
- user_list, group = self._list_users_in_group_data()
- hints.add_filter('name', 'Arthur Rimbaud', comparator='equals')
- users = self.identity_api.list_users_in_group(group['id'], hints=hints)
- self.assertEqual(1, len(users))
- self.assertEqual(user_list[3]['id'], users[0]['id'])
- self._delete_test_data('user', user_list)
- self._delete_entity('group')(group['id'])
-
-
-class LimitTests(filtering.FilterTests):
- ENTITIES = ['user', 'group', 'project']
-
- def setUp(self):
- """Setup for Limit Test Cases."""
- self.entity_lists = {}
-
- for entity in self.ENTITIES:
- # Create 20 entities
- self.entity_lists[entity] = self._create_test_data(entity, 20)
- self.addCleanup(self.clean_up_entities)
-
- def clean_up_entities(self):
- """Clean up entity test data from Limit Test Cases."""
- for entity in self.ENTITIES:
- self._delete_test_data(entity, self.entity_lists[entity])
- del self.entity_lists
-
- def _test_list_entity_filtered_and_limited(self, entity):
- self.config_fixture.config(list_limit=10)
- # Should get back just 10 entities
- hints = driver_hints.Hints()
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(hints.limit['limit'], len(entities))
- self.assertTrue(hints.limit['truncated'])
-
- # Override with driver specific limit
- if entity == 'project':
- self.config_fixture.config(group='resource', list_limit=5)
- else:
- self.config_fixture.config(group='identity', list_limit=5)
-
- # Should get back just 5 users
- hints = driver_hints.Hints()
- entities = self._list_entities(entity)(hints=hints)
- self.assertEqual(hints.limit['limit'], len(entities))
-
- # Finally, let's pretend we want to get the full list of entities,
- # even with the limits set, as part of some internal calculation.
- # Calling the API without a hints list should achieve this, and
- # return at least the 20 entries we created (there may be other
- # entities lying around created by other tests/setup).
- entities = self._list_entities(entity)()
- self.assertTrue(len(entities) >= 20)
- self._match_with_list(self.entity_lists[entity], entities)
-
- def test_list_users_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('user')
-
- def test_list_groups_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('group')
-
- def test_list_projects_filtered_and_limited(self):
- self._test_list_entity_filtered_and_limited('project')
diff --git a/keystone-moon/keystone/tests/unit/identity/test_controllers.py b/keystone-moon/keystone/tests/unit/identity/test_controllers.py
deleted file mode 100644
index ed2fe3ff..00000000
--- a/keystone-moon/keystone/tests/unit/identity/test_controllers.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright 2016 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-from oslo_config import cfg
-
-from keystone import exception
-from keystone.identity import controllers
-from keystone.tests import unit
-from keystone.tests.unit.ksfixtures import database
-
-
-CONF = cfg.CONF
-
-_ADMIN_CONTEXT = {'is_admin': True, 'query_string': {}}
-
-
-class UserTestCaseNoDefaultDomain(unit.TestCase):
-
- def setUp(self):
- super(UserTestCaseNoDefaultDomain, self).setUp()
- self.useFixture(database.Database())
- self.load_backends()
- self.user_controller = controllers.User()
-
- def test_setup(self):
- # Other tests in this class assume there's no default domain, so make
- # sure the setUp worked as expected.
- self.assertRaises(
- exception.DomainNotFound,
- self.resource_api.get_domain, CONF.identity.default_domain_id)
-
- def test_get_users(self):
- # When list_users is done and there's no default domain, the result is
- # an empty list.
- res = self.user_controller.get_users(_ADMIN_CONTEXT)
- self.assertEqual([], res['users'])
-
- def test_get_user_by_name(self):
- # When get_user_by_name is done and there's no default domain, the
- # result is 404 Not Found
- user_name = uuid.uuid4().hex
- self.assertRaises(
- exception.UserNotFound,
- self.user_controller.get_user_by_name, _ADMIN_CONTEXT, user_name)
-
- def test_create_user(self):
- # When a user is created using the v2 controller and there's no default
- # domain, it doesn't fail with can't find domain (a default domain is
- # created)
- user = {'name': uuid.uuid4().hex}
- self.user_controller.create_user(_ADMIN_CONTEXT, user)
- # If the above doesn't fail then this is successful.
diff --git a/keystone-moon/keystone/tests/unit/identity/test_core.py b/keystone-moon/keystone/tests/unit/identity/test_core.py
deleted file mode 100644
index 39f3c701..00000000
--- a/keystone-moon/keystone/tests/unit/identity/test_core.py
+++ /dev/null
@@ -1,176 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Unit tests for core identity behavior."""
-
-import itertools
-import os
-import uuid
-
-import mock
-from oslo_config import cfg
-from oslo_config import fixture as config_fixture
-
-from keystone import exception
-from keystone import identity
-from keystone.tests import unit
-from keystone.tests.unit.ksfixtures import database
-
-
-CONF = cfg.CONF
-
-
-class TestDomainConfigs(unit.BaseTestCase):
-
- def setUp(self):
- super(TestDomainConfigs, self).setUp()
- self.addCleanup(CONF.reset)
-
- self.tmp_dir = unit.dirs.tmp()
-
- self.config_fixture = self.useFixture(config_fixture.Config(CONF))
- self.config_fixture.config(domain_config_dir=self.tmp_dir,
- group='identity')
-
- def test_config_for_nonexistent_domain(self):
- """Having a config for a non-existent domain will be ignored.
-
- There are no assertions in this test because there are no side
- effects. If there is a config file for a domain that does not
- exist it should be ignored.
-
- """
- domain_id = uuid.uuid4().hex
- domain_config_filename = os.path.join(self.tmp_dir,
- 'keystone.%s.conf' % domain_id)
- self.addCleanup(lambda: os.remove(domain_config_filename))
- with open(domain_config_filename, 'w'):
- """Write an empty config file."""
-
- e = exception.DomainNotFound(domain_id=domain_id)
- mock_assignment_api = mock.Mock()
- mock_assignment_api.get_domain_by_name.side_effect = e
-
- domain_config = identity.DomainConfigs()
- fake_standard_driver = None
- domain_config.setup_domain_drivers(fake_standard_driver,
- mock_assignment_api)
-
- def test_config_for_dot_name_domain(self):
- # Ensure we can get the right domain name which has dots within it
- # from filename.
- domain_config_filename = os.path.join(self.tmp_dir,
- 'keystone.abc.def.com.conf')
- with open(domain_config_filename, 'w'):
- """Write an empty config file."""
- self.addCleanup(os.remove, domain_config_filename)
-
- with mock.patch.object(identity.DomainConfigs,
- '_load_config_from_file') as mock_load_config:
- domain_config = identity.DomainConfigs()
- fake_assignment_api = None
- fake_standard_driver = None
- domain_config.setup_domain_drivers(fake_standard_driver,
- fake_assignment_api)
- mock_load_config.assert_called_once_with(fake_assignment_api,
- [domain_config_filename],
- 'abc.def.com')
-
- def test_config_for_multiple_sql_backend(self):
- domains_config = identity.DomainConfigs()
-
- # Create the right sequence of is_sql in the drivers being
- # requested to expose the bug, which is that a False setting
- # means it forgets previous True settings.
- drivers = []
- files = []
- for idx, is_sql in enumerate((True, False, True)):
- drv = mock.Mock(is_sql=is_sql)
- drivers.append(drv)
- name = 'dummy.{0}'.format(idx)
- files.append(''.join((
- identity.DOMAIN_CONF_FHEAD,
- name,
- identity.DOMAIN_CONF_FTAIL)))
-
- walk_fake = lambda *a, **kwa: (
- ('/fake/keystone/domains/config', [], files), )
-
- generic_driver = mock.Mock(is_sql=False)
-
- assignment_api = mock.Mock()
- id_factory = itertools.count()
- assignment_api.get_domain_by_name.side_effect = (
- lambda name: {'id': next(id_factory), '_': 'fake_domain'})
- load_driver_mock = mock.Mock(side_effect=drivers)
-
- with mock.patch.object(os, 'walk', walk_fake):
- with mock.patch.object(identity.cfg, 'ConfigOpts'):
- with mock.patch.object(domains_config, '_load_driver',
- load_driver_mock):
- self.assertRaises(
- exception.MultipleSQLDriversInConfig,
- domains_config.setup_domain_drivers,
- generic_driver, assignment_api)
-
- self.assertEqual(3, load_driver_mock.call_count)
-
-
-class TestDatabaseDomainConfigs(unit.TestCase):
-
- def setUp(self):
- super(TestDatabaseDomainConfigs, self).setUp()
- self.useFixture(database.Database())
- self.load_backends()
-
- def test_domain_config_in_database_disabled_by_default(self):
- self.assertFalse(CONF.identity.domain_configurations_from_database)
-
- def test_loading_config_from_database(self):
- self.config_fixture.config(domain_configurations_from_database=True,
- group='identity')
- domain = unit.new_domain_ref()
- self.resource_api.create_domain(domain['id'], domain)
- # Override two config options for our domain
- conf = {'ldap': {'url': uuid.uuid4().hex,
- 'suffix': uuid.uuid4().hex,
- 'use_tls': 'True'},
- 'identity': {
- 'driver': 'ldap'}}
- self.domain_config_api.create_config(domain['id'], conf)
- fake_standard_driver = None
- domain_config = identity.DomainConfigs()
- domain_config.setup_domain_drivers(fake_standard_driver,
- self.resource_api)
- # Make sure our two overrides are in place, and others are not affected
- res = domain_config.get_domain_conf(domain['id'])
- self.assertEqual(conf['ldap']['url'], res.ldap.url)
- self.assertEqual(conf['ldap']['suffix'], res.ldap.suffix)
- self.assertEqual(CONF.ldap.query_scope, res.ldap.query_scope)
-
- # Make sure the override is not changing the type of the config value
- use_tls_type = type(CONF.ldap.use_tls)
- self.assertEqual(use_tls_type(conf['ldap']['use_tls']),
- res.ldap.use_tls)
-
- # Now turn off using database domain configuration and check that the
- # default config file values are now seen instead of the overrides.
- CONF.set_override('domain_configurations_from_database', False,
- 'identity', enforce_type=True)
- domain_config = identity.DomainConfigs()
- domain_config.setup_domain_drivers(fake_standard_driver,
- self.resource_api)
- res = domain_config.get_domain_conf(domain['id'])
- self.assertEqual(CONF.ldap.url, res.ldap.url)
- self.assertEqual(CONF.ldap.suffix, res.ldap.suffix)
- self.assertEqual(CONF.ldap.use_tls, res.ldap.use_tls)
- self.assertEqual(CONF.ldap.query_scope, res.ldap.query_scope)