diff options
Diffstat (limited to 'keystone-moon/keystone/tests')
26 files changed, 969 insertions, 450 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py index ec087c41..1ad8d50d 100644 --- a/keystone-moon/keystone/tests/unit/common/test_notifications.py +++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py @@ -279,6 +279,16 @@ class BaseNotificationTest(test_v3.RestfulTestCase): self.assertEqual(event_type, audit['event_type']) self.assertTrue(audit['send_notification_called']) + def _assert_initiator_data_is_set(self, operation, resource_type, typeURI): + self.assertTrue(len(self._audits) > 0) + audit = self._audits[-1] + payload = audit['payload'] + self.assertEqual(self.user_id, payload['initiator']['id']) + self.assertEqual(self.project_id, payload['initiator']['project_id']) + self.assertEqual(typeURI, payload['target']['typeURI']) + action = '%s.%s' % (operation, resource_type) + self.assertEqual(action, payload['action']) + def _assert_notify_not_sent(self, resource_id, operation, resource_type, public=True): unexpected = { @@ -633,11 +643,154 @@ class CADFNotificationsForEntities(NotificationsForEntities): resource_id = resp.result.get('domain').get('id') self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain', cadftaxonomy.SECURITY_DOMAIN) - self.assertTrue(len(self._audits) > 0) - audit = self._audits[-1] - payload = audit['payload'] - self.assertEqual(self.user_id, payload['initiator']['id']) - self.assertEqual(self.project_id, payload['initiator']['project_id']) + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'domain', + cadftaxonomy.SECURITY_DOMAIN) + + +class V2Notifications(BaseNotificationTest): + + def setUp(self): + super(V2Notifications, self).setUp() + self.config_fixture.config(notification_format='cadf') + + def test_user(self): + token = self.get_scoped_token() + resp = self.admin_request( + method='POST', + path='/v2.0/users', + body={ + 'user': { + 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + 'enabled': True, + }, + }, + token=token, + ) + user_id = resp.result.get('user').get('id') + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + # test for delete user + self.admin_request( + method='DELETE', + path='/v2.0/users/%s' % user_id, + token=token, + ) + self._assert_initiator_data_is_set(DELETED_OPERATION, + 'user', + cadftaxonomy.SECURITY_ACCOUNT_USER) + + def test_role(self): + token = self.get_scoped_token() + resp = self.admin_request( + method='POST', + path='/v2.0/OS-KSADM/roles', + body={ + 'role': { + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + }, + }, + token=token, + ) + role_id = resp.result.get('role').get('id') + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'role', + cadftaxonomy.SECURITY_ROLE) + # test for delete role + self.admin_request( + method='DELETE', + path='/v2.0/OS-KSADM/roles/%s' % role_id, + token=token, + ) + self._assert_initiator_data_is_set(DELETED_OPERATION, + 'role', + cadftaxonomy.SECURITY_ROLE) + + def test_service_and_endpoint(self): + token = self.get_scoped_token() + resp = self.admin_request( + method='POST', + path='/v2.0/OS-KSADM/services', + body={ + 'OS-KSADM:service': { + 'name': uuid.uuid4().hex, + 'type': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + }, + }, + token=token, + ) + service_id = resp.result.get('OS-KSADM:service').get('id') + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'service', + cadftaxonomy.SECURITY_SERVICE) + resp = self.admin_request( + method='POST', + path='/v2.0/endpoints', + body={ + 'endpoint': { + 'region': uuid.uuid4().hex, + 'service_id': service_id, + 'publicurl': uuid.uuid4().hex, + 'adminurl': uuid.uuid4().hex, + 'internalurl': uuid.uuid4().hex, + }, + }, + token=token, + ) + endpoint_id = resp.result.get('endpoint').get('id') + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'endpoint', + cadftaxonomy.SECURITY_ENDPOINT) + # test for delete endpoint + self.admin_request( + method='DELETE', + path='/v2.0/endpoints/%s' % endpoint_id, + token=token, + ) + self._assert_initiator_data_is_set(DELETED_OPERATION, + 'endpoint', + cadftaxonomy.SECURITY_ENDPOINT) + # test for delete service + self.admin_request( + method='DELETE', + path='/v2.0/OS-KSADM/services/%s' % service_id, + token=token, + ) + self._assert_initiator_data_is_set(DELETED_OPERATION, + 'service', + cadftaxonomy.SECURITY_SERVICE) + + def test_project(self): + token = self.get_scoped_token() + resp = self.admin_request( + method='POST', + path='/v2.0/tenants', + body={ + 'tenant': { + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': True + }, + }, + token=token, + ) + project_id = resp.result.get('tenant').get('id') + self._assert_initiator_data_is_set(CREATED_OPERATION, + 'project', + cadftaxonomy.SECURITY_PROJECT) + # test for delete project + self.admin_request( + method='DELETE', + path='/v2.0/tenants/%s' % project_id, + token=token, + ) + self._assert_initiator_data_is_set(DELETED_OPERATION, + 'project', + cadftaxonomy.SECURITY_PROJECT) class TestEventCallbacks(test_v3.RestfulTestCase): diff --git a/keystone-moon/keystone/tests/unit/rest.py b/keystone-moon/keystone/tests/unit/rest.py index da24019f..35b47e2b 100644 --- a/keystone-moon/keystone/tests/unit/rest.py +++ b/keystone-moon/keystone/tests/unit/rest.py @@ -114,7 +114,7 @@ class RestfulTestCase(unit.TestCase): example:: - self.assertResponseStatus(response, http_client.NO_CONTENT) + self.assertResponseStatus(response, 204) """ self.assertEqual( response.status_code, diff --git a/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py b/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py index 4c574549..24fc82dd 100644 --- a/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py +++ b/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py @@ -48,7 +48,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): Valid endpoint and project id test case. """ - self.put(self.default_request_url) + self.put(self.default_request_url, + expected_status=204) def test_create_endpoint_project_association_with_invalid_project(self): """PUT OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -81,7 +82,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): """ self.put(self.default_request_url, - body={'project_id': self.default_domain_project_id}) + body={'project_id': self.default_domain_project_id}, + expected_status=204) def test_check_endpoint_project_association(self): """HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -89,11 +91,13 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): Valid project and endpoint id test case. """ - self.put(self.default_request_url) + self.put(self.default_request_url, + expected_status=204) self.head('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.default_domain_project_id, - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) def test_check_endpoint_project_association_with_invalid_project(self): """HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -165,7 +169,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): """ r = self.get('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' % - {'endpoint_id': self.endpoint_id}) + {'endpoint_id': self.endpoint_id}, + expected_status=200) self.assertValidProjectListResponse(r, expected_length=0) def test_list_projects_associated_with_invalid_endpoint(self): @@ -188,7 +193,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): self.delete('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.default_domain_project_id, - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) def test_remove_endpoint_project_association_with_invalid_project(self): """DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id} @@ -220,26 +226,26 @@ class EndpointFilterCRUDTestCase(TestExtensionCase): self.put(self.default_request_url) association_url = ('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' % {'endpoint_id': self.endpoint_id}) - r = self.get(association_url) + r = self.get(association_url, expected_status=200) self.assertValidProjectListResponse(r, expected_length=1) self.delete('/projects/%(project_id)s' % { 'project_id': self.default_domain_project_id}) - r = self.get(association_url) + r = self.get(association_url, expected_status=200) self.assertValidProjectListResponse(r, expected_length=0) def test_endpoint_project_association_cleanup_when_endpoint_deleted(self): self.put(self.default_request_url) association_url = '/OS-EP-FILTER/projects/%(project_id)s/endpoints' % { 'project_id': self.default_domain_project_id} - r = self.get(association_url) + r = self.get(association_url, expected_status=200) self.assertValidEndpointListResponse(r, expected_length=1) self.delete('/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint_id}) - r = self.get(association_url) + r = self.get(association_url, expected_status=200) self.assertValidEndpointListResponse(r, expected_length=0) @@ -270,7 +276,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) # attempt to authenticate without requesting a project auth_data = self.build_authentication_request( @@ -290,7 +297,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) auth_data = self.build_authentication_request( user_id=self.user['id'], @@ -310,7 +318,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) auth_data = self.build_authentication_request( user_id=self.user['id'], @@ -329,7 +338,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) # create a second temporary endpoint self.endpoint_id2 = uuid.uuid4().hex @@ -343,7 +353,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id2}) + 'endpoint_id': self.endpoint_id2}, + expected_status=204) # remove the temporary reference # this will create inconsistency in the endpoint filter table @@ -369,7 +380,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) # Add a disabled endpoint to the default project. @@ -387,7 +399,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': disabled_endpoint_id}) + 'endpoint_id': disabled_endpoint_id}, + expected_status=204) # Authenticate to get token with catalog auth_data = self.build_authentication_request( @@ -416,11 +429,13 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': endpoint_id1}) + 'endpoint_id': endpoint_id1}, + expected_status=204) self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': endpoint_id2}) + 'endpoint_id': endpoint_id2}, + expected_status=204) # there should be only two endpoints in token catalog auth_data = self.build_authentication_request( @@ -439,7 +454,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase): self.put('/OS-EP-FILTER/projects/%(project_id)s' '/endpoints/%(endpoint_id)s' % { 'project_id': self.project['id'], - 'endpoint_id': self.endpoint_id}) + 'endpoint_id': self.endpoint_id}, + expected_status=204) auth_data = self.build_authentication_request( user_id=self.user['id'], @@ -622,7 +638,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): self.DEFAULT_ENDPOINT_GROUP_URL, self.DEFAULT_ENDPOINT_GROUP_BODY) url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % { 'endpoint_group_id': endpoint_group_id} - self.head(url, expected_status=http_client.OK) + self.head(url, expected_status=200) def test_check_invalid_endpoint_group(self): """HEAD /OS-EP-FILTER/endpoint_groups/{endpoint_group_id} @@ -816,7 +832,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase): self.project_id) url = self._get_project_endpoint_group_url( endpoint_group_id, self.project_id) - self.head(url, expected_status=http_client.OK) + self.head(url, expected_status=200) def test_check_endpoint_group_to_project_with_invalid_project_id(self): """Test HEAD with an invalid endpoint group and project association.""" diff --git a/keystone-moon/keystone/tests/unit/test_backend.py b/keystone-moon/keystone/tests/unit/test_backend.py index d3b51edd..302fc2c2 100644 --- a/keystone-moon/keystone/tests/unit/test_backend.py +++ b/keystone-moon/keystone/tests/unit/test_backend.py @@ -4671,7 +4671,7 @@ class TokenTests(object): def test_list_revoked_tokens_for_multiple_tokens(self): self.check_list_revoked_tokens([self.delete_token() - for x in range(2)]) + for x in six.moves.range(2)]) def test_flush_expired_token(self): token_id = uuid.uuid4().hex diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py index 808922a7..d96ec376 100644 --- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py +++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py @@ -21,7 +21,6 @@ import ldap import mock from oslo_config import cfg import pkg_resources -from six.moves import http_client from six.moves import range from testtools import matchers @@ -2182,6 +2181,26 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity): self.skipTest( "Enabled emulation conflicts with enabled mask") + def test_user_enabled_use_group_config(self): + self.config_fixture.config( + group='ldap', + user_enabled_emulation_use_group_config=True, + group_member_attribute='uniqueMember', + group_objectclass='groupOfUniqueNames') + self.ldapdb.clear() + self.load_backends() + self.load_fixtures(default_fixtures) + + # Create a user and ensure they are enabled. + user1 = {'name': u'fäké1', 'enabled': True, + 'domain_id': CONF.identity.default_domain_id} + user_ref = self.identity_api.create_user(user1) + self.assertIs(True, user_ref['enabled']) + + # Get a user and ensure they are enabled. + user_ref = self.identity_api.get_user(user_ref['id']) + self.assertIs(True, user_ref['enabled']) + def test_user_enabled_invert(self): self.config_fixture.config(group='ldap', user_enabled_invert=True, user_enabled_default=False) @@ -2487,7 +2506,7 @@ class BaseMultiLDAPandSQLIdentity(object): self.identity_api._get_domain_driver_and_entity_id( user['id'])) - if expected_status == http_client.OK: + if expected_status == 200: ref = driver.get_user(entity_id) ref = self.identity_api._set_domain_id_and_mapping( ref, domain_id, driver, map.EntityType.USER) @@ -2661,23 +2680,21 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, check_user = self.check_user check_user(self.users['user0'], - self.domains['domain_default']['id'], http_client.OK) + self.domains['domain_default']['id'], 200) for domain in [self.domains['domain1']['id'], self.domains['domain2']['id'], self.domains['domain3']['id'], self.domains['domain4']['id']]: check_user(self.users['user0'], domain, exception.UserNotFound) - check_user(self.users['user1'], self.domains['domain1']['id'], - http_client.OK) + check_user(self.users['user1'], self.domains['domain1']['id'], 200) for domain in [self.domains['domain_default']['id'], self.domains['domain2']['id'], self.domains['domain3']['id'], self.domains['domain4']['id']]: check_user(self.users['user1'], domain, exception.UserNotFound) - check_user(self.users['user2'], self.domains['domain2']['id'], - http_client.OK) + check_user(self.users['user2'], self.domains['domain2']['id'], 200) for domain in [self.domains['domain_default']['id'], self.domains['domain1']['id'], self.domains['domain3']['id'], @@ -2687,14 +2704,10 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides, # domain3 and domain4 share the same backend, so you should be # able to see user3 and user4 from either. - check_user(self.users['user3'], self.domains['domain3']['id'], - http_client.OK) - check_user(self.users['user3'], self.domains['domain4']['id'], - http_client.OK) - check_user(self.users['user4'], self.domains['domain3']['id'], - http_client.OK) - check_user(self.users['user4'], self.domains['domain4']['id'], - http_client.OK) + check_user(self.users['user3'], self.domains['domain3']['id'], 200) + check_user(self.users['user3'], self.domains['domain4']['id'], 200) + check_user(self.users['user4'], self.domains['domain3']['id'], 200) + check_user(self.users['user4'], self.domains['domain4']['id'], 200) for domain in [self.domains['domain_default']['id'], self.domains['domain1']['id'], @@ -3151,12 +3164,12 @@ class DomainSpecificLDAPandSQLIdentity( # driver, but won't find it via any other domain driver self.check_user(self.users['user0'], - self.domains['domain_default']['id'], http_client.OK) + self.domains['domain_default']['id'], 200) self.check_user(self.users['user0'], self.domains['domain1']['id'], exception.UserNotFound) self.check_user(self.users['user1'], - self.domains['domain1']['id'], http_client.OK) + self.domains['domain1']['id'], 200) self.check_user(self.users['user1'], self.domains['domain_default']['id'], exception.UserNotFound) diff --git a/keystone-moon/keystone/tests/unit/test_catalog.py b/keystone-moon/keystone/tests/unit/test_catalog.py index 85acfedf..ada2de43 100644 --- a/keystone-moon/keystone/tests/unit/test_catalog.py +++ b/keystone-moon/keystone/tests/unit/test_catalog.py @@ -53,8 +53,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): """Applicable only to JSON.""" return r.result['access']['token']['id'] - def _endpoint_create(self, expected_status=http_client.OK, - service_id=SERVICE_FIXTURE, + def _endpoint_create(self, expected_status=200, service_id=SERVICE_FIXTURE, publicurl='http://localhost:8080', internalurl='http://localhost:8080', adminurl='http://localhost:8080'): @@ -77,6 +76,18 @@ class V2CatalogTestCase(rest.RestfulTestCase): body=body) return body, r + def _region_create(self): + region_id = uuid.uuid4().hex + self.catalog_api.create_region({'id': region_id}) + return region_id + + def _service_create(self): + service_id = uuid.uuid4().hex + service = unit.new_service_ref() + service['id'] = service_id + self.catalog_api.create_service(service_id, service) + return service_id + def test_endpoint_create(self): req_body, response = self._endpoint_create() self.assertIn('endpoint', response.result) @@ -84,6 +95,78 @@ class V2CatalogTestCase(rest.RestfulTestCase): for field, value in req_body['endpoint'].items(): self.assertEqual(response.result['endpoint'][field], value) + def test_pure_v3_endpoint_with_publicurl_visible_from_v2(self): + """Test pure v3 endpoint can be fetched via v2 API. + + For those who are using v2 APIs, endpoints created by v3 API should + also be visible as there are no differences about the endpoints + except the format or the internal implementation. + And because public url is required for v2 API, so only the v3 endpoints + of the service which has the public interface endpoint will be + converted into v2 endpoints. + """ + region_id = self._region_create() + service_id = self._service_create() + # create a v3 endpoint with three interfaces + body = { + 'endpoint': unit.new_endpoint_ref(service_id, + default_region_id=region_id) + } + for interface in catalog.controllers.INTERFACES: + body['endpoint']['interface'] = interface + self.admin_request(method='POST', + token=self.get_scoped_token(), + path='/v3/endpoints', + expected_status=http_client.CREATED, + body=body) + + r = self.admin_request(token=self.get_scoped_token(), + path='/v2.0/endpoints') + # v3 endpoints having public url can be fetched via v2.0 API + self.assertEqual(1, len(r.result['endpoints'])) + v2_endpoint = r.result['endpoints'][0] + self.assertEqual(service_id, v2_endpoint['service_id']) + # check urls just in case. + # This is not the focus of this test, so no different urls are used. + self.assertEqual(body['endpoint']['url'], v2_endpoint['publicurl']) + self.assertEqual(body['endpoint']['url'], v2_endpoint['adminurl']) + self.assertEqual(body['endpoint']['url'], v2_endpoint['internalurl']) + self.assertNotIn('name', v2_endpoint) + + v3_endpoint = self.catalog_api.get_endpoint(v2_endpoint['id']) + # it's the v3 public endpoint's id as the generated v2 endpoint + self.assertEqual('public', v3_endpoint['interface']) + self.assertEqual(service_id, v3_endpoint['service_id']) + + def test_pure_v3_endpoint_without_publicurl_invisible_from_v2(self): + """Test pure v3 endpoint without public url can't be fetched via v2 API. + + V2 API will return endpoints created by v3 API, but because public url + is required for v2 API, so v3 endpoints without public url will be + ignored. + """ + region_id = self._region_create() + service_id = self._service_create() + # create a v3 endpoint without public interface + body = { + 'endpoint': unit.new_endpoint_ref(service_id, + default_region_id=region_id) + } + for interface in catalog.controllers.INTERFACES: + if interface == 'public': + continue + body['endpoint']['interface'] = interface + self.admin_request(method='POST', + token=self.get_scoped_token(), + path='/v3/endpoints', + expected_status=http_client.CREATED, + body=body) + + r = self.admin_request(token=self.get_scoped_token(), + path='/v2.0/endpoints') + # v3 endpoints without public url won't be fetched via v2.0 API + self.assertEqual(0, len(r.result['endpoints'])) + def test_endpoint_create_with_null_adminurl(self): req_body, response = self._endpoint_create(adminurl=None) self.assertIsNone(req_body['endpoint']['adminurl']) @@ -126,7 +209,7 @@ class V2CatalogTestCase(rest.RestfulTestCase): valid_url = 'http://127.0.0.1:8774/v1.1/$(tenant_id)s' # baseline tests that all valid URLs works - self._endpoint_create(expected_status=http_client.OK, + self._endpoint_create(expected_status=200, publicurl=valid_url, internalurl=valid_url, adminurl=valid_url) diff --git a/keystone-moon/keystone/tests/unit/test_cert_setup.py b/keystone-moon/keystone/tests/unit/test_cert_setup.py index 47a99810..769e7c8e 100644 --- a/keystone-moon/keystone/tests/unit/test_cert_setup.py +++ b/keystone-moon/keystone/tests/unit/test_cert_setup.py @@ -17,7 +17,6 @@ import os import shutil import mock -from six.moves import http_client from testtools import matchers from keystone.common import environment @@ -114,13 +113,11 @@ class CertSetupTestCase(rest.RestfulTestCase): # requests don't have some of the normal information signing_resp = self.request(self.public_app, '/v2.0/certificates/signing', - method='GET', - expected_status=http_client.OK) + method='GET', expected_status=200) cacert_resp = self.request(self.public_app, '/v2.0/certificates/ca', - method='GET', - expected_status=http_client.OK) + method='GET', expected_status=200) with open(CONF.signing.certfile) as f: self.assertEqual(f.read(), signing_resp.text) @@ -136,7 +133,7 @@ class CertSetupTestCase(rest.RestfulTestCase): for accept in [None, 'text/html', 'application/json', 'text/xml']: headers = {'Accept': accept} if accept else {} resp = self.request(self.public_app, path, method='GET', - expected_status=http_client.OK, + expected_status=200, headers=headers) self.assertEqual('text/html', resp.content_type) @@ -149,7 +146,7 @@ class CertSetupTestCase(rest.RestfulTestCase): def test_failure(self): for path in ['/v2.0/certificates/signing', '/v2.0/certificates/ca']: self.request(self.public_app, path, method='GET', - expected_status=http_client.INTERNAL_SERVER_ERROR) + expected_status=500) def test_pki_certs_rebuild(self): self.test_create_pki_certs() diff --git a/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py b/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py index b241b41b..8664e2c3 100644 --- a/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py +++ b/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py @@ -12,8 +12,6 @@ import uuid -from six.moves import http_client - from keystone.tests.unit import test_v3 @@ -33,7 +31,7 @@ class TestSimpleCert(BaseTestCase): method='GET', path=path, headers={'Accept': content_type}, - expected_status=http_client.OK) + expected_status=200) self.assertEqual(content_type, response.content_type.lower()) self.assertIn('---BEGIN', response.body) @@ -56,4 +54,4 @@ class TestSimpleCert(BaseTestCase): self.request(app=self.public_app, method='GET', path=path, - expected_status=http_client.INTERNAL_SERVER_ERROR) + expected_status=500) diff --git a/keystone-moon/keystone/tests/unit/test_policy.py b/keystone-moon/keystone/tests/unit/test_policy.py index b2f0e525..686e2b70 100644 --- a/keystone-moon/keystone/tests/unit/test_policy.py +++ b/keystone-moon/keystone/tests/unit/test_policy.py @@ -16,10 +16,8 @@ import json import os -import mock from oslo_policy import policy as common_policy import six -from six.moves.urllib import request as urlrequest from testtools import matchers from keystone import exception @@ -118,28 +116,6 @@ class PolicyTestCase(BasePolicyTestCase): action = "example:allowed" rules.enforce(self.credentials, action, self.target) - def test_enforce_http_true(self): - - def fakeurlopen(url, post_data): - return six.StringIO("True") - - action = "example:get_http" - target = {} - with mock.patch.object(urlrequest, 'urlopen', fakeurlopen): - result = rules.enforce(self.credentials, action, target) - self.assertTrue(result) - - def test_enforce_http_false(self): - - def fakeurlopen(url, post_data): - return six.StringIO("False") - - action = "example:get_http" - target = {} - with mock.patch.object(urlrequest, 'urlopen', fakeurlopen): - self.assertRaises(exception.ForbiddenAction, rules.enforce, - self.credentials, action, target) - def test_templatized_enforcement(self): target_mine = {'project_id': 'fake'} target_not_mine = {'project_id': 'another'} diff --git a/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py b/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py index 87b3d48d..f498fe94 100644 --- a/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py +++ b/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py @@ -180,6 +180,7 @@ class FederationExtension(test_sql_upgrade.SqlMigrateBase): self.federation_protocol = 'federation_protocol' self.service_provider = 'service_provider' self.mapping = 'mapping' + self.remote_id_table = 'idp_remote_ids' def repo_package(self): return federation @@ -310,6 +311,68 @@ class FederationExtension(test_sql_upgrade.SqlMigrateBase): self.assertEqual('', sp.auth_url) self.assertEqual('', sp.sp_url) + def test_propagate_remote_id_to_separate_column(self): + """Make sure empty remote_id is not propagated. + Test scenario: + - Upgrade database to version 6 where identity_provider table has a + remote_id column + - Add 3 identity provider objects, where idp1 and idp2 have valid + remote_id parameter set, and idp3 has it empty (None). + - Upgrade database to version 7 and expect migration scripts to + properly move data rom identity_provider.remote_id column into + separate table idp_remote_ids. + - In the idp_remote_ids table expect to find entries for idp1 and idp2 + and not find anything for idp3 (identitified by idp's id) + + """ + session = self.Session() + idp1 = {'id': uuid.uuid4().hex, + 'remote_id': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': True} + idp2 = {'id': uuid.uuid4().hex, + 'remote_id': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': True} + idp3 = {'id': uuid.uuid4().hex, + 'remote_id': None, + 'description': uuid.uuid4().hex, + 'enabled': True} + self.upgrade(6, repository=self.repo_path) + self.assertTableColumns(self.identity_provider, + ['id', 'description', 'enabled', 'remote_id']) + + self.insert_dict(session, self.identity_provider, idp1) + self.insert_dict(session, self.identity_provider, idp2) + self.insert_dict(session, self.identity_provider, idp3) + + session.close() + self.upgrade(7, repository=self.repo_path) + + self.assertTableColumns(self.identity_provider, + ['id', 'description', 'enabled']) + remote_id_table = sqlalchemy.Table(self.remote_id_table, + self.metadata, + autoload=True) + + session = self.Session() + self.metadata.clear() + + idp = session.query(remote_id_table).filter( + remote_id_table.c.idp_id == idp1['id'])[0] + self.assertEqual(idp1['remote_id'], idp.remote_id) + + idp = session.query(remote_id_table).filter( + remote_id_table.c.idp_id == idp2['id'])[0] + self.assertEqual(idp2['remote_id'], idp.remote_id) + + idp = session.query(remote_id_table).filter( + remote_id_table.c.idp_id == idp3['id']) + # NOTE(marek-denis): As idp3 had empty 'remote_id' attribute we expect + # not to find it in the 'remote_id_table' table, hence count should be + # 0.real + self.assertEqual(0, idp.count()) + def test_add_relay_state_column(self): self.upgrade(8, repository=self.repo_path) self.assertTableColumns(self.service_provider, diff --git a/keystone-moon/keystone/tests/unit/test_v2.py b/keystone-moon/keystone/tests/unit/test_v2.py index 99b5a897..acdfca5f 100644 --- a/keystone-moon/keystone/tests/unit/test_v2.py +++ b/keystone-moon/keystone/tests/unit/test_v2.py @@ -132,7 +132,7 @@ class CoreApiTests(object): 'tenantId': self.tenant_bar['id'], }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidAuthenticationResponse(r, require_service_catalog=True) def test_authenticate_unscoped(self): @@ -147,7 +147,7 @@ class CoreApiTests(object): }, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidAuthenticationResponse(r) def test_get_tenants_for_token(self): @@ -234,7 +234,7 @@ class CoreApiTests(object): 'token_id': token, }, token=token, - expected_status=http_client.OK) + expected_status=200) def test_endpoints(self): token = self.get_scoped_token() @@ -370,7 +370,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) def test_error_response(self): """This triggers assertValidErrorResponse by convention.""" @@ -459,7 +459,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) user_id = self._get_user_id(r.result) @@ -470,7 +470,7 @@ class CoreApiTests(object): 'user_id': user_id }, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertEqual(CONF.member_role_name, self._get_role_name(r.result)) # Create a new tenant @@ -485,7 +485,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) project_id = self._get_project_id(r.result) @@ -501,7 +501,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) # 'member_role' should be in new_tenant r = self.admin_request( @@ -510,7 +510,7 @@ class CoreApiTests(object): 'user_id': user_id }, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertEqual('_member_', self._get_role_name(r.result)) # 'member_role' should not be in tenant_bar any more @@ -520,7 +520,7 @@ class CoreApiTests(object): 'user_id': user_id }, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertNoRoles(r.result) def test_update_user_with_invalid_tenant(self): @@ -539,7 +539,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) user_id = self._get_user_id(r.result) # Update user with an invalid tenant @@ -571,7 +571,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) user_id = self._get_user_id(r.result) # Update user with an invalid tenant @@ -604,7 +604,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) user_id = self._get_user_id(r.result) @@ -615,7 +615,7 @@ class CoreApiTests(object): 'user_id': user_id }, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertEqual(CONF.member_role_name, self._get_role_name(r.result)) # Update user's tenant with old tenant id @@ -630,7 +630,7 @@ class CoreApiTests(object): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) # 'member_role' should still be in tenant_bar r = self.admin_request( @@ -639,7 +639,7 @@ class CoreApiTests(object): 'user_id': user_id }, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertEqual('_member_', self._get_role_name(r.result)) def test_authenticating_a_user_with_no_password(self): @@ -721,7 +721,7 @@ class LegacyV2UsernameTests(object): path='/v2.0/users', token=token, body=body, - expected_status=http_client.OK) + expected_status=200) def test_create_with_extra_username(self): """The response for creating a user will contain the extra fields.""" @@ -772,7 +772,7 @@ class LegacyV2UsernameTests(object): 'enabled': enabled, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -802,7 +802,7 @@ class LegacyV2UsernameTests(object): 'enabled': enabled, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -881,7 +881,7 @@ class LegacyV2UsernameTests(object): 'enabled': enabled, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -911,7 +911,7 @@ class LegacyV2UsernameTests(object): 'enabled': enabled, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -931,7 +931,7 @@ class LegacyV2UsernameTests(object): 'enabled': True, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -956,7 +956,7 @@ class LegacyV2UsernameTests(object): 'enabled': enabled, }, }, - expected_status=http_client.OK) + expected_status=200) self.assertValidUserResponse(r) @@ -1200,7 +1200,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): method='GET', path='/v2.0/tokens/revoked', token=token, - expected_status=http_client.OK) + expected_status=200) self.assertValidRevocationListResponse(r) def assertValidRevocationListResponse(self, response): @@ -1231,7 +1231,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): method='GET', path='/v2.0/tokens/revoked', token=token1, - expected_status=http_client.OK) + expected_status=200) signed_text = r.result['signed'] data_json = cms.cms_verify(signed_text, CONF.signing.certfile, @@ -1333,7 +1333,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): }, }, }, - expected_status=http_client.OK) + expected_status=200) # ensure password doesn't leak user_id = r.result['user']['id'] @@ -1341,7 +1341,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): method='GET', path='/v2.0/users/%s' % user_id, token=token, - expected_status=http_client.OK) + expected_status=200) self.assertNotIn('OS-KSADM:password', r.result['user']) def test_updating_a_user_with_an_OSKSADM_password(self): @@ -1360,7 +1360,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): }, }, token=token, - expected_status=http_client.OK) + expected_status=200) # successfully authenticate self.public_request( @@ -1374,7 +1374,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests): }, }, }, - expected_status=http_client.OK) + expected_status=200) class RevokeApiTestCase(V2TestCase): @@ -1436,7 +1436,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): method='GET', path=path, token=admin_token, - expected_status=http_client.OK) + expected_status=200) def test_authenticate_scoped_token(self): project_ref = self.new_project_ref() @@ -1466,7 +1466,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): method='GET', path=path, token=admin_token, - expected_status=http_client.OK) + expected_status=200) def test_token_authentication_and_validation(self): """Test token authentication for Fernet token provider. @@ -1491,7 +1491,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): } } }, - expected_status=http_client.OK) + expected_status=200) token_id = self._get_token_id(r) path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id'])) @@ -1500,7 +1500,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): method='GET', path=path, token=CONF.admin_token, - expected_status=http_client.OK) + expected_status=200) def test_rescoped_tokens_maintain_original_expiration(self): project_ref = self.new_project_ref() @@ -1522,7 +1522,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): }, # NOTE(lbragstad): This test may need to be refactored if Keystone # decides to disallow rescoping using a scoped token. - expected_status=http_client.OK) + expected_status=200) original_token = resp.result['access']['token']['id'] original_expiration = resp.result['access']['token']['expires'] @@ -1537,7 +1537,7 @@ class TestFernetTokenProviderV2(RestfulTestCase): } } }, - expected_status=http_client.OK) + expected_status=200) rescoped_token = resp.result['access']['token']['id'] rescoped_expiration = resp.result['access']['token']['expires'] self.assertNotEqual(original_token, rescoped_token) diff --git a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py index 8d6d9eb7..2a3fad86 100644 --- a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py +++ b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py @@ -1137,7 +1137,7 @@ class ClientDrivenTestCase(unit.TestCase): credentials, signature = self._generate_default_user_ec2_credentials() credentials['signature'] = signature resp, token = self._send_ec2_auth_request(credentials) - self.assertEqual(http_client.OK, resp.status_code) + self.assertEqual(200, resp.status_code) self.assertIn('access', token) def test_ec2_auth_success_trust(self): @@ -1169,7 +1169,7 @@ class ClientDrivenTestCase(unit.TestCase): cred.access, cred.secret) credentials['signature'] = signature resp, token = self._send_ec2_auth_request(credentials) - self.assertEqual(http_client.OK, resp.status_code) + self.assertEqual(200, resp.status_code) self.assertEqual(trust_id, token['access']['trust']['id']) # TODO(shardy) we really want to check the roles and trustee # but because of where the stubbing happens we don't seem to diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py index 7afe6ad8..32c5e295 100644 --- a/keystone-moon/keystone/tests/unit/test_v3.py +++ b/keystone-moon/keystone/tests/unit/test_v3.py @@ -18,7 +18,6 @@ import uuid from oslo_config import cfg from oslo_serialization import jsonutils from oslo_utils import timeutils -from six.moves import http_client from testtools import matchers from keystone import auth @@ -412,7 +411,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, r = self.v3_authenticate_token(auth) return r.headers.get('X-Subject-Token') - def v3_authenticate_token(self, auth, expected_status=http_client.CREATED): + def v3_authenticate_token(self, auth, expected_status=201): return self.admin_request(method='POST', path='/v3/auth/tokens', body=auth, @@ -441,31 +440,42 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, return self.admin_request(path=path, token=token, **kwargs) - def get(self, path, expected_status=http_client.OK, **kwargs): - return self.v3_request(path, method='GET', - expected_status=expected_status, **kwargs) + def get(self, path, **kwargs): + r = self.v3_request(method='GET', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 200) + return r - def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs): - r = self.v3_request(path, method='HEAD', - expected_status=expected_status, **kwargs) + def head(self, path, **kwargs): + r = self.v3_request(method='HEAD', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) self.assertEqual('', r.body) return r - def post(self, path, expected_status=http_client.CREATED, **kwargs): - return self.v3_request(path, method='POST', - expected_status=expected_status, **kwargs) + def post(self, path, **kwargs): + r = self.v3_request(method='POST', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 201) + return r - def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs): - return self.v3_request(path, method='PUT', - expected_status=expected_status, **kwargs) + def put(self, path, **kwargs): + r = self.v3_request(method='PUT', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) + return r - def patch(self, path, expected_status=http_client.OK, **kwargs): - return self.v3_request(path, method='PATCH', - expected_status=expected_status, **kwargs) + def patch(self, path, **kwargs): + r = self.v3_request(method='PATCH', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 200) + return r - def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs): - return self.v3_request(path, method='DELETE', - expected_status=expected_status, **kwargs) + def delete(self, path, **kwargs): + r = self.v3_request(method='DELETE', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) + return r def assertValidErrorResponse(self, r): resp = r.result diff --git a/keystone-moon/keystone/tests/unit/test_v3_assignment.py b/keystone-moon/keystone/tests/unit/test_v3_assignment.py index f22e9f2b..6b15b1c3 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_assignment.py +++ b/keystone-moon/keystone/tests/unit/test_v3_assignment.py @@ -363,13 +363,14 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # validates the returned token and it should be valid. self.head('/auth/tokens', headers={'x-subject-token': subject_token}, - expected_status=http_client.OK) + expected_status=200) # now disable the domain self.domain['enabled'] = False url = "/domains/%(domain_id)s" % {'domain_id': self.domain['id']} self.patch(url, - body={'domain': {'enabled': False}}) + body={'domain': {'enabled': False}}, + expected_status=200) # validates the same token again and it should be 'not found' # as the domain has already been disabled. @@ -511,7 +512,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, ref = self.new_project_ref(domain_id=self.domain_id, is_domain=True) self.post('/projects', body={'project': ref}, - expected_status=http_client.NOT_IMPLEMENTED) + expected_status=501) @utils.wip('waiting for projects acting as domains implementation') def test_create_project_without_parent_id_and_without_domain_id(self): @@ -1289,9 +1290,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase, member_url = ('%(collection_url)s/%(role_id)s' % { 'collection_url': collection_url, 'role_id': self.role_id}) - self.put(member_url) + self.put(member_url, expected_status=204) # Check the user has the role assigned - self.head(member_url) + self.head(member_url, expected_status=204) return member_url, user_ref def test_delete_user_before_removing_role_assignment_succeeds(self): @@ -1300,7 +1301,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # Delete the user from identity backend self.identity_api.driver.delete_user(user['id']) # Clean up the role assignment - self.delete(member_url) + self.delete(member_url, expected_status=204) # Make sure the role is gone self.head(member_url, expected_status=http_client.NOT_FOUND) @@ -1343,7 +1344,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase, # validates the returned token; it should be valid. self.head('/auth/tokens', headers={'x-subject-token': token}, - expected_status=http_client.OK) + expected_status=200) # revokes the grant from group on project. self.assignment_api.delete_grant(role_id=self.role['id'], @@ -1868,7 +1869,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, self.default_user_id = self.user_ids[0] self.default_group_id = self.group_ids[0] - def get_role_assignments(self, expected_status=http_client.OK, **filters): + def get_role_assignments(self, expected_status=200, **filters): """Returns the result from querying role assignment API + queried URL. Calls GET /v3/role_assignments?<params> and returns its result, where diff --git a/keystone-moon/keystone/tests/unit/test_v3_auth.py b/keystone-moon/keystone/tests/unit/test_v3_auth.py index 496a75c0..d53a85df 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_auth.py +++ b/keystone-moon/keystone/tests/unit/test_v3_auth.py @@ -384,8 +384,9 @@ class TokenAPITests(object): v2_token = r.result['access']['token']['id'] # Delete the v2 token using v3. - self.delete( + resp = self.delete( '/auth/tokens', headers={'X-Subject-Token': v2_token}) + self.assertEqual(resp.status_code, 204) # Attempting to use the deleted token on v2 should fail. self.admin_request( @@ -405,8 +406,7 @@ class TokenAPITests(object): self.assertEqual(expires, r.result['token']['expires_at']) def test_check_token(self): - self.head('/auth/tokens', headers=self.headers, - expected_status=http_client.OK) + self.head('/auth/tokens', headers=self.headers, expected_status=200) def test_validate_token(self): r = self.get('/auth/tokens', headers=self.headers) @@ -655,13 +655,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): password=self.userAdminA['password'], domain_name=self.domainA['name'])) - self.head('/auth/tokens', headers=headers, - expected_status=http_client.OK, + self.head('/auth/tokens', headers=headers, expected_status=200, token=adminA_token) - self.head('/auth/tokens', headers=headers, - expected_status=http_client.OK, + self.head('/auth/tokens', headers=headers, expected_status=200, token=user_token) - self.delete('/auth/tokens', headers=headers, + self.delete('/auth/tokens', headers=headers, expected_status=204, token=user_token) # invalid X-Auth-Token and invalid X-Subject-Token self.head('/auth/tokens', headers=headers, @@ -695,13 +693,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): password=self.userAdminA['password'], domain_name=self.domainA['name'])) - self.head('/auth/tokens', headers=headers, - expected_status=http_client.OK, + self.head('/auth/tokens', headers=headers, expected_status=200, token=adminA_token) - self.head('/auth/tokens', headers=headers, - expected_status=http_client.OK, + self.head('/auth/tokens', headers=headers, expected_status=200, token=user_token) - self.delete('/auth/tokens', headers=headers, + self.delete('/auth/tokens', headers=headers, expected_status=204, token=adminA_token) # invalid X-Auth-Token and invalid X-Subject-Token self.head('/auth/tokens', headers=headers, @@ -868,10 +864,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm both tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, - expected_status=http_client.OK) + expected_status=200) # create a new role role = self.new_role_ref() @@ -887,10 +883,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # both tokens should remain valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, - expected_status=http_client.OK) + expected_status=200) def test_deleting_user_grant_revokes_token(self): """Test deleting a user grant revokes token. @@ -910,7 +906,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # Delete the grant, which should invalidate the token grant_url = ( '/projects/%(project_id)s/users/%(user_id)s/' @@ -1012,19 +1008,19 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': tokenA}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': tokenB}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': tokenC}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': tokenD}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': tokenE}, - expected_status=http_client.OK) + expected_status=200) # Delete the role, which should invalidate the tokens role_url = '/roles/%s' % self.role1['id'] @@ -1047,7 +1043,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # ...but the one using role2 is still valid self.head('/auth/tokens', headers={'X-Subject-Token': tokenC}, - expected_status=http_client.OK) + expected_status=200) def test_domain_user_role_assignment_maintains_token(self): """Test user-domain role assignment maintains existing token. @@ -1067,7 +1063,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # Assign a role, which should not affect the token grant_url = ( '/domains/%(domain_id)s/users/%(user_id)s/' @@ -1078,7 +1074,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) def test_disabling_project_revokes_token(self): token = self.get_requested_token( @@ -1090,7 +1086,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # disable the project, which should invalidate the token self.patch( @@ -1118,7 +1114,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # delete the project, which should invalidate the token self.delete( @@ -1167,13 +1163,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': token3}, - expected_status=http_client.OK) + expected_status=200) # Delete the group grant, which should invalidate the # tokens for user1 and user2 grant_url = ( @@ -1213,7 +1209,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # Delete the grant, which should invalidate the token grant_url = ( '/domains/%(domain_id)s/groups/%(group_id)s/' @@ -1224,7 +1220,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) def test_group_membership_changes_revokes_token(self): """Test add/removal to/from group revokes token. @@ -1254,10 +1250,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=http_client.OK) + expected_status=200) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + expected_status=200) # Remove user1 from group1, which should invalidate # the token self.delete('/groups/%(group_id)s/users/%(user_id)s' % { @@ -1269,14 +1265,14 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # But user2's token should still be valid self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + expected_status=200) # Adding user2 to a group should not invalidate token self.put('/groups/%(group_id)s/users/%(user_id)s' % { 'group_id': self.group2['id'], 'user_id': self.user2['id']}) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + expected_status=200) def test_removing_role_assignment_does_not_affect_other_users(self): """Revoking a role from one user should not affect other users.""" @@ -1320,7 +1316,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # authorization for the second user should still succeed self.head('/auth/tokens', headers={'X-Subject-Token': user3_token}, - expected_status=http_client.OK) + expected_status=200) self.v3_authenticate_token( self.build_authentication_request( user_id=self.user3['id'], @@ -1370,7 +1366,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): token = self.get_v2_token() self.delete('/auth/tokens', - headers={'X-Subject-Token': token}) + headers={'X-Subject-Token': token}, + expected_status=204) self.head('/auth/tokens', headers={'X-Subject-Token': token}, @@ -1400,7 +1397,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # revoke the project-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': project_scoped_token}) + headers={'X-Subject-Token': project_scoped_token}, + expected_status=204) # The project-scoped token is invalidated. self.head('/auth/tokens', @@ -1410,16 +1408,17 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=http_client.OK) + expected_status=200) # The domain-scoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': domain_scoped_token}, - expected_status=http_client.OK) + expected_status=200) # revoke the domain-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': domain_scoped_token}) + headers={'X-Subject-Token': domain_scoped_token}, + expected_status=204) # The domain-scoped token is invalid. self.head('/auth/tokens', @@ -1429,7 +1428,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=http_client.OK) + expected_status=200) def test_revoke_token_from_token_v2(self): # Test that a scoped token can be requested from an unscoped token, @@ -1447,7 +1446,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # revoke the project-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': project_scoped_token}) + headers={'X-Subject-Token': project_scoped_token}, + expected_status=204) # The project-scoped token is invalidated. self.head('/auth/tokens', @@ -1457,7 +1457,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=http_client.OK) + expected_status=200) class TestTokenRevokeByAssignment(TestTokenRevokeById): @@ -1501,7 +1501,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById): # authorization for the projectA should still succeed self.head('/auth/tokens', headers={'X-Subject-Token': other_project_token}, - expected_status=http_client.OK) + expected_status=200) # while token for the projectB should not self.head('/auth/tokens', headers={'X-Subject-Token': project_token}, @@ -1563,24 +1563,27 @@ class TestTokenRevokeApi(TestTokenRevokeById): def test_revoke_token(self): scoped_token = self.get_scoped_token() headers = {'X-Subject-Token': scoped_token} - response = self.get('/auth/tokens', headers=headers).json_body['token'] + response = self.get('/auth/tokens', headers=headers, + expected_status=200).json_body['token'] - self.delete('/auth/tokens', headers=headers) + self.delete('/auth/tokens', headers=headers, expected_status=204) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - events_response = self.get('/OS-REVOKE/events').json_body + events_response = self.get('/OS-REVOKE/events', + expected_status=200).json_body self.assertValidRevokedTokenResponse(events_response, audit_id=response['audit_ids'][0]) def test_revoke_v2_token(self): token = self.get_v2_token() headers = {'X-Subject-Token': token} - response = self.get('/auth/tokens', - headers=headers).json_body['token'] - self.delete('/auth/tokens', headers=headers) + response = self.get('/auth/tokens', headers=headers, + expected_status=200).json_body['token'] + self.delete('/auth/tokens', headers=headers, expected_status=204) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - events_response = self.get('/OS-REVOKE/events').json_body + events_response = self.get('/OS-REVOKE/events', + expected_status=200).json_body self.assertValidRevokedTokenResponse( events_response, @@ -1592,24 +1595,28 @@ class TestTokenRevokeApi(TestTokenRevokeById): def test_list_delete_project_shows_in_event_list(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events').json_body['events'] + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body['events'] self.assertEqual([], events) self.delete( '/projects/%(project_id)s' % {'project_id': self.projectA['id']}) - events_response = self.get('/OS-REVOKE/events').json_body + events_response = self.get('/OS-REVOKE/events', + expected_status=200).json_body self.assertValidDeletedProjectResponse(events_response, self.projectA['id']) def test_disable_domain_shows_in_event_list(self): - events = self.get('/OS-REVOKE/events').json_body['events'] + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body['events'] self.assertEqual([], events) disable_body = {'domain': {'enabled': False}} self.patch( '/domains/%(project_id)s' % {'project_id': self.domainA['id']}, body=disable_body) - events = self.get('/OS-REVOKE/events').json_body + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body self.assertDomainInList(events, self.domainA['id']) @@ -1639,7 +1646,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): def test_list_delete_token_shows_in_event_list(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events').json_body['events'] + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body['events'] self.assertEqual([], events) scoped_token = self.get_scoped_token() @@ -1653,17 +1661,15 @@ class TestTokenRevokeApi(TestTokenRevokeById): response.json_body['token'] headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']} - self.head('/auth/tokens', headers=headers, - expected_status=http_client.OK) - self.head('/auth/tokens', headers=headers2, - expected_status=http_client.OK) - self.head('/auth/tokens', headers=headers3, - expected_status=http_client.OK) + self.head('/auth/tokens', headers=headers, expected_status=200) + self.head('/auth/tokens', headers=headers2, expected_status=200) + self.head('/auth/tokens', headers=headers3, expected_status=200) - self.delete('/auth/tokens', headers=headers) + self.delete('/auth/tokens', headers=headers, expected_status=204) # NOTE(ayoung): not deleting token3, as it should be deleted # by previous - events_response = self.get('/OS-REVOKE/events').json_body + events_response = self.get('/OS-REVOKE/events', + expected_status=200).json_body events = events_response['events'] self.assertEqual(1, len(events)) self.assertEventDataInList( @@ -1671,32 +1677,32 @@ class TestTokenRevokeApi(TestTokenRevokeById): audit_id=token2['audit_ids'][1]) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - self.head('/auth/tokens', headers=headers2, - expected_status=http_client.OK) - self.head('/auth/tokens', headers=headers3, - expected_status=http_client.OK) + self.head('/auth/tokens', headers=headers2, expected_status=200) + self.head('/auth/tokens', headers=headers3, expected_status=200) def test_list_with_filter(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events').json_body['events'] + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body['events'] self.assertEqual(0, len(events)) scoped_token = self.get_scoped_token() headers = {'X-Subject-Token': scoped_token} auth = self.build_authentication_request(token=scoped_token) headers2 = {'X-Subject-Token': self.get_requested_token(auth)} - self.delete('/auth/tokens', headers=headers) - self.delete('/auth/tokens', headers=headers2) + self.delete('/auth/tokens', headers=headers, expected_status=204) + self.delete('/auth/tokens', headers=headers2, expected_status=204) - events = self.get('/OS-REVOKE/events').json_body['events'] + events = self.get('/OS-REVOKE/events', + expected_status=200).json_body['events'] self.assertEqual(2, len(events)) future = utils.isotime(timeutils.utcnow() + datetime.timedelta(seconds=1000)) - events = self.get('/OS-REVOKE/events?since=%s' % (future) - ).json_body['events'] + events = self.get('/OS-REVOKE/events?since=%s' % (future), + expected_status=200).json_body['events'] self.assertEqual(0, len(events)) @@ -3106,7 +3112,8 @@ class TestTrustChain(test_v3.RestfulTestCase): def test_delete_trust_cascade(self): self.assert_user_authenticate(self.user_chain[0]) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[0]['id']}) + 'trust_id': self.trust_chain[0]['id']}, + expected_status=204) headers = {'X-Subject-Token': self.last_token} self.head('/auth/tokens', headers=headers, @@ -3116,10 +3123,12 @@ class TestTrustChain(test_v3.RestfulTestCase): def test_delete_broken_chain(self): self.assert_user_authenticate(self.user_chain[0]) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[1]['id']}) + 'trust_id': self.trust_chain[1]['id']}, + expected_status=204) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[0]['id']}) + 'trust_id': self.trust_chain[0]['id']}, + expected_status=204) def test_trustor_roles_revoked(self): self.assert_user_authenticate(self.user_chain[0]) @@ -3214,7 +3223,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): # make sure the trust exists trust = self.assertValidTrustResponse(r, ref) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) # get a token for the trustee auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], @@ -3232,7 +3242,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self._initialize_test_consume_trust(2) # check decremented value r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) trust = r.result.get('trust') self.assertIsNotNone(trust) self.assertEqual(1, trust['remaining_uses']) @@ -3310,7 +3321,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self.assertValidTrustResponse(r, ref) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password']) @@ -3321,7 +3333,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_id=trust['id']) r = self.v3_authenticate_token(auth_data) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) trust = r.result.get('trust') self.assertIsNone(trust['remaining_uses']) @@ -3335,27 +3348,30 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self.assertValidTrustResponse(r, ref) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=200) self.assertValidTrustResponse(r, ref) # validate roles on the trust r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles' % { - 'trust_id': trust['id']}) + 'trust_id': trust['id']}, + expected_status=200) roles = self.assertValidRoleListResponse(r, self.role) self.assertIn(self.role['id'], [x['id'] for x in roles]) self.head( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, - expected_status=http_client.OK) + expected_status=200) r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], - 'role_id': self.role['id']}) + 'role_id': self.role['id']}, + expected_status=200) self.assertValidRoleResponse(r, self.role) - r = self.get('/OS-TRUST/trusts') + r = self.get('/OS-TRUST/trusts', expected_status=200) self.assertValidTrustListResponse(r, trust) # trusts are immutable @@ -3365,7 +3381,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): expected_status=http_client.NOT_FOUND) self.delete( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, + expected_status=204) self.get( '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, @@ -3554,7 +3571,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): path = '/v2.0/tokens/%s' % (token) self.admin_request( path=path, token=CONF.admin_token, - method='GET', expected_status=http_client.OK) + method='GET', expected_status=200) def test_exercise_trust_scoped_token_without_impersonation(self): ref = self.new_trust_ref( @@ -3758,7 +3775,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): expected_status=http_client.FORBIDDEN) def assertTrustTokensRevoked(self, trust_id): - revocation_response = self.get('/OS-REVOKE/events') + revocation_response = self.get('/OS-REVOKE/events', + expected_status=200) revocation_events = revocation_response.json_body['events'] found = False for event in revocation_events: @@ -3787,7 +3805,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): r, self.trustee_user) trust_token = r.headers['X-Subject-Token'] self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust_id}) + 'trust_id': trust_id}, + expected_status=204) headers = {'X-Subject-Token': trust_token} self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) @@ -3814,7 +3833,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data) + self.v3_authenticate_token(auth_data, expected_status=201) self.disable_user(self.user) @@ -3842,7 +3861,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data) + self.v3_authenticate_token(auth_data, expected_status=201) self.disable_user(self.trustee_user) @@ -3867,7 +3886,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self.assertValidTrustResponse(r, ref) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust['id']}) + 'trust_id': trust['id']}, + expected_status=204) self.get('/OS-TRUST/trusts/%(trust_id)s' % { 'trust_id': trust['id']}, @@ -3897,19 +3917,19 @@ class TestTrustAuth(test_v3.RestfulTestCase): r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) - r = self.get('/OS-TRUST/trusts') + r = self.get('/OS-TRUST/trusts', expected_status=200) trusts = r.result['trusts'] self.assertEqual(3, len(trusts)) self.assertValidTrustListResponse(r) r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' % - self.user_id) + self.user_id, expected_status=200) trusts = r.result['trusts'] self.assertEqual(3, len(trusts)) self.assertValidTrustListResponse(r) r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' % - self.user_id) + self.user_id, expected_status=200) trusts = r.result['trusts'] self.assertEqual(0, len(trusts)) @@ -3935,11 +3955,13 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_token = r.headers.get('X-Subject-Token') self.get('/OS-TRUST/trusts?trustor_user_id=%s' % - self.user_id, token=trust_token) + self.user_id, expected_status=200, + token=trust_token) self.assertValidUserResponse( self.patch('/users/%s' % self.trustee_user['id'], - body={'user': {'password': uuid.uuid4().hex}})) + body={'user': {'password': uuid.uuid4().hex}}, + expected_status=200)) self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id, expected_status=http_client.UNAUTHORIZED, @@ -3971,13 +3993,14 @@ class TestTrustAuth(test_v3.RestfulTestCase): 'trust_id': trust['id'], 'role_id': self.role['id']}, auth=auth_data, - expected_status=http_client.OK) + expected_status=200) r = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, - auth=auth_data) + auth=auth_data, + expected_status=200) self.assertValidRoleResponse(r, self.role) def test_do_not_consume_remaining_uses_when_get_token_fails(self): @@ -4022,7 +4045,7 @@ class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase): 'query_string': {}, 'environment': {}} r = auth_controller.validate_token(context) - self.assertEqual(http_client.OK, r.status_code) + self.assertEqual(200, r.status_code) class TestAuthContext(unit.TestCase): @@ -4082,7 +4105,9 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): def test_get_catalog_project_scoped_token(self): """Call ``GET /auth/catalog`` with a project-scoped token.""" - r = self.get('/auth/catalog') + r = self.get( + '/auth/catalog', + expected_status=200) self.assertValidCatalogResponse(r) def test_get_catalog_domain_scoped_token(self): @@ -4116,7 +4141,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): expected_status=http_client.UNAUTHORIZED) def test_get_projects_project_scoped_token(self): - r = self.get('/auth/projects') + r = self.get('/auth/projects', expected_status=200) self.assertThat(r.json['projects'], matchers.HasLength(1)) self.assertValidProjectListResponse(r) @@ -4124,7 +4149,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): self.put(path='/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id'])) - r = self.get('/auth/domains') + r = self.get('/auth/domains', expected_status=200) self.assertThat(r.json['domains'], matchers.HasLength(1)) self.assertValidDomainListResponse(r) @@ -4135,7 +4160,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) def _make_auth_request(self, auth_data): - resp = self.post('/auth/tokens', body=auth_data) + resp = self.post('/auth/tokens', body=auth_data, expected_status=201) token = resp.headers.get('X-Subject-Token') self.assertLess(len(token), 255) return token @@ -4167,13 +4192,13 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): trust_id=trust['id']) return self._make_auth_request(auth_data) - def _validate_token(self, token, expected_status=http_client.OK): + def _validate_token(self, token, expected_status=200): return self.get( '/auth/tokens', headers={'X-Subject-Token': token}, expected_status=expected_status) - def _revoke_token(self, token, expected_status=http_client.NO_CONTENT): + def _revoke_token(self, token, expected_status=204): return self.delete( '/auth/tokens', headers={'X-Subject-Token': token}, @@ -4547,8 +4572,7 @@ class TestAuthFernetTokenProvider(TestAuth): self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) # Bind not current supported by Fernet, see bug 1433311. - self.v3_authenticate_token(auth_data, - expected_status=http_client.NOT_IMPLEMENTED) + self.v3_authenticate_token(auth_data, expected_status=501) def test_v2_v3_bind_token_intermix(self): self.config_fixture.config(group='token', bind='kerberos') @@ -4563,7 +4587,7 @@ class TestAuthFernetTokenProvider(TestAuth): self.admin_request(path='/v2.0/tokens', method='POST', body=body, - expected_status=http_client.NOT_IMPLEMENTED) + expected_status=501) def test_auth_with_bind_token(self): self.config_fixture.config(group='token', bind=['kerberos']) @@ -4573,5 +4597,4 @@ class TestAuthFernetTokenProvider(TestAuth): self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) # Bind not current supported by Fernet, see bug 1433311. - self.v3_authenticate_token(auth_data, - expected_status=http_client.NOT_IMPLEMENTED) + self.v3_authenticate_token(auth_data, expected_status=501) diff --git a/keystone-moon/keystone/tests/unit/test_v3_catalog.py b/keystone-moon/keystone/tests/unit/test_v3_catalog.py index 0d82390d..c536169a 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_catalog.py +++ b/keystone-moon/keystone/tests/unit/test_v3_catalog.py @@ -36,7 +36,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): r = self.put( '/regions/%s' % region_id, body={'region': ref}, - expected_status=http_client.CREATED) + expected_status=201) self.assertValidRegionResponse(r, ref) # Double-check that the region ID was kept as-is and not # populated with a UUID, as is the case with POST /v3/regions @@ -49,7 +49,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): r = self.put( '/regions/%s' % region_id, body={'region': ref}, - expected_status=http_client.CREATED) + expected_status=201) self.assertValidRegionResponse(r, ref) # Double-check that the region ID was kept as-is and not # populated with a UUID, as is the case with POST /v3/regions @@ -60,7 +60,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref = dict(description="my region") self.put( '/regions/myregion', - body={'region': ref}, expected_status=http_client.CREATED) + body={'region': ref}, expected_status=201) # Create region again with duplicate id self.put( '/regions/myregion', @@ -86,7 +86,9 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref = self.new_region_ref() ref['id'] = '' - r = self.post('/regions', body={'region': ref}) + r = self.post( + '/regions', + body={'region': ref}, expected_status=201) self.assertValidRegionResponse(r, ref) self.assertNotEmpty(r.result['region'].get('id')) @@ -98,7 +100,10 @@ class CatalogTestCase(test_v3.RestfulTestCase): del ref['id'] # let the service define the ID - r = self.post('/regions', body={'region': ref}) + r = self.post( + '/regions', + body={'region': ref}, + expected_status=201) self.assertValidRegionResponse(r, ref) def test_create_region_without_description(self): @@ -107,7 +112,10 @@ class CatalogTestCase(test_v3.RestfulTestCase): del ref['description'] - r = self.post('/regions', body={'region': ref}) + r = self.post( + '/regions', + body={'region': ref}, + expected_status=201) # Create the description in the reference to compare to since the # response should now have a description, even though we didn't send # it with the original reference. @@ -127,10 +135,16 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref1['description'] = region_desc ref2['description'] = region_desc - resp1 = self.post('/regions', body={'region': ref1}) + resp1 = self.post( + '/regions', + body={'region': ref1}, + expected_status=201) self.assertValidRegionResponse(resp1, ref1) - resp2 = self.post('/regions', body={'region': ref2}) + resp2 = self.post( + '/regions', + body={'region': ref2}, + expected_status=201) self.assertValidRegionResponse(resp2, ref2) def test_create_regions_without_descriptions(self): @@ -145,9 +159,15 @@ class CatalogTestCase(test_v3.RestfulTestCase): del ref1['description'] ref2['description'] = None - resp1 = self.post('/regions', body={'region': ref1}) + resp1 = self.post( + '/regions', + body={'region': ref1}, + expected_status=201) - resp2 = self.post('/regions', body={'region': ref2}) + resp2 = self.post( + '/regions', + body={'region': ref2}, + expected_status=201) # Create the descriptions in the references to compare to since the # responses should now have descriptions, even though we didn't send # a description with the original references. @@ -211,14 +231,16 @@ class CatalogTestCase(test_v3.RestfulTestCase): """Call ``PATCH /regions/{region_id}``.""" region_ref = self.new_region_ref() - resp = self.post('/regions', body={'region': region_ref}) + resp = self.post('/regions', body={'region': region_ref}, + expected_status=201) region_updates = { # update with something that's not the description 'parent_region_id': self.region_id, } resp = self.patch('/regions/%s' % region_ref['id'], - body={'region': region_updates}) + body={'region': region_updates}, + expected_status=200) # NOTE(dstanek): Keystone should keep the original description. self.assertEqual(region_ref['description'], @@ -591,7 +613,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref = self.new_endpoint_ref(service_id=self.service_id) ref["region"] = uuid.uuid4().hex ref.pop('region_id') - self.post('/endpoints', body={'endpoint': ref}) + self.post('/endpoints', body={'endpoint': ref}, expected_status=201) # Make sure the region is created self.get('/regions/%(region_id)s' % { 'region_id': ref["region"]}) @@ -600,7 +622,7 @@ class CatalogTestCase(test_v3.RestfulTestCase): """EndpointV3 allows to creates the endpoint without region.""" ref = self.new_endpoint_ref(service_id=self.service_id) ref.pop('region_id') - self.post('/endpoints', body={'endpoint': ref}) + self.post('/endpoints', body={'endpoint': ref}, expected_status=201) def test_create_endpoint_with_empty_url(self): """Call ``POST /endpoints``.""" @@ -756,7 +778,9 @@ class CatalogTestCase(test_v3.RestfulTestCase): ref = self.new_endpoint_ref(self.service_id) ref['url'] = valid_url - self.post('/endpoints', body={'endpoint': ref}) + self.post('/endpoints', + body={'endpoint': ref}, + expected_status=201) def test_endpoint_create_with_invalid_url(self): """Test the invalid cases: substitutions is not exactly right. diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py index cf504b00..dd8cf2dd 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_credential.py +++ b/keystone-moon/keystone/tests/unit/test_v3_credential.py @@ -382,7 +382,7 @@ class TestCredentialEc2(CredentialBaseTestCase): r = self.post( '/ec2tokens', body={'ec2Credentials': sig_ref}, - expected_status=http_client.OK) + expected_status=200) self.assertValidTokenResponse(r) def test_ec2_credential_signature_validate(self): diff --git a/keystone-moon/keystone/tests/unit/test_v3_domain_config.py b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py index 3f7af87d..701cd3cf 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_domain_config.py +++ b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py @@ -40,7 +40,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): url = '/domains/%(domain_id)s/config' % { 'domain_id': self.domain['id']} r = self.put(url, body={'config': self.config}, - expected_status=http_client.CREATED) + expected_status=201) res = self.domain_config_api.get_config(self.domain['id']) self.assertEqual(self.config, r.result['config']) self.assertEqual(self.config, res) @@ -50,11 +50,11 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): self.put('/domains/%(domain_id)s/config' % { 'domain_id': self.domain['id']}, body={'config': self.config}, - expected_status=http_client.CREATED) + expected_status=201) self.put('/domains/%(domain_id)s/config' % { 'domain_id': self.domain['id']}, body={'config': self.config}, - expected_status=http_client.OK) + expected_status=200) def test_delete_config(self): """Call ``DELETE /domains{domain_id}/config``.""" @@ -80,7 +80,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): 'domain_id': self.domain['id']} r = self.get(url) self.assertEqual(self.config, r.result['config']) - self.head(url, expected_status=http_client.OK) + self.head(url, expected_status=200) def test_get_config_by_group(self): """Call ``GET & HEAD /domains{domain_id}/config/{group}``.""" @@ -89,7 +89,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): 'domain_id': self.domain['id']} r = self.get(url) self.assertEqual({'ldap': self.config['ldap']}, r.result['config']) - self.head(url, expected_status=http_client.OK) + self.head(url, expected_status=200) def test_get_config_by_option(self): """Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``.""" @@ -99,7 +99,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase): r = self.get(url) self.assertEqual({'url': self.config['ldap']['url']}, r.result['config']) - self.head(url, expected_status=http_client.OK) + self.head(url, expected_status=200) def test_get_non_existant_config(self): """Call ``GET /domains{domain_id}/config when no config defined``.""" diff --git a/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py b/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py index b0c8256e..3423d2d8 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py +++ b/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py @@ -53,14 +53,12 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): url, expected_status=http_client.NOT_FOUND) - self.put(url) + self.put(url, expected_status=204) # test that the new resource is accessible. - self.assert_head_and_get_return_same_response( - url, - expected_status=http_client.NO_CONTENT) + self.assert_head_and_get_return_same_response(url, expected_status=204) - self.delete(url) + self.delete(url, expected_status=204) # test that the deleted resource is no longer accessible self.assert_head_and_get_return_same_response( @@ -101,16 +99,18 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.put('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' '/endpoints/%(endpoint_id)s' % { 'policy_id': self.policy['id'], - 'endpoint_id': self.endpoint['id']}) + 'endpoint_id': self.endpoint['id']}, + expected_status=204) self.head('/endpoints/%(endpoint_id)s/OS-ENDPOINT-POLICY' '/policy' % { 'endpoint_id': self.endpoint['id']}, - expected_status=http_client.OK) + expected_status=200) r = self.get('/endpoints/%(endpoint_id)s/OS-ENDPOINT-POLICY' '/policy' % { - 'endpoint_id': self.endpoint['id']}) + 'endpoint_id': self.endpoint['id']}, + expected_status=200) self.assertValidPolicyResponse(r, ref=self.policy) def test_list_endpoints_for_policy(self): @@ -119,11 +119,13 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): self.put('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' '/endpoints/%(endpoint_id)s' % { 'policy_id': self.policy['id'], - 'endpoint_id': self.endpoint['id']}) + 'endpoint_id': self.endpoint['id']}, + expected_status=204) r = self.get('/policies/%(policy_id)s/OS-ENDPOINT-POLICY' '/endpoints' % { - 'policy_id': self.policy['id']}) + 'policy_id': self.policy['id']}, + expected_status=200) self.assertValidEndpointListResponse(r, ref=self.endpoint) self.assertThat(r.result.get('endpoints'), matchers.HasLength(1)) @@ -133,8 +135,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): 'policy_id': self.policy['id'], 'endpoint_id': self.endpoint['id']} - self.put(url) - self.head(url) + self.put(url, expected_status=204) + self.head(url, expected_status=204) self.delete('/endpoints/%(endpoint_id)s' % { 'endpoint_id': self.endpoint['id']}) @@ -148,8 +150,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): 'service_id': self.service['id'], 'region_id': self.region['id']} - self.put(url) - self.head(url) + self.put(url, expected_status=204) + self.head(url, expected_status=204) self.delete('/regions/%(region_id)s' % { 'region_id': self.region['id']}) @@ -163,8 +165,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): 'service_id': self.service['id'], 'region_id': self.region['id']} - self.put(url) - self.head(url) + self.put(url, expected_status=204) + self.head(url, expected_status=204) self.delete('/services/%(service_id)s' % { 'service_id': self.service['id']}) @@ -177,8 +179,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): 'policy_id': self.policy['id'], 'service_id': self.service['id']} - self.put(url) - self.get(url, expected_status=http_client.NO_CONTENT) + self.put(url, expected_status=204) + self.get(url, expected_status=204) self.delete('/policies/%(policy_id)s' % { 'policy_id': self.policy['id']}) @@ -191,8 +193,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase): 'policy_id': self.policy['id'], 'service_id': self.service['id']} - self.put(url) - self.get(url, expected_status=http_client.NO_CONTENT) + self.put(url, expected_status=204) + self.get(url, expected_status=204) self.delete('/services/%(service_id)s' % { 'service_id': self.service['id']}) diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py index 5717e67b..4d7dcaab 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_federation.py +++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py @@ -815,7 +815,7 @@ class FederatedIdentityProviderTests(FederationTests): if body is None: body = self._http_idp_input() resp = self.put(url, body={'identity_provider': body}, - expected_status=http_client.CREATED) + expected_status=201) return resp def _http_idp_input(self, **kwargs): @@ -1027,7 +1027,7 @@ class FederatedIdentityProviderTests(FederationTests): url = self.base_url(suffix=uuid.uuid4().hex) body = self._http_idp_input() self.put(url, body={'identity_provider': body}, - expected_status=http_client.CREATED) + expected_status=201) self.put(url, body={'identity_provider': body}, expected_status=http_client.CONFLICT) @@ -1084,7 +1084,7 @@ class FederatedIdentityProviderTests(FederationTests): idp_url = self.base_url(suffix=idp_id) # assign protocol to IdP - kwargs = {'expected_status': http_client.CREATED} + kwargs = {'expected_status': 201} resp, idp_id, proto = self._assign_protocol_to_idp( url=url, idp_id=idp_id, @@ -1179,7 +1179,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_assign_protocol_to_idp(self): """Assign a protocol to existing IdP.""" - self._assign_protocol_to_idp(expected_status=http_client.CREATED) + self._assign_protocol_to_idp(expected_status=201) def test_protocol_composite_pk(self): """Test whether Keystone let's add two entities with identical @@ -1193,7 +1193,7 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': http_client.CREATED} + kwargs = {'expected_status': 201} self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) @@ -1209,7 +1209,7 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': http_client.CREATED} + kwargs = {'expected_status': 201} resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) kwargs = {'expected_status': http_client.CONFLICT} @@ -1235,8 +1235,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_get_protocol(self): """Create and later fetch protocol tied to IdP.""" - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] url = "%s/protocols/%s" % (idp_id, proto_id) url = self.base_url(suffix=url) @@ -1255,14 +1254,12 @@ class FederatedIdentityProviderTests(FederationTests): Compare input and output id sets. """ - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) iterations = random.randint(0, 16) protocol_ids = [] for _ in range(iterations): - resp, _, proto = self._assign_protocol_to_idp( - idp_id=idp_id, - expected_status=http_client.CREATED) + resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id, + expected_status=201) proto_id = self._fetch_attribute_from_response(resp, 'protocol') proto_id = proto_id['id'] protocol_ids.append(proto_id) @@ -1281,8 +1278,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_protocols_attribute(self): """Update protocol's attribute.""" - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) new_mapping_id = uuid.uuid4().hex url = "%s/protocols/%s" % (idp_id, proto) @@ -1303,8 +1299,7 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='/%(idp_id)s/' 'protocols/%(protocol_id)s') - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) url = url % {'idp_id': idp_id, 'protocol_id': proto} self.delete(url) @@ -1345,7 +1340,7 @@ class MappingCRUDTests(FederationTests): url = self.MAPPING_URL + uuid.uuid4().hex resp = self.put(url, body={'mapping': mapping_fixtures.MAPPING_LARGE}, - expected_status=http_client.CREATED) + expected_status=201) return resp def _get_id_from_response(self, resp): @@ -1362,7 +1357,7 @@ class MappingCRUDTests(FederationTests): resp = self.get(url) entities = resp.result.get('mappings') self.assertIsNotNone(entities) - self.assertResponseStatus(resp, http_client.OK) + self.assertResponseStatus(resp, 200) self.assertValidListLinks(resp.result.get('links')) self.assertEqual(1, len(entities)) @@ -1372,7 +1367,7 @@ class MappingCRUDTests(FederationTests): mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': str(mapping_id)} resp = self.delete(url) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) self.get(url, expected_status=http_client.NOT_FOUND) def test_mapping_get(self): @@ -1976,8 +1971,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): token_id, 'project', self.project_all['id']) - self.v3_authenticate_token( - scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR) + self.v3_authenticate_token(scoped_token, expected_status=500) def test_lists_with_missing_group_in_backend(self): """Test a mapping that points to a group that does not exist @@ -2529,7 +2523,7 @@ class SAMLGenerationTests(FederationTests): self.sp = self.sp_ref() url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID self.put(url, body={'service_provider': self.sp}, - expected_status=http_client.CREATED) + expected_status=201) def test_samlize_token_values(self): """Test the SAML generator produces a SAML object. @@ -2763,7 +2757,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.SAML_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=http_client.OK) + expected_status=200) response = etree.fromstring(http_response.result) issuer = response[0] @@ -2879,7 +2873,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=http_client.OK) + expected_status=200) env_response = etree.fromstring(http_response.result) header = env_response[0] @@ -3079,13 +3073,13 @@ class IdPMetadataGenerationTests(FederationTests): self.generator.generate_metadata) def test_get_metadata_with_no_metadata_file_configured(self): - self.get(self.METADATA_URL, - expected_status=http_client.INTERNAL_SERVER_ERROR) + self.get(self.METADATA_URL, expected_status=500) def test_get_metadata(self): self.config_fixture.config( group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml') - r = self.get(self.METADATA_URL, response_content_type='text/xml') + r = self.get(self.METADATA_URL, response_content_type='text/xml', + expected_status=200) self.assertEqual('text/xml', r.headers.get('Content-Type')) reference_file = _load_xml('idp_saml2_metadata.xml') @@ -3108,7 +3102,7 @@ class ServiceProviderTests(FederationTests): self.SP_REF = self.sp_ref() self.SERVICE_PROVIDER = self.put( url, body={'service_provider': self.SP_REF}, - expected_status=http_client.CREATED).result + expected_status=201).result def sp_ref(self): ref = { @@ -3127,7 +3121,7 @@ class ServiceProviderTests(FederationTests): def test_get_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.get(url) + resp = self.get(url, expected_status=200) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) @@ -3139,7 +3133,7 @@ class ServiceProviderTests(FederationTests): url = self.base_url(suffix=uuid.uuid4().hex) sp = self.sp_ref() resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) + expected_status=201) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) @@ -3149,7 +3143,7 @@ class ServiceProviderTests(FederationTests): sp = self.sp_ref() del sp['relay_state_prefix'] resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) + expected_status=201) sp_result = resp.result['service_provider'] self.assertEqual(CONF.saml.relay_state_prefix, sp_result['relay_state_prefix']) @@ -3161,7 +3155,7 @@ class ServiceProviderTests(FederationTests): non_default_prefix = uuid.uuid4().hex sp['relay_state_prefix'] = non_default_prefix resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) + expected_status=201) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) @@ -3188,8 +3182,7 @@ class ServiceProviderTests(FederationTests): } for id, sp in ref_service_providers.items(): url = self.base_url(suffix=id) - self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) + self.put(url, body={'service_provider': sp}, expected_status=201) # Insert ids into service provider object, we will compare it with # responses from server and those include 'id' attribute. @@ -3216,14 +3209,15 @@ class ServiceProviderTests(FederationTests): """ new_sp_ref = self.sp_ref() url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}) + resp = self.patch(url, body={'service_provider': new_sp_ref}, + expected_status=200) patch_result = resp.result new_sp_ref['id'] = self.SERVICE_PROVIDER_ID self.assertValidEntity(patch_result['service_provider'], ref=new_sp_ref, keys_to_check=self.SP_KEYS) - resp = self.get(url) + resp = self.get(url, expected_status=200) get_result = resp.result self.assertDictEqual(patch_result['service_provider'], @@ -3261,14 +3255,15 @@ class ServiceProviderTests(FederationTests): non_default_prefix = uuid.uuid4().hex new_sp_ref['relay_state_prefix'] = non_default_prefix url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}) + resp = self.patch(url, body={'service_provider': new_sp_ref}, + expected_status=200) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) def test_delete_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.delete(url) + self.delete(url, expected_status=204) def test_delete_service_provider_404(self): url = self.base_url(suffix=uuid.uuid4().hex) diff --git a/keystone-moon/keystone/tests/unit/test_v3_identity.py b/keystone-moon/keystone/tests/unit/test_v3_identity.py index 3d424cea..5a8e4fd5 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_identity.py +++ b/keystone-moon/keystone/tests/unit/test_v3_identity.py @@ -295,17 +295,18 @@ class IdentityTestCase(test_v3.RestfulTestCase): old_password_auth = self.build_authentication_request( user_id=user_ref['id'], password=password) - r = self.v3_authenticate_token(old_password_auth) + r = self.v3_authenticate_token(old_password_auth, expected_status=201) old_token = r.headers.get('X-Subject-Token') # auth as user with a token should work before a password change old_token_auth = self.build_authentication_request(token=old_token) - self.v3_authenticate_token(old_token_auth) + self.v3_authenticate_token(old_token_auth, expected_status=201) # administrative password reset new_password = uuid.uuid4().hex self.patch('/users/%s' % user_ref['id'], - body={'user': {'password': new_password}}) + body={'user': {'password': new_password}}, + expected_status=200) # auth as user with original password should not work after change self.v3_authenticate_token(old_password_auth, @@ -319,7 +320,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): new_password_auth = self.build_authentication_request( user_id=user_ref['id'], password=new_password) - self.v3_authenticate_token(new_password_auth) + self.v3_authenticate_token(new_password_auth, expected_status=201) def test_update_user_domain_id(self): """Call ``PATCH /users/{user_id}`` with domain_id.""" @@ -370,7 +371,7 @@ class IdentityTestCase(test_v3.RestfulTestCase): # Confirm token is valid for now self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + expected_status=200) # Now delete the user self.delete('/users/%(user_id)s' % { @@ -473,7 +474,8 @@ class IdentityTestCase(test_v3.RestfulTestCase): # administrative password reset new_password = uuid.uuid4().hex self.patch('/users/%s' % user_ref['id'], - body={'user': {'password': new_password}}) + body={'user': {'password': new_password}}, + expected_status=200) self.assertNotIn(password, log_fix.output) self.assertNotIn(new_password, log_fix.output) @@ -559,8 +561,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): password = self.user_ref['password'] self.user_ref = self.identity_api.create_user(self.user_ref) self.user_ref['password'] = password - self.token = self.get_request_token(self.user_ref['password'], - http_client.CREATED) + self.token = self.get_request_token(self.user_ref['password'], 201) def get_request_token(self, password, expected_status): auth_data = self.build_authentication_request( @@ -580,16 +581,16 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): def test_changing_password(self): # original password works token_id = self.get_request_token(self.user_ref['password'], - expected_status=http_client.CREATED) + expected_status=201) # original token works old_token_auth = self.build_authentication_request(token=token_id) - self.v3_authenticate_token(old_token_auth) + self.v3_authenticate_token(old_token_auth, expected_status=201) # change password new_password = uuid.uuid4().hex self.change_password(password=new_password, original_password=self.user_ref['password'], - expected_status=http_client.NO_CONTENT) + expected_status=204) # old password fails self.get_request_token(self.user_ref['password'], @@ -600,8 +601,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): expected_status=http_client.NOT_FOUND) # new password works - self.get_request_token(new_password, - expected_status=http_client.CREATED) + self.get_request_token(new_password, expected_status=201) def test_changing_password_with_missing_original_password_fails(self): r = self.change_password(password=uuid.uuid4().hex, @@ -640,7 +640,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase): new_password = uuid.uuid4().hex self.change_password(password=new_password, original_password=self.user_ref['password'], - expected_status=http_client.NO_CONTENT) + expected_status=204) self.assertNotIn(self.user_ref['password'], log_fix.output) self.assertNotIn(new_password, log_fix.output) diff --git a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py index 3a0d481c..8794a426 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py +++ b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py @@ -140,7 +140,7 @@ class ConsumerCRUDTests(OAuth1Tests): consumer = self._create_single_consumer() consumer_id = consumer['id'] resp = self.delete(self.CONSUMER_URL + '/%s' % consumer_id) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) def test_consumer_get(self): consumer = self._create_single_consumer() @@ -262,7 +262,7 @@ class OAuthFlowTests(OAuth1Tests): url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) + resp = self.put(url, body=body, expected_status=200) self.verifier = resp.result['token']['oauth_verifier'] self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier)) self.assertEqual(8, len(self.verifier)) @@ -357,7 +357,7 @@ class AccessTokenCRUDTests(OAuthFlowTests): resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' % {'user': self.user_id, 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) # List access_token should be 0 resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' @@ -400,7 +400,7 @@ class AuthTokenTests(OAuthFlowTests): resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' % {'user': self.user_id, 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) # Check Keystone Token no longer exists headers = {'X-Subject-Token': self.keystone_token_id, @@ -415,7 +415,7 @@ class AuthTokenTests(OAuthFlowTests): consumer_id = self.consumer['key'] resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s' % {'consumer_id': consumer_id}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) # List access_token should be 0 resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens' @@ -645,7 +645,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) + resp = self.put(url, body=body, expected_status=200) verifier = resp.result['token']['oauth_verifier'] self.assertIsNotNone(verifier) @@ -719,7 +719,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) + resp = self.put(url, body=body, expected_status=200) self.verifier = resp.result['token']['oauth_verifier'] self.request_token.set_verifier(self.verifier) @@ -753,8 +753,7 @@ class MaliciousOAuth1Tests(OAuth1Tests): # NOTE(stevemar): To simulate this error, we remove the Authorization # header from the post request. del headers['Authorization'] - self.post(endpoint, headers=headers, - expected_status=http_client.INTERNAL_SERVER_ERROR) + self.post(endpoint, headers=headers, expected_status=500) class OAuthNotificationTests(OAuth1Tests, @@ -830,7 +829,7 @@ class OAuthNotificationTests(OAuth1Tests, url = self._authorize_request_token(request_key) body = {'roles': [{'id': self.role_id}]} - resp = self.put(url, body=body, expected_status=http_client.OK) + resp = self.put(url, body=body, expected_status=200) self.verifier = resp.result['token']['oauth_verifier'] self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier)) self.assertEqual(8, len(self.verifier)) @@ -859,7 +858,7 @@ class OAuthNotificationTests(OAuth1Tests, resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s' % {'user': self.user_id, 'auth': self.access_token.key}) - self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.assertResponseStatus(resp, 204) # Test to ensure the delete access token notification is sent self._assert_notify_sent(access_key, diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py index 296e1d4b..9922ae5e 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_protection.py +++ b/keystone-moon/keystone/tests/unit/test_v3_protection.py @@ -461,8 +461,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): token = self.get_requested_token(auth) self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + headers={'X-Subject-Token': token}, expected_status=200) def test_user_check_user_token(self): # A user can check one of their own tokens. @@ -475,8 +474,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): token2 = self.get_requested_token(auth) self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + headers={'X-Subject-Token': token2}, expected_status=200) def test_user_check_other_user_token_rejected(self): # A user cannot check another user's token. @@ -512,8 +510,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): user_token = self.get_requested_token(user_auth) self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, - expected_status=http_client.OK) + headers={'X-Subject-Token': user_token}, expected_status=200) def test_user_revoke_same_token(self): # Given a non-admin user token, the token can be used to revoke @@ -686,8 +683,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, # Return the expected return codes for APIs with and without data # with any specified status overriding the normal values if expected_status is None: - return (http_client.OK, http_client.CREATED, - http_client.NO_CONTENT) + return (200, 201, 204) else: return (expected_status, expected_status, expected_status) @@ -1054,7 +1050,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, password=self.domain_admin_user['password'], domain_id=self.domainA['id']) entity_url = '/domains/%s' % self.domainA['id'] - self.get(entity_url, auth=self.auth) + self.get(entity_url, auth=self.auth, expected_status=200) def test_list_user_credentials(self): self.credential_user = self.new_credential_ref(self.just_a_user['id']) @@ -1186,8 +1182,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, token = self.get_requested_token(auth) self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, - expected_status=http_client.OK) + headers={'X-Subject-Token': token}, expected_status=200) def test_user_check_user_token(self): # A user can check one of their own tokens. @@ -1200,8 +1195,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, token2 = self.get_requested_token(auth) self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) + headers={'X-Subject-Token': token2}, expected_status=200) def test_user_check_other_user_token_rejected(self): # A user cannot check another user's token. @@ -1237,8 +1231,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, user_token = self.get_requested_token(user_auth) self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, - expected_status=http_client.OK) + headers={'X-Subject-Token': user_token}, expected_status=200) def test_user_revoke_same_token(self): # Given a non-admin user token, the token can be used to revoke diff --git a/keystone-moon/keystone/tests/unit/test_versions.py b/keystone-moon/keystone/tests/unit/test_versions.py index fc8051b2..40814588 100644 --- a/keystone-moon/keystone/tests/unit/test_versions.py +++ b/keystone-moon/keystone/tests/unit/test_versions.py @@ -751,7 +751,7 @@ class VersionTestCase(unit.TestCase): def test_public_version_v2(self): client = TestClient(self.public_app) resp = client.get('/v2.0/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v2_VERSION_RESPONSE self._paste_in_port(expected['version'], @@ -762,7 +762,7 @@ class VersionTestCase(unit.TestCase): def test_admin_version_v2(self): client = TestClient(self.admin_app) resp = client.get('/v2.0/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v2_VERSION_RESPONSE self._paste_in_port(expected['version'], @@ -775,7 +775,7 @@ class VersionTestCase(unit.TestCase): for app in (self.public_app, self.admin_app): client = TestClient(app) resp = client.get('/v2.0/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v2_VERSION_RESPONSE self._paste_in_port(expected['version'], 'http://localhost/v2.0/') @@ -784,7 +784,7 @@ class VersionTestCase(unit.TestCase): def test_public_version_v3(self): client = TestClient(self.public_app) resp = client.get('/v3/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v3_VERSION_RESPONSE self._paste_in_port(expected['version'], @@ -796,7 +796,7 @@ class VersionTestCase(unit.TestCase): def test_admin_version_v3(self): client = TestClient(self.admin_app) resp = client.get('/v3/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v3_VERSION_RESPONSE self._paste_in_port(expected['version'], @@ -809,7 +809,7 @@ class VersionTestCase(unit.TestCase): for app in (self.public_app, self.admin_app): client = TestClient(app) resp = client.get('/v3/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v3_VERSION_RESPONSE self._paste_in_port(expected['version'], 'http://localhost/v3/') @@ -824,7 +824,7 @@ class VersionTestCase(unit.TestCase): # request to /v3 should pass resp = client.get('/v3/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v3_VERSION_RESPONSE self._paste_in_port(expected['version'], @@ -857,7 +857,7 @@ class VersionTestCase(unit.TestCase): # request to /v2.0 should pass resp = client.get('/v2.0/') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) data = jsonutils.loads(resp.body) expected = v2_VERSION_RESPONSE self._paste_in_port(expected['version'], diff --git a/keystone-moon/keystone/tests/unit/test_wsgi.py b/keystone-moon/keystone/tests/unit/test_wsgi.py index 2a5cb386..ed4c67d6 100644 --- a/keystone-moon/keystone/tests/unit/test_wsgi.py +++ b/keystone-moon/keystone/tests/unit/test_wsgi.py @@ -112,16 +112,15 @@ class ApplicationTest(BaseWSGITest): resp = wsgi.render_response(body=data) self.assertEqual('200 OK', resp.status) - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) self.assertEqual(body, resp.body) self.assertEqual('X-Auth-Token', resp.headers.get('Vary')) self.assertEqual(str(len(body)), resp.headers.get('Content-Length')) def test_render_response_custom_status(self): - resp = wsgi.render_response( - status=(http_client.NOT_IMPLEMENTED, 'Not Implemented')) + resp = wsgi.render_response(status=(501, 'Not Implemented')) self.assertEqual('501 Not Implemented', resp.status) - self.assertEqual(http_client.NOT_IMPLEMENTED, resp.status_int) + self.assertEqual(501, resp.status_int) def test_successful_require_attribute(self): app = FakeAttributeCheckerApp() @@ -173,14 +172,14 @@ class ApplicationTest(BaseWSGITest): def test_render_response_no_body(self): resp = wsgi.render_response() self.assertEqual('204 No Content', resp.status) - self.assertEqual(http_client.NO_CONTENT, resp.status_int) + self.assertEqual(204, resp.status_int) self.assertEqual(b'', resp.body) self.assertEqual('0', resp.headers.get('Content-Length')) self.assertIsNone(resp.headers.get('Content-Type')) def test_render_response_head_with_body(self): resp = wsgi.render_response({'id': uuid.uuid4().hex}, method='HEAD') - self.assertEqual(http_client.OK, resp.status_int) + self.assertEqual(200, resp.status_int) self.assertEqual(b'', resp.body) self.assertNotEqual(resp.headers.get('Content-Length'), '0') self.assertEqual('application/json', resp.headers.get('Content-Type')) diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py index 5f74b430..bfb590db 100644 --- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py +++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py @@ -16,13 +16,17 @@ import hashlib import os import uuid +import msgpack from oslo_utils import timeutils +from six.moves import urllib from keystone.common import config from keystone.common import utils +from keystone.contrib.federation import constants as federation_constants from keystone import exception from keystone.tests import unit from keystone.tests.unit import ksfixtures +from keystone.tests.unit.ksfixtures import database from keystone.token import provider from keystone.token.providers import fernet from keystone.token.providers.fernet import token_formatters @@ -57,7 +61,156 @@ class TestFernetTokenProvider(unit.TestCase): uuid.uuid4().hex) +class TestValidate(unit.TestCase): + def setUp(self): + super(TestValidate, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + self.useFixture(database.Database()) + self.load_backends() + + def config_overrides(self): + super(TestValidate, self).config_overrides() + self.config_fixture.config(group='token', provider='fernet') + + def test_validate_v3_token_simple(self): + # Check the fields in the token result when use validate_v3_token + # with a simple token. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + method_names = ['password'] + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + self.assertIsInstance(token['audit_ids'], list) + self.assertIsInstance(token['expires_at'], str) + self.assertEqual({}, token['extras']) + self.assertIsInstance(token['issued_at'], str) + self.assertEqual(method_names, token['methods']) + exp_user_info = { + 'id': user_ref['id'], + 'name': user_ref['name'], + 'domain': { + 'id': domain_ref['id'], + 'name': domain_ref['name'], + }, + } + self.assertEqual(exp_user_info, token['user']) + + def test_validate_v3_token_federated_info(self): + # Check the user fields in the token result when use validate_v3_token + # when the token has federated info. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + method_names = ['mapped'] + + group_ids = [uuid.uuid4().hex, ] + identity_provider = uuid.uuid4().hex + protocol = uuid.uuid4().hex + auth_context = { + 'user_id': user_ref['id'], + 'group_ids': group_ids, + federation_constants.IDENTITY_PROVIDER: identity_provider, + federation_constants.PROTOCOL: protocol, + } + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names, auth_context=auth_context) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + exp_user_info = { + 'id': user_ref['id'], + 'name': user_ref['id'], + 'domain': {'id': CONF.federation.federated_domain_name, + 'name': CONF.federation.federated_domain_name, }, + federation_constants.FEDERATION: { + 'groups': [{'id': group_id} for group_id in group_ids], + 'identity_provider': {'id': identity_provider, }, + 'protocol': {'id': protocol, }, + }, + } + self.assertEqual(exp_user_info, token['user']) + + def test_validate_v3_token_trust(self): + # Check the trust fields in the token result when use validate_v3_token + # when the token has trust info. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + trustor_user_ref = unit.new_user_ref(domain_ref['id']) + trustor_user_ref = self.identity_api.create_user(trustor_user_ref) + + project_ref = unit.new_project_ref(domain_id=domain_ref['id']) + project_ref = self.resource_api.create_project(project_ref['id'], + project_ref) + + role_ref = unit.new_role_ref() + role_ref = self.role_api.create_role(role_ref['id'], role_ref) + + self.assignment_api.create_grant( + role_ref['id'], user_id=user_ref['id'], + project_id=project_ref['id']) + + self.assignment_api.create_grant( + role_ref['id'], user_id=trustor_user_ref['id'], + project_id=project_ref['id']) + + trustor_user_id = trustor_user_ref['id'] + trustee_user_id = user_ref['id'] + trust_ref = unit.new_trust_ref( + trustor_user_id, trustee_user_id, project_id=project_ref['id'], + role_ids=[role_ref['id'], ]) + trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref, + trust_ref['roles']) + + method_names = ['password'] + + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names, project_id=project_ref['id'], + trust=trust_ref) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + exp_trust_info = { + 'id': trust_ref['id'], + 'impersonation': False, + 'trustee_user': {'id': user_ref['id'], }, + 'trustor_user': {'id': trustor_user_ref['id'], }, + } + self.assertEqual(exp_trust_info, token['OS-TRUST:trust']) + + def test_validate_v3_token_validation_error_exc(self): + # When the token format isn't recognized, TokenNotFound is raised. + + # A uuid string isn't a valid fernet token. + token_id = uuid.uuid4().hex + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v3_token, token_id) + + class TestTokenFormatter(unit.TestCase): + def setUp(self): + super(TestTokenFormatter, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + def test_restore_padding(self): # 'a' will result in '==' padding, 'aa' will result in '=' padding, and # 'aaa' will result in no padding. @@ -73,6 +226,39 @@ class TestTokenFormatter(unit.TestCase): ) self.assertEqual(encoded_string, encoded_str_with_padding_restored) + def test_legacy_padding_validation(self): + first_value = uuid.uuid4().hex + second_value = uuid.uuid4().hex + payload = (first_value, second_value) + msgpack_payload = msgpack.packb(payload) + + # NOTE(lbragstad): This method perserves the way that keystone used to + # percent encode the tokens, prior to bug #1491926. + def legacy_pack(payload): + tf = token_formatters.TokenFormatter() + encrypted_payload = tf.crypto.encrypt(payload) + + # the encrypted_payload is returned with padding appended + self.assertTrue(encrypted_payload.endswith('=')) + + # using urllib.parse.quote will percent encode the padding, like + # keystone did in Kilo. + percent_encoded_payload = urllib.parse.quote(encrypted_payload) + + # ensure that the padding was actaully percent encoded + self.assertTrue(percent_encoded_payload.endswith('%3D')) + return percent_encoded_payload + + token_with_legacy_padding = legacy_pack(msgpack_payload) + tf = token_formatters.TokenFormatter() + + # demonstrate the we can validate a payload that has been percent + # encoded with the Fernet logic that existed in Kilo + serialized_payload = tf.unpack(token_with_legacy_padding) + returned_payload = msgpack.unpackb(serialized_payload) + self.assertEqual(first_value, returned_payload[0]) + self.assertEqual(second_value, returned_payload[1]) + class TestPayloads(unit.TestCase): def test_uuid_hex_to_byte_conversions(self): @@ -204,8 +390,7 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_trust_id, trust_id) - def test_unscoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' + def _test_unscoped_payload_with_user_id(self, exp_user_id): exp_methods = ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] @@ -221,30 +406,15 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_project_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.ProjectScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids) - - (user_id, methods, project_id, expires_at, audit_ids) = ( - token_formatters.ProjectScopedPayload.disassemble(payload)) + def test_unscoped_payload_with_non_uuid_user_id(self): + self._test_unscoped_payload_with_user_id('someNonUuidUserId') - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + def test_unscoped_payload_with_16_char_non_uuid_user_id(self): + self._test_unscoped_payload_with_user_id('0123456789abcdef') - def test_project_scoped_payload_with_non_uuid_project_id(self): - exp_user_id = uuid.uuid4().hex + def _test_project_scoped_payload_with_ids(self, exp_user_id, + exp_project_id): exp_methods = ['password'] - exp_project_id = 'someNonUuidProjectId' exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] @@ -261,8 +431,15 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_domain_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' + def test_project_scoped_payload_with_non_uuid_user_id(self): + self._test_project_scoped_payload_with_ids('someNonUuidUserId', + 'someNonUuidProjectId') + + def test_project_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_project_scoped_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + + def _test_domain_scoped_payload_with_user_id(self, exp_user_id): exp_methods = ['password'] exp_domain_id = uuid.uuid4().hex exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) @@ -281,32 +458,14 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_trust_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - exp_trust_id = uuid.uuid4().hex - - payload = token_formatters.TrustScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids, exp_trust_id) - - (user_id, methods, project_id, expires_at, audit_ids, trust_id) = ( - token_formatters.TrustScopedPayload.disassemble(payload)) + def test_domain_scoped_payload_with_non_uuid_user_id(self): + self._test_domain_scoped_payload_with_user_id('nonUuidUserId') - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_trust_id, trust_id) + def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_domain_scoped_payload_with_user_id('0123456789abcdef') - def test_trust_scoped_payload_with_non_uuid_project_id(self): - exp_user_id = uuid.uuid4().hex + def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id): exp_methods = ['password'] - exp_project_id = 'someNonUuidProjectId' exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] exp_trust_id = uuid.uuid4().hex @@ -325,12 +484,19 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_trust_id, trust_id) - def test_federated_payload_with_non_uuid_ids(self): - exp_user_id = 'someNonUuidUserId' + def test_trust_scoped_payload_with_non_uuid_user_id(self): + self._test_trust_scoped_payload_with_ids('someNonUuidUserId', + 'someNonUuidProjectId') + + def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_trust_scoped_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + + def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id): exp_methods = ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] - exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], + exp_federated_info = {'group_ids': [{'id': exp_group_id}], 'idp_id': uuid.uuid4().hex, 'protocol_id': uuid.uuid4().hex} @@ -352,6 +518,14 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_federated_info['protocol_id'], federated_info['protocol_id']) + def test_federated_payload_with_non_uuid_ids(self): + self._test_federated_payload_with_ids('someNonUuidUserId', + 'someNonUuidGroupId') + + def test_federated_payload_with_16_char_non_uuid_ids(self): + self._test_federated_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + def test_federated_project_scoped_payload(self): exp_user_id = 'someNonUuidUserId' exp_methods = ['token'] |