aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit')
-rw-r--r--keystone-moon/keystone/tests/unit/common/test_notifications.py163
-rw-r--r--keystone-moon/keystone/tests/unit/rest.py2
-rw-r--r--keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py60
-rw-r--r--keystone-moon/keystone/tests/unit/test_backend.py2
-rw-r--r--keystone-moon/keystone/tests/unit/test_backend_ldap.py47
-rw-r--r--keystone-moon/keystone/tests/unit/test_catalog.py89
-rw-r--r--keystone-moon/keystone/tests/unit/test_cert_setup.py11
-rw-r--r--keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py6
-rw-r--r--keystone-moon/keystone/tests/unit/test_policy.py24
-rw-r--r--keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py63
-rw-r--r--keystone-moon/keystone/tests/unit/test_v2.py70
-rw-r--r--keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py4
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3.py50
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_assignment.py17
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_auth.py253
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_catalog.py54
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_credential.py2
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_domain_config.py12
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py42
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_federation.py73
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_identity.py28
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_oauth1.py21
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_protection.py23
-rw-r--r--keystone-moon/keystone/tests/unit/test_versions.py16
-rw-r--r--keystone-moon/keystone/tests/unit/test_wsgi.py11
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_fernet_provider.py276
26 files changed, 969 insertions, 450 deletions
diff --git a/keystone-moon/keystone/tests/unit/common/test_notifications.py b/keystone-moon/keystone/tests/unit/common/test_notifications.py
index ec087c41..1ad8d50d 100644
--- a/keystone-moon/keystone/tests/unit/common/test_notifications.py
+++ b/keystone-moon/keystone/tests/unit/common/test_notifications.py
@@ -279,6 +279,16 @@ class BaseNotificationTest(test_v3.RestfulTestCase):
self.assertEqual(event_type, audit['event_type'])
self.assertTrue(audit['send_notification_called'])
+ def _assert_initiator_data_is_set(self, operation, resource_type, typeURI):
+ self.assertTrue(len(self._audits) > 0)
+ audit = self._audits[-1]
+ payload = audit['payload']
+ self.assertEqual(self.user_id, payload['initiator']['id'])
+ self.assertEqual(self.project_id, payload['initiator']['project_id'])
+ self.assertEqual(typeURI, payload['target']['typeURI'])
+ action = '%s.%s' % (operation, resource_type)
+ self.assertEqual(action, payload['action'])
+
def _assert_notify_not_sent(self, resource_id, operation, resource_type,
public=True):
unexpected = {
@@ -633,11 +643,154 @@ class CADFNotificationsForEntities(NotificationsForEntities):
resource_id = resp.result.get('domain').get('id')
self._assert_last_audit(resource_id, CREATED_OPERATION, 'domain',
cadftaxonomy.SECURITY_DOMAIN)
- self.assertTrue(len(self._audits) > 0)
- audit = self._audits[-1]
- payload = audit['payload']
- self.assertEqual(self.user_id, payload['initiator']['id'])
- self.assertEqual(self.project_id, payload['initiator']['project_id'])
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'domain',
+ cadftaxonomy.SECURITY_DOMAIN)
+
+
+class V2Notifications(BaseNotificationTest):
+
+ def setUp(self):
+ super(V2Notifications, self).setUp()
+ self.config_fixture.config(notification_format='cadf')
+
+ def test_user(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/users',
+ body={
+ 'user': {
+ 'name': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ 'enabled': True,
+ },
+ },
+ token=token,
+ )
+ user_id = resp.result.get('user').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+ # test for delete user
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/users/%s' % user_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'user',
+ cadftaxonomy.SECURITY_ACCOUNT_USER)
+
+ def test_role(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/OS-KSADM/roles',
+ body={
+ 'role': {
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ role_id = resp.result.get('role').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'role',
+ cadftaxonomy.SECURITY_ROLE)
+ # test for delete role
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/OS-KSADM/roles/%s' % role_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'role',
+ cadftaxonomy.SECURITY_ROLE)
+
+ def test_service_and_endpoint(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/OS-KSADM/services',
+ body={
+ 'OS-KSADM:service': {
+ 'name': uuid.uuid4().hex,
+ 'type': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ service_id = resp.result.get('OS-KSADM:service').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'service',
+ cadftaxonomy.SECURITY_SERVICE)
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/endpoints',
+ body={
+ 'endpoint': {
+ 'region': uuid.uuid4().hex,
+ 'service_id': service_id,
+ 'publicurl': uuid.uuid4().hex,
+ 'adminurl': uuid.uuid4().hex,
+ 'internalurl': uuid.uuid4().hex,
+ },
+ },
+ token=token,
+ )
+ endpoint_id = resp.result.get('endpoint').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'endpoint',
+ cadftaxonomy.SECURITY_ENDPOINT)
+ # test for delete endpoint
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/endpoints/%s' % endpoint_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'endpoint',
+ cadftaxonomy.SECURITY_ENDPOINT)
+ # test for delete service
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/OS-KSADM/services/%s' % service_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'service',
+ cadftaxonomy.SECURITY_SERVICE)
+
+ def test_project(self):
+ token = self.get_scoped_token()
+ resp = self.admin_request(
+ method='POST',
+ path='/v2.0/tenants',
+ body={
+ 'tenant': {
+ 'name': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True
+ },
+ },
+ token=token,
+ )
+ project_id = resp.result.get('tenant').get('id')
+ self._assert_initiator_data_is_set(CREATED_OPERATION,
+ 'project',
+ cadftaxonomy.SECURITY_PROJECT)
+ # test for delete project
+ self.admin_request(
+ method='DELETE',
+ path='/v2.0/tenants/%s' % project_id,
+ token=token,
+ )
+ self._assert_initiator_data_is_set(DELETED_OPERATION,
+ 'project',
+ cadftaxonomy.SECURITY_PROJECT)
class TestEventCallbacks(test_v3.RestfulTestCase):
diff --git a/keystone-moon/keystone/tests/unit/rest.py b/keystone-moon/keystone/tests/unit/rest.py
index da24019f..35b47e2b 100644
--- a/keystone-moon/keystone/tests/unit/rest.py
+++ b/keystone-moon/keystone/tests/unit/rest.py
@@ -114,7 +114,7 @@ class RestfulTestCase(unit.TestCase):
example::
- self.assertResponseStatus(response, http_client.NO_CONTENT)
+ self.assertResponseStatus(response, 204)
"""
self.assertEqual(
response.status_code,
diff --git a/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py b/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py
index 4c574549..24fc82dd 100644
--- a/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py
+++ b/keystone-moon/keystone/tests/unit/test_associate_project_endpoint_extension.py
@@ -48,7 +48,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
Valid endpoint and project id test case.
"""
- self.put(self.default_request_url)
+ self.put(self.default_request_url,
+ expected_status=204)
def test_create_endpoint_project_association_with_invalid_project(self):
"""PUT OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@@ -81,7 +82,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
"""
self.put(self.default_request_url,
- body={'project_id': self.default_domain_project_id})
+ body={'project_id': self.default_domain_project_id},
+ expected_status=204)
def test_check_endpoint_project_association(self):
"""HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@@ -89,11 +91,13 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
Valid project and endpoint id test case.
"""
- self.put(self.default_request_url)
+ self.put(self.default_request_url,
+ expected_status=204)
self.head('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.default_domain_project_id,
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
def test_check_endpoint_project_association_with_invalid_project(self):
"""HEAD /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@@ -165,7 +169,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
"""
r = self.get('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' %
- {'endpoint_id': self.endpoint_id})
+ {'endpoint_id': self.endpoint_id},
+ expected_status=200)
self.assertValidProjectListResponse(r, expected_length=0)
def test_list_projects_associated_with_invalid_endpoint(self):
@@ -188,7 +193,8 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
self.delete('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.default_domain_project_id,
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
def test_remove_endpoint_project_association_with_invalid_project(self):
"""DELETE /OS-EP-FILTER/projects/{project_id}/endpoints/{endpoint_id}
@@ -220,26 +226,26 @@ class EndpointFilterCRUDTestCase(TestExtensionCase):
self.put(self.default_request_url)
association_url = ('/OS-EP-FILTER/endpoints/%(endpoint_id)s/projects' %
{'endpoint_id': self.endpoint_id})
- r = self.get(association_url)
+ r = self.get(association_url, expected_status=200)
self.assertValidProjectListResponse(r, expected_length=1)
self.delete('/projects/%(project_id)s' % {
'project_id': self.default_domain_project_id})
- r = self.get(association_url)
+ r = self.get(association_url, expected_status=200)
self.assertValidProjectListResponse(r, expected_length=0)
def test_endpoint_project_association_cleanup_when_endpoint_deleted(self):
self.put(self.default_request_url)
association_url = '/OS-EP-FILTER/projects/%(project_id)s/endpoints' % {
'project_id': self.default_domain_project_id}
- r = self.get(association_url)
+ r = self.get(association_url, expected_status=200)
self.assertValidEndpointListResponse(r, expected_length=1)
self.delete('/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint_id})
- r = self.get(association_url)
+ r = self.get(association_url, expected_status=200)
self.assertValidEndpointListResponse(r, expected_length=0)
@@ -270,7 +276,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
# attempt to authenticate without requesting a project
auth_data = self.build_authentication_request(
@@ -290,7 +297,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
@@ -310,7 +318,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
@@ -329,7 +338,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
# create a second temporary endpoint
self.endpoint_id2 = uuid.uuid4().hex
@@ -343,7 +353,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id2})
+ 'endpoint_id': self.endpoint_id2},
+ expected_status=204)
# remove the temporary reference
# this will create inconsistency in the endpoint filter table
@@ -369,7 +380,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
# Add a disabled endpoint to the default project.
@@ -387,7 +399,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': disabled_endpoint_id})
+ 'endpoint_id': disabled_endpoint_id},
+ expected_status=204)
# Authenticate to get token with catalog
auth_data = self.build_authentication_request(
@@ -416,11 +429,13 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': endpoint_id1})
+ 'endpoint_id': endpoint_id1},
+ expected_status=204)
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': endpoint_id2})
+ 'endpoint_id': endpoint_id2},
+ expected_status=204)
# there should be only two endpoints in token catalog
auth_data = self.build_authentication_request(
@@ -439,7 +454,8 @@ class EndpointFilterTokenRequestTestCase(TestExtensionCase):
self.put('/OS-EP-FILTER/projects/%(project_id)s'
'/endpoints/%(endpoint_id)s' % {
'project_id': self.project['id'],
- 'endpoint_id': self.endpoint_id})
+ 'endpoint_id': self.endpoint_id},
+ expected_status=204)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
@@ -622,7 +638,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
self.DEFAULT_ENDPOINT_GROUP_URL, self.DEFAULT_ENDPOINT_GROUP_BODY)
url = '/OS-EP-FILTER/endpoint_groups/%(endpoint_group_id)s' % {
'endpoint_group_id': endpoint_group_id}
- self.head(url, expected_status=http_client.OK)
+ self.head(url, expected_status=200)
def test_check_invalid_endpoint_group(self):
"""HEAD /OS-EP-FILTER/endpoint_groups/{endpoint_group_id}
@@ -816,7 +832,7 @@ class EndpointGroupCRUDTestCase(TestExtensionCase):
self.project_id)
url = self._get_project_endpoint_group_url(
endpoint_group_id, self.project_id)
- self.head(url, expected_status=http_client.OK)
+ self.head(url, expected_status=200)
def test_check_endpoint_group_to_project_with_invalid_project_id(self):
"""Test HEAD with an invalid endpoint group and project association."""
diff --git a/keystone-moon/keystone/tests/unit/test_backend.py b/keystone-moon/keystone/tests/unit/test_backend.py
index d3b51edd..302fc2c2 100644
--- a/keystone-moon/keystone/tests/unit/test_backend.py
+++ b/keystone-moon/keystone/tests/unit/test_backend.py
@@ -4671,7 +4671,7 @@ class TokenTests(object):
def test_list_revoked_tokens_for_multiple_tokens(self):
self.check_list_revoked_tokens([self.delete_token()
- for x in range(2)])
+ for x in six.moves.range(2)])
def test_flush_expired_token(self):
token_id = uuid.uuid4().hex
diff --git a/keystone-moon/keystone/tests/unit/test_backend_ldap.py b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
index 808922a7..d96ec376 100644
--- a/keystone-moon/keystone/tests/unit/test_backend_ldap.py
+++ b/keystone-moon/keystone/tests/unit/test_backend_ldap.py
@@ -21,7 +21,6 @@ import ldap
import mock
from oslo_config import cfg
import pkg_resources
-from six.moves import http_client
from six.moves import range
from testtools import matchers
@@ -2182,6 +2181,26 @@ class LDAPIdentityEnabledEmulation(LDAPIdentity):
self.skipTest(
"Enabled emulation conflicts with enabled mask")
+ def test_user_enabled_use_group_config(self):
+ self.config_fixture.config(
+ group='ldap',
+ user_enabled_emulation_use_group_config=True,
+ group_member_attribute='uniqueMember',
+ group_objectclass='groupOfUniqueNames')
+ self.ldapdb.clear()
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
+ # Create a user and ensure they are enabled.
+ user1 = {'name': u'fäké1', 'enabled': True,
+ 'domain_id': CONF.identity.default_domain_id}
+ user_ref = self.identity_api.create_user(user1)
+ self.assertIs(True, user_ref['enabled'])
+
+ # Get a user and ensure they are enabled.
+ user_ref = self.identity_api.get_user(user_ref['id'])
+ self.assertIs(True, user_ref['enabled'])
+
def test_user_enabled_invert(self):
self.config_fixture.config(group='ldap', user_enabled_invert=True,
user_enabled_default=False)
@@ -2487,7 +2506,7 @@ class BaseMultiLDAPandSQLIdentity(object):
self.identity_api._get_domain_driver_and_entity_id(
user['id']))
- if expected_status == http_client.OK:
+ if expected_status == 200:
ref = driver.get_user(entity_id)
ref = self.identity_api._set_domain_id_and_mapping(
ref, domain_id, driver, map.EntityType.USER)
@@ -2661,23 +2680,21 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
check_user = self.check_user
check_user(self.users['user0'],
- self.domains['domain_default']['id'], http_client.OK)
+ self.domains['domain_default']['id'], 200)
for domain in [self.domains['domain1']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user0'], domain, exception.UserNotFound)
- check_user(self.users['user1'], self.domains['domain1']['id'],
- http_client.OK)
+ check_user(self.users['user1'], self.domains['domain1']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user1'], domain, exception.UserNotFound)
- check_user(self.users['user2'], self.domains['domain2']['id'],
- http_client.OK)
+ check_user(self.users['user2'], self.domains['domain2']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
self.domains['domain3']['id'],
@@ -2687,14 +2704,10 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, unit.SQLDriverOverrides,
# domain3 and domain4 share the same backend, so you should be
# able to see user3 and user4 from either.
- check_user(self.users['user3'], self.domains['domain3']['id'],
- http_client.OK)
- check_user(self.users['user3'], self.domains['domain4']['id'],
- http_client.OK)
- check_user(self.users['user4'], self.domains['domain3']['id'],
- http_client.OK)
- check_user(self.users['user4'], self.domains['domain4']['id'],
- http_client.OK)
+ check_user(self.users['user3'], self.domains['domain3']['id'], 200)
+ check_user(self.users['user3'], self.domains['domain4']['id'], 200)
+ check_user(self.users['user4'], self.domains['domain3']['id'], 200)
+ check_user(self.users['user4'], self.domains['domain4']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
@@ -3151,12 +3164,12 @@ class DomainSpecificLDAPandSQLIdentity(
# driver, but won't find it via any other domain driver
self.check_user(self.users['user0'],
- self.domains['domain_default']['id'], http_client.OK)
+ self.domains['domain_default']['id'], 200)
self.check_user(self.users['user0'],
self.domains['domain1']['id'], exception.UserNotFound)
self.check_user(self.users['user1'],
- self.domains['domain1']['id'], http_client.OK)
+ self.domains['domain1']['id'], 200)
self.check_user(self.users['user1'],
self.domains['domain_default']['id'],
exception.UserNotFound)
diff --git a/keystone-moon/keystone/tests/unit/test_catalog.py b/keystone-moon/keystone/tests/unit/test_catalog.py
index 85acfedf..ada2de43 100644
--- a/keystone-moon/keystone/tests/unit/test_catalog.py
+++ b/keystone-moon/keystone/tests/unit/test_catalog.py
@@ -53,8 +53,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
"""Applicable only to JSON."""
return r.result['access']['token']['id']
- def _endpoint_create(self, expected_status=http_client.OK,
- service_id=SERVICE_FIXTURE,
+ def _endpoint_create(self, expected_status=200, service_id=SERVICE_FIXTURE,
publicurl='http://localhost:8080',
internalurl='http://localhost:8080',
adminurl='http://localhost:8080'):
@@ -77,6 +76,18 @@ class V2CatalogTestCase(rest.RestfulTestCase):
body=body)
return body, r
+ def _region_create(self):
+ region_id = uuid.uuid4().hex
+ self.catalog_api.create_region({'id': region_id})
+ return region_id
+
+ def _service_create(self):
+ service_id = uuid.uuid4().hex
+ service = unit.new_service_ref()
+ service['id'] = service_id
+ self.catalog_api.create_service(service_id, service)
+ return service_id
+
def test_endpoint_create(self):
req_body, response = self._endpoint_create()
self.assertIn('endpoint', response.result)
@@ -84,6 +95,78 @@ class V2CatalogTestCase(rest.RestfulTestCase):
for field, value in req_body['endpoint'].items():
self.assertEqual(response.result['endpoint'][field], value)
+ def test_pure_v3_endpoint_with_publicurl_visible_from_v2(self):
+ """Test pure v3 endpoint can be fetched via v2 API.
+
+ For those who are using v2 APIs, endpoints created by v3 API should
+ also be visible as there are no differences about the endpoints
+ except the format or the internal implementation.
+ And because public url is required for v2 API, so only the v3 endpoints
+ of the service which has the public interface endpoint will be
+ converted into v2 endpoints.
+ """
+ region_id = self._region_create()
+ service_id = self._service_create()
+ # create a v3 endpoint with three interfaces
+ body = {
+ 'endpoint': unit.new_endpoint_ref(service_id,
+ default_region_id=region_id)
+ }
+ for interface in catalog.controllers.INTERFACES:
+ body['endpoint']['interface'] = interface
+ self.admin_request(method='POST',
+ token=self.get_scoped_token(),
+ path='/v3/endpoints',
+ expected_status=http_client.CREATED,
+ body=body)
+
+ r = self.admin_request(token=self.get_scoped_token(),
+ path='/v2.0/endpoints')
+ # v3 endpoints having public url can be fetched via v2.0 API
+ self.assertEqual(1, len(r.result['endpoints']))
+ v2_endpoint = r.result['endpoints'][0]
+ self.assertEqual(service_id, v2_endpoint['service_id'])
+ # check urls just in case.
+ # This is not the focus of this test, so no different urls are used.
+ self.assertEqual(body['endpoint']['url'], v2_endpoint['publicurl'])
+ self.assertEqual(body['endpoint']['url'], v2_endpoint['adminurl'])
+ self.assertEqual(body['endpoint']['url'], v2_endpoint['internalurl'])
+ self.assertNotIn('name', v2_endpoint)
+
+ v3_endpoint = self.catalog_api.get_endpoint(v2_endpoint['id'])
+ # it's the v3 public endpoint's id as the generated v2 endpoint
+ self.assertEqual('public', v3_endpoint['interface'])
+ self.assertEqual(service_id, v3_endpoint['service_id'])
+
+ def test_pure_v3_endpoint_without_publicurl_invisible_from_v2(self):
+ """Test pure v3 endpoint without public url can't be fetched via v2 API.
+
+ V2 API will return endpoints created by v3 API, but because public url
+ is required for v2 API, so v3 endpoints without public url will be
+ ignored.
+ """
+ region_id = self._region_create()
+ service_id = self._service_create()
+ # create a v3 endpoint without public interface
+ body = {
+ 'endpoint': unit.new_endpoint_ref(service_id,
+ default_region_id=region_id)
+ }
+ for interface in catalog.controllers.INTERFACES:
+ if interface == 'public':
+ continue
+ body['endpoint']['interface'] = interface
+ self.admin_request(method='POST',
+ token=self.get_scoped_token(),
+ path='/v3/endpoints',
+ expected_status=http_client.CREATED,
+ body=body)
+
+ r = self.admin_request(token=self.get_scoped_token(),
+ path='/v2.0/endpoints')
+ # v3 endpoints without public url won't be fetched via v2.0 API
+ self.assertEqual(0, len(r.result['endpoints']))
+
def test_endpoint_create_with_null_adminurl(self):
req_body, response = self._endpoint_create(adminurl=None)
self.assertIsNone(req_body['endpoint']['adminurl'])
@@ -126,7 +209,7 @@ class V2CatalogTestCase(rest.RestfulTestCase):
valid_url = 'http://127.0.0.1:8774/v1.1/$(tenant_id)s'
# baseline tests that all valid URLs works
- self._endpoint_create(expected_status=http_client.OK,
+ self._endpoint_create(expected_status=200,
publicurl=valid_url,
internalurl=valid_url,
adminurl=valid_url)
diff --git a/keystone-moon/keystone/tests/unit/test_cert_setup.py b/keystone-moon/keystone/tests/unit/test_cert_setup.py
index 47a99810..769e7c8e 100644
--- a/keystone-moon/keystone/tests/unit/test_cert_setup.py
+++ b/keystone-moon/keystone/tests/unit/test_cert_setup.py
@@ -17,7 +17,6 @@ import os
import shutil
import mock
-from six.moves import http_client
from testtools import matchers
from keystone.common import environment
@@ -114,13 +113,11 @@ class CertSetupTestCase(rest.RestfulTestCase):
# requests don't have some of the normal information
signing_resp = self.request(self.public_app,
'/v2.0/certificates/signing',
- method='GET',
- expected_status=http_client.OK)
+ method='GET', expected_status=200)
cacert_resp = self.request(self.public_app,
'/v2.0/certificates/ca',
- method='GET',
- expected_status=http_client.OK)
+ method='GET', expected_status=200)
with open(CONF.signing.certfile) as f:
self.assertEqual(f.read(), signing_resp.text)
@@ -136,7 +133,7 @@ class CertSetupTestCase(rest.RestfulTestCase):
for accept in [None, 'text/html', 'application/json', 'text/xml']:
headers = {'Accept': accept} if accept else {}
resp = self.request(self.public_app, path, method='GET',
- expected_status=http_client.OK,
+ expected_status=200,
headers=headers)
self.assertEqual('text/html', resp.content_type)
@@ -149,7 +146,7 @@ class CertSetupTestCase(rest.RestfulTestCase):
def test_failure(self):
for path in ['/v2.0/certificates/signing', '/v2.0/certificates/ca']:
self.request(self.public_app, path, method='GET',
- expected_status=http_client.INTERNAL_SERVER_ERROR)
+ expected_status=500)
def test_pki_certs_rebuild(self):
self.test_create_pki_certs()
diff --git a/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py b/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py
index b241b41b..8664e2c3 100644
--- a/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py
+++ b/keystone-moon/keystone/tests/unit/test_contrib_simple_cert.py
@@ -12,8 +12,6 @@
import uuid
-from six.moves import http_client
-
from keystone.tests.unit import test_v3
@@ -33,7 +31,7 @@ class TestSimpleCert(BaseTestCase):
method='GET',
path=path,
headers={'Accept': content_type},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertEqual(content_type, response.content_type.lower())
self.assertIn('---BEGIN', response.body)
@@ -56,4 +54,4 @@ class TestSimpleCert(BaseTestCase):
self.request(app=self.public_app,
method='GET',
path=path,
- expected_status=http_client.INTERNAL_SERVER_ERROR)
+ expected_status=500)
diff --git a/keystone-moon/keystone/tests/unit/test_policy.py b/keystone-moon/keystone/tests/unit/test_policy.py
index b2f0e525..686e2b70 100644
--- a/keystone-moon/keystone/tests/unit/test_policy.py
+++ b/keystone-moon/keystone/tests/unit/test_policy.py
@@ -16,10 +16,8 @@
import json
import os
-import mock
from oslo_policy import policy as common_policy
import six
-from six.moves.urllib import request as urlrequest
from testtools import matchers
from keystone import exception
@@ -118,28 +116,6 @@ class PolicyTestCase(BasePolicyTestCase):
action = "example:allowed"
rules.enforce(self.credentials, action, self.target)
- def test_enforce_http_true(self):
-
- def fakeurlopen(url, post_data):
- return six.StringIO("True")
-
- action = "example:get_http"
- target = {}
- with mock.patch.object(urlrequest, 'urlopen', fakeurlopen):
- result = rules.enforce(self.credentials, action, target)
- self.assertTrue(result)
-
- def test_enforce_http_false(self):
-
- def fakeurlopen(url, post_data):
- return six.StringIO("False")
-
- action = "example:get_http"
- target = {}
- with mock.patch.object(urlrequest, 'urlopen', fakeurlopen):
- self.assertRaises(exception.ForbiddenAction, rules.enforce,
- self.credentials, action, target)
-
def test_templatized_enforcement(self):
target_mine = {'project_id': 'fake'}
target_not_mine = {'project_id': 'another'}
diff --git a/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py b/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py
index 87b3d48d..f498fe94 100644
--- a/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py
+++ b/keystone-moon/keystone/tests/unit/test_sql_migrate_extensions.py
@@ -180,6 +180,7 @@ class FederationExtension(test_sql_upgrade.SqlMigrateBase):
self.federation_protocol = 'federation_protocol'
self.service_provider = 'service_provider'
self.mapping = 'mapping'
+ self.remote_id_table = 'idp_remote_ids'
def repo_package(self):
return federation
@@ -310,6 +311,68 @@ class FederationExtension(test_sql_upgrade.SqlMigrateBase):
self.assertEqual('', sp.auth_url)
self.assertEqual('', sp.sp_url)
+ def test_propagate_remote_id_to_separate_column(self):
+ """Make sure empty remote_id is not propagated.
+ Test scenario:
+ - Upgrade database to version 6 where identity_provider table has a
+ remote_id column
+ - Add 3 identity provider objects, where idp1 and idp2 have valid
+ remote_id parameter set, and idp3 has it empty (None).
+ - Upgrade database to version 7 and expect migration scripts to
+ properly move data rom identity_provider.remote_id column into
+ separate table idp_remote_ids.
+ - In the idp_remote_ids table expect to find entries for idp1 and idp2
+ and not find anything for idp3 (identitified by idp's id)
+
+ """
+ session = self.Session()
+ idp1 = {'id': uuid.uuid4().hex,
+ 'remote_id': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True}
+ idp2 = {'id': uuid.uuid4().hex,
+ 'remote_id': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True}
+ idp3 = {'id': uuid.uuid4().hex,
+ 'remote_id': None,
+ 'description': uuid.uuid4().hex,
+ 'enabled': True}
+ self.upgrade(6, repository=self.repo_path)
+ self.assertTableColumns(self.identity_provider,
+ ['id', 'description', 'enabled', 'remote_id'])
+
+ self.insert_dict(session, self.identity_provider, idp1)
+ self.insert_dict(session, self.identity_provider, idp2)
+ self.insert_dict(session, self.identity_provider, idp3)
+
+ session.close()
+ self.upgrade(7, repository=self.repo_path)
+
+ self.assertTableColumns(self.identity_provider,
+ ['id', 'description', 'enabled'])
+ remote_id_table = sqlalchemy.Table(self.remote_id_table,
+ self.metadata,
+ autoload=True)
+
+ session = self.Session()
+ self.metadata.clear()
+
+ idp = session.query(remote_id_table).filter(
+ remote_id_table.c.idp_id == idp1['id'])[0]
+ self.assertEqual(idp1['remote_id'], idp.remote_id)
+
+ idp = session.query(remote_id_table).filter(
+ remote_id_table.c.idp_id == idp2['id'])[0]
+ self.assertEqual(idp2['remote_id'], idp.remote_id)
+
+ idp = session.query(remote_id_table).filter(
+ remote_id_table.c.idp_id == idp3['id'])
+ # NOTE(marek-denis): As idp3 had empty 'remote_id' attribute we expect
+ # not to find it in the 'remote_id_table' table, hence count should be
+ # 0.real
+ self.assertEqual(0, idp.count())
+
def test_add_relay_state_column(self):
self.upgrade(8, repository=self.repo_path)
self.assertTableColumns(self.service_provider,
diff --git a/keystone-moon/keystone/tests/unit/test_v2.py b/keystone-moon/keystone/tests/unit/test_v2.py
index 99b5a897..acdfca5f 100644
--- a/keystone-moon/keystone/tests/unit/test_v2.py
+++ b/keystone-moon/keystone/tests/unit/test_v2.py
@@ -132,7 +132,7 @@ class CoreApiTests(object):
'tenantId': self.tenant_bar['id'],
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidAuthenticationResponse(r, require_service_catalog=True)
def test_authenticate_unscoped(self):
@@ -147,7 +147,7 @@ class CoreApiTests(object):
},
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidAuthenticationResponse(r)
def test_get_tenants_for_token(self):
@@ -234,7 +234,7 @@ class CoreApiTests(object):
'token_id': token,
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
def test_endpoints(self):
token = self.get_scoped_token()
@@ -370,7 +370,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
def test_error_response(self):
"""This triggers assertValidErrorResponse by convention."""
@@ -459,7 +459,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
user_id = self._get_user_id(r.result)
@@ -470,7 +470,7 @@ class CoreApiTests(object):
'user_id': user_id
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
# Create a new tenant
@@ -485,7 +485,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
project_id = self._get_project_id(r.result)
@@ -501,7 +501,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
# 'member_role' should be in new_tenant
r = self.admin_request(
@@ -510,7 +510,7 @@ class CoreApiTests(object):
'user_id': user_id
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertEqual('_member_', self._get_role_name(r.result))
# 'member_role' should not be in tenant_bar any more
@@ -520,7 +520,7 @@ class CoreApiTests(object):
'user_id': user_id
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertNoRoles(r.result)
def test_update_user_with_invalid_tenant(self):
@@ -539,7 +539,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
user_id = self._get_user_id(r.result)
# Update user with an invalid tenant
@@ -571,7 +571,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
user_id = self._get_user_id(r.result)
# Update user with an invalid tenant
@@ -604,7 +604,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
user_id = self._get_user_id(r.result)
@@ -615,7 +615,7 @@ class CoreApiTests(object):
'user_id': user_id
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertEqual(CONF.member_role_name, self._get_role_name(r.result))
# Update user's tenant with old tenant id
@@ -630,7 +630,7 @@ class CoreApiTests(object):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
# 'member_role' should still be in tenant_bar
r = self.admin_request(
@@ -639,7 +639,7 @@ class CoreApiTests(object):
'user_id': user_id
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertEqual('_member_', self._get_role_name(r.result))
def test_authenticating_a_user_with_no_password(self):
@@ -721,7 +721,7 @@ class LegacyV2UsernameTests(object):
path='/v2.0/users',
token=token,
body=body,
- expected_status=http_client.OK)
+ expected_status=200)
def test_create_with_extra_username(self):
"""The response for creating a user will contain the extra fields."""
@@ -772,7 +772,7 @@ class LegacyV2UsernameTests(object):
'enabled': enabled,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -802,7 +802,7 @@ class LegacyV2UsernameTests(object):
'enabled': enabled,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -881,7 +881,7 @@ class LegacyV2UsernameTests(object):
'enabled': enabled,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -911,7 +911,7 @@ class LegacyV2UsernameTests(object):
'enabled': enabled,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -931,7 +931,7 @@ class LegacyV2UsernameTests(object):
'enabled': True,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -956,7 +956,7 @@ class LegacyV2UsernameTests(object):
'enabled': enabled,
},
},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidUserResponse(r)
@@ -1200,7 +1200,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
method='GET',
path='/v2.0/tokens/revoked',
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidRevocationListResponse(r)
def assertValidRevocationListResponse(self, response):
@@ -1231,7 +1231,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
method='GET',
path='/v2.0/tokens/revoked',
token=token1,
- expected_status=http_client.OK)
+ expected_status=200)
signed_text = r.result['signed']
data_json = cms.cms_verify(signed_text, CONF.signing.certfile,
@@ -1333,7 +1333,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
},
},
},
- expected_status=http_client.OK)
+ expected_status=200)
# ensure password doesn't leak
user_id = r.result['user']['id']
@@ -1341,7 +1341,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
method='GET',
path='/v2.0/users/%s' % user_id,
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
self.assertNotIn('OS-KSADM:password', r.result['user'])
def test_updating_a_user_with_an_OSKSADM_password(self):
@@ -1360,7 +1360,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
},
},
token=token,
- expected_status=http_client.OK)
+ expected_status=200)
# successfully authenticate
self.public_request(
@@ -1374,7 +1374,7 @@ class V2TestCase(RestfulTestCase, CoreApiTests, LegacyV2UsernameTests):
},
},
},
- expected_status=http_client.OK)
+ expected_status=200)
class RevokeApiTestCase(V2TestCase):
@@ -1436,7 +1436,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
method='GET',
path=path,
token=admin_token,
- expected_status=http_client.OK)
+ expected_status=200)
def test_authenticate_scoped_token(self):
project_ref = self.new_project_ref()
@@ -1466,7 +1466,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
method='GET',
path=path,
token=admin_token,
- expected_status=http_client.OK)
+ expected_status=200)
def test_token_authentication_and_validation(self):
"""Test token authentication for Fernet token provider.
@@ -1491,7 +1491,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
}
}
},
- expected_status=http_client.OK)
+ expected_status=200)
token_id = self._get_token_id(r)
path = ('/v2.0/tokens/%s?belongsTo=%s' % (token_id, project_ref['id']))
@@ -1500,7 +1500,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
method='GET',
path=path,
token=CONF.admin_token,
- expected_status=http_client.OK)
+ expected_status=200)
def test_rescoped_tokens_maintain_original_expiration(self):
project_ref = self.new_project_ref()
@@ -1522,7 +1522,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
},
# NOTE(lbragstad): This test may need to be refactored if Keystone
# decides to disallow rescoping using a scoped token.
- expected_status=http_client.OK)
+ expected_status=200)
original_token = resp.result['access']['token']['id']
original_expiration = resp.result['access']['token']['expires']
@@ -1537,7 +1537,7 @@ class TestFernetTokenProviderV2(RestfulTestCase):
}
}
},
- expected_status=http_client.OK)
+ expected_status=200)
rescoped_token = resp.result['access']['token']['id']
rescoped_expiration = resp.result['access']['token']['expires']
self.assertNotEqual(original_token, rescoped_token)
diff --git a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
index 8d6d9eb7..2a3fad86 100644
--- a/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
+++ b/keystone-moon/keystone/tests/unit/test_v2_keystoneclient.py
@@ -1137,7 +1137,7 @@ class ClientDrivenTestCase(unit.TestCase):
credentials, signature = self._generate_default_user_ec2_credentials()
credentials['signature'] = signature
resp, token = self._send_ec2_auth_request(credentials)
- self.assertEqual(http_client.OK, resp.status_code)
+ self.assertEqual(200, resp.status_code)
self.assertIn('access', token)
def test_ec2_auth_success_trust(self):
@@ -1169,7 +1169,7 @@ class ClientDrivenTestCase(unit.TestCase):
cred.access, cred.secret)
credentials['signature'] = signature
resp, token = self._send_ec2_auth_request(credentials)
- self.assertEqual(http_client.OK, resp.status_code)
+ self.assertEqual(200, resp.status_code)
self.assertEqual(trust_id, token['access']['trust']['id'])
# TODO(shardy) we really want to check the roles and trustee
# but because of where the stubbing happens we don't seem to
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py
index 7afe6ad8..32c5e295 100644
--- a/keystone-moon/keystone/tests/unit/test_v3.py
+++ b/keystone-moon/keystone/tests/unit/test_v3.py
@@ -18,7 +18,6 @@ import uuid
from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
-from six.moves import http_client
from testtools import matchers
from keystone import auth
@@ -412,7 +411,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
r = self.v3_authenticate_token(auth)
return r.headers.get('X-Subject-Token')
- def v3_authenticate_token(self, auth, expected_status=http_client.CREATED):
+ def v3_authenticate_token(self, auth, expected_status=201):
return self.admin_request(method='POST',
path='/v3/auth/tokens',
body=auth,
@@ -441,31 +440,42 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
return self.admin_request(path=path, token=token, **kwargs)
- def get(self, path, expected_status=http_client.OK, **kwargs):
- return self.v3_request(path, method='GET',
- expected_status=expected_status, **kwargs)
+ def get(self, path, **kwargs):
+ r = self.v3_request(method='GET', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 200)
+ return r
- def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- r = self.v3_request(path, method='HEAD',
- expected_status=expected_status, **kwargs)
+ def head(self, path, **kwargs):
+ r = self.v3_request(method='HEAD', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
self.assertEqual('', r.body)
return r
- def post(self, path, expected_status=http_client.CREATED, **kwargs):
- return self.v3_request(path, method='POST',
- expected_status=expected_status, **kwargs)
+ def post(self, path, **kwargs):
+ r = self.v3_request(method='POST', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 201)
+ return r
- def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- return self.v3_request(path, method='PUT',
- expected_status=expected_status, **kwargs)
+ def put(self, path, **kwargs):
+ r = self.v3_request(method='PUT', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
+ return r
- def patch(self, path, expected_status=http_client.OK, **kwargs):
- return self.v3_request(path, method='PATCH',
- expected_status=expected_status, **kwargs)
+ def patch(self, path, **kwargs):
+ r = self.v3_request(method='PATCH', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 200)
+ return r
- def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- return self.v3_request(path, method='DELETE',
- expected_status=expected_status, **kwargs)
+ def delete(self, path, **kwargs):
+ r = self.v3_request(method='DELETE', path=path, **kwargs)
+ if 'expected_status' not in kwargs:
+ self.assertResponseStatus(r, 204)
+ return r
def assertValidErrorResponse(self, r):
resp = r.result
diff --git a/keystone-moon/keystone/tests/unit/test_v3_assignment.py b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
index f22e9f2b..6b15b1c3 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_assignment.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_assignment.py
@@ -363,13 +363,14 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# validates the returned token and it should be valid.
self.head('/auth/tokens',
headers={'x-subject-token': subject_token},
- expected_status=http_client.OK)
+ expected_status=200)
# now disable the domain
self.domain['enabled'] = False
url = "/domains/%(domain_id)s" % {'domain_id': self.domain['id']}
self.patch(url,
- body={'domain': {'enabled': False}})
+ body={'domain': {'enabled': False}},
+ expected_status=200)
# validates the same token again and it should be 'not found'
# as the domain has already been disabled.
@@ -511,7 +512,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
ref = self.new_project_ref(domain_id=self.domain_id, is_domain=True)
self.post('/projects',
body={'project': ref},
- expected_status=http_client.NOT_IMPLEMENTED)
+ expected_status=501)
@utils.wip('waiting for projects acting as domains implementation')
def test_create_project_without_parent_id_and_without_domain_id(self):
@@ -1289,9 +1290,9 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
member_url = ('%(collection_url)s/%(role_id)s' % {
'collection_url': collection_url,
'role_id': self.role_id})
- self.put(member_url)
+ self.put(member_url, expected_status=204)
# Check the user has the role assigned
- self.head(member_url)
+ self.head(member_url, expected_status=204)
return member_url, user_ref
def test_delete_user_before_removing_role_assignment_succeeds(self):
@@ -1300,7 +1301,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# Delete the user from identity backend
self.identity_api.driver.delete_user(user['id'])
# Clean up the role assignment
- self.delete(member_url)
+ self.delete(member_url, expected_status=204)
# Make sure the role is gone
self.head(member_url, expected_status=http_client.NOT_FOUND)
@@ -1343,7 +1344,7 @@ class AssignmentTestCase(test_v3.RestfulTestCase,
# validates the returned token; it should be valid.
self.head('/auth/tokens',
headers={'x-subject-token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# revokes the grant from group on project.
self.assignment_api.delete_grant(role_id=self.role['id'],
@@ -1868,7 +1869,7 @@ class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase,
self.default_user_id = self.user_ids[0]
self.default_group_id = self.group_ids[0]
- def get_role_assignments(self, expected_status=http_client.OK, **filters):
+ def get_role_assignments(self, expected_status=200, **filters):
"""Returns the result from querying role assignment API + queried URL.
Calls GET /v3/role_assignments?<params> and returns its result, where
diff --git a/keystone-moon/keystone/tests/unit/test_v3_auth.py b/keystone-moon/keystone/tests/unit/test_v3_auth.py
index 496a75c0..d53a85df 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_auth.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_auth.py
@@ -384,8 +384,9 @@ class TokenAPITests(object):
v2_token = r.result['access']['token']['id']
# Delete the v2 token using v3.
- self.delete(
+ resp = self.delete(
'/auth/tokens', headers={'X-Subject-Token': v2_token})
+ self.assertEqual(resp.status_code, 204)
# Attempting to use the deleted token on v2 should fail.
self.admin_request(
@@ -405,8 +406,7 @@ class TokenAPITests(object):
self.assertEqual(expires, r.result['token']['expires_at'])
def test_check_token(self):
- self.head('/auth/tokens', headers=self.headers,
- expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=self.headers, expected_status=200)
def test_validate_token(self):
r = self.get('/auth/tokens', headers=self.headers)
@@ -655,13 +655,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.OK,
+ self.head('/auth/tokens', headers=headers, expected_status=200,
token=adminA_token)
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.OK,
+ self.head('/auth/tokens', headers=headers, expected_status=200,
token=user_token)
- self.delete('/auth/tokens', headers=headers,
+ self.delete('/auth/tokens', headers=headers, expected_status=204,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
@@ -695,13 +693,11 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.OK,
+ self.head('/auth/tokens', headers=headers, expected_status=200,
token=adminA_token)
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.OK,
+ self.head('/auth/tokens', headers=headers, expected_status=200,
token=user_token)
- self.delete('/auth/tokens', headers=headers,
+ self.delete('/auth/tokens', headers=headers, expected_status=204,
token=adminA_token)
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
@@ -868,10 +864,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm both tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
# create a new role
role = self.new_role_ref()
@@ -887,10 +883,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# both tokens should remain valid
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
def test_deleting_user_grant_revokes_token(self):
"""Test deleting a user grant revokes token.
@@ -910,7 +906,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# Delete the grant, which should invalidate the token
grant_url = (
'/projects/%(project_id)s/users/%(user_id)s/'
@@ -1012,19 +1008,19 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE},
- expected_status=http_client.OK)
+ expected_status=200)
# Delete the role, which should invalidate the tokens
role_url = '/roles/%s' % self.role1['id']
@@ -1047,7 +1043,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# ...but the one using role2 is still valid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC},
- expected_status=http_client.OK)
+ expected_status=200)
def test_domain_user_role_assignment_maintains_token(self):
"""Test user-domain role assignment maintains existing token.
@@ -1067,7 +1063,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# Assign a role, which should not affect the token
grant_url = (
'/domains/%(domain_id)s/users/%(user_id)s/'
@@ -1078,7 +1074,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.put(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
def test_disabling_project_revokes_token(self):
token = self.get_requested_token(
@@ -1090,7 +1086,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# disable the project, which should invalidate the token
self.patch(
@@ -1118,7 +1114,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# delete the project, which should invalidate the token
self.delete(
@@ -1167,13 +1163,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': token3},
- expected_status=http_client.OK)
+ expected_status=200)
# Delete the group grant, which should invalidate the
# tokens for user1 and user2
grant_url = (
@@ -1213,7 +1209,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# Delete the grant, which should invalidate the token
grant_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/'
@@ -1224,7 +1220,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.put(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
def test_group_membership_changes_revokes_token(self):
"""Test add/removal to/from group revokes token.
@@ -1254,10 +1250,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
- expected_status=http_client.OK)
+ expected_status=200)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ expected_status=200)
# Remove user1 from group1, which should invalidate
# the token
self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
@@ -1269,14 +1265,14 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# But user2's token should still be valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ expected_status=200)
# Adding user2 to a group should not invalidate token
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group2['id'],
'user_id': self.user2['id']})
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ expected_status=200)
def test_removing_role_assignment_does_not_affect_other_users(self):
"""Revoking a role from one user should not affect other users."""
@@ -1320,7 +1316,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# authorization for the second user should still succeed
self.head('/auth/tokens',
headers={'X-Subject-Token': user3_token},
- expected_status=http_client.OK)
+ expected_status=200)
self.v3_authenticate_token(
self.build_authentication_request(
user_id=self.user3['id'],
@@ -1370,7 +1366,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
token = self.get_v2_token()
self.delete('/auth/tokens',
- headers={'X-Subject-Token': token})
+ headers={'X-Subject-Token': token},
+ expected_status=204)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -1400,7 +1397,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# revoke the project-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': project_scoped_token})
+ headers={'X-Subject-Token': project_scoped_token},
+ expected_status=204)
# The project-scoped token is invalidated.
self.head('/auth/tokens',
@@ -1410,16 +1408,17 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
# The domain-scoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
# revoke the domain-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': domain_scoped_token})
+ headers={'X-Subject-Token': domain_scoped_token},
+ expected_status=204)
# The domain-scoped token is invalid.
self.head('/auth/tokens',
@@ -1429,7 +1428,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
def test_revoke_token_from_token_v2(self):
# Test that a scoped token can be requested from an unscoped token,
@@ -1447,7 +1446,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# revoke the project-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': project_scoped_token})
+ headers={'X-Subject-Token': project_scoped_token},
+ expected_status=204)
# The project-scoped token is invalidated.
self.head('/auth/tokens',
@@ -1457,7 +1457,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=http_client.OK)
+ expected_status=200)
class TestTokenRevokeByAssignment(TestTokenRevokeById):
@@ -1501,7 +1501,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById):
# authorization for the projectA should still succeed
self.head('/auth/tokens',
headers={'X-Subject-Token': other_project_token},
- expected_status=http_client.OK)
+ expected_status=200)
# while token for the projectB should not
self.head('/auth/tokens',
headers={'X-Subject-Token': project_token},
@@ -1563,24 +1563,27 @@ class TestTokenRevokeApi(TestTokenRevokeById):
def test_revoke_token(self):
scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token}
- response = self.get('/auth/tokens', headers=headers).json_body['token']
+ response = self.get('/auth/tokens', headers=headers,
+ expected_status=200).json_body['token']
- self.delete('/auth/tokens', headers=headers)
+ self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- events_response = self.get('/OS-REVOKE/events').json_body
+ events_response = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body
self.assertValidRevokedTokenResponse(events_response,
audit_id=response['audit_ids'][0])
def test_revoke_v2_token(self):
token = self.get_v2_token()
headers = {'X-Subject-Token': token}
- response = self.get('/auth/tokens',
- headers=headers).json_body['token']
- self.delete('/auth/tokens', headers=headers)
+ response = self.get('/auth/tokens', headers=headers,
+ expected_status=200).json_body['token']
+ self.delete('/auth/tokens', headers=headers, expected_status=204)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- events_response = self.get('/OS-REVOKE/events').json_body
+ events_response = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body
self.assertValidRevokedTokenResponse(
events_response,
@@ -1592,24 +1595,28 @@ class TestTokenRevokeApi(TestTokenRevokeById):
def test_list_delete_project_shows_in_event_list(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events').json_body['events']
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body['events']
self.assertEqual([], events)
self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']})
- events_response = self.get('/OS-REVOKE/events').json_body
+ events_response = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body
self.assertValidDeletedProjectResponse(events_response,
self.projectA['id'])
def test_disable_domain_shows_in_event_list(self):
- events = self.get('/OS-REVOKE/events').json_body['events']
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body['events']
self.assertEqual([], events)
disable_body = {'domain': {'enabled': False}}
self.patch(
'/domains/%(project_id)s' % {'project_id': self.domainA['id']},
body=disable_body)
- events = self.get('/OS-REVOKE/events').json_body
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body
self.assertDomainInList(events, self.domainA['id'])
@@ -1639,7 +1646,8 @@ class TestTokenRevokeApi(TestTokenRevokeById):
def test_list_delete_token_shows_in_event_list(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events').json_body['events']
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body['events']
self.assertEqual([], events)
scoped_token = self.get_scoped_token()
@@ -1653,17 +1661,15 @@ class TestTokenRevokeApi(TestTokenRevokeById):
response.json_body['token']
headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.OK)
- self.head('/auth/tokens', headers=headers2,
- expected_status=http_client.OK)
- self.head('/auth/tokens', headers=headers3,
- expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=headers, expected_status=200)
+ self.head('/auth/tokens', headers=headers2, expected_status=200)
+ self.head('/auth/tokens', headers=headers3, expected_status=200)
- self.delete('/auth/tokens', headers=headers)
+ self.delete('/auth/tokens', headers=headers, expected_status=204)
# NOTE(ayoung): not deleting token3, as it should be deleted
# by previous
- events_response = self.get('/OS-REVOKE/events').json_body
+ events_response = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body
events = events_response['events']
self.assertEqual(1, len(events))
self.assertEventDataInList(
@@ -1671,32 +1677,32 @@ class TestTokenRevokeApi(TestTokenRevokeById):
audit_id=token2['audit_ids'][1])
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- self.head('/auth/tokens', headers=headers2,
- expected_status=http_client.OK)
- self.head('/auth/tokens', headers=headers3,
- expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=headers2, expected_status=200)
+ self.head('/auth/tokens', headers=headers3, expected_status=200)
def test_list_with_filter(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events').json_body['events']
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body['events']
self.assertEqual(0, len(events))
scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token}
auth = self.build_authentication_request(token=scoped_token)
headers2 = {'X-Subject-Token': self.get_requested_token(auth)}
- self.delete('/auth/tokens', headers=headers)
- self.delete('/auth/tokens', headers=headers2)
+ self.delete('/auth/tokens', headers=headers, expected_status=204)
+ self.delete('/auth/tokens', headers=headers2, expected_status=204)
- events = self.get('/OS-REVOKE/events').json_body['events']
+ events = self.get('/OS-REVOKE/events',
+ expected_status=200).json_body['events']
self.assertEqual(2, len(events))
future = utils.isotime(timeutils.utcnow() +
datetime.timedelta(seconds=1000))
- events = self.get('/OS-REVOKE/events?since=%s' % (future)
- ).json_body['events']
+ events = self.get('/OS-REVOKE/events?since=%s' % (future),
+ expected_status=200).json_body['events']
self.assertEqual(0, len(events))
@@ -3106,7 +3112,8 @@ class TestTrustChain(test_v3.RestfulTestCase):
def test_delete_trust_cascade(self):
self.assert_user_authenticate(self.user_chain[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[0]['id']})
+ 'trust_id': self.trust_chain[0]['id']},
+ expected_status=204)
headers = {'X-Subject-Token': self.last_token}
self.head('/auth/tokens', headers=headers,
@@ -3116,10 +3123,12 @@ class TestTrustChain(test_v3.RestfulTestCase):
def test_delete_broken_chain(self):
self.assert_user_authenticate(self.user_chain[0])
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[1]['id']})
+ 'trust_id': self.trust_chain[1]['id']},
+ expected_status=204)
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[0]['id']})
+ 'trust_id': self.trust_chain[0]['id']},
+ expected_status=204)
def test_trustor_roles_revoked(self):
self.assert_user_authenticate(self.user_chain[0])
@@ -3214,7 +3223,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
# make sure the trust exists
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
# get a token for the trustee
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
@@ -3232,7 +3242,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self._initialize_test_consume_trust(2)
# check decremented value
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
trust = r.result.get('trust')
self.assertIsNotNone(trust)
self.assertEqual(1, trust['remaining_uses'])
@@ -3310,7 +3321,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'])
@@ -3321,7 +3333,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust_id=trust['id'])
r = self.v3_authenticate_token(auth_data)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
trust = r.result.get('trust')
self.assertIsNone(trust['remaining_uses'])
@@ -3335,27 +3348,30 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=200)
self.assertValidTrustResponse(r, ref)
# validate roles on the trust
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles' % {
- 'trust_id': trust['id']})
+ 'trust_id': trust['id']},
+ expected_status=200)
roles = self.assertValidRoleListResponse(r, self.role)
self.assertIn(self.role['id'], [x['id'] for x in roles])
self.head(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'],
'role_id': self.role['id']},
- expected_status=http_client.OK)
+ expected_status=200)
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'],
- 'role_id': self.role['id']})
+ 'role_id': self.role['id']},
+ expected_status=200)
self.assertValidRoleResponse(r, self.role)
- r = self.get('/OS-TRUST/trusts')
+ r = self.get('/OS-TRUST/trusts', expected_status=200)
self.assertValidTrustListResponse(r, trust)
# trusts are immutable
@@ -3365,7 +3381,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
expected_status=http_client.NOT_FOUND)
self.delete(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
+ expected_status=204)
self.get(
'/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
@@ -3554,7 +3571,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
path = '/v2.0/tokens/%s' % (token)
self.admin_request(
path=path, token=CONF.admin_token,
- method='GET', expected_status=http_client.OK)
+ method='GET', expected_status=200)
def test_exercise_trust_scoped_token_without_impersonation(self):
ref = self.new_trust_ref(
@@ -3758,7 +3775,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
expected_status=http_client.FORBIDDEN)
def assertTrustTokensRevoked(self, trust_id):
- revocation_response = self.get('/OS-REVOKE/events')
+ revocation_response = self.get('/OS-REVOKE/events',
+ expected_status=200)
revocation_events = revocation_response.json_body['events']
found = False
for event in revocation_events:
@@ -3787,7 +3805,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r, self.trustee_user)
trust_token = r.headers['X-Subject-Token']
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust_id})
+ 'trust_id': trust_id},
+ expected_status=204)
headers = {'X-Subject-Token': trust_token}
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
@@ -3814,7 +3833,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data)
+ self.v3_authenticate_token(auth_data, expected_status=201)
self.disable_user(self.user)
@@ -3842,7 +3861,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data)
+ self.v3_authenticate_token(auth_data, expected_status=201)
self.disable_user(self.trustee_user)
@@ -3867,7 +3886,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self.assertValidTrustResponse(r, ref)
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust['id']})
+ 'trust_id': trust['id']},
+ expected_status=204)
self.get('/OS-TRUST/trusts/%(trust_id)s' % {
'trust_id': trust['id']},
@@ -3897,19 +3917,19 @@ class TestTrustAuth(test_v3.RestfulTestCase):
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
self.assertValidTrustResponse(r, ref)
- r = self.get('/OS-TRUST/trusts')
+ r = self.get('/OS-TRUST/trusts', expected_status=200)
trusts = r.result['trusts']
self.assertEqual(3, len(trusts))
self.assertValidTrustListResponse(r)
r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
- self.user_id)
+ self.user_id, expected_status=200)
trusts = r.result['trusts']
self.assertEqual(3, len(trusts))
self.assertValidTrustListResponse(r)
r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' %
- self.user_id)
+ self.user_id, expected_status=200)
trusts = r.result['trusts']
self.assertEqual(0, len(trusts))
@@ -3935,11 +3955,13 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust_token = r.headers.get('X-Subject-Token')
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
- self.user_id, token=trust_token)
+ self.user_id, expected_status=200,
+ token=trust_token)
self.assertValidUserResponse(
self.patch('/users/%s' % self.trustee_user['id'],
- body={'user': {'password': uuid.uuid4().hex}}))
+ body={'user': {'password': uuid.uuid4().hex}},
+ expected_status=200))
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=http_client.UNAUTHORIZED,
@@ -3971,13 +3993,14 @@ class TestTrustAuth(test_v3.RestfulTestCase):
'trust_id': trust['id'],
'role_id': self.role['id']},
auth=auth_data,
- expected_status=http_client.OK)
+ expected_status=200)
r = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'],
'role_id': self.role['id']},
- auth=auth_data)
+ auth=auth_data,
+ expected_status=200)
self.assertValidRoleResponse(r, self.role)
def test_do_not_consume_remaining_uses_when_get_token_fails(self):
@@ -4022,7 +4045,7 @@ class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase):
'query_string': {},
'environment': {}}
r = auth_controller.validate_token(context)
- self.assertEqual(http_client.OK, r.status_code)
+ self.assertEqual(200, r.status_code)
class TestAuthContext(unit.TestCase):
@@ -4082,7 +4105,9 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
def test_get_catalog_project_scoped_token(self):
"""Call ``GET /auth/catalog`` with a project-scoped token."""
- r = self.get('/auth/catalog')
+ r = self.get(
+ '/auth/catalog',
+ expected_status=200)
self.assertValidCatalogResponse(r)
def test_get_catalog_domain_scoped_token(self):
@@ -4116,7 +4141,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
expected_status=http_client.UNAUTHORIZED)
def test_get_projects_project_scoped_token(self):
- r = self.get('/auth/projects')
+ r = self.get('/auth/projects', expected_status=200)
self.assertThat(r.json['projects'], matchers.HasLength(1))
self.assertValidProjectListResponse(r)
@@ -4124,7 +4149,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id']))
- r = self.get('/auth/domains')
+ r = self.get('/auth/domains', expected_status=200)
self.assertThat(r.json['domains'], matchers.HasLength(1))
self.assertValidDomainListResponse(r)
@@ -4135,7 +4160,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
def _make_auth_request(self, auth_data):
- resp = self.post('/auth/tokens', body=auth_data)
+ resp = self.post('/auth/tokens', body=auth_data, expected_status=201)
token = resp.headers.get('X-Subject-Token')
self.assertLess(len(token), 255)
return token
@@ -4167,13 +4192,13 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase):
trust_id=trust['id'])
return self._make_auth_request(auth_data)
- def _validate_token(self, token, expected_status=http_client.OK):
+ def _validate_token(self, token, expected_status=200):
return self.get(
'/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=expected_status)
- def _revoke_token(self, token, expected_status=http_client.NO_CONTENT):
+ def _revoke_token(self, token, expected_status=204):
return self.delete(
'/auth/tokens',
headers={'X-Subject-Token': token},
@@ -4547,8 +4572,7 @@ class TestAuthFernetTokenProvider(TestAuth):
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
# Bind not current supported by Fernet, see bug 1433311.
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.NOT_IMPLEMENTED)
+ self.v3_authenticate_token(auth_data, expected_status=501)
def test_v2_v3_bind_token_intermix(self):
self.config_fixture.config(group='token', bind='kerberos')
@@ -4563,7 +4587,7 @@ class TestAuthFernetTokenProvider(TestAuth):
self.admin_request(path='/v2.0/tokens',
method='POST',
body=body,
- expected_status=http_client.NOT_IMPLEMENTED)
+ expected_status=501)
def test_auth_with_bind_token(self):
self.config_fixture.config(group='token', bind=['kerberos'])
@@ -4573,5 +4597,4 @@ class TestAuthFernetTokenProvider(TestAuth):
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
# Bind not current supported by Fernet, see bug 1433311.
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.NOT_IMPLEMENTED)
+ self.v3_authenticate_token(auth_data, expected_status=501)
diff --git a/keystone-moon/keystone/tests/unit/test_v3_catalog.py b/keystone-moon/keystone/tests/unit/test_v3_catalog.py
index 0d82390d..c536169a 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_catalog.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_catalog.py
@@ -36,7 +36,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
r = self.put(
'/regions/%s' % region_id,
body={'region': ref},
- expected_status=http_client.CREATED)
+ expected_status=201)
self.assertValidRegionResponse(r, ref)
# Double-check that the region ID was kept as-is and not
# populated with a UUID, as is the case with POST /v3/regions
@@ -49,7 +49,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
r = self.put(
'/regions/%s' % region_id,
body={'region': ref},
- expected_status=http_client.CREATED)
+ expected_status=201)
self.assertValidRegionResponse(r, ref)
# Double-check that the region ID was kept as-is and not
# populated with a UUID, as is the case with POST /v3/regions
@@ -60,7 +60,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref = dict(description="my region")
self.put(
'/regions/myregion',
- body={'region': ref}, expected_status=http_client.CREATED)
+ body={'region': ref}, expected_status=201)
# Create region again with duplicate id
self.put(
'/regions/myregion',
@@ -86,7 +86,9 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref = self.new_region_ref()
ref['id'] = ''
- r = self.post('/regions', body={'region': ref})
+ r = self.post(
+ '/regions',
+ body={'region': ref}, expected_status=201)
self.assertValidRegionResponse(r, ref)
self.assertNotEmpty(r.result['region'].get('id'))
@@ -98,7 +100,10 @@ class CatalogTestCase(test_v3.RestfulTestCase):
del ref['id']
# let the service define the ID
- r = self.post('/regions', body={'region': ref})
+ r = self.post(
+ '/regions',
+ body={'region': ref},
+ expected_status=201)
self.assertValidRegionResponse(r, ref)
def test_create_region_without_description(self):
@@ -107,7 +112,10 @@ class CatalogTestCase(test_v3.RestfulTestCase):
del ref['description']
- r = self.post('/regions', body={'region': ref})
+ r = self.post(
+ '/regions',
+ body={'region': ref},
+ expected_status=201)
# Create the description in the reference to compare to since the
# response should now have a description, even though we didn't send
# it with the original reference.
@@ -127,10 +135,16 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref1['description'] = region_desc
ref2['description'] = region_desc
- resp1 = self.post('/regions', body={'region': ref1})
+ resp1 = self.post(
+ '/regions',
+ body={'region': ref1},
+ expected_status=201)
self.assertValidRegionResponse(resp1, ref1)
- resp2 = self.post('/regions', body={'region': ref2})
+ resp2 = self.post(
+ '/regions',
+ body={'region': ref2},
+ expected_status=201)
self.assertValidRegionResponse(resp2, ref2)
def test_create_regions_without_descriptions(self):
@@ -145,9 +159,15 @@ class CatalogTestCase(test_v3.RestfulTestCase):
del ref1['description']
ref2['description'] = None
- resp1 = self.post('/regions', body={'region': ref1})
+ resp1 = self.post(
+ '/regions',
+ body={'region': ref1},
+ expected_status=201)
- resp2 = self.post('/regions', body={'region': ref2})
+ resp2 = self.post(
+ '/regions',
+ body={'region': ref2},
+ expected_status=201)
# Create the descriptions in the references to compare to since the
# responses should now have descriptions, even though we didn't send
# a description with the original references.
@@ -211,14 +231,16 @@ class CatalogTestCase(test_v3.RestfulTestCase):
"""Call ``PATCH /regions/{region_id}``."""
region_ref = self.new_region_ref()
- resp = self.post('/regions', body={'region': region_ref})
+ resp = self.post('/regions', body={'region': region_ref},
+ expected_status=201)
region_updates = {
# update with something that's not the description
'parent_region_id': self.region_id,
}
resp = self.patch('/regions/%s' % region_ref['id'],
- body={'region': region_updates})
+ body={'region': region_updates},
+ expected_status=200)
# NOTE(dstanek): Keystone should keep the original description.
self.assertEqual(region_ref['description'],
@@ -591,7 +613,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref = self.new_endpoint_ref(service_id=self.service_id)
ref["region"] = uuid.uuid4().hex
ref.pop('region_id')
- self.post('/endpoints', body={'endpoint': ref})
+ self.post('/endpoints', body={'endpoint': ref}, expected_status=201)
# Make sure the region is created
self.get('/regions/%(region_id)s' % {
'region_id': ref["region"]})
@@ -600,7 +622,7 @@ class CatalogTestCase(test_v3.RestfulTestCase):
"""EndpointV3 allows to creates the endpoint without region."""
ref = self.new_endpoint_ref(service_id=self.service_id)
ref.pop('region_id')
- self.post('/endpoints', body={'endpoint': ref})
+ self.post('/endpoints', body={'endpoint': ref}, expected_status=201)
def test_create_endpoint_with_empty_url(self):
"""Call ``POST /endpoints``."""
@@ -756,7 +778,9 @@ class CatalogTestCase(test_v3.RestfulTestCase):
ref = self.new_endpoint_ref(self.service_id)
ref['url'] = valid_url
- self.post('/endpoints', body={'endpoint': ref})
+ self.post('/endpoints',
+ body={'endpoint': ref},
+ expected_status=201)
def test_endpoint_create_with_invalid_url(self):
"""Test the invalid cases: substitutions is not exactly right.
diff --git a/keystone-moon/keystone/tests/unit/test_v3_credential.py b/keystone-moon/keystone/tests/unit/test_v3_credential.py
index cf504b00..dd8cf2dd 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_credential.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_credential.py
@@ -382,7 +382,7 @@ class TestCredentialEc2(CredentialBaseTestCase):
r = self.post(
'/ec2tokens',
body={'ec2Credentials': sig_ref},
- expected_status=http_client.OK)
+ expected_status=200)
self.assertValidTokenResponse(r)
def test_ec2_credential_signature_validate(self):
diff --git a/keystone-moon/keystone/tests/unit/test_v3_domain_config.py b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py
index 3f7af87d..701cd3cf 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_domain_config.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_domain_config.py
@@ -40,7 +40,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
url = '/domains/%(domain_id)s/config' % {
'domain_id': self.domain['id']}
r = self.put(url, body={'config': self.config},
- expected_status=http_client.CREATED)
+ expected_status=201)
res = self.domain_config_api.get_config(self.domain['id'])
self.assertEqual(self.config, r.result['config'])
self.assertEqual(self.config, res)
@@ -50,11 +50,11 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
self.put('/domains/%(domain_id)s/config' % {
'domain_id': self.domain['id']},
body={'config': self.config},
- expected_status=http_client.CREATED)
+ expected_status=201)
self.put('/domains/%(domain_id)s/config' % {
'domain_id': self.domain['id']},
body={'config': self.config},
- expected_status=http_client.OK)
+ expected_status=200)
def test_delete_config(self):
"""Call ``DELETE /domains{domain_id}/config``."""
@@ -80,7 +80,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}
r = self.get(url)
self.assertEqual(self.config, r.result['config'])
- self.head(url, expected_status=http_client.OK)
+ self.head(url, expected_status=200)
def test_get_config_by_group(self):
"""Call ``GET & HEAD /domains{domain_id}/config/{group}``."""
@@ -89,7 +89,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
'domain_id': self.domain['id']}
r = self.get(url)
self.assertEqual({'ldap': self.config['ldap']}, r.result['config'])
- self.head(url, expected_status=http_client.OK)
+ self.head(url, expected_status=200)
def test_get_config_by_option(self):
"""Call ``GET & HEAD /domains{domain_id}/config/{group}/{option}``."""
@@ -99,7 +99,7 @@ class DomainConfigTestCase(test_v3.RestfulTestCase):
r = self.get(url)
self.assertEqual({'url': self.config['ldap']['url']},
r.result['config'])
- self.head(url, expected_status=http_client.OK)
+ self.head(url, expected_status=200)
def test_get_non_existant_config(self):
"""Call ``GET /domains{domain_id}/config when no config defined``."""
diff --git a/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py b/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py
index b0c8256e..3423d2d8 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_endpoint_policy.py
@@ -53,14 +53,12 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
url,
expected_status=http_client.NOT_FOUND)
- self.put(url)
+ self.put(url, expected_status=204)
# test that the new resource is accessible.
- self.assert_head_and_get_return_same_response(
- url,
- expected_status=http_client.NO_CONTENT)
+ self.assert_head_and_get_return_same_response(url, expected_status=204)
- self.delete(url)
+ self.delete(url, expected_status=204)
# test that the deleted resource is no longer accessible
self.assert_head_and_get_return_same_response(
@@ -101,16 +99,18 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.put('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
'/endpoints/%(endpoint_id)s' % {
'policy_id': self.policy['id'],
- 'endpoint_id': self.endpoint['id']})
+ 'endpoint_id': self.endpoint['id']},
+ expected_status=204)
self.head('/endpoints/%(endpoint_id)s/OS-ENDPOINT-POLICY'
'/policy' % {
'endpoint_id': self.endpoint['id']},
- expected_status=http_client.OK)
+ expected_status=200)
r = self.get('/endpoints/%(endpoint_id)s/OS-ENDPOINT-POLICY'
'/policy' % {
- 'endpoint_id': self.endpoint['id']})
+ 'endpoint_id': self.endpoint['id']},
+ expected_status=200)
self.assertValidPolicyResponse(r, ref=self.policy)
def test_list_endpoints_for_policy(self):
@@ -119,11 +119,13 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
self.put('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
'/endpoints/%(endpoint_id)s' % {
'policy_id': self.policy['id'],
- 'endpoint_id': self.endpoint['id']})
+ 'endpoint_id': self.endpoint['id']},
+ expected_status=204)
r = self.get('/policies/%(policy_id)s/OS-ENDPOINT-POLICY'
'/endpoints' % {
- 'policy_id': self.policy['id']})
+ 'policy_id': self.policy['id']},
+ expected_status=200)
self.assertValidEndpointListResponse(r, ref=self.endpoint)
self.assertThat(r.result.get('endpoints'), matchers.HasLength(1))
@@ -133,8 +135,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
'policy_id': self.policy['id'],
'endpoint_id': self.endpoint['id']}
- self.put(url)
- self.head(url)
+ self.put(url, expected_status=204)
+ self.head(url, expected_status=204)
self.delete('/endpoints/%(endpoint_id)s' % {
'endpoint_id': self.endpoint['id']})
@@ -148,8 +150,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
'service_id': self.service['id'],
'region_id': self.region['id']}
- self.put(url)
- self.head(url)
+ self.put(url, expected_status=204)
+ self.head(url, expected_status=204)
self.delete('/regions/%(region_id)s' % {
'region_id': self.region['id']})
@@ -163,8 +165,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
'service_id': self.service['id'],
'region_id': self.region['id']}
- self.put(url)
- self.head(url)
+ self.put(url, expected_status=204)
+ self.head(url, expected_status=204)
self.delete('/services/%(service_id)s' % {
'service_id': self.service['id']})
@@ -177,8 +179,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
'policy_id': self.policy['id'],
'service_id': self.service['id']}
- self.put(url)
- self.get(url, expected_status=http_client.NO_CONTENT)
+ self.put(url, expected_status=204)
+ self.get(url, expected_status=204)
self.delete('/policies/%(policy_id)s' % {
'policy_id': self.policy['id']})
@@ -191,8 +193,8 @@ class EndpointPolicyTestCase(test_v3.RestfulTestCase):
'policy_id': self.policy['id'],
'service_id': self.service['id']}
- self.put(url)
- self.get(url, expected_status=http_client.NO_CONTENT)
+ self.put(url, expected_status=204)
+ self.get(url, expected_status=204)
self.delete('/services/%(service_id)s' % {
'service_id': self.service['id']})
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py
index 5717e67b..4d7dcaab 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_federation.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py
@@ -815,7 +815,7 @@ class FederatedIdentityProviderTests(FederationTests):
if body is None:
body = self._http_idp_input()
resp = self.put(url, body={'identity_provider': body},
- expected_status=http_client.CREATED)
+ expected_status=201)
return resp
def _http_idp_input(self, **kwargs):
@@ -1027,7 +1027,7 @@ class FederatedIdentityProviderTests(FederationTests):
url = self.base_url(suffix=uuid.uuid4().hex)
body = self._http_idp_input()
self.put(url, body={'identity_provider': body},
- expected_status=http_client.CREATED)
+ expected_status=201)
self.put(url, body={'identity_provider': body},
expected_status=http_client.CONFLICT)
@@ -1084,7 +1084,7 @@ class FederatedIdentityProviderTests(FederationTests):
idp_url = self.base_url(suffix=idp_id)
# assign protocol to IdP
- kwargs = {'expected_status': http_client.CREATED}
+ kwargs = {'expected_status': 201}
resp, idp_id, proto = self._assign_protocol_to_idp(
url=url,
idp_id=idp_id,
@@ -1179,7 +1179,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_assign_protocol_to_idp(self):
"""Assign a protocol to existing IdP."""
- self._assign_protocol_to_idp(expected_status=http_client.CREATED)
+ self._assign_protocol_to_idp(expected_status=201)
def test_protocol_composite_pk(self):
"""Test whether Keystone let's add two entities with identical
@@ -1193,7 +1193,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': http_client.CREATED}
+ kwargs = {'expected_status': 201}
self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
@@ -1209,7 +1209,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
- kwargs = {'expected_status': http_client.CREATED}
+ kwargs = {'expected_status': 201}
resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
url=url, **kwargs)
kwargs = {'expected_status': http_client.CONFLICT}
@@ -1235,8 +1235,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_get_protocol(self):
"""Create and later fetch protocol tied to IdP."""
- resp, idp_id, proto = self._assign_protocol_to_idp(
- expected_status=http_client.CREATED)
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
url = "%s/protocols/%s" % (idp_id, proto_id)
url = self.base_url(suffix=url)
@@ -1255,14 +1254,12 @@ class FederatedIdentityProviderTests(FederationTests):
Compare input and output id sets.
"""
- resp, idp_id, proto = self._assign_protocol_to_idp(
- expected_status=http_client.CREATED)
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
iterations = random.randint(0, 16)
protocol_ids = []
for _ in range(iterations):
- resp, _, proto = self._assign_protocol_to_idp(
- idp_id=idp_id,
- expected_status=http_client.CREATED)
+ resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id,
+ expected_status=201)
proto_id = self._fetch_attribute_from_response(resp, 'protocol')
proto_id = proto_id['id']
protocol_ids.append(proto_id)
@@ -1281,8 +1278,7 @@ class FederatedIdentityProviderTests(FederationTests):
def test_update_protocols_attribute(self):
"""Update protocol's attribute."""
- resp, idp_id, proto = self._assign_protocol_to_idp(
- expected_status=http_client.CREATED)
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
new_mapping_id = uuid.uuid4().hex
url = "%s/protocols/%s" % (idp_id, proto)
@@ -1303,8 +1299,7 @@ class FederatedIdentityProviderTests(FederationTests):
"""
url = self.base_url(suffix='/%(idp_id)s/'
'protocols/%(protocol_id)s')
- resp, idp_id, proto = self._assign_protocol_to_idp(
- expected_status=http_client.CREATED)
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
url = url % {'idp_id': idp_id,
'protocol_id': proto}
self.delete(url)
@@ -1345,7 +1340,7 @@ class MappingCRUDTests(FederationTests):
url = self.MAPPING_URL + uuid.uuid4().hex
resp = self.put(url,
body={'mapping': mapping_fixtures.MAPPING_LARGE},
- expected_status=http_client.CREATED)
+ expected_status=201)
return resp
def _get_id_from_response(self, resp):
@@ -1362,7 +1357,7 @@ class MappingCRUDTests(FederationTests):
resp = self.get(url)
entities = resp.result.get('mappings')
self.assertIsNotNone(entities)
- self.assertResponseStatus(resp, http_client.OK)
+ self.assertResponseStatus(resp, 200)
self.assertValidListLinks(resp.result.get('links'))
self.assertEqual(1, len(entities))
@@ -1372,7 +1367,7 @@ class MappingCRUDTests(FederationTests):
mapping_id = self._get_id_from_response(resp)
url = url % {'mapping_id': str(mapping_id)}
resp = self.delete(url)
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
self.get(url, expected_status=http_client.NOT_FOUND)
def test_mapping_get(self):
@@ -1976,8 +1971,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
token_id, 'project',
self.project_all['id'])
- self.v3_authenticate_token(
- scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR)
+ self.v3_authenticate_token(scoped_token, expected_status=500)
def test_lists_with_missing_group_in_backend(self):
"""Test a mapping that points to a group that does not exist
@@ -2529,7 +2523,7 @@ class SAMLGenerationTests(FederationTests):
self.sp = self.sp_ref()
url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
self.put(url, body={'service_provider': self.sp},
- expected_status=http_client.CREATED)
+ expected_status=201)
def test_samlize_token_values(self):
"""Test the SAML generator produces a SAML object.
@@ -2763,7 +2757,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=http_client.OK)
+ expected_status=200)
response = etree.fromstring(http_response.result)
issuer = response[0]
@@ -2879,7 +2873,7 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
response_content_type='text/xml',
- expected_status=http_client.OK)
+ expected_status=200)
env_response = etree.fromstring(http_response.result)
header = env_response[0]
@@ -3079,13 +3073,13 @@ class IdPMetadataGenerationTests(FederationTests):
self.generator.generate_metadata)
def test_get_metadata_with_no_metadata_file_configured(self):
- self.get(self.METADATA_URL,
- expected_status=http_client.INTERNAL_SERVER_ERROR)
+ self.get(self.METADATA_URL, expected_status=500)
def test_get_metadata(self):
self.config_fixture.config(
group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
- r = self.get(self.METADATA_URL, response_content_type='text/xml')
+ r = self.get(self.METADATA_URL, response_content_type='text/xml',
+ expected_status=200)
self.assertEqual('text/xml', r.headers.get('Content-Type'))
reference_file = _load_xml('idp_saml2_metadata.xml')
@@ -3108,7 +3102,7 @@ class ServiceProviderTests(FederationTests):
self.SP_REF = self.sp_ref()
self.SERVICE_PROVIDER = self.put(
url, body={'service_provider': self.SP_REF},
- expected_status=http_client.CREATED).result
+ expected_status=201).result
def sp_ref(self):
ref = {
@@ -3127,7 +3121,7 @@ class ServiceProviderTests(FederationTests):
def test_get_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.get(url)
+ resp = self.get(url, expected_status=200)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
@@ -3139,7 +3133,7 @@ class ServiceProviderTests(FederationTests):
url = self.base_url(suffix=uuid.uuid4().hex)
sp = self.sp_ref()
resp = self.put(url, body={'service_provider': sp},
- expected_status=http_client.CREATED)
+ expected_status=201)
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
@@ -3149,7 +3143,7 @@ class ServiceProviderTests(FederationTests):
sp = self.sp_ref()
del sp['relay_state_prefix']
resp = self.put(url, body={'service_provider': sp},
- expected_status=http_client.CREATED)
+ expected_status=201)
sp_result = resp.result['service_provider']
self.assertEqual(CONF.saml.relay_state_prefix,
sp_result['relay_state_prefix'])
@@ -3161,7 +3155,7 @@ class ServiceProviderTests(FederationTests):
non_default_prefix = uuid.uuid4().hex
sp['relay_state_prefix'] = non_default_prefix
resp = self.put(url, body={'service_provider': sp},
- expected_status=http_client.CREATED)
+ expected_status=201)
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
@@ -3188,8 +3182,7 @@ class ServiceProviderTests(FederationTests):
}
for id, sp in ref_service_providers.items():
url = self.base_url(suffix=id)
- self.put(url, body={'service_provider': sp},
- expected_status=http_client.CREATED)
+ self.put(url, body={'service_provider': sp}, expected_status=201)
# Insert ids into service provider object, we will compare it with
# responses from server and those include 'id' attribute.
@@ -3216,14 +3209,15 @@ class ServiceProviderTests(FederationTests):
"""
new_sp_ref = self.sp_ref()
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref})
+ resp = self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=200)
patch_result = resp.result
new_sp_ref['id'] = self.SERVICE_PROVIDER_ID
self.assertValidEntity(patch_result['service_provider'],
ref=new_sp_ref,
keys_to_check=self.SP_KEYS)
- resp = self.get(url)
+ resp = self.get(url, expected_status=200)
get_result = resp.result
self.assertDictEqual(patch_result['service_provider'],
@@ -3261,14 +3255,15 @@ class ServiceProviderTests(FederationTests):
non_default_prefix = uuid.uuid4().hex
new_sp_ref['relay_state_prefix'] = non_default_prefix
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- resp = self.patch(url, body={'service_provider': new_sp_ref})
+ resp = self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=200)
sp_result = resp.result['service_provider']
self.assertEqual(non_default_prefix,
sp_result['relay_state_prefix'])
def test_delete_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
- self.delete(url)
+ self.delete(url, expected_status=204)
def test_delete_service_provider_404(self):
url = self.base_url(suffix=uuid.uuid4().hex)
diff --git a/keystone-moon/keystone/tests/unit/test_v3_identity.py b/keystone-moon/keystone/tests/unit/test_v3_identity.py
index 3d424cea..5a8e4fd5 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_identity.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_identity.py
@@ -295,17 +295,18 @@ class IdentityTestCase(test_v3.RestfulTestCase):
old_password_auth = self.build_authentication_request(
user_id=user_ref['id'],
password=password)
- r = self.v3_authenticate_token(old_password_auth)
+ r = self.v3_authenticate_token(old_password_auth, expected_status=201)
old_token = r.headers.get('X-Subject-Token')
# auth as user with a token should work before a password change
old_token_auth = self.build_authentication_request(token=old_token)
- self.v3_authenticate_token(old_token_auth)
+ self.v3_authenticate_token(old_token_auth, expected_status=201)
# administrative password reset
new_password = uuid.uuid4().hex
self.patch('/users/%s' % user_ref['id'],
- body={'user': {'password': new_password}})
+ body={'user': {'password': new_password}},
+ expected_status=200)
# auth as user with original password should not work after change
self.v3_authenticate_token(old_password_auth,
@@ -319,7 +320,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
new_password_auth = self.build_authentication_request(
user_id=user_ref['id'],
password=new_password)
- self.v3_authenticate_token(new_password_auth)
+ self.v3_authenticate_token(new_password_auth, expected_status=201)
def test_update_user_domain_id(self):
"""Call ``PATCH /users/{user_id}`` with domain_id."""
@@ -370,7 +371,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Confirm token is valid for now
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ expected_status=200)
# Now delete the user
self.delete('/users/%(user_id)s' % {
@@ -473,7 +474,8 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# administrative password reset
new_password = uuid.uuid4().hex
self.patch('/users/%s' % user_ref['id'],
- body={'user': {'password': new_password}})
+ body={'user': {'password': new_password}},
+ expected_status=200)
self.assertNotIn(password, log_fix.output)
self.assertNotIn(new_password, log_fix.output)
@@ -559,8 +561,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
password = self.user_ref['password']
self.user_ref = self.identity_api.create_user(self.user_ref)
self.user_ref['password'] = password
- self.token = self.get_request_token(self.user_ref['password'],
- http_client.CREATED)
+ self.token = self.get_request_token(self.user_ref['password'], 201)
def get_request_token(self, password, expected_status):
auth_data = self.build_authentication_request(
@@ -580,16 +581,16 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
def test_changing_password(self):
# original password works
token_id = self.get_request_token(self.user_ref['password'],
- expected_status=http_client.CREATED)
+ expected_status=201)
# original token works
old_token_auth = self.build_authentication_request(token=token_id)
- self.v3_authenticate_token(old_token_auth)
+ self.v3_authenticate_token(old_token_auth, expected_status=201)
# change password
new_password = uuid.uuid4().hex
self.change_password(password=new_password,
original_password=self.user_ref['password'],
- expected_status=http_client.NO_CONTENT)
+ expected_status=204)
# old password fails
self.get_request_token(self.user_ref['password'],
@@ -600,8 +601,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
expected_status=http_client.NOT_FOUND)
# new password works
- self.get_request_token(new_password,
- expected_status=http_client.CREATED)
+ self.get_request_token(new_password, expected_status=201)
def test_changing_password_with_missing_original_password_fails(self):
r = self.change_password(password=uuid.uuid4().hex,
@@ -640,7 +640,7 @@ class UserSelfServiceChangingPasswordsTestCase(test_v3.RestfulTestCase):
new_password = uuid.uuid4().hex
self.change_password(password=new_password,
original_password=self.user_ref['password'],
- expected_status=http_client.NO_CONTENT)
+ expected_status=204)
self.assertNotIn(self.user_ref['password'], log_fix.output)
self.assertNotIn(new_password, log_fix.output)
diff --git a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
index 3a0d481c..8794a426 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_oauth1.py
@@ -140,7 +140,7 @@ class ConsumerCRUDTests(OAuth1Tests):
consumer = self._create_single_consumer()
consumer_id = consumer['id']
resp = self.delete(self.CONSUMER_URL + '/%s' % consumer_id)
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
def test_consumer_get(self):
consumer = self._create_single_consumer()
@@ -262,7 +262,7 @@ class OAuthFlowTests(OAuth1Tests):
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
+ resp = self.put(url, body=body, expected_status=200)
self.verifier = resp.result['token']['oauth_verifier']
self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier))
self.assertEqual(8, len(self.verifier))
@@ -357,7 +357,7 @@ class AccessTokenCRUDTests(OAuthFlowTests):
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
% {'user': self.user_id,
'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
# List access_token should be 0
resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
@@ -400,7 +400,7 @@ class AuthTokenTests(OAuthFlowTests):
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
% {'user': self.user_id,
'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
# Check Keystone Token no longer exists
headers = {'X-Subject-Token': self.keystone_token_id,
@@ -415,7 +415,7 @@ class AuthTokenTests(OAuthFlowTests):
consumer_id = self.consumer['key']
resp = self.delete('/OS-OAUTH1/consumers/%(consumer_id)s'
% {'consumer_id': consumer_id})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
# List access_token should be 0
resp = self.get('/users/%(user_id)s/OS-OAUTH1/access_tokens'
@@ -645,7 +645,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
+ resp = self.put(url, body=body, expected_status=200)
verifier = resp.result['token']['oauth_verifier']
self.assertIsNotNone(verifier)
@@ -719,7 +719,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
+ resp = self.put(url, body=body, expected_status=200)
self.verifier = resp.result['token']['oauth_verifier']
self.request_token.set_verifier(self.verifier)
@@ -753,8 +753,7 @@ class MaliciousOAuth1Tests(OAuth1Tests):
# NOTE(stevemar): To simulate this error, we remove the Authorization
# header from the post request.
del headers['Authorization']
- self.post(endpoint, headers=headers,
- expected_status=http_client.INTERNAL_SERVER_ERROR)
+ self.post(endpoint, headers=headers, expected_status=500)
class OAuthNotificationTests(OAuth1Tests,
@@ -830,7 +829,7 @@ class OAuthNotificationTests(OAuth1Tests,
url = self._authorize_request_token(request_key)
body = {'roles': [{'id': self.role_id}]}
- resp = self.put(url, body=body, expected_status=http_client.OK)
+ resp = self.put(url, body=body, expected_status=200)
self.verifier = resp.result['token']['oauth_verifier']
self.assertTrue(all(i in core.VERIFIER_CHARS for i in self.verifier))
self.assertEqual(8, len(self.verifier))
@@ -859,7 +858,7 @@ class OAuthNotificationTests(OAuth1Tests,
resp = self.delete('/users/%(user)s/OS-OAUTH1/access_tokens/%(auth)s'
% {'user': self.user_id,
'auth': self.access_token.key})
- self.assertResponseStatus(resp, http_client.NO_CONTENT)
+ self.assertResponseStatus(resp, 204)
# Test to ensure the delete access token notification is sent
self._assert_notify_sent(access_key,
diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py
index 296e1d4b..9922ae5e 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_protection.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_protection.py
@@ -461,8 +461,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
token = self.get_requested_token(auth)
self.head('/auth/tokens', token=token,
- headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': token}, expected_status=200)
def test_user_check_user_token(self):
# A user can check one of their own tokens.
@@ -475,8 +474,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
token2 = self.get_requested_token(auth)
self.head('/auth/tokens', token=token1,
- headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': token2}, expected_status=200)
def test_user_check_other_user_token_rejected(self):
# A user cannot check another user's token.
@@ -512,8 +510,7 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
user_token = self.get_requested_token(user_auth)
self.head('/auth/tokens', token=admin_token,
- headers={'X-Subject-Token': user_token},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': user_token}, expected_status=200)
def test_user_revoke_same_token(self):
# Given a non-admin user token, the token can be used to revoke
@@ -686,8 +683,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# Return the expected return codes for APIs with and without data
# with any specified status overriding the normal values
if expected_status is None:
- return (http_client.OK, http_client.CREATED,
- http_client.NO_CONTENT)
+ return (200, 201, 204)
else:
return (expected_status, expected_status, expected_status)
@@ -1054,7 +1050,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
entity_url = '/domains/%s' % self.domainA['id']
- self.get(entity_url, auth=self.auth)
+ self.get(entity_url, auth=self.auth, expected_status=200)
def test_list_user_credentials(self):
self.credential_user = self.new_credential_ref(self.just_a_user['id'])
@@ -1186,8 +1182,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
token = self.get_requested_token(auth)
self.head('/auth/tokens', token=token,
- headers={'X-Subject-Token': token},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': token}, expected_status=200)
def test_user_check_user_token(self):
# A user can check one of their own tokens.
@@ -1200,8 +1195,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
token2 = self.get_requested_token(auth)
self.head('/auth/tokens', token=token1,
- headers={'X-Subject-Token': token2},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': token2}, expected_status=200)
def test_user_check_other_user_token_rejected(self):
# A user cannot check another user's token.
@@ -1237,8 +1231,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
user_token = self.get_requested_token(user_auth)
self.head('/auth/tokens', token=admin_token,
- headers={'X-Subject-Token': user_token},
- expected_status=http_client.OK)
+ headers={'X-Subject-Token': user_token}, expected_status=200)
def test_user_revoke_same_token(self):
# Given a non-admin user token, the token can be used to revoke
diff --git a/keystone-moon/keystone/tests/unit/test_versions.py b/keystone-moon/keystone/tests/unit/test_versions.py
index fc8051b2..40814588 100644
--- a/keystone-moon/keystone/tests/unit/test_versions.py
+++ b/keystone-moon/keystone/tests/unit/test_versions.py
@@ -751,7 +751,7 @@ class VersionTestCase(unit.TestCase):
def test_public_version_v2(self):
client = TestClient(self.public_app)
resp = client.get('/v2.0/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
@@ -762,7 +762,7 @@ class VersionTestCase(unit.TestCase):
def test_admin_version_v2(self):
client = TestClient(self.admin_app)
resp = client.get('/v2.0/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
@@ -775,7 +775,7 @@ class VersionTestCase(unit.TestCase):
for app in (self.public_app, self.admin_app):
client = TestClient(app)
resp = client.get('/v2.0/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'], 'http://localhost/v2.0/')
@@ -784,7 +784,7 @@ class VersionTestCase(unit.TestCase):
def test_public_version_v3(self):
client = TestClient(self.public_app)
resp = client.get('/v3/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
@@ -796,7 +796,7 @@ class VersionTestCase(unit.TestCase):
def test_admin_version_v3(self):
client = TestClient(self.admin_app)
resp = client.get('/v3/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
@@ -809,7 +809,7 @@ class VersionTestCase(unit.TestCase):
for app in (self.public_app, self.admin_app):
client = TestClient(app)
resp = client.get('/v3/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'], 'http://localhost/v3/')
@@ -824,7 +824,7 @@ class VersionTestCase(unit.TestCase):
# request to /v3 should pass
resp = client.get('/v3/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v3_VERSION_RESPONSE
self._paste_in_port(expected['version'],
@@ -857,7 +857,7 @@ class VersionTestCase(unit.TestCase):
# request to /v2.0 should pass
resp = client.get('/v2.0/')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
data = jsonutils.loads(resp.body)
expected = v2_VERSION_RESPONSE
self._paste_in_port(expected['version'],
diff --git a/keystone-moon/keystone/tests/unit/test_wsgi.py b/keystone-moon/keystone/tests/unit/test_wsgi.py
index 2a5cb386..ed4c67d6 100644
--- a/keystone-moon/keystone/tests/unit/test_wsgi.py
+++ b/keystone-moon/keystone/tests/unit/test_wsgi.py
@@ -112,16 +112,15 @@ class ApplicationTest(BaseWSGITest):
resp = wsgi.render_response(body=data)
self.assertEqual('200 OK', resp.status)
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
self.assertEqual(body, resp.body)
self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
self.assertEqual(str(len(body)), resp.headers.get('Content-Length'))
def test_render_response_custom_status(self):
- resp = wsgi.render_response(
- status=(http_client.NOT_IMPLEMENTED, 'Not Implemented'))
+ resp = wsgi.render_response(status=(501, 'Not Implemented'))
self.assertEqual('501 Not Implemented', resp.status)
- self.assertEqual(http_client.NOT_IMPLEMENTED, resp.status_int)
+ self.assertEqual(501, resp.status_int)
def test_successful_require_attribute(self):
app = FakeAttributeCheckerApp()
@@ -173,14 +172,14 @@ class ApplicationTest(BaseWSGITest):
def test_render_response_no_body(self):
resp = wsgi.render_response()
self.assertEqual('204 No Content', resp.status)
- self.assertEqual(http_client.NO_CONTENT, resp.status_int)
+ self.assertEqual(204, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertEqual('0', resp.headers.get('Content-Length'))
self.assertIsNone(resp.headers.get('Content-Type'))
def test_render_response_head_with_body(self):
resp = wsgi.render_response({'id': uuid.uuid4().hex}, method='HEAD')
- self.assertEqual(http_client.OK, resp.status_int)
+ self.assertEqual(200, resp.status_int)
self.assertEqual(b'', resp.body)
self.assertNotEqual(resp.headers.get('Content-Length'), '0')
self.assertEqual('application/json', resp.headers.get('Content-Type'))
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
index 5f74b430..bfb590db 100644
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
@@ -16,13 +16,17 @@ import hashlib
import os
import uuid
+import msgpack
from oslo_utils import timeutils
+from six.moves import urllib
from keystone.common import config
from keystone.common import utils
+from keystone.contrib.federation import constants as federation_constants
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
+from keystone.tests.unit.ksfixtures import database
from keystone.token import provider
from keystone.token.providers import fernet
from keystone.token.providers.fernet import token_formatters
@@ -57,7 +61,156 @@ class TestFernetTokenProvider(unit.TestCase):
uuid.uuid4().hex)
+class TestValidate(unit.TestCase):
+ def setUp(self):
+ super(TestValidate, self).setUp()
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+ self.useFixture(database.Database())
+ self.load_backends()
+
+ def config_overrides(self):
+ super(TestValidate, self).config_overrides()
+ self.config_fixture.config(group='token', provider='fernet')
+
+ def test_validate_v3_token_simple(self):
+ # Check the fields in the token result when use validate_v3_token
+ # with a simple token.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ method_names = ['password']
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ self.assertIsInstance(token['audit_ids'], list)
+ self.assertIsInstance(token['expires_at'], str)
+ self.assertEqual({}, token['extras'])
+ self.assertIsInstance(token['issued_at'], str)
+ self.assertEqual(method_names, token['methods'])
+ exp_user_info = {
+ 'id': user_ref['id'],
+ 'name': user_ref['name'],
+ 'domain': {
+ 'id': domain_ref['id'],
+ 'name': domain_ref['name'],
+ },
+ }
+ self.assertEqual(exp_user_info, token['user'])
+
+ def test_validate_v3_token_federated_info(self):
+ # Check the user fields in the token result when use validate_v3_token
+ # when the token has federated info.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ method_names = ['mapped']
+
+ group_ids = [uuid.uuid4().hex, ]
+ identity_provider = uuid.uuid4().hex
+ protocol = uuid.uuid4().hex
+ auth_context = {
+ 'user_id': user_ref['id'],
+ 'group_ids': group_ids,
+ federation_constants.IDENTITY_PROVIDER: identity_provider,
+ federation_constants.PROTOCOL: protocol,
+ }
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names, auth_context=auth_context)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ exp_user_info = {
+ 'id': user_ref['id'],
+ 'name': user_ref['id'],
+ 'domain': {'id': CONF.federation.federated_domain_name,
+ 'name': CONF.federation.federated_domain_name, },
+ federation_constants.FEDERATION: {
+ 'groups': [{'id': group_id} for group_id in group_ids],
+ 'identity_provider': {'id': identity_provider, },
+ 'protocol': {'id': protocol, },
+ },
+ }
+ self.assertEqual(exp_user_info, token['user'])
+
+ def test_validate_v3_token_trust(self):
+ # Check the trust fields in the token result when use validate_v3_token
+ # when the token has trust info.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ trustor_user_ref = unit.new_user_ref(domain_ref['id'])
+ trustor_user_ref = self.identity_api.create_user(trustor_user_ref)
+
+ project_ref = unit.new_project_ref(domain_id=domain_ref['id'])
+ project_ref = self.resource_api.create_project(project_ref['id'],
+ project_ref)
+
+ role_ref = unit.new_role_ref()
+ role_ref = self.role_api.create_role(role_ref['id'], role_ref)
+
+ self.assignment_api.create_grant(
+ role_ref['id'], user_id=user_ref['id'],
+ project_id=project_ref['id'])
+
+ self.assignment_api.create_grant(
+ role_ref['id'], user_id=trustor_user_ref['id'],
+ project_id=project_ref['id'])
+
+ trustor_user_id = trustor_user_ref['id']
+ trustee_user_id = user_ref['id']
+ trust_ref = unit.new_trust_ref(
+ trustor_user_id, trustee_user_id, project_id=project_ref['id'],
+ role_ids=[role_ref['id'], ])
+ trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref,
+ trust_ref['roles'])
+
+ method_names = ['password']
+
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names, project_id=project_ref['id'],
+ trust=trust_ref)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ exp_trust_info = {
+ 'id': trust_ref['id'],
+ 'impersonation': False,
+ 'trustee_user': {'id': user_ref['id'], },
+ 'trustor_user': {'id': trustor_user_ref['id'], },
+ }
+ self.assertEqual(exp_trust_info, token['OS-TRUST:trust'])
+
+ def test_validate_v3_token_validation_error_exc(self):
+ # When the token format isn't recognized, TokenNotFound is raised.
+
+ # A uuid string isn't a valid fernet token.
+ token_id = uuid.uuid4().hex
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v3_token, token_id)
+
+
class TestTokenFormatter(unit.TestCase):
+ def setUp(self):
+ super(TestTokenFormatter, self).setUp()
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+
def test_restore_padding(self):
# 'a' will result in '==' padding, 'aa' will result in '=' padding, and
# 'aaa' will result in no padding.
@@ -73,6 +226,39 @@ class TestTokenFormatter(unit.TestCase):
)
self.assertEqual(encoded_string, encoded_str_with_padding_restored)
+ def test_legacy_padding_validation(self):
+ first_value = uuid.uuid4().hex
+ second_value = uuid.uuid4().hex
+ payload = (first_value, second_value)
+ msgpack_payload = msgpack.packb(payload)
+
+ # NOTE(lbragstad): This method perserves the way that keystone used to
+ # percent encode the tokens, prior to bug #1491926.
+ def legacy_pack(payload):
+ tf = token_formatters.TokenFormatter()
+ encrypted_payload = tf.crypto.encrypt(payload)
+
+ # the encrypted_payload is returned with padding appended
+ self.assertTrue(encrypted_payload.endswith('='))
+
+ # using urllib.parse.quote will percent encode the padding, like
+ # keystone did in Kilo.
+ percent_encoded_payload = urllib.parse.quote(encrypted_payload)
+
+ # ensure that the padding was actaully percent encoded
+ self.assertTrue(percent_encoded_payload.endswith('%3D'))
+ return percent_encoded_payload
+
+ token_with_legacy_padding = legacy_pack(msgpack_payload)
+ tf = token_formatters.TokenFormatter()
+
+ # demonstrate the we can validate a payload that has been percent
+ # encoded with the Fernet logic that existed in Kilo
+ serialized_payload = tf.unpack(token_with_legacy_padding)
+ returned_payload = msgpack.unpackb(serialized_payload)
+ self.assertEqual(first_value, returned_payload[0])
+ self.assertEqual(second_value, returned_payload[1])
+
class TestPayloads(unit.TestCase):
def test_uuid_hex_to_byte_conversions(self):
@@ -204,8 +390,7 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
- def test_unscoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
+ def _test_unscoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
@@ -221,30 +406,15 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_project_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
+ def test_unscoped_payload_with_non_uuid_user_id(self):
+ self._test_unscoped_payload_with_user_id('someNonUuidUserId')
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_unscoped_payload_with_user_id('0123456789abcdef')
- def test_project_scoped_payload_with_non_uuid_project_id(self):
- exp_user_id = uuid.uuid4().hex
+ def _test_project_scoped_payload_with_ids(self, exp_user_id,
+ exp_project_id):
exp_methods = ['password']
- exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
@@ -261,8 +431,15 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_domain_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
+ def test_project_scoped_payload_with_non_uuid_user_id(self):
+ self._test_project_scoped_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidProjectId')
+
+ def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_project_scoped_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
+ def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_domain_id = uuid.uuid4().hex
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
@@ -281,32 +458,14 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_trust_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
+ def test_domain_scoped_payload_with_non_uuid_user_id(self):
+ self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
+ def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
- def test_trust_scoped_payload_with_non_uuid_project_id(self):
- exp_user_id = uuid.uuid4().hex
+ def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
exp_methods = ['password']
- exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_trust_id = uuid.uuid4().hex
@@ -325,12 +484,19 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
- def test_federated_payload_with_non_uuid_ids(self):
- exp_user_id = 'someNonUuidUserId'
+ def test_trust_scoped_payload_with_non_uuid_user_id(self):
+ self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidProjectId')
+
+ def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_trust_scoped_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
+ def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
- exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
+ exp_federated_info = {'group_ids': [{'id': exp_group_id}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
@@ -352,6 +518,14 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_federated_info['protocol_id'],
federated_info['protocol_id'])
+ def test_federated_payload_with_non_uuid_ids(self):
+ self._test_federated_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidGroupId')
+
+ def test_federated_payload_with_16_char_non_uuid_ids(self):
+ self._test_federated_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
def test_federated_project_scoped_payload(self):
exp_user_id = 'someNonUuidUserId'
exp_methods = ['token']