diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_identity.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_identity.py | 584 |
1 files changed, 584 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_identity.py b/keystone-moon/keystone/tests/unit/test_v3_identity.py new file mode 100644 index 00000000..ac077297 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v3_identity.py @@ -0,0 +1,584 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from oslo_config import cfg +from testtools import matchers + +from keystone.common import controller +from keystone import exception +from keystone.tests import unit as tests +from keystone.tests.unit import test_v3 + + +CONF = cfg.CONF + + +class IdentityTestCase(test_v3.RestfulTestCase): + """Test users and groups.""" + + def setUp(self): + super(IdentityTestCase, self).setUp() + + self.group = self.new_group_ref( + domain_id=self.domain_id) + self.group = self.identity_api.create_group(self.group) + self.group_id = self.group['id'] + + self.credential_id = uuid.uuid4().hex + self.credential = self.new_credential_ref( + user_id=self.user['id'], + project_id=self.project_id) + self.credential['id'] = self.credential_id + self.credential_api.create_credential( + self.credential_id, + self.credential) + + # user crud tests + + def test_create_user(self): + """Call ``POST /users``.""" + ref = self.new_user_ref(domain_id=self.domain_id) + r = self.post( + '/users', + body={'user': ref}) + return self.assertValidUserResponse(r, ref) + + def test_create_user_without_domain(self): + """Call ``POST /users`` without specifying domain. + + According to the identity-api specification, if you do not + explicitly specific the domain_id in the entity, it should + take the domain scope of the token as the domain_id. + + """ + # Create a user with a role on the domain so we can get a + # domain scoped token + domain = self.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user = self.new_user_ref(domain_id=domain['id']) + password = user['password'] + user = self.identity_api.create_user(user) + user['password'] = password + self.assignment_api.create_grant( + role_id=self.role_id, user_id=user['id'], + domain_id=domain['id']) + + ref = self.new_user_ref(domain_id=domain['id']) + ref_nd = ref.copy() + ref_nd.pop('domain_id') + auth = self.build_authentication_request( + user_id=user['id'], + password=user['password'], + domain_id=domain['id']) + r = self.post('/users', body={'user': ref_nd}, auth=auth) + self.assertValidUserResponse(r, ref) + + # Now try the same thing without a domain token - which should fail + ref = self.new_user_ref(domain_id=domain['id']) + ref_nd = ref.copy() + ref_nd.pop('domain_id') + auth = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id']) + r = self.post('/users', body={'user': ref_nd}, auth=auth) + # TODO(henry-nash): Due to bug #1283539 we currently automatically + # use the default domain_id if a domain scoped token is not being + # used. Change the code below to expect a failure once this bug is + # fixed. + ref['domain_id'] = CONF.identity.default_domain_id + return self.assertValidUserResponse(r, ref) + + def test_create_user_400(self): + """Call ``POST /users``.""" + self.post('/users', body={'user': {}}, expected_status=400) + + def test_list_users(self): + """Call ``GET /users``.""" + resource_url = '/users' + r = self.get(resource_url) + self.assertValidUserListResponse(r, ref=self.user, + resource_url=resource_url) + + def test_list_users_with_multiple_backends(self): + """Call ``GET /users`` when multiple backends is enabled. + + In this scenario, the controller requires a domain to be specified + either as a filter or by using a domain scoped token. + + """ + self.config_fixture.config(group='identity', + domain_specific_drivers_enabled=True) + + # Create a user with a role on the domain so we can get a + # domain scoped token + domain = self.new_domain_ref() + self.resource_api.create_domain(domain['id'], domain) + user = self.new_user_ref(domain_id=domain['id']) + password = user['password'] + user = self.identity_api.create_user(user) + user['password'] = password + self.assignment_api.create_grant( + role_id=self.role_id, user_id=user['id'], + domain_id=domain['id']) + + ref = self.new_user_ref(domain_id=domain['id']) + ref_nd = ref.copy() + ref_nd.pop('domain_id') + auth = self.build_authentication_request( + user_id=user['id'], + password=user['password'], + domain_id=domain['id']) + + # First try using a domain scoped token + resource_url = '/users' + r = self.get(resource_url, auth=auth) + self.assertValidUserListResponse(r, ref=user, + resource_url=resource_url) + + # Now try with an explicit filter + resource_url = ('/users?domain_id=%(domain_id)s' % + {'domain_id': domain['id']}) + r = self.get(resource_url) + self.assertValidUserListResponse(r, ref=user, + resource_url=resource_url) + + # Now try the same thing without a domain token or filter, + # which should fail + r = self.get('/users', expected_status=exception.Unauthorized.code) + + def test_list_users_with_static_admin_token_and_multiple_backends(self): + # domain-specific operations with the bootstrap ADMIN token is + # disallowed when domain-specific drivers are enabled + self.config_fixture.config(group='identity', + domain_specific_drivers_enabled=True) + self.get('/users', token=CONF.admin_token, + expected_status=exception.Unauthorized.code) + + def test_list_users_no_default_project(self): + """Call ``GET /users`` making sure no default_project_id.""" + user = self.new_user_ref(self.domain_id) + user = self.identity_api.create_user(user) + resource_url = '/users' + r = self.get(resource_url) + self.assertValidUserListResponse(r, ref=user, + resource_url=resource_url) + + def test_get_user(self): + """Call ``GET /users/{user_id}``.""" + r = self.get('/users/%(user_id)s' % { + 'user_id': self.user['id']}) + self.assertValidUserResponse(r, self.user) + + def test_get_user_with_default_project(self): + """Call ``GET /users/{user_id}`` making sure of default_project_id.""" + user = self.new_user_ref(domain_id=self.domain_id, + project_id=self.project_id) + user = self.identity_api.create_user(user) + r = self.get('/users/%(user_id)s' % {'user_id': user['id']}) + self.assertValidUserResponse(r, user) + + def test_add_user_to_group(self): + """Call ``PUT /groups/{group_id}/users/{user_id}``.""" + self.put('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + + def test_list_groups_for_user(self): + """Call ``GET /users/{user_id}/groups``.""" + + self.user1 = self.new_user_ref( + domain_id=self.domain['id']) + password = self.user1['password'] + self.user1 = self.identity_api.create_user(self.user1) + self.user1['password'] = password + self.user2 = self.new_user_ref( + domain_id=self.domain['id']) + password = self.user2['password'] + self.user2 = self.identity_api.create_user(self.user2) + self.user2['password'] = password + self.put('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user1['id']}) + + # Scenarios below are written to test the default policy configuration + + # One should be allowed to list one's own groups + auth = self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password']) + resource_url = ('/users/%(user_id)s/groups' % + {'user_id': self.user1['id']}) + r = self.get(resource_url, auth=auth) + self.assertValidGroupListResponse(r, ref=self.group, + resource_url=resource_url) + + # Administrator is allowed to list others' groups + resource_url = ('/users/%(user_id)s/groups' % + {'user_id': self.user1['id']}) + r = self.get(resource_url) + self.assertValidGroupListResponse(r, ref=self.group, + resource_url=resource_url) + + # Ordinary users should not be allowed to list other's groups + auth = self.build_authentication_request( + user_id=self.user2['id'], + password=self.user2['password']) + r = self.get('/users/%(user_id)s/groups' % { + 'user_id': self.user1['id']}, auth=auth, + expected_status=exception.ForbiddenAction.code) + + def test_check_user_in_group(self): + """Call ``HEAD /groups/{group_id}/users/{user_id}``.""" + self.put('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + self.head('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + + def test_list_users_in_group(self): + """Call ``GET /groups/{group_id}/users``.""" + self.put('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + resource_url = ('/groups/%(group_id)s/users' % + {'group_id': self.group_id}) + r = self.get(resource_url) + self.assertValidUserListResponse(r, ref=self.user, + resource_url=resource_url) + self.assertIn('/groups/%(group_id)s/users' % { + 'group_id': self.group_id}, r.result['links']['self']) + + def test_remove_user_from_group(self): + """Call ``DELETE /groups/{group_id}/users/{user_id}``.""" + self.put('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + self.delete('/groups/%(group_id)s/users/%(user_id)s' % { + 'group_id': self.group_id, 'user_id': self.user['id']}) + + def test_update_user(self): + """Call ``PATCH /users/{user_id}``.""" + user = self.new_user_ref(domain_id=self.domain_id) + del user['id'] + r = self.patch('/users/%(user_id)s' % { + 'user_id': self.user['id']}, + body={'user': user}) + self.assertValidUserResponse(r, user) + + def test_admin_password_reset(self): + # bootstrap a user as admin + user_ref = self.new_user_ref(domain_id=self.domain['id']) + password = user_ref['password'] + user_ref = self.identity_api.create_user(user_ref) + + # auth as user should work before a password change + old_password_auth = self.build_authentication_request( + user_id=user_ref['id'], + password=password) + r = self.v3_authenticate_token(old_password_auth, expected_status=201) + old_token = r.headers.get('X-Subject-Token') + + # auth as user with a token should work before a password change + old_token_auth = self.build_authentication_request(token=old_token) + self.v3_authenticate_token(old_token_auth, expected_status=201) + + # administrative password reset + new_password = uuid.uuid4().hex + self.patch('/users/%s' % user_ref['id'], + body={'user': {'password': new_password}}, + expected_status=200) + + # auth as user with original password should not work after change + self.v3_authenticate_token(old_password_auth, expected_status=401) + + # auth as user with an old token should not work after change + self.v3_authenticate_token(old_token_auth, expected_status=404) + + # new password should work + new_password_auth = self.build_authentication_request( + user_id=user_ref['id'], + password=new_password) + self.v3_authenticate_token(new_password_auth, expected_status=201) + + def test_update_user_domain_id(self): + """Call ``PATCH /users/{user_id}`` with domain_id.""" + user = self.new_user_ref(domain_id=self.domain['id']) + user = self.identity_api.create_user(user) + user['domain_id'] = CONF.identity.default_domain_id + r = self.patch('/users/%(user_id)s' % { + 'user_id': user['id']}, + body={'user': user}, + expected_status=exception.ValidationError.code) + self.config_fixture.config(domain_id_immutable=False) + user['domain_id'] = self.domain['id'] + r = self.patch('/users/%(user_id)s' % { + 'user_id': user['id']}, + body={'user': user}) + self.assertValidUserResponse(r, user) + + def test_delete_user(self): + """Call ``DELETE /users/{user_id}``. + + As well as making sure the delete succeeds, we ensure + that any credentials that reference this user are + also deleted, while other credentials are unaffected. + In addition, no tokens should remain valid for this user. + + """ + # First check the credential for this user is present + r = self.credential_api.get_credential(self.credential['id']) + self.assertDictEqual(r, self.credential) + # Create a second credential with a different user + self.user2 = self.new_user_ref( + domain_id=self.domain['id'], + project_id=self.project['id']) + self.user2 = self.identity_api.create_user(self.user2) + self.credential2 = self.new_credential_ref( + user_id=self.user2['id'], + project_id=self.project['id']) + self.credential_api.create_credential( + self.credential2['id'], + self.credential2) + # Create a token for this user which we can check later + # gets deleted + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id']) + token = self.get_requested_token(auth_data) + # Confirm token is valid for now + self.head('/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=200) + + # Now delete the user + self.delete('/users/%(user_id)s' % { + 'user_id': self.user['id']}) + + # Deleting the user should have deleted any credentials + # that reference this project + self.assertRaises(exception.CredentialNotFound, + self.credential_api.get_credential, + self.credential['id']) + # And the no tokens we remain valid + tokens = self.token_provider_api._persistence._list_tokens( + self.user['id']) + self.assertEqual(0, len(tokens)) + # But the credential for user2 is unaffected + r = self.credential_api.get_credential(self.credential2['id']) + self.assertDictEqual(r, self.credential2) + + # group crud tests + + def test_create_group(self): + """Call ``POST /groups``.""" + ref = self.new_group_ref(domain_id=self.domain_id) + r = self.post( + '/groups', + body={'group': ref}) + return self.assertValidGroupResponse(r, ref) + + def test_create_group_400(self): + """Call ``POST /groups``.""" + self.post('/groups', body={'group': {}}, expected_status=400) + + def test_list_groups(self): + """Call ``GET /groups``.""" + resource_url = '/groups' + r = self.get(resource_url) + self.assertValidGroupListResponse(r, ref=self.group, + resource_url=resource_url) + + def test_get_group(self): + """Call ``GET /groups/{group_id}``.""" + r = self.get('/groups/%(group_id)s' % { + 'group_id': self.group_id}) + self.assertValidGroupResponse(r, self.group) + + def test_update_group(self): + """Call ``PATCH /groups/{group_id}``.""" + group = self.new_group_ref(domain_id=self.domain_id) + del group['id'] + r = self.patch('/groups/%(group_id)s' % { + 'group_id': self.group_id}, + body={'group': group}) + self.assertValidGroupResponse(r, group) + + def test_update_group_domain_id(self): + """Call ``PATCH /groups/{group_id}`` with domain_id.""" + group = self.new_group_ref(domain_id=self.domain['id']) + group = self.identity_api.create_group(group) + group['domain_id'] = CONF.identity.default_domain_id + r = self.patch('/groups/%(group_id)s' % { + 'group_id': group['id']}, + body={'group': group}, + expected_status=exception.ValidationError.code) + self.config_fixture.config(domain_id_immutable=False) + group['domain_id'] = self.domain['id'] + r = self.patch('/groups/%(group_id)s' % { + 'group_id': group['id']}, + body={'group': group}) + self.assertValidGroupResponse(r, group) + + def test_delete_group(self): + """Call ``DELETE /groups/{group_id}``.""" + self.delete('/groups/%(group_id)s' % { + 'group_id': self.group_id}) + + +class IdentityV3toV2MethodsTestCase(tests.TestCase): + """Test users V3 to V2 conversion methods.""" + + def setUp(self): + super(IdentityV3toV2MethodsTestCase, self).setUp() + self.load_backends() + self.user_id = uuid.uuid4().hex + self.default_project_id = uuid.uuid4().hex + self.tenant_id = uuid.uuid4().hex + self.domain_id = uuid.uuid4().hex + # User with only default_project_id in ref + self.user1 = {'id': self.user_id, + 'name': self.user_id, + 'default_project_id': self.default_project_id, + 'domain_id': self.domain_id} + # User without default_project_id or tenantId in ref + self.user2 = {'id': self.user_id, + 'name': self.user_id, + 'domain_id': self.domain_id} + # User with both tenantId and default_project_id in ref + self.user3 = {'id': self.user_id, + 'name': self.user_id, + 'default_project_id': self.default_project_id, + 'tenantId': self.tenant_id, + 'domain_id': self.domain_id} + # User with only tenantId in ref + self.user4 = {'id': self.user_id, + 'name': self.user_id, + 'tenantId': self.tenant_id, + 'domain_id': self.domain_id} + + # Expected result if the user is meant to have a tenantId element + self.expected_user = {'id': self.user_id, + 'name': self.user_id, + 'username': self.user_id, + 'tenantId': self.default_project_id} + + # Expected result if the user is not meant to have a tenantId element + self.expected_user_no_tenant_id = {'id': self.user_id, + 'name': self.user_id, + 'username': self.user_id} + + def test_v3_to_v2_user_method(self): + + updated_user1 = controller.V2Controller.v3_to_v2_user(self.user1) + self.assertIs(self.user1, updated_user1) + self.assertDictEqual(self.user1, self.expected_user) + updated_user2 = controller.V2Controller.v3_to_v2_user(self.user2) + self.assertIs(self.user2, updated_user2) + self.assertDictEqual(self.user2, self.expected_user_no_tenant_id) + updated_user3 = controller.V2Controller.v3_to_v2_user(self.user3) + self.assertIs(self.user3, updated_user3) + self.assertDictEqual(self.user3, self.expected_user) + updated_user4 = controller.V2Controller.v3_to_v2_user(self.user4) + self.assertIs(self.user4, updated_user4) + self.assertDictEqual(self.user4, self.expected_user_no_tenant_id) + + def test_v3_to_v2_user_method_list(self): + user_list = [self.user1, self.user2, self.user3, self.user4] + updated_list = controller.V2Controller.v3_to_v2_user(user_list) + + self.assertEqual(len(updated_list), len(user_list)) + + for i, ref in enumerate(updated_list): + # Order should not change. + self.assertIs(ref, user_list[i]) + + self.assertDictEqual(self.user1, self.expected_user) + self.assertDictEqual(self.user2, self.expected_user_no_tenant_id) + self.assertDictEqual(self.user3, self.expected_user) + self.assertDictEqual(self.user4, self.expected_user_no_tenant_id) + + +class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): + + def setUp(self): + super(UserSelfServiceChangingPasswordsTestCase, self).setUp() + self.user_ref = self.new_user_ref(domain_id=self.domain['id']) + password = self.user_ref['password'] + self.user_ref = self.identity_api.create_user(self.user_ref) + self.user_ref['password'] = password + self.token = self.get_request_token(self.user_ref['password'], 201) + + def get_request_token(self, password, expected_status): + auth_data = self.build_authentication_request( + user_id=self.user_ref['id'], + password=password) + r = self.v3_authenticate_token(auth_data, + expected_status=expected_status) + return r.headers.get('X-Subject-Token') + + def change_password(self, expected_status, **kwargs): + """Returns a test response for a change password request.""" + return self.post('/users/%s/password' % self.user_ref['id'], + body={'user': kwargs}, + token=self.token, + expected_status=expected_status) + + def test_changing_password(self): + # original password works + token_id = self.get_request_token(self.user_ref['password'], + expected_status=201) + # original token works + old_token_auth = self.build_authentication_request(token=token_id) + self.v3_authenticate_token(old_token_auth, expected_status=201) + + # change password + new_password = uuid.uuid4().hex + self.change_password(password=new_password, + original_password=self.user_ref['password'], + expected_status=204) + + # old password fails + self.get_request_token(self.user_ref['password'], expected_status=401) + + # old token fails + self.v3_authenticate_token(old_token_auth, expected_status=404) + + # new password works + self.get_request_token(new_password, expected_status=201) + + def test_changing_password_with_missing_original_password_fails(self): + r = self.change_password(password=uuid.uuid4().hex, + expected_status=400) + self.assertThat(r.result['error']['message'], + matchers.Contains('original_password')) + + def test_changing_password_with_missing_password_fails(self): + r = self.change_password(original_password=self.user_ref['password'], + expected_status=400) + self.assertThat(r.result['error']['message'], + matchers.Contains('password')) + + def test_changing_password_with_incorrect_password_fails(self): + self.change_password(password=uuid.uuid4().hex, + original_password=uuid.uuid4().hex, + expected_status=401) + + def test_changing_password_with_disabled_user_fails(self): + # disable the user account + self.user_ref['enabled'] = False + self.patch('/users/%s' % self.user_ref['id'], + body={'user': self.user_ref}) + + self.change_password(password=uuid.uuid4().hex, + original_password=self.user_ref['password'], + expected_status=401) |