From 2e7b4f2027a1147ca28301e4f88adf8274b39a1f Mon Sep 17 00:00:00 2001 From: DUVAL Thomas Date: Thu, 9 Jun 2016 09:11:50 +0200 Subject: Update Keystone core to Mitaka. Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db --- .../keystone/tests/unit/test_v3_identity.py | 461 ++++++++++++++------- 1 file changed, 305 insertions(+), 156 deletions(-) (limited to 'keystone-moon/keystone/tests/unit/test_v3_identity.py') diff --git a/keystone-moon/keystone/tests/unit/test_v3_identity.py b/keystone-moon/keystone/tests/unit/test_v3_identity.py index 5a8e4fd5..7d3f6cad 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_identity.py +++ b/keystone-moon/keystone/tests/unit/test_v3_identity.py @@ -30,31 +30,63 @@ from keystone.tests.unit import test_v3 CONF = cfg.CONF +# NOTE(morganfainberg): To be removed when admin_token_auth middleware is +# removed. This was moved to it's own testcase so it can setup the +# admin_token_auth pipeline without impacting other tests. +class IdentityTestCaseStaticAdminToken(test_v3.RestfulTestCase): + EXTENSION_TO_ADD = 'admin_token_auth' + + def config_overrides(self): + super(IdentityTestCaseStaticAdminToken, self).config_overrides() + self.config_fixture.config( + admin_token='ADMIN') + + def test_list_users_with_static_admin_token_and_multiple_backends(self): + # domain-specific operations with the bootstrap ADMIN token is + # disallowed when domain-specific drivers are enabled + self.config_fixture.config(group='identity', + domain_specific_drivers_enabled=True) + self.get('/users', token=CONF.admin_token, + expected_status=exception.Unauthorized.code) + + def test_create_user_with_admin_token_and_no_domain(self): + """Call ``POST /users`` with admin token but no domain id. + + It should not be possible to use the admin token to create a user + while not explicitly passing the domain in the request body. + + """ + # Passing a valid domain id to new_user_ref() since domain_id is + # not an optional parameter. + ref = unit.new_user_ref(domain_id=self.domain_id) + # Delete the domain id before sending the request. + del ref['domain_id'] + self.post('/users', body={'user': ref}, token=CONF.admin_token, + expected_status=http_client.BAD_REQUEST) + + class IdentityTestCase(test_v3.RestfulTestCase): """Test users and groups.""" def setUp(self): super(IdentityTestCase, self).setUp() - self.group = self.new_group_ref( - domain_id=self.domain_id) + self.group = unit.new_group_ref(domain_id=self.domain_id) self.group = self.identity_api.create_group(self.group) self.group_id = self.group['id'] - self.credential_id = uuid.uuid4().hex - self.credential = self.new_credential_ref( + self.credential = unit.new_credential_ref( user_id=self.user['id'], project_id=self.project_id) - self.credential['id'] = self.credential_id - self.credential_api.create_credential( - self.credential_id, - self.credential) + + self.credential_api.create_credential(self.credential['id'], + self.credential) # user crud tests def test_create_user(self): """Call ``POST /users``.""" - ref = self.new_user_ref(domain_id=self.domain_id) + ref = unit.new_user_ref(domain_id=self.domain_id) r = self.post( '/users', body={'user': ref}) @@ -70,17 +102,14 @@ class IdentityTestCase(test_v3.RestfulTestCase): """ # Create a user with a role on the domain so we can get a # domain scoped token - domain = self.new_domain_ref() + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user = self.new_user_ref(domain_id=domain['id']) - password = user['password'] - user = self.identity_api.create_user(user) - user['password'] = password + user = unit.create_user(self.identity_api, domain_id=domain['id']) self.assignment_api.create_grant( role_id=self.role_id, user_id=user['id'], domain_id=domain['id']) - ref = self.new_user_ref(domain_id=domain['id']) + ref = unit.new_user_ref(domain_id=domain['id']) ref_nd = ref.copy() ref_nd.pop('domain_id') auth = self.build_authentication_request( @@ -91,7 +120,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidUserResponse(r, ref) # Now try the same thing without a domain token - which should fail - ref = self.new_user_ref(domain_id=domain['id']) + ref = unit.new_user_ref(domain_id=domain['id']) ref_nd = ref.copy() ref_nd.pop('domain_id') auth = self.build_authentication_request( @@ -112,6 +141,79 @@ class IdentityTestCase(test_v3.RestfulTestCase): ref['domain_id'] = CONF.identity.default_domain_id return self.assertValidUserResponse(r, ref) + def test_create_user_with_admin_token_and_domain(self): + """Call ``POST /users`` with admin token and domain id.""" + ref = unit.new_user_ref(domain_id=self.domain_id) + self.post('/users', body={'user': ref}, token=self.get_admin_token(), + expected_status=http_client.CREATED) + + def test_user_management_normalized_keys(self): + """Illustrate the inconsistent handling of hyphens in keys. + + To quote Morgan in bug 1526244: + + the reason this is converted from "domain-id" to "domain_id" is + because of how we process/normalize data. The way we have to handle + specific data types for known columns requires avoiding "-" in the + actual python code since "-" is not valid for attributes in python + w/o significant use of "getattr" etc. + + In short, historically we handle some things in conversions. The + use of "extras" has long been a poor design choice that leads to + odd/strange inconsistent behaviors because of other choices made in + handling data from within the body. (In many cases we convert from + "-" to "_" throughout openstack) + + Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9 + + """ + # Create two domains to work with. + domain1 = unit.new_domain_ref() + self.resource_api.create_domain(domain1['id'], domain1) + domain2 = unit.new_domain_ref() + self.resource_api.create_domain(domain2['id'], domain2) + + # We can successfully create a normal user without any surprises. + user = unit.new_user_ref(domain_id=domain1['id']) + r = self.post( + '/users', + body={'user': user}) + self.assertValidUserResponse(r, user) + user['id'] = r.json['user']['id'] + + # Query strings are not normalized: so we get all users back (like + # self.user), not just the ones in the specified domain. + r = self.get( + '/users?domain-id=%s' % domain1['id']) + self.assertValidUserListResponse(r, ref=self.user) + self.assertNotEqual(domain1['id'], self.user['domain_id']) + + # When creating a new user, if we move the 'domain_id' into the + # 'domain-id' attribute, the server will normalize the request + # attribute, and effectively "move it back" for us. + user = unit.new_user_ref(domain_id=domain1['id']) + user['domain-id'] = user.pop('domain_id') + r = self.post( + '/users', + body={'user': user}) + self.assertNotIn('domain-id', r.json['user']) + self.assertEqual(domain1['id'], r.json['user']['domain_id']) + # (move this attribute back so we can use assertValidUserResponse) + user['domain_id'] = user.pop('domain-id') + self.assertValidUserResponse(r, user) + user['id'] = r.json['user']['id'] + + # If we try updating the user's 'domain_id' by specifying a + # 'domain-id', then it'll be stored into extras rather than normalized, + # and the user's actual 'domain_id' is not affected. + r = self.patch( + '/users/%s' % user['id'], + body={'user': {'domain-id': domain2['id']}}) + self.assertEqual(domain2['id'], r.json['user']['domain-id']) + self.assertEqual(user['domain_id'], r.json['user']['domain_id']) + self.assertNotEqual(domain2['id'], user['domain_id']) + self.assertValidUserResponse(r, user) + def test_create_user_bad_request(self): """Call ``POST /users``.""" self.post('/users', body={'user': {}}, @@ -134,29 +236,42 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.config_fixture.config(group='identity', domain_specific_drivers_enabled=True) - # Create a user with a role on the domain so we can get a - # domain scoped token - domain = self.new_domain_ref() + # Create a new domain with a new project and user + domain = unit.new_domain_ref() self.resource_api.create_domain(domain['id'], domain) - user = self.new_user_ref(domain_id=domain['id']) - password = user['password'] - user = self.identity_api.create_user(user) - user['password'] = password + + project = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project['id'], project) + + user = unit.create_user(self.identity_api, domain_id=domain['id']) + + # Create both project and domain role grants for the user so we + # can get both project and domain scoped tokens self.assignment_api.create_grant( role_id=self.role_id, user_id=user['id'], domain_id=domain['id']) + self.assignment_api.create_grant( + role_id=self.role_id, user_id=user['id'], + project_id=project['id']) - ref = self.new_user_ref(domain_id=domain['id']) - ref_nd = ref.copy() - ref_nd.pop('domain_id') - auth = self.build_authentication_request( + dom_auth = self.build_authentication_request( user_id=user['id'], password=user['password'], domain_id=domain['id']) + project_auth = self.build_authentication_request( + user_id=user['id'], + password=user['password'], + project_id=project['id']) # First try using a domain scoped token resource_url = '/users' - r = self.get(resource_url, auth=auth) + r = self.get(resource_url, auth=dom_auth) + self.assertValidUserListResponse(r, ref=user, + resource_url=resource_url) + + # Now try using a project scoped token + resource_url = '/users' + r = self.get(resource_url, auth=project_auth) self.assertValidUserListResponse(r, ref=user, resource_url=resource_url) @@ -167,21 +282,9 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.assertValidUserListResponse(r, ref=user, resource_url=resource_url) - # Now try the same thing without a domain token or filter, - # which should fail - r = self.get('/users', expected_status=exception.Unauthorized.code) - - def test_list_users_with_static_admin_token_and_multiple_backends(self): - # domain-specific operations with the bootstrap ADMIN token is - # disallowed when domain-specific drivers are enabled - self.config_fixture.config(group='identity', - domain_specific_drivers_enabled=True) - self.get('/users', token=CONF.admin_token, - expected_status=exception.Unauthorized.code) - def test_list_users_no_default_project(self): """Call ``GET /users`` making sure no default_project_id.""" - user = self.new_user_ref(self.domain_id) + user = unit.new_user_ref(self.domain_id) user = self.identity_api.create_user(user) resource_url = '/users' r = self.get(resource_url) @@ -196,7 +299,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_get_user_with_default_project(self): """Call ``GET /users/{user_id}`` making sure of default_project_id.""" - user = self.new_user_ref(domain_id=self.domain_id, + user = unit.new_user_ref(domain_id=self.domain_id, project_id=self.project_id) user = self.identity_api.create_user(user) r = self.get('/users/%(user_id)s' % {'user_id': user['id']}) @@ -209,45 +312,39 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_list_groups_for_user(self): """Call ``GET /users/{user_id}/groups``.""" + user1 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + user2 = unit.create_user(self.identity_api, + domain_id=self.domain['id']) - self.user1 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user1['password'] - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - self.user2 = self.new_user_ref( - domain_id=self.domain['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password self.put('/groups/%(group_id)s/users/%(user_id)s' % { - 'group_id': self.group_id, 'user_id': self.user1['id']}) + 'group_id': self.group_id, 'user_id': user1['id']}) # Scenarios below are written to test the default policy configuration # One should be allowed to list one's own groups auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password']) + user_id=user1['id'], + password=user1['password']) resource_url = ('/users/%(user_id)s/groups' % - {'user_id': self.user1['id']}) + {'user_id': user1['id']}) r = self.get(resource_url, auth=auth) self.assertValidGroupListResponse(r, ref=self.group, resource_url=resource_url) # Administrator is allowed to list others' groups resource_url = ('/users/%(user_id)s/groups' % - {'user_id': self.user1['id']}) + {'user_id': user1['id']}) r = self.get(resource_url) self.assertValidGroupListResponse(r, ref=self.group, resource_url=resource_url) # Ordinary users should not be allowed to list other's groups auth = self.build_authentication_request( - user_id=self.user2['id'], - password=self.user2['password']) + user_id=user2['id'], + password=user2['password']) r = self.get('/users/%(user_id)s/groups' % { - 'user_id': self.user1['id']}, auth=auth, + 'user_id': user1['id']}, auth=auth, expected_status=exception.ForbiddenAction.code) def test_check_user_in_group(self): @@ -278,7 +375,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_update_user(self): """Call ``PATCH /users/{user_id}``.""" - user = self.new_user_ref(domain_id=self.domain_id) + user = unit.new_user_ref(domain_id=self.domain_id) del user['id'] r = self.patch('/users/%(user_id)s' % { 'user_id': self.user['id']}, @@ -287,44 +384,42 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_admin_password_reset(self): # bootstrap a user as admin - user_ref = self.new_user_ref(domain_id=self.domain['id']) - password = user_ref['password'] - user_ref = self.identity_api.create_user(user_ref) + user_ref = unit.create_user(self.identity_api, + domain_id=self.domain['id']) # auth as user should work before a password change old_password_auth = self.build_authentication_request( user_id=user_ref['id'], - password=password) - r = self.v3_authenticate_token(old_password_auth, expected_status=201) + password=user_ref['password']) + r = self.v3_create_token(old_password_auth) old_token = r.headers.get('X-Subject-Token') # auth as user with a token should work before a password change old_token_auth = self.build_authentication_request(token=old_token) - self.v3_authenticate_token(old_token_auth, expected_status=201) + self.v3_create_token(old_token_auth) # administrative password reset new_password = uuid.uuid4().hex self.patch('/users/%s' % user_ref['id'], - body={'user': {'password': new_password}}, - expected_status=200) + body={'user': {'password': new_password}}) # auth as user with original password should not work after change - self.v3_authenticate_token(old_password_auth, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(old_password_auth, + expected_status=http_client.UNAUTHORIZED) # auth as user with an old token should not work after change - self.v3_authenticate_token(old_token_auth, - expected_status=http_client.NOT_FOUND) + self.v3_create_token(old_token_auth, + expected_status=http_client.NOT_FOUND) # new password should work new_password_auth = self.build_authentication_request( user_id=user_ref['id'], password=new_password) - self.v3_authenticate_token(new_password_auth, expected_status=201) + self.v3_create_token(new_password_auth) def test_update_user_domain_id(self): """Call ``PATCH /users/{user_id}`` with domain_id.""" - user = self.new_user_ref(domain_id=self.domain['id']) + user = unit.new_user_ref(domain_id=self.domain['id']) user = self.identity_api.create_user(user) user['domain_id'] = CONF.identity.default_domain_id r = self.patch('/users/%(user_id)s' % { @@ -349,18 +444,16 @@ class IdentityTestCase(test_v3.RestfulTestCase): """ # First check the credential for this user is present r = self.credential_api.get_credential(self.credential['id']) - self.assertDictEqual(r, self.credential) + self.assertDictEqual(self.credential, r) # Create a second credential with a different user - self.user2 = self.new_user_ref( - domain_id=self.domain['id'], - project_id=self.project['id']) - self.user2 = self.identity_api.create_user(self.user2) - self.credential2 = self.new_credential_ref( - user_id=self.user2['id'], - project_id=self.project['id']) - self.credential_api.create_credential( - self.credential2['id'], - self.credential2) + + user2 = unit.new_user_ref(domain_id=self.domain['id'], + project_id=self.project['id']) + user2 = self.identity_api.create_user(user2) + credential2 = unit.new_credential_ref(user_id=user2['id'], + project_id=self.project['id']) + self.credential_api.create_credential(credential2['id'], credential2) + # Create a token for this user which we can check later # gets deleted auth_data = self.build_authentication_request( @@ -371,7 +464,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): # Confirm token is valid for now self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # Now delete the user self.delete('/users/%(user_id)s' % { @@ -387,14 +480,57 @@ class IdentityTestCase(test_v3.RestfulTestCase): self.user['id']) self.assertEqual(0, len(tokens)) # But the credential for user2 is unaffected - r = self.credential_api.get_credential(self.credential2['id']) - self.assertDictEqual(r, self.credential2) + r = self.credential_api.get_credential(credential2['id']) + self.assertDictEqual(credential2, r) + + # shadow user tests + def test_shadow_federated_user(self): + fed_user = unit.new_federated_user_ref() + user = ( + self.identity_api.shadow_federated_user(fed_user["idp_id"], + fed_user["protocol_id"], + fed_user["unique_id"], + fed_user["display_name"]) + ) + self.assertIsNotNone(user["id"]) + self.assertEqual(len(user.keys()), 4) + self.assertIsNotNone(user['id']) + self.assertIsNotNone(user['name']) + self.assertIsNone(user['domain_id']) + self.assertEqual(user['enabled'], True) + + def test_shadow_existing_federated_user(self): + fed_user = unit.new_federated_user_ref() + + # introduce the user to keystone for the first time + shadow_user1 = self.identity_api.shadow_federated_user( + fed_user["idp_id"], + fed_user["protocol_id"], + fed_user["unique_id"], + fed_user["display_name"]) + self.assertEqual(fed_user['display_name'], shadow_user1['name']) + + # shadow the user again, with another name to invalidate the cache + # internally, this operation causes request to the driver. It should + # not fail. + fed_user['display_name'] = uuid.uuid4().hex + shadow_user2 = self.identity_api.shadow_federated_user( + fed_user["idp_id"], + fed_user["protocol_id"], + fed_user["unique_id"], + fed_user["display_name"]) + self.assertEqual(fed_user['display_name'], shadow_user2['name']) + self.assertNotEqual(shadow_user1['name'], shadow_user2['name']) + + # The shadowed users still share the same unique ID. + self.assertEqual(shadow_user1['id'], shadow_user2['id']) # group crud tests def test_create_group(self): """Call ``POST /groups``.""" - ref = self.new_group_ref(domain_id=self.domain_id) + # Create a new group to avoid a duplicate check failure + ref = unit.new_group_ref(domain_id=self.domain_id) r = self.post( '/groups', body={'group': ref}) @@ -420,7 +556,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_update_group(self): """Call ``PATCH /groups/{group_id}``.""" - group = self.new_group_ref(domain_id=self.domain_id) + group = unit.new_group_ref(domain_id=self.domain_id) del group['id'] r = self.patch('/groups/%(group_id)s' % { 'group_id': self.group_id}, @@ -429,19 +565,17 @@ class IdentityTestCase(test_v3.RestfulTestCase): def test_update_group_domain_id(self): """Call ``PATCH /groups/{group_id}`` with domain_id.""" - group = self.new_group_ref(domain_id=self.domain['id']) - group = self.identity_api.create_group(group) - group['domain_id'] = CONF.identity.default_domain_id + self.group['domain_id'] = CONF.identity.default_domain_id r = self.patch('/groups/%(group_id)s' % { - 'group_id': group['id']}, - body={'group': group}, + 'group_id': self.group['id']}, + body={'group': self.group}, expected_status=exception.ValidationError.code) self.config_fixture.config(domain_id_immutable=False) - group['domain_id'] = self.domain['id'] + self.group['domain_id'] = self.domain['id'] r = self.patch('/groups/%(group_id)s' % { - 'group_id': group['id']}, - body={'group': group}) - self.assertValidGroupResponse(r, group) + 'group_id': self.group['id']}, + body={'group': self.group}) + self.assertValidGroupResponse(r, self.group) def test_delete_group(self): """Call ``DELETE /groups/{group_id}``.""" @@ -453,7 +587,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): log_fix = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) - ref = self.new_user_ref(domain_id=self.domain_id) + ref = unit.new_user_ref(domain_id=self.domain_id) self.post( '/users', body={'user': ref}) @@ -467,108 +601,122 @@ class IdentityTestCase(test_v3.RestfulTestCase): log_fix = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) # bootstrap a user as admin - user_ref = self.new_user_ref(domain_id=self.domain['id']) - password = user_ref['password'] - user_ref = self.identity_api.create_user(user_ref) + user_ref = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + + self.assertNotIn(user_ref['password'], log_fix.output) # administrative password reset new_password = uuid.uuid4().hex self.patch('/users/%s' % user_ref['id'], - body={'user': {'password': new_password}}, - expected_status=200) + body={'user': {'password': new_password}}) - self.assertNotIn(password, log_fix.output) self.assertNotIn(new_password, log_fix.output) class IdentityV3toV2MethodsTestCase(unit.TestCase): """Test users V3 to V2 conversion methods.""" + def new_user_ref(self, **kwargs): + """Construct a bare bones user ref. + + Omits all optional components. + """ + ref = unit.new_user_ref(**kwargs) + # description is already omitted + del ref['email'] + del ref['enabled'] + del ref['password'] + return ref + def setUp(self): super(IdentityV3toV2MethodsTestCase, self).setUp() self.load_backends() - self.user_id = uuid.uuid4().hex - self.default_project_id = uuid.uuid4().hex - self.tenant_id = uuid.uuid4().hex + user_id = uuid.uuid4().hex + project_id = uuid.uuid4().hex + # User with only default_project_id in ref - self.user1 = {'id': self.user_id, - 'name': self.user_id, - 'default_project_id': self.default_project_id, - 'domain_id': CONF.identity.default_domain_id} + self.user1 = self.new_user_ref( + id=user_id, + name=user_id, + project_id=project_id, + domain_id=CONF.identity.default_domain_id) # User without default_project_id or tenantId in ref - self.user2 = {'id': self.user_id, - 'name': self.user_id, - 'domain_id': CONF.identity.default_domain_id} + self.user2 = self.new_user_ref( + id=user_id, + name=user_id, + domain_id=CONF.identity.default_domain_id) # User with both tenantId and default_project_id in ref - self.user3 = {'id': self.user_id, - 'name': self.user_id, - 'default_project_id': self.default_project_id, - 'tenantId': self.tenant_id, - 'domain_id': CONF.identity.default_domain_id} + self.user3 = self.new_user_ref( + id=user_id, + name=user_id, + project_id=project_id, + tenantId=project_id, + domain_id=CONF.identity.default_domain_id) # User with only tenantId in ref - self.user4 = {'id': self.user_id, - 'name': self.user_id, - 'tenantId': self.tenant_id, - 'domain_id': CONF.identity.default_domain_id} + self.user4 = self.new_user_ref( + id=user_id, + name=user_id, + tenantId=project_id, + domain_id=CONF.identity.default_domain_id) # Expected result if the user is meant to have a tenantId element - self.expected_user = {'id': self.user_id, - 'name': self.user_id, - 'username': self.user_id, - 'tenantId': self.default_project_id} + self.expected_user = {'id': user_id, + 'name': user_id, + 'username': user_id, + 'tenantId': project_id} # Expected result if the user is not meant to have a tenantId element - self.expected_user_no_tenant_id = {'id': self.user_id, - 'name': self.user_id, - 'username': self.user_id} + self.expected_user_no_tenant_id = {'id': user_id, + 'name': user_id, + 'username': user_id} def test_v3_to_v2_user_method(self): updated_user1 = controller.V2Controller.v3_to_v2_user(self.user1) self.assertIs(self.user1, updated_user1) - self.assertDictEqual(self.user1, self.expected_user) + self.assertDictEqual(self.expected_user, self.user1) updated_user2 = controller.V2Controller.v3_to_v2_user(self.user2) self.assertIs(self.user2, updated_user2) - self.assertDictEqual(self.user2, self.expected_user_no_tenant_id) + self.assertDictEqual(self.expected_user_no_tenant_id, self.user2) updated_user3 = controller.V2Controller.v3_to_v2_user(self.user3) self.assertIs(self.user3, updated_user3) - self.assertDictEqual(self.user3, self.expected_user) + self.assertDictEqual(self.expected_user, self.user3) updated_user4 = controller.V2Controller.v3_to_v2_user(self.user4) self.assertIs(self.user4, updated_user4) - self.assertDictEqual(self.user4, self.expected_user_no_tenant_id) + self.assertDictEqual(self.expected_user_no_tenant_id, self.user4) def test_v3_to_v2_user_method_list(self): user_list = [self.user1, self.user2, self.user3, self.user4] updated_list = controller.V2Controller.v3_to_v2_user(user_list) - self.assertEqual(len(updated_list), len(user_list)) + self.assertEqual(len(user_list), len(updated_list)) for i, ref in enumerate(updated_list): # Order should not change. self.assertIs(ref, user_list[i]) - self.assertDictEqual(self.user1, self.expected_user) - self.assertDictEqual(self.user2, self.expected_user_no_tenant_id) - self.assertDictEqual(self.user3, self.expected_user) - self.assertDictEqual(self.user4, self.expected_user_no_tenant_id) + self.assertDictEqual(self.expected_user, self.user1) + self.assertDictEqual(self.expected_user_no_tenant_id, self.user2) + self.assertDictEqual(self.expected_user, self.user3) + self.assertDictEqual(self.expected_user_no_tenant_id, self.user4) class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): def setUp(self): super(UserSelfServiceChangingPasswordsTestCase, self).setUp() - self.user_ref = self.new_user_ref(domain_id=self.domain['id']) - password = self.user_ref['password'] - self.user_ref = self.identity_api.create_user(self.user_ref) - self.user_ref['password'] = password - self.token = self.get_request_token(self.user_ref['password'], 201) + self.user_ref = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + self.token = self.get_request_token(self.user_ref['password'], + http_client.CREATED) def get_request_token(self, password, expected_status): auth_data = self.build_authentication_request( user_id=self.user_ref['id'], password=password) - r = self.v3_authenticate_token(auth_data, - expected_status=expected_status) + r = self.v3_create_token(auth_data, + expected_status=expected_status) return r.headers.get('X-Subject-Token') def change_password(self, expected_status, **kwargs): @@ -581,27 +729,28 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): def test_changing_password(self): # original password works token_id = self.get_request_token(self.user_ref['password'], - expected_status=201) + expected_status=http_client.CREATED) # original token works old_token_auth = self.build_authentication_request(token=token_id) - self.v3_authenticate_token(old_token_auth, expected_status=201) + self.v3_create_token(old_token_auth) # change password new_password = uuid.uuid4().hex self.change_password(password=new_password, original_password=self.user_ref['password'], - expected_status=204) + expected_status=http_client.NO_CONTENT) # old password fails self.get_request_token(self.user_ref['password'], expected_status=http_client.UNAUTHORIZED) # old token fails - self.v3_authenticate_token(old_token_auth, - expected_status=http_client.NOT_FOUND) + self.v3_create_token(old_token_auth, + expected_status=http_client.NOT_FOUND) # new password works - self.get_request_token(new_password, expected_status=201) + self.get_request_token(new_password, + expected_status=http_client.CREATED) def test_changing_password_with_missing_original_password_fails(self): r = self.change_password(password=uuid.uuid4().hex, @@ -640,7 +789,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): new_password = uuid.uuid4().hex self.change_password(password=new_password, original_password=self.user_ref['password'], - expected_status=204) + expected_status=http_client.NO_CONTENT) self.assertNotIn(self.user_ref['password'], log_fix.output) self.assertNotIn(new_password, log_fix.output) -- cgit 1.2.3-korg