diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_auth.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_auth.py | 3237 |
1 files changed, 1796 insertions, 1441 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_auth.py b/keystone-moon/keystone/tests/unit/test_v3_auth.py index d53a85df..698feeb8 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_auth.py +++ b/keystone-moon/keystone/tests/unit/test_v3_auth.py @@ -14,6 +14,7 @@ import copy import datetime +import itertools import json import operator import uuid @@ -21,6 +22,8 @@ import uuid from keystoneclient.common import cms import mock from oslo_config import cfg +from oslo_log import versionutils +from oslo_utils import fixture from oslo_utils import timeutils from six.moves import http_client from six.moves import range @@ -28,9 +31,12 @@ from testtools import matchers from testtools import testcase from keystone import auth +from keystone.auth.plugins import totp from keystone.common import utils +from keystone.contrib.revoke import routers from keystone import exception from keystone.policy.backends import rules +from keystone.tests.common import auth as common_auth from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 @@ -38,7 +44,7 @@ from keystone.tests.unit import test_v3 CONF = cfg.CONF -class TestAuthInfo(test_v3.AuthTestMixin, testcase.TestCase): +class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase): def setUp(self): super(TestAuthInfo, self).setUp() auth.controllers.load_auth_methods() @@ -121,7 +127,7 @@ class TokenAPITests(object): # resolved in Python for multiple inheritance means that a setUp in this # would get skipped by the testrunner. def doSetUp(self): - r = self.v3_authenticate_token(self.build_authentication_request( + r = self.v3_create_token(self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain_id, password=self.user['password'])) @@ -129,43 +135,372 @@ class TokenAPITests(object): self.v3_token = r.headers.get('X-Subject-Token') self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')} - def test_default_fixture_scope_token(self): - self.assertIsNotNone(self.get_scoped_token()) + def _make_auth_request(self, auth_data): + resp = self.post('/auth/tokens', body=auth_data) + token = resp.headers.get('X-Subject-Token') + return token - def test_v3_v2_intermix_non_default_domain_failed(self): - v3_token = self.get_requested_token(self.build_authentication_request( + def _get_unscoped_token(self): + auth_data = self.build_authentication_request( user_id=self.user['id'], - password=self.user['password'])) + password=self.user['password']) + return self._make_auth_request(auth_data) - # now validate the v3 token with v2 API - self.admin_request( - path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, - method='GET', - expected_status=http_client.UNAUTHORIZED) + def _get_domain_scoped_token(self): + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_id=self.domain_id) + return self._make_auth_request(auth_data) + + def _get_project_scoped_token(self): + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project_id) + return self._make_auth_request(auth_data) + + def _get_trust_scoped_token(self, trustee_user, trust): + auth_data = self.build_authentication_request( + user_id=trustee_user['id'], + password=trustee_user['password'], + trust_id=trust['id']) + return self._make_auth_request(auth_data) + + def _create_trust(self, impersonation=False): + # Create a trustee user + trustee_user = unit.create_user(self.identity_api, + domain_id=self.domain_id) + ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee_user['id'], + project_id=self.project_id, + impersonation=impersonation, + role_ids=[self.role_id]) + + # Create a trust + r = self.post('/OS-TRUST/trusts', body={'trust': ref}) + trust = self.assertValidTrustResponse(r) + return (trustee_user, trust) + + def _validate_token(self, token, expected_status=http_client.OK): + return self.get( + '/auth/tokens', + headers={'X-Subject-Token': token}, + expected_status=expected_status) + + def _revoke_token(self, token, expected_status=http_client.NO_CONTENT): + return self.delete( + '/auth/tokens', + headers={'x-subject-token': token}, + expected_status=expected_status) + + def _set_user_enabled(self, user, enabled=True): + user['enabled'] = enabled + self.identity_api.update_user(user['id'], user) + + def test_validate_unscoped_token(self): + unscoped_token = self._get_unscoped_token() + self._validate_token(unscoped_token) + + def test_revoke_unscoped_token(self): + unscoped_token = self._get_unscoped_token() + self._validate_token(unscoped_token) + self._revoke_token(unscoped_token) + self._validate_token(unscoped_token, + expected_status=http_client.NOT_FOUND) + + def test_unscoped_token_is_invalid_after_disabling_user(self): + unscoped_token = self._get_unscoped_token() + # Make sure the token is valid + self._validate_token(unscoped_token) + # Disable the user + self._set_user_enabled(self.user, enabled=False) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + unscoped_token) + + def test_unscoped_token_is_invalid_after_enabling_disabled_user(self): + unscoped_token = self._get_unscoped_token() + # Make sure the token is valid + self._validate_token(unscoped_token) + # Disable the user + self._set_user_enabled(self.user, enabled=False) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + unscoped_token) + # Enable the user + self._set_user_enabled(self.user) + # Ensure validating a token for a re-enabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + unscoped_token) + + def test_unscoped_token_is_invalid_after_disabling_user_domain(self): + unscoped_token = self._get_unscoped_token() + # Make sure the token is valid + self._validate_token(unscoped_token) + # Disable the user's domain + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + unscoped_token) + + def test_unscoped_token_is_invalid_after_changing_user_password(self): + unscoped_token = self._get_unscoped_token() + # Make sure the token is valid + self._validate_token(unscoped_token) + # Change user's password + self.user['password'] = 'Password1' + self.identity_api.update_user(self.user['id'], self.user) + # Ensure updating user's password revokes existing user's tokens + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + unscoped_token) + + def test_validate_domain_scoped_token(self): + # Grant user access to domain + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + domain_scoped_token = self._get_domain_scoped_token() + resp = self._validate_token(domain_scoped_token) + resp_json = json.loads(resp.body) + self.assertIsNotNone(resp_json['token']['catalog']) + self.assertIsNotNone(resp_json['token']['roles']) + self.assertIsNotNone(resp_json['token']['domain']) + + def test_domain_scoped_token_is_invalid_after_disabling_user(self): + # Grant user access to domain + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + domain_scoped_token = self._get_domain_scoped_token() + # Make sure the token is valid + self._validate_token(domain_scoped_token) + # Disable user + self._set_user_enabled(self.user, enabled=False) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + domain_scoped_token) + + def test_domain_scoped_token_is_invalid_after_deleting_grant(self): + # Grant user access to domain + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + domain_scoped_token = self._get_domain_scoped_token() + # Make sure the token is valid + self._validate_token(domain_scoped_token) + # Delete access to domain + self.assignment_api.delete_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + domain_scoped_token) + + def test_domain_scoped_token_invalid_after_disabling_domain(self): + # Grant user access to domain + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + domain_scoped_token = self._get_domain_scoped_token() + # Make sure the token is valid + self._validate_token(domain_scoped_token) + # Disable domain + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + # Ensure validating a token for a disabled domain fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + domain_scoped_token) + + def test_v2_validate_domain_scoped_token_returns_unauthorized(self): + # Test that validating a domain scoped token in v2.0 returns + # unauthorized. + # Grant user access to domain + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + + scoped_token = self._get_domain_scoped_token() + self.assertRaises(exception.Unauthorized, + self.token_provider_api.validate_v2_token, + scoped_token) + + def test_validate_project_scoped_token(self): + project_scoped_token = self._get_project_scoped_token() + self._validate_token(project_scoped_token) + + def test_revoke_project_scoped_token(self): + project_scoped_token = self._get_project_scoped_token() + self._validate_token(project_scoped_token) + self._revoke_token(project_scoped_token) + self._validate_token(project_scoped_token, + expected_status=http_client.NOT_FOUND) + + def test_project_scoped_token_is_invalid_after_disabling_user(self): + project_scoped_token = self._get_project_scoped_token() + # Make sure the token is valid + self._validate_token(project_scoped_token) + # Disable the user + self._set_user_enabled(self.user, enabled=False) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + project_scoped_token) + + def test_project_scoped_token_invalid_after_changing_user_password(self): + project_scoped_token = self._get_project_scoped_token() + # Make sure the token is valid + self._validate_token(project_scoped_token) + # Update user's password + self.user['password'] = 'Password1' + self.identity_api.update_user(self.user['id'], self.user) + # Ensure updating user's password revokes existing tokens + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + project_scoped_token) + + def test_project_scoped_token_invalid_after_disabling_project(self): + project_scoped_token = self._get_project_scoped_token() + # Make sure the token is valid + self._validate_token(project_scoped_token) + # Disable project + self.project['enabled'] = False + self.resource_api.update_project(self.project['id'], self.project) + # Ensure validating a token for a disabled project fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + project_scoped_token) + + def test_rescope_unscoped_token_with_trust(self): + trustee_user, trust = self._create_trust() + self._get_trust_scoped_token(trustee_user, trust) + + def test_validate_a_trust_scoped_token(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + def test_validate_a_trust_scoped_token_impersonated(self): + trustee_user, trust = self._create_trust(impersonation=True) + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + def test_revoke_trust_scoped_token(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + self._revoke_token(trust_scoped_token) + self._validate_token(trust_scoped_token, + expected_status=http_client.NOT_FOUND) + + def test_trust_scoped_token_is_invalid_after_disabling_trustee(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + # Disable trustee + trustee_update_ref = dict(enabled=False) + self.identity_api.update_user(trustee_user['id'], trustee_update_ref) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + trust_scoped_token) + + def test_trust_scoped_token_invalid_after_changing_trustee_password(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + # Change trustee's password + trustee_update_ref = dict(password='Password1') + self.identity_api.update_user(trustee_user['id'], trustee_update_ref) + # Ensure updating trustee's password revokes existing tokens + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + trust_scoped_token) + + def test_trust_scoped_token_is_invalid_after_disabling_trustor(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + # Disable the trustor + trustor_update_ref = dict(enabled=False) + self.identity_api.update_user(self.user['id'], trustor_update_ref) + # Ensure validating a token for a disabled user fails + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + trust_scoped_token) + + def test_trust_scoped_token_invalid_after_changing_trustor_password(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + # Change trustor's password + trustor_update_ref = dict(password='Password1') + self.identity_api.update_user(self.user['id'], trustor_update_ref) + # Ensure updating trustor's password revokes existing user's tokens + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + trust_scoped_token) + + def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Validate a trust scoped token + self._validate_token(trust_scoped_token) + + # Disable trustor's domain + self.domain['enabled'] = False + self.resource_api.update_domain(self.domain['id'], self.domain) + + trustor_update_ref = dict(password='Password1') + self.identity_api.update_user(self.user['id'], trustor_update_ref) + # Ensure updating trustor's password revokes existing user's tokens + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_token, + trust_scoped_token) + + def test_v2_validate_trust_scoped_token(self): + # Test that validating an trust scoped token in v2.0 returns + # unauthorized. + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + self.assertRaises(exception.Unauthorized, + self.token_provider_api.validate_v2_token, + trust_scoped_token) + + def test_default_fixture_scope_token(self): + self.assertIsNotNone(self.get_scoped_token()) def test_v3_v2_intermix_new_default_domain(self): # If the default_domain_id config option is changed, then should be # able to validate a v3 token with user in the new domain. # 1) Create a new domain for the user. - new_domain = { - 'description': uuid.uuid4().hex, - 'enabled': True, - 'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex, - } + new_domain = unit.new_domain_ref() self.resource_api.create_domain(new_domain['id'], new_domain) # 2) Create user in new domain. - new_user_password = uuid.uuid4().hex - new_user = { - 'name': uuid.uuid4().hex, - 'domain_id': new_domain['id'], - 'password': new_user_password, - 'email': uuid.uuid4().hex, - } - new_user = self.identity_api.create_user(new_user) + new_user = unit.create_user(self.identity_api, + domain_id=new_domain['id']) # 3) Update the default_domain_id config option to the new domain self.config_fixture.config( @@ -175,12 +510,12 @@ class TokenAPITests(object): # 4) Get a token using v3 API. v3_token = self.get_requested_token(self.build_authentication_request( user_id=new_user['id'], - password=new_user_password)) + password=new_user['password'])) # 5) Validate token using v2 API. self.admin_request( path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, + token=self.get_admin_token(), method='GET') def test_v3_v2_intermix_domain_scoped_token_failed(self): @@ -199,10 +534,10 @@ class TokenAPITests(object): self.admin_request( method='GET', path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, + token=self.get_admin_token(), expected_status=http_client.UNAUTHORIZED) - def test_v3_v2_intermix_non_default_project_failed(self): + def test_v3_v2_intermix_non_default_project_succeed(self): # self.project is in a non-default domain v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.default_domain_user['id'], @@ -213,10 +548,9 @@ class TokenAPITests(object): self.admin_request( method='GET', path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, - expected_status=http_client.UNAUTHORIZED) + token=self.get_admin_token()) - def test_v3_v2_intermix_non_default_user_failed(self): + def test_v3_v2_intermix_non_default_user_succeed(self): self.assignment_api.create_grant( self.role['id'], user_id=self.user['id'], @@ -232,8 +566,7 @@ class TokenAPITests(object): self.admin_request( method='GET', path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, - expected_status=http_client.UNAUTHORIZED) + token=self.get_admin_token()) def test_v3_v2_intermix_domain_scope_failed(self): self.assignment_api.create_grant( @@ -249,12 +582,12 @@ class TokenAPITests(object): # v2 cannot reference projects outside the default domain self.admin_request( path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, + token=self.get_admin_token(), method='GET', expected_status=http_client.UNAUTHORIZED) def test_v3_v2_unscoped_token_intermix(self): - r = self.v3_authenticate_token(self.build_authentication_request( + r = self.v3_create_token(self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'])) self.assertValidUnscopedTokenResponse(r) @@ -264,7 +597,7 @@ class TokenAPITests(object): # now validate the v3 token with v2 API r = self.admin_request( path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token, + token=self.get_admin_token(), method='GET') v2_token_data = r.result @@ -278,7 +611,7 @@ class TokenAPITests(object): def test_v3_v2_token_intermix(self): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. - r = self.v3_authenticate_token(self.build_authentication_request( + r = self.v3_create_token(self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project['id'])) @@ -290,7 +623,7 @@ class TokenAPITests(object): r = self.admin_request( method='GET', path='/v2.0/tokens/%s' % v3_token, - token=CONF.admin_token) + token=self.get_admin_token()) v2_token_data = r.result self.assertEqual(v2_token_data['access']['user']['id'], @@ -318,9 +651,7 @@ class TokenAPITests(object): v2_token = v2_token_data['access']['token']['id'] r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token}) - # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are - # missing timezones, so they will not pass this assertion. - # self.assertValidUnscopedTokenResponse(r) + self.assertValidUnscopedTokenResponse(r) v3_token_data = r.result self.assertEqual(v2_token_data['access']['user']['id'], @@ -347,9 +678,7 @@ class TokenAPITests(object): v2_token = v2_token_data['access']['token']['id'] r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token}) - # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are - # missing timezones, so they will not pass this assertion. - # self.assertValidProjectScopedTokenResponse(r) + self.assertValidProjectScopedTokenResponse(r) v3_token_data = r.result self.assertEqual(v2_token_data['access']['user']['id'], @@ -384,9 +713,8 @@ class TokenAPITests(object): v2_token = r.result['access']['token']['id'] # Delete the v2 token using v3. - resp = self.delete( + self.delete( '/auth/tokens', headers={'X-Subject-Token': v2_token}) - self.assertEqual(resp.status_code, 204) # Attempting to use the deleted token on v2 should fail. self.admin_request( @@ -397,7 +725,7 @@ class TokenAPITests(object): expires = self.v3_token_data['token']['expires_at'] # rescope the token - r = self.v3_authenticate_token(self.build_authentication_request( + r = self.v3_create_token(self.build_authentication_request( token=self.v3_token, project_id=self.project_id)) self.assertValidProjectScopedTokenResponse(r) @@ -406,12 +734,24 @@ class TokenAPITests(object): self.assertEqual(expires, r.result['token']['expires_at']) def test_check_token(self): - self.head('/auth/tokens', headers=self.headers, expected_status=200) + self.head('/auth/tokens', headers=self.headers, + expected_status=http_client.OK) def test_validate_token(self): r = self.get('/auth/tokens', headers=self.headers) self.assertValidUnscopedTokenResponse(r) + def test_validate_missing_subject_token(self): + self.get('/auth/tokens', + expected_status=http_client.NOT_FOUND) + + def test_validate_missing_auth_token(self): + self.admin_request( + method='GET', + path='/v3/projects', + token=None, + expected_status=http_client.UNAUTHORIZED) + def test_validate_token_nocatalog(self): v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.user['id'], @@ -422,6 +762,399 @@ class TokenAPITests(object): headers={'X-Subject-Token': v3_token}) self.assertValidProjectScopedTokenResponse(r, require_catalog=False) + def test_is_admin_token_by_ids(self): + self.config_fixture.config( + group='resource', + admin_project_domain_name=self.domain['name'], + admin_project_name=self.project['name']) + r = self.v3_create_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=True) + v3_token = r.headers.get('X-Subject-Token') + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=True) + + def test_is_admin_token_by_names(self): + self.config_fixture.config( + group='resource', + admin_project_domain_name=self.domain['name'], + admin_project_name=self.project['name']) + r = self.v3_create_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_domain_name=self.domain['name'], + project_name=self.project['name'])) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=True) + v3_token = r.headers.get('X-Subject-Token') + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=True) + + def test_token_for_non_admin_project_is_not_admin(self): + self.config_fixture.config( + group='resource', + admin_project_domain_name=self.domain['name'], + admin_project_name=uuid.uuid4().hex) + r = self.v3_create_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + v3_token = r.headers.get('X-Subject-Token') + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + + def test_token_for_non_admin_domain_same_project_name_is_not_admin(self): + self.config_fixture.config( + group='resource', + admin_project_domain_name=uuid.uuid4().hex, + admin_project_name=self.project['name']) + r = self.v3_create_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + v3_token = r.headers.get('X-Subject-Token') + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + + def test_only_admin_project_set_acts_as_non_admin(self): + self.config_fixture.config( + group='resource', + admin_project_name=self.project['name']) + r = self.v3_create_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + v3_token = r.headers.get('X-Subject-Token') + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + self.assertValidProjectScopedTokenResponse(r, is_admin_project=False) + + def _create_role(self, domain_id=None): + """Call ``POST /roles``.""" + ref = unit.new_role_ref(domain_id=domain_id) + r = self.post('/roles', body={'role': ref}) + return self.assertValidRoleResponse(r, ref) + + def _create_implied_role(self, prior_id): + implied = self._create_role() + url = '/roles/%s/implies/%s' % (prior_id, implied['id']) + self.put(url, expected_status=http_client.CREATED) + return implied + + def _delete_implied_role(self, prior_role_id, implied_role_id): + url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id) + self.delete(url) + + def _get_scoped_token_roles(self, is_domain=False): + if is_domain: + v3_token = self.get_domain_scoped_token() + else: + v3_token = self.get_scoped_token() + + r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token}) + v3_token_data = r.result + token_roles = v3_token_data['token']['roles'] + return token_roles + + def _create_implied_role_shows_in_v3_token(self, is_domain): + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(1, len(token_roles)) + + prior = token_roles[0]['id'] + implied1 = self._create_implied_role(prior) + + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(2, len(token_roles)) + + implied2 = self._create_implied_role(prior) + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(3, len(token_roles)) + + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(prior, token_role_ids) + self.assertIn(implied1['id'], token_role_ids) + self.assertIn(implied2['id'], token_role_ids) + + def test_create_implied_role_shows_in_v3_project_token(self): + # regardless of the default chosen, this should always + # test with the option set. + self.config_fixture.config(group='token', infer_roles=True) + self._create_implied_role_shows_in_v3_token(False) + + def test_create_implied_role_shows_in_v3_domain_token(self): + self.config_fixture.config(group='token', infer_roles=True) + self.assignment_api.create_grant(self.role['id'], + user_id=self.user['id'], + domain_id=self.domain['id']) + + self._create_implied_role_shows_in_v3_token(True) + + def test_group_assigned_implied_role_shows_in_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + is_domain = False + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(1, len(token_roles)) + + new_role = self._create_role() + prior = new_role['id'] + + new_group_ref = unit.new_group_ref(domain_id=self.domain['id']) + new_group = self.identity_api.create_group(new_group_ref) + self.assignment_api.create_grant(prior, + group_id=new_group['id'], + project_id=self.project['id']) + + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(1, len(token_roles)) + + self.identity_api.add_user_to_group(self.user['id'], + new_group['id']) + + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(2, len(token_roles)) + + implied1 = self._create_implied_role(prior) + + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(3, len(token_roles)) + + implied2 = self._create_implied_role(prior) + token_roles = self._get_scoped_token_roles(is_domain) + self.assertEqual(4, len(token_roles)) + + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(prior, token_role_ids) + self.assertIn(implied1['id'], token_role_ids) + self.assertIn(implied2['id'], token_role_ids) + + def test_multiple_implied_roles_show_in_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + token_roles = self._get_scoped_token_roles() + self.assertEqual(1, len(token_roles)) + + prior = token_roles[0]['id'] + implied1 = self._create_implied_role(prior) + implied2 = self._create_implied_role(prior) + implied3 = self._create_implied_role(prior) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(4, len(token_roles)) + + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(prior, token_role_ids) + self.assertIn(implied1['id'], token_role_ids) + self.assertIn(implied2['id'], token_role_ids) + self.assertIn(implied3['id'], token_role_ids) + + def test_chained_implied_role_shows_in_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + token_roles = self._get_scoped_token_roles() + self.assertEqual(1, len(token_roles)) + + prior = token_roles[0]['id'] + implied1 = self._create_implied_role(prior) + implied2 = self._create_implied_role(implied1['id']) + implied3 = self._create_implied_role(implied2['id']) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(4, len(token_roles)) + + token_role_ids = [role['id'] for role in token_roles] + + self.assertIn(prior, token_role_ids) + self.assertIn(implied1['id'], token_role_ids) + self.assertIn(implied2['id'], token_role_ids) + self.assertIn(implied3['id'], token_role_ids) + + def test_implied_role_disabled_by_config(self): + self.config_fixture.config(group='token', infer_roles=False) + token_roles = self._get_scoped_token_roles() + self.assertEqual(1, len(token_roles)) + + prior = token_roles[0]['id'] + implied1 = self._create_implied_role(prior) + implied2 = self._create_implied_role(implied1['id']) + self._create_implied_role(implied2['id']) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(1, len(token_roles)) + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(prior, token_role_ids) + + def test_delete_implied_role_do_not_show_in_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + token_roles = self._get_scoped_token_roles() + prior = token_roles[0]['id'] + implied = self._create_implied_role(prior) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(2, len(token_roles)) + self._delete_implied_role(prior, implied['id']) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(1, len(token_roles)) + + def test_unrelated_implied_roles_do_not_change_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + token_roles = self._get_scoped_token_roles() + prior = token_roles[0]['id'] + implied = self._create_implied_role(prior) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(2, len(token_roles)) + + unrelated = self._create_role() + url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id']) + self.put(url, expected_status=http_client.CREATED) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(2, len(token_roles)) + + self._delete_implied_role(unrelated['id'], implied['id']) + token_roles = self._get_scoped_token_roles() + self.assertEqual(2, len(token_roles)) + + def test_domain_scpecific_roles_do_not_show_v3_token(self): + self.config_fixture.config(group='token', infer_roles=True) + initial_token_roles = self._get_scoped_token_roles() + + new_role = self._create_role(domain_id=self.domain_id) + self.assignment_api.create_grant(new_role['id'], + user_id=self.user['id'], + project_id=self.project['id']) + implied = self._create_implied_role(new_role['id']) + + token_roles = self._get_scoped_token_roles() + self.assertEqual(len(initial_token_roles) + 1, len(token_roles)) + + # The implied role from the domain specific role should be in the + # token, but not the domain specific role itself. + token_role_ids = [role['id'] for role in token_roles] + self.assertIn(implied['id'], token_role_ids) + self.assertNotIn(new_role['id'], token_role_ids) + + def test_remove_all_roles_from_scope_result_in_404(self): + # create a new user + new_user = unit.create_user(self.identity_api, + domain_id=self.domain['id']) + + # give the new user a role on a project + path = '/projects/%s/users/%s/roles/%s' % ( + self.project['id'], new_user['id'], self.role['id']) + self.put(path=path) + + # authenticate as the new user and get a project-scoped token + auth_data = self.build_authentication_request( + user_id=new_user['id'], + password=new_user['password'], + project_id=self.project['id']) + subject_token_id = self.v3_create_token(auth_data).headers.get( + 'X-Subject-Token') + + # make sure the project-scoped token is valid + headers = {'X-Subject-Token': subject_token_id} + r = self.get('/auth/tokens', headers=headers) + self.assertValidProjectScopedTokenResponse(r) + + # remove the roles from the user for the given scope + path = '/projects/%s/users/%s/roles/%s' % ( + self.project['id'], new_user['id'], self.role['id']) + self.delete(path=path) + + # token validation should now result in 404 + self.get('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) + + +class TokenDataTests(object): + """Test the data in specific token types.""" + + def test_unscoped_token_format(self): + # ensure the unscoped token response contains the appropriate data + r = self.get('/auth/tokens', headers=self.headers) + self.assertValidUnscopedTokenResponse(r) + + def test_domain_scoped_token_format(self): + # ensure the domain scoped token response contains the appropriate data + self.assignment_api.create_grant( + self.role['id'], + user_id=self.default_domain_user['id'], + domain_id=self.domain['id']) + + domain_scoped_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + domain_id=self.domain['id']) + ) + self.headers['X-Subject-Token'] = domain_scoped_token + r = self.get('/auth/tokens', headers=self.headers) + self.assertValidDomainScopedTokenResponse(r) + + def test_project_scoped_token_format(self): + # ensure project scoped token responses contains the appropriate data + project_scoped_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project['id']) + ) + self.headers['X-Subject-Token'] = project_scoped_token + r = self.get('/auth/tokens', headers=self.headers) + self.assertValidProjectScopedTokenResponse(r) + + def test_extra_data_in_unscoped_token_fails_validation(self): + # ensure unscoped token response contains the appropriate data + r = self.get('/auth/tokens', headers=self.headers) + + # populate the response result with some extra data + r.result['token'][u'extra'] = unicode(uuid.uuid4().hex) + self.assertRaises(exception.SchemaValidationError, + self.assertValidUnscopedTokenResponse, + r) + + def test_extra_data_in_domain_scoped_token_fails_validation(self): + # ensure domain scoped token response contains the appropriate data + self.assignment_api.create_grant( + self.role['id'], + user_id=self.default_domain_user['id'], + domain_id=self.domain['id']) + + domain_scoped_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + domain_id=self.domain['id']) + ) + self.headers['X-Subject-Token'] = domain_scoped_token + r = self.get('/auth/tokens', headers=self.headers) + + # populate the response result with some extra data + r.result['token'][u'extra'] = unicode(uuid.uuid4().hex) + self.assertRaises(exception.SchemaValidationError, + self.assertValidDomainScopedTokenResponse, + r) + + def test_extra_data_in_project_scoped_token_fails_validation(self): + # ensure project scoped token responses contains the appropriate data + project_scoped_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project['id']) + ) + self.headers['X-Subject-Token'] = project_scoped_token + resp = self.get('/auth/tokens', headers=self.headers) + + # populate the response result with some extra data + resp.result['token'][u'extra'] = unicode(uuid.uuid4().hex) + self.assertRaises(exception.SchemaValidationError, + self.assertValidProjectScopedTokenResponse, + resp) + class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): def config_overrides(self): @@ -431,7 +1164,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): allow_rescope_scoped_token=False) def test_rescoping_v3_to_v3_disabled(self): - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( token=self.get_scoped_token(), project_id=self.project_id), @@ -465,7 +1198,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): def test_rescoping_v2_to_v3_disabled(self): token = self._v2_token() - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( token=token['access']['token']['id'], project_id=self.project_id), @@ -481,7 +1214,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): def test_rescoped_domain_token_disabled(self): - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) self.assignment_api.create_grant(self.role['id'], user_id=self.user['id'], @@ -495,14 +1228,14 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): self.build_authentication_request( token=unscoped_token, domain_id=self.domainA['id'])) - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( token=domain_scoped_token, project_id=self.project_id), expected_status=http_client.FORBIDDEN) -class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): +class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests): def config_overrides(self): super(TestPKITokenAPIs, self).config_overrides() self.config_fixture.config(group='token', provider='pki') @@ -518,7 +1251,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - resp = self.v3_authenticate_token(auth_data) + resp = self.v3_create_token(auth_data) token_data = resp.result token_id = resp.headers.get('X-Subject-Token') self.assertIn('expires_at', token_data['token']) @@ -542,7 +1275,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], project_id=self.default_domain_project['id']) - resp = self.v3_authenticate_token(auth_data) + resp = self.v3_create_token(auth_data) token_data = resp.result token = resp.headers.get('X-Subject-Token') @@ -550,7 +1283,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): token = cms.cms_hash_token(token) path = '/v2.0/tokens/%s' % (token) resp = self.admin_request(path=path, - token=CONF.admin_token, + token=self.get_admin_token(), method='GET') v2_token = resp.result self.assertEqual(v2_token['access']['user']['id'], @@ -559,8 +1292,8 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): # just need to make sure the non fraction part agrees self.assertIn(v2_token['access']['token']['expires'][:-1], token_data['token']['expires_at']) - self.assertEqual(v2_token['access']['user']['roles'][0]['id'], - token_data['token']['roles'][0]['id']) + self.assertEqual(v2_token['access']['user']['roles'][0]['name'], + token_data['token']['roles'][0]['name']) class TestPKIZTokenAPIs(TestPKITokenAPIs): @@ -572,7 +1305,8 @@ class TestPKIZTokenAPIs(TestPKITokenAPIs): return cms.pkiz_verify(*args, **kwargs) -class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): +class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, + TokenDataTests): def config_overrides(self): super(TestUUIDTokenAPIs, self).config_overrides() self.config_fixture.config(group='token', provider='uuid') @@ -585,14 +1319,15 @@ class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - resp = self.v3_authenticate_token(auth_data) + resp = self.v3_create_token(auth_data) token_data = resp.result token_id = resp.headers.get('X-Subject-Token') self.assertIn('expires_at', token_data['token']) self.assertFalse(cms.is_asn1_token(token_id)) -class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): +class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests, + TokenDataTests): def config_overrides(self): super(TestFernetTokenAPIs, self).config_overrides() self.config_fixture.config(group='token', provider='fernet') @@ -602,6 +1337,34 @@ class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): super(TestFernetTokenAPIs, self).setUp() self.doSetUp() + def _make_auth_request(self, auth_data): + token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data) + self.assertLess(len(token), 255) + return token + + def test_validate_tampered_unscoped_token_fails(self): + unscoped_token = self._get_unscoped_token() + tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + + unscoped_token[50 + 32:]) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) + + def test_validate_tampered_project_scoped_token_fails(self): + project_scoped_token = self._get_project_scoped_token() + tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + + project_scoped_token[50 + 32:]) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) + + def test_validate_tampered_trust_scoped_token_fails(self): + trustee_user, trust = self._create_trust() + trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) + # Get a trust scoped token + tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + + trust_scoped_token[50 + 32:]) + self._validate_token(tampered_token, + expected_status=http_client.NOT_FOUND) + class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): """Test token revoke using v3 Identity API by token owner and admin.""" @@ -616,29 +1379,22 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): """ super(TestTokenRevokeSelfAndAdmin, self).load_sample_data() # DomainA setup - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.userAdminA = self.new_user_ref(domain_id=self.domainA['id']) - password = self.userAdminA['password'] - self.userAdminA = self.identity_api.create_user(self.userAdminA) - self.userAdminA['password'] = password + self.userAdminA = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) - self.userNormalA = self.new_user_ref( - domain_id=self.domainA['id']) - password = self.userNormalA['password'] - self.userNormalA = self.identity_api.create_user(self.userNormalA) - self.userNormalA['password'] = password + self.userNormalA = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) self.assignment_api.create_grant(self.role['id'], user_id=self.userAdminA['id'], domain_id=self.domainA['id']) - def config_overrides(self): - super(TestTokenRevokeSelfAndAdmin, self).config_overrides() - self.config_fixture.config( - group='oslo_policy', - policy_file=unit.dirs.etc('policy.v3cloudsample.json')) + def _policy_fixture(self): + return ksfixtures.Policy(unit.dirs.etc('policy.v3cloudsample.json'), + self.config_fixture) def test_user_revokes_own_token(self): user_token = self.get_requested_token( @@ -655,11 +1411,13 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): password=self.userAdminA['password'], domain_name=self.domainA['name'])) - self.head('/auth/tokens', headers=headers, expected_status=200, + self.head('/auth/tokens', headers=headers, + expected_status=http_client.OK, token=adminA_token) - self.head('/auth/tokens', headers=headers, expected_status=200, + self.head('/auth/tokens', headers=headers, + expected_status=http_client.OK, token=user_token) - self.delete('/auth/tokens', headers=headers, expected_status=204, + self.delete('/auth/tokens', headers=headers, token=user_token) # invalid X-Auth-Token and invalid X-Subject-Token self.head('/auth/tokens', headers=headers, @@ -693,11 +1451,13 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): password=self.userAdminA['password'], domain_name=self.domainA['name'])) - self.head('/auth/tokens', headers=headers, expected_status=200, + self.head('/auth/tokens', headers=headers, + expected_status=http_client.OK, token=adminA_token) - self.head('/auth/tokens', headers=headers, expected_status=200, + self.head('/auth/tokens', headers=headers, + expected_status=http_client.OK, token=user_token) - self.delete('/auth/tokens', headers=headers, expected_status=204, + self.delete('/auth/tokens', headers=headers, token=adminA_token) # invalid X-Auth-Token and invalid X-Subject-Token self.head('/auth/tokens', headers=headers, @@ -714,14 +1474,12 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): def test_adminB_fails_revoking_userA_token(self): # DomainB setup - self.domainB = self.new_domain_ref() + self.domainB = unit.new_domain_ref() self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.userAdminB = self.new_user_ref(domain_id=self.domainB['id']) - password = self.userAdminB['password'] - self.userAdminB = self.identity_api.create_user(self.userAdminB) - self.userAdminB['password'] = password + userAdminB = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) self.assignment_api.create_grant(self.role['id'], - user_id=self.userAdminB['id'], + user_id=userAdminB['id'], domain_id=self.domainB['id']) user_token = self.get_requested_token( @@ -733,8 +1491,8 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): adminB_token = self.get_requested_token( self.build_authentication_request( - user_id=self.userAdminB['id'], - password=self.userAdminB['password'], + user_id=userAdminB['id'], + password=userAdminB['password'], domain_name=self.domainB['name'])) self.head('/auth/tokens', headers=headers, @@ -750,7 +1508,6 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): def config_overrides(self): super(TestTokenRevokeById, self).config_overrides() - self.config_fixture.config(group='revoke', driver='kvs') self.config_fixture.config( group='token', provider='pki', @@ -782,44 +1539,32 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): super(TestTokenRevokeById, self).setUp() # Start by creating a couple of domains and projects - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = self.new_domain_ref() + self.domainB = unit.new_domain_ref() self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.projectA = self.new_project_ref(domain_id=self.domainA['id']) + self.projectA = unit.new_project_ref(domain_id=self.domainA['id']) self.resource_api.create_project(self.projectA['id'], self.projectA) - self.projectB = self.new_project_ref(domain_id=self.domainA['id']) + self.projectB = unit.new_project_ref(domain_id=self.domainA['id']) self.resource_api.create_project(self.projectB['id'], self.projectB) # Now create some users - self.user1 = self.new_user_ref( - domain_id=self.domainA['id']) - password = self.user1['password'] - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - - self.user2 = self.new_user_ref( - domain_id=self.domainB['id']) - password = self.user2['password'] - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - - self.user3 = self.new_user_ref( - domain_id=self.domainB['id']) - password = self.user3['password'] - self.user3 = self.identity_api.create_user(self.user3) - self.user3['password'] = password - - self.group1 = self.new_group_ref( - domain_id=self.domainA['id']) + self.user1 = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) + + self.user2 = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) + + self.user3 = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) + + self.group1 = unit.new_group_ref(domain_id=self.domainA['id']) self.group1 = self.identity_api.create_group(self.group1) - self.group2 = self.new_group_ref( - domain_id=self.domainA['id']) + self.group2 = unit.new_group_ref(domain_id=self.domainA['id']) self.group2 = self.identity_api.create_group(self.group2) - self.group3 = self.new_group_ref( - domain_id=self.domainB['id']) + self.group3 = unit.new_group_ref(domain_id=self.domainB['id']) self.group3 = self.identity_api.create_group(self.group3) self.identity_api.add_user_to_group(self.user1['id'], @@ -829,9 +1574,9 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.identity_api.add_user_to_group(self.user3['id'], self.group2['id']) - self.role1 = self.new_role_ref() + self.role1 = unit.new_role_ref() self.role_api.create_role(self.role1['id'], self.role1) - self.role2 = self.new_role_ref() + self.role2 = unit.new_role_ref() self.role_api.create_role(self.role2['id'], self.role2) self.assignment_api.create_grant(self.role2['id'], @@ -864,13 +1609,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm both tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, - expected_status=200) + expected_status=http_client.OK) # create a new role - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # assign a new role @@ -883,10 +1628,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # both tokens should remain valid self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': scoped_token}, - expected_status=200) + expected_status=http_client.OK) def test_deleting_user_grant_revokes_token(self): """Test deleting a user grant revokes token. @@ -906,7 +1651,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # Delete the grant, which should invalidate the token grant_url = ( '/projects/%(project_id)s/users/%(user_id)s/' @@ -920,22 +1665,14 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): expected_status=http_client.NOT_FOUND) def role_data_fixtures(self): - self.projectC = self.new_project_ref(domain_id=self.domainA['id']) + self.projectC = unit.new_project_ref(domain_id=self.domainA['id']) self.resource_api.create_project(self.projectC['id'], self.projectC) - self.user4 = self.new_user_ref(domain_id=self.domainB['id']) - password = self.user4['password'] - self.user4 = self.identity_api.create_user(self.user4) - self.user4['password'] = password - self.user5 = self.new_user_ref( - domain_id=self.domainA['id']) - password = self.user5['password'] - self.user5 = self.identity_api.create_user(self.user5) - self.user5['password'] = password - self.user6 = self.new_user_ref( - domain_id=self.domainA['id']) - password = self.user6['password'] - self.user6 = self.identity_api.create_user(self.user6) - self.user6['password'] = password + self.user4 = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) + self.user5 = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) + self.user6 = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) self.identity_api.add_user_to_group(self.user5['id'], self.group1['id']) self.assignment_api.create_grant(self.role1['id'], @@ -954,29 +1691,29 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): def test_deleting_role_revokes_token(self): """Test deleting a role revokes token. - Add some additional test data, namely: - - A third project (project C) - - Three additional users - user4 owned by domainB and user5 and 6 - owned by domainA (different domain ownership should not affect - the test results, just provided to broaden test coverage) - - User5 is a member of group1 - - Group1 gets an additional assignment - role1 on projectB as - well as its existing role1 on projectA - - User4 has role2 on Project C - - User6 has role1 on projectA and domainA - - This allows us to create 5 tokens by virtue of different types - of role assignment: - - user1, scoped to ProjectA by virtue of user role1 assignment - - user5, scoped to ProjectB by virtue of group role1 assignment - - user4, scoped to ProjectC by virtue of user role2 assignment - - user6, scoped to ProjectA by virtue of user role1 assignment - - user6, scoped to DomainA by virtue of user role1 assignment - - role1 is then deleted - - Check the tokens on Project A and B, and DomainA are revoked, - but not the one for Project C + Add some additional test data, namely: + + - A third project (project C) + - Three additional users - user4 owned by domainB and user5 and 6 owned + by domainA (different domain ownership should not affect the test + results, just provided to broaden test coverage) + - User5 is a member of group1 + - Group1 gets an additional assignment - role1 on projectB as well as + its existing role1 on projectA + - User4 has role2 on Project C + - User6 has role1 on projectA and domainA + - This allows us to create 5 tokens by virtue of different types of + role assignment: + - user1, scoped to ProjectA by virtue of user role1 assignment + - user5, scoped to ProjectB by virtue of group role1 assignment + - user4, scoped to ProjectC by virtue of user role2 assignment + - user6, scoped to ProjectA by virtue of user role1 assignment + - user6, scoped to DomainA by virtue of user role1 assignment + - role1 is then deleted + - Check the tokens on Project A and B, and DomainA are revoked, but not + the one for Project C """ - self.role_data_fixtures() # Now we are ready to start issuing requests @@ -1008,19 +1745,19 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': tokenA}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': tokenB}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': tokenC}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': tokenD}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': tokenE}, - expected_status=200) + expected_status=http_client.OK) # Delete the role, which should invalidate the tokens role_url = '/roles/%s' % self.role1['id'] @@ -1043,7 +1780,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # ...but the one using role2 is still valid self.head('/auth/tokens', headers={'X-Subject-Token': tokenC}, - expected_status=200) + expected_status=http_client.OK) def test_domain_user_role_assignment_maintains_token(self): """Test user-domain role assignment maintains existing token. @@ -1063,7 +1800,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # Assign a role, which should not affect the token grant_url = ( '/domains/%(domain_id)s/users/%(user_id)s/' @@ -1074,7 +1811,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) def test_disabling_project_revokes_token(self): token = self.get_requested_token( @@ -1086,7 +1823,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # disable the project, which should invalidate the token self.patch( @@ -1097,7 +1834,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=http_client.NOT_FOUND) - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], @@ -1114,7 +1851,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # delete the project, which should invalidate the token self.delete( @@ -1124,7 +1861,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.head('/auth/tokens', headers={'X-Subject-Token': token}, expected_status=http_client.NOT_FOUND) - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], @@ -1163,13 +1900,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': token3}, - expected_status=200) + expected_status=http_client.OK) # Delete the group grant, which should invalidate the # tokens for user1 and user2 grant_url = ( @@ -1209,7 +1946,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm token is valid self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) # Delete the grant, which should invalidate the token grant_url = ( '/domains/%(domain_id)s/groups/%(group_id)s/' @@ -1220,7 +1957,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.put(grant_url) self.head('/auth/tokens', headers={'X-Subject-Token': token}, - expected_status=200) + expected_status=http_client.OK) def test_group_membership_changes_revokes_token(self): """Test add/removal to/from group revokes token. @@ -1250,10 +1987,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # Confirm tokens are valid self.head('/auth/tokens', headers={'X-Subject-Token': token1}, - expected_status=200) + expected_status=http_client.OK) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=200) + expected_status=http_client.OK) # Remove user1 from group1, which should invalidate # the token self.delete('/groups/%(group_id)s/users/%(user_id)s' % { @@ -1265,18 +2002,17 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # But user2's token should still be valid self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=200) + expected_status=http_client.OK) # Adding user2 to a group should not invalidate token self.put('/groups/%(group_id)s/users/%(user_id)s' % { 'group_id': self.group2['id'], 'user_id': self.user2['id']}) self.head('/auth/tokens', headers={'X-Subject-Token': token2}, - expected_status=200) + expected_status=http_client.OK) def test_removing_role_assignment_does_not_affect_other_users(self): """Revoking a role from one user should not affect other users.""" - # This group grant is not needed for the test self.delete( '/projects/%(project_id)s/groups/%(group_id)s/roles/%(role_id)s' % @@ -1306,7 +2042,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.head('/auth/tokens', headers={'X-Subject-Token': user1_token}, expected_status=http_client.NOT_FOUND) - self.v3_authenticate_token( + self.v3_create_token( self.build_authentication_request( user_id=self.user1['id'], password=self.user1['password'], @@ -1316,8 +2052,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # authorization for the second user should still succeed self.head('/auth/tokens', headers={'X-Subject-Token': user3_token}, - expected_status=200) - self.v3_authenticate_token( + expected_status=http_client.OK) + self.v3_create_token( self.build_authentication_request( user_id=self.user3['id'], password=self.user3['password'], @@ -1338,7 +2074,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.delete( '/projects/%(project_id)s' % {'project_id': self.projectA['id']}) - # Make sure that we get a NotFound(404) when heading that role. + # Make sure that we get a 404 Not Found when heading that role. self.head(role_path, expected_status=http_client.NOT_FOUND) def get_v2_token(self, token=None, project_id=None): @@ -1366,8 +2102,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): token = self.get_v2_token() self.delete('/auth/tokens', - headers={'X-Subject-Token': token}, - expected_status=204) + headers={'X-Subject-Token': token}) self.head('/auth/tokens', headers={'X-Subject-Token': token}, @@ -1397,8 +2132,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # revoke the project-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': project_scoped_token}, - expected_status=204) + headers={'X-Subject-Token': project_scoped_token}) # The project-scoped token is invalidated. self.head('/auth/tokens', @@ -1408,17 +2142,16 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=200) + expected_status=http_client.OK) # The domain-scoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': domain_scoped_token}, - expected_status=200) + expected_status=http_client.OK) # revoke the domain-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': domain_scoped_token}, - expected_status=204) + headers={'X-Subject-Token': domain_scoped_token}) # The domain-scoped token is invalid. self.head('/auth/tokens', @@ -1428,16 +2161,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=200) + expected_status=http_client.OK) def test_revoke_token_from_token_v2(self): # Test that a scoped token can be requested from an unscoped token, # the scoped token can be revoked, and the unscoped token remains # valid. - # FIXME(blk-u): This isn't working correctly. The scoped token should - # be revoked. See bug 1347318. - unscoped_token = self.get_v2_token() # Get a project-scoped token from the unscoped token @@ -1446,8 +2176,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # revoke the project-scoped token. self.delete('/auth/tokens', - headers={'X-Subject-Token': project_scoped_token}, - expected_status=204) + headers={'X-Subject-Token': project_scoped_token}) # The project-scoped token is invalidated. self.head('/auth/tokens', @@ -1457,7 +2186,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): # The unscoped token should still be valid. self.head('/auth/tokens', headers={'X-Subject-Token': unscoped_token}, - expected_status=200) + expected_status=http_client.OK) class TestTokenRevokeByAssignment(TestTokenRevokeById): @@ -1465,9 +2194,6 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById): def config_overrides(self): super(TestTokenRevokeById, self).config_overrides() self.config_fixture.config( - group='revoke', - driver='kvs') - self.config_fixture.config( group='token', provider='uuid', revoke_by_id=True) @@ -1501,7 +2227,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById): # authorization for the projectA should still succeed self.head('/auth/tokens', headers={'X-Subject-Token': other_project_token}, - expected_status=200) + expected_status=http_client.OK) # while token for the projectB should not self.head('/auth/tokens', headers={'X-Subject-Token': project_token}, @@ -1512,14 +2238,21 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById): self.assertIn(project_token, revoked_tokens) -class TestTokenRevokeApi(TestTokenRevokeById): - EXTENSION_NAME = 'revoke' - EXTENSION_TO_ADD = 'revoke_extension' +class RevokeContribTests(test_v3.RestfulTestCase): + @mock.patch.object(versionutils, 'report_deprecated_feature') + def test_exception_happens(self, mock_deprecator): + routers.RevokeExtension(mock.ANY) + mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY) + args, _kwargs = mock_deprecator.call_args + self.assertIn("Remove revoke_extension from", args[1]) + + +class TestTokenRevokeApi(TestTokenRevokeById): """Test token revocation on the v3 Identity API.""" + def config_overrides(self): super(TestTokenRevokeApi, self).config_overrides() - self.config_fixture.config(group='revoke', driver='kvs') self.config_fixture.config( group='token', provider='pki', @@ -1536,15 +2269,19 @@ class TestTokenRevokeApi(TestTokenRevokeById): expected_response = {'events': [{'project_id': project_id}]} self.assertEqual(expected_response, events_response) - def assertDomainInList(self, events_response, domain_id): + def assertDomainAndProjectInList(self, events_response, domain_id): events = events_response['events'] - self.assertEqual(1, len(events)) - self.assertEqual(domain_id, events[0]['domain_id']) + self.assertEqual(2, len(events)) + self.assertEqual(domain_id, events[0]['project_id']) + self.assertEqual(domain_id, events[1]['domain_id']) self.assertIsNotNone(events[0]['issued_before']) + self.assertIsNotNone(events[1]['issued_before']) self.assertIsNotNone(events_response['links']) del (events_response['events'][0]['issued_before']) + del (events_response['events'][1]['issued_before']) del (events_response['links']) - expected_response = {'events': [{'domain_id': domain_id}]} + expected_response = {'events': [{'project_id': domain_id}, + {'domain_id': domain_id}]} self.assertEqual(expected_response, events_response) def assertValidRevokedTokenResponse(self, events_response, **kwargs): @@ -1563,62 +2300,55 @@ class TestTokenRevokeApi(TestTokenRevokeById): def test_revoke_token(self): scoped_token = self.get_scoped_token() headers = {'X-Subject-Token': scoped_token} - response = self.get('/auth/tokens', headers=headers, - expected_status=200).json_body['token'] + response = self.get('/auth/tokens', headers=headers).json_body['token'] - self.delete('/auth/tokens', headers=headers, expected_status=204) + self.delete('/auth/tokens', headers=headers) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - events_response = self.get('/OS-REVOKE/events', - expected_status=200).json_body + events_response = self.get('/OS-REVOKE/events').json_body self.assertValidRevokedTokenResponse(events_response, audit_id=response['audit_ids'][0]) def test_revoke_v2_token(self): token = self.get_v2_token() headers = {'X-Subject-Token': token} - response = self.get('/auth/tokens', headers=headers, - expected_status=200).json_body['token'] - self.delete('/auth/tokens', headers=headers, expected_status=204) + response = self.get('/auth/tokens', + headers=headers).json_body['token'] + self.delete('/auth/tokens', headers=headers) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - events_response = self.get('/OS-REVOKE/events', - expected_status=200).json_body + events_response = self.get('/OS-REVOKE/events').json_body self.assertValidRevokedTokenResponse( events_response, audit_id=response['audit_ids'][0]) - def test_revoke_by_id_false_410(self): + def test_revoke_by_id_false_returns_gone(self): self.get('/auth/tokens/OS-PKI/revoked', expected_status=http_client.GONE) def test_list_delete_project_shows_in_event_list(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events').json_body['events'] self.assertEqual([], events) self.delete( '/projects/%(project_id)s' % {'project_id': self.projectA['id']}) - events_response = self.get('/OS-REVOKE/events', - expected_status=200).json_body + events_response = self.get('/OS-REVOKE/events').json_body self.assertValidDeletedProjectResponse(events_response, self.projectA['id']) def test_disable_domain_shows_in_event_list(self): - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events').json_body['events'] self.assertEqual([], events) disable_body = {'domain': {'enabled': False}} self.patch( '/domains/%(project_id)s' % {'project_id': self.domainA['id']}, body=disable_body) - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body + events = self.get('/OS-REVOKE/events').json_body - self.assertDomainInList(events, self.domainA['id']) + self.assertDomainAndProjectInList(events, self.domainA['id']) def assertEventDataInList(self, events, **kwargs): found = False @@ -1646,30 +2376,31 @@ class TestTokenRevokeApi(TestTokenRevokeById): def test_list_delete_token_shows_in_event_list(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events').json_body['events'] self.assertEqual([], events) scoped_token = self.get_scoped_token() headers = {'X-Subject-Token': scoped_token} auth_req = self.build_authentication_request(token=scoped_token) - response = self.v3_authenticate_token(auth_req) + response = self.v3_create_token(auth_req) token2 = response.json_body['token'] headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']} - response = self.v3_authenticate_token(auth_req) + response = self.v3_create_token(auth_req) response.json_body['token'] headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']} - self.head('/auth/tokens', headers=headers, expected_status=200) - self.head('/auth/tokens', headers=headers2, expected_status=200) - self.head('/auth/tokens', headers=headers3, expected_status=200) + self.head('/auth/tokens', headers=headers, + expected_status=http_client.OK) + self.head('/auth/tokens', headers=headers2, + expected_status=http_client.OK) + self.head('/auth/tokens', headers=headers3, + expected_status=http_client.OK) - self.delete('/auth/tokens', headers=headers, expected_status=204) + self.delete('/auth/tokens', headers=headers) # NOTE(ayoung): not deleting token3, as it should be deleted # by previous - events_response = self.get('/OS-REVOKE/events', - expected_status=200).json_body + events_response = self.get('/OS-REVOKE/events').json_body events = events_response['events'] self.assertEqual(1, len(events)) self.assertEventDataInList( @@ -1677,32 +2408,32 @@ class TestTokenRevokeApi(TestTokenRevokeById): audit_id=token2['audit_ids'][1]) self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) - self.head('/auth/tokens', headers=headers2, expected_status=200) - self.head('/auth/tokens', headers=headers3, expected_status=200) + self.head('/auth/tokens', headers=headers2, + expected_status=http_client.OK) + self.head('/auth/tokens', headers=headers3, + expected_status=http_client.OK) def test_list_with_filter(self): self.role_data_fixtures() - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events').json_body['events'] self.assertEqual(0, len(events)) scoped_token = self.get_scoped_token() headers = {'X-Subject-Token': scoped_token} auth = self.build_authentication_request(token=scoped_token) headers2 = {'X-Subject-Token': self.get_requested_token(auth)} - self.delete('/auth/tokens', headers=headers, expected_status=204) - self.delete('/auth/tokens', headers=headers2, expected_status=204) + self.delete('/auth/tokens', headers=headers) + self.delete('/auth/tokens', headers=headers2) - events = self.get('/OS-REVOKE/events', - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events').json_body['events'] self.assertEqual(2, len(events)) future = utils.isotime(timeutils.utcnow() + datetime.timedelta(seconds=1000)) - events = self.get('/OS-REVOKE/events?since=%s' % (future), - expected_status=200).json_body['events'] + events = self.get('/OS-REVOKE/events?since=%s' % (future) + ).json_body['events'] self.assertEqual(0, len(events)) @@ -1764,7 +2495,7 @@ class TestAuthExternalDomain(test_v3.RestfulTestCase): self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidProjectScopedTokenResponse(r) self.assertEqual(self.user['name'], token['bind']['kerberos']) @@ -1776,7 +2507,7 @@ class TestAuthExternalDomain(test_v3.RestfulTestCase): self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidUnscopedTokenResponse(r) self.assertEqual(self.user['name'], token['bind']['kerberos']) @@ -1820,7 +2551,7 @@ class TestAuthExternalDefaultDomain(test_v3.RestfulTestCase): remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidProjectScopedTokenResponse(r) self.assertEqual(self.default_domain_user['name'], token['bind']['kerberos']) @@ -1831,7 +2562,7 @@ class TestAuthExternalDefaultDomain(test_v3.RestfulTestCase): remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidUnscopedTokenResponse(r) self.assertEqual(self.default_domain_user['name'], token['bind']['kerberos']) @@ -1852,7 +2583,7 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_id(self): @@ -1860,7 +2591,7 @@ class TestAuth(test_v3.RestfulTestCase): username=self.user['name'], user_domain_id=self.domain['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_unscoped_token_with_user_domain_name(self): @@ -1868,7 +2599,7 @@ class TestAuth(test_v3.RestfulTestCase): username=self.user['name'], user_domain_name=self.domain['name'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_project_id_scoped_token_with_user_id(self): @@ -1876,11 +2607,11 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r) def _second_project_as_default(self): - ref = self.new_project_ref(domain_id=self.domain_id) + ref = unit.new_project_ref(domain_id=self.domain_id) r = self.post('/projects', body={'project': ref}) project = self.assertValidProjectResponse(r, ref) @@ -1907,7 +2638,7 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r) self.assertEqual(project['id'], r.result['token']['project']['id']) @@ -1952,7 +2683,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) catalog = r.result['token']['catalog'] self.assertEqual(1, len(catalog)) @@ -1989,13 +2720,12 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertEqual([], r.result['token']['catalog']) def test_auth_catalog_disabled_endpoint(self): """On authenticate, get a catalog that excludes disabled endpoints.""" - # Create a disabled endpoint that's like the enabled one. disabled_endpoint_ref = copy.copy(self.endpoint) disabled_endpoint_id = uuid.uuid4().hex @@ -2011,21 +2741,21 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self._check_disabled_endpoint_result(r.result['token']['catalog'], disabled_endpoint_id) def test_project_id_scoped_token_with_user_id_unauthorized(self): - project = self.new_project_ref(domain_id=self.domain_id) + project = unit.new_project_ref(domain_id=self.domain_id) self.resource_api.create_project(project['id'], project) auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=project['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_user_and_group_roles_scoped_token(self): """Test correct roles are returned in scoped token. @@ -2049,30 +2779,19 @@ class TestAuth(test_v3.RestfulTestCase): tokens """ - - domainA = self.new_domain_ref() + domainA = unit.new_domain_ref() self.resource_api.create_domain(domainA['id'], domainA) - projectA = self.new_project_ref(domain_id=domainA['id']) + projectA = unit.new_project_ref(domain_id=domainA['id']) self.resource_api.create_project(projectA['id'], projectA) - user1 = self.new_user_ref( - domain_id=domainA['id']) - password = user1['password'] - user1 = self.identity_api.create_user(user1) - user1['password'] = password + user1 = unit.create_user(self.identity_api, domain_id=domainA['id']) - user2 = self.new_user_ref( - domain_id=domainA['id']) - password = user2['password'] - user2 = self.identity_api.create_user(user2) - user2['password'] = password + user2 = unit.create_user(self.identity_api, domain_id=domainA['id']) - group1 = self.new_group_ref( - domain_id=domainA['id']) + group1 = unit.new_group_ref(domain_id=domainA['id']) group1 = self.identity_api.create_group(group1) - group2 = self.new_group_ref( - domain_id=domainA['id']) + group2 = unit.new_group_ref(domain_id=domainA['id']) group2 = self.identity_api.create_group(group2) self.identity_api.add_user_to_group(user1['id'], @@ -2083,7 +2802,7 @@ class TestAuth(test_v3.RestfulTestCase): # Now create all the roles and assign them role_list = [] for _ in range(8): - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) role_list.append(role) @@ -2119,7 +2838,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=user1['id'], password=user1['password'], project_id=projectA['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidScopedTokenResponse(r) roles_ids = [] for ref in token['roles']: @@ -2133,7 +2852,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=user1['id'], password=user1['password'], domain_id=domainA['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidScopedTokenResponse(r) roles_ids = [] for ref in token['roles']: @@ -2151,7 +2870,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=user1['id'], password=user1['password'], project_id=projectA['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidScopedTokenResponse(r) roles_ids = [] for ref in token['roles']: @@ -2164,30 +2883,23 @@ class TestAuth(test_v3.RestfulTestCase): def test_auth_token_cross_domain_group_and_project(self): """Verify getting a token in cross domain group/project roles.""" # create domain, project and group and grant roles to user - domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + domain1 = unit.new_domain_ref() self.resource_api.create_domain(domain1['id'], domain1) - project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, - 'domain_id': domain1['id']} + project1 = unit.new_project_ref(domain_id=domain1['id']) self.resource_api.create_project(project1['id'], project1) - user_foo = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) - password = user_foo['password'] - user_foo = self.identity_api.create_user(user_foo) - user_foo['password'] = password - role_member = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} + user_foo = unit.create_user(self.identity_api, + domain_id=test_v3.DEFAULT_DOMAIN_ID) + role_member = unit.new_role_ref() self.role_api.create_role(role_member['id'], role_member) - role_admin = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} + role_admin = unit.new_role_ref() self.role_api.create_role(role_admin['id'], role_admin) - role_foo_domain1 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} + role_foo_domain1 = unit.new_role_ref() self.role_api.create_role(role_foo_domain1['id'], role_foo_domain1) - role_group_domain1 = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} + role_group_domain1 = unit.new_role_ref() self.role_api.create_role(role_group_domain1['id'], role_group_domain1) self.assignment_api.add_user_to_project(project1['id'], user_foo['id']) - new_group = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex} + new_group = unit.new_group_ref(domain_id=domain1['id']) new_group = self.identity_api.create_group(new_group) self.identity_api.add_user_to_group(user_foo['id'], new_group['id']) @@ -2216,7 +2928,7 @@ class TestAuth(test_v3.RestfulTestCase): project_name=project1['name'], project_domain_id=domain1['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) scoped_token = self.assertValidScopedTokenResponse(r) project = scoped_token["project"] roles_ids = [] @@ -2234,7 +2946,7 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_id=self.domain['id'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r) def test_project_id_scoped_token_with_user_domain_name(self): @@ -2243,7 +2955,7 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_name=self.domain['name'], password=self.user['password'], project_id=self.project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidProjectScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_id(self): @@ -2255,7 +2967,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_domain_id(self): @@ -2268,7 +2980,7 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_id=self.domain['id'], password=self.user['password'], domain_id=self.domain['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_id_scoped_token_with_user_domain_name(self): @@ -2281,7 +2993,7 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_name=self.domain['name'], password=self.user['password'], domain_id=self.domain['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_id(self): @@ -2293,7 +3005,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_domain_id(self): @@ -2306,7 +3018,7 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_id=self.domain['id'], password=self.user['password'], domain_name=self.domain['name']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_name_scoped_token_with_user_domain_name(self): @@ -2319,12 +3031,11 @@ class TestAuth(test_v3.RestfulTestCase): user_domain_name=self.domain['name'], password=self.user['password'], domain_name=self.domain['name']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_group_role(self): - group = self.new_group_ref( - domain_id=self.domain_id) + group = unit.new_group_ref(domain_id=self.domain_id) group = self.identity_api.create_group(group) # add user to group @@ -2340,7 +3051,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_token_with_name(self): @@ -2353,7 +3064,7 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_name=self.domain['name']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidDomainScopedTokenResponse(r) def test_domain_scope_failed(self): @@ -2361,21 +3072,21 @@ class TestAuth(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], domain_id=self.domain['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_auth_with_id(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) token = r.headers.get('X-Subject-Token') # test token auth auth_data = self.build_authentication_request(token=token) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def get_v2_token(self, tenant_id=None): @@ -2393,7 +3104,7 @@ class TestAuth(test_v3.RestfulTestCase): def test_validate_v2_unscoped_token_with_v3_api(self): v2_token = self.get_v2_token().result['access']['token']['id'] auth_data = self.build_authentication_request(token=v2_token) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_validate_v2_scoped_token_with_v3_api(self): @@ -2404,46 +3115,46 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( token=v2_token, project_id=self.default_domain_project['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidScopedTokenResponse(r) def test_invalid_user_id(self): auth_data = self.build_authentication_request( user_id=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_user_name(self): auth_data = self.build_authentication_request( username=uuid.uuid4().hex, user_domain_id=self.domain['id'], password=self.user['password']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_domain_id(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_id=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_domain_name(self): auth_data = self.build_authentication_request( username=self.user['name'], user_domain_name=uuid.uuid4().hex, password=self.user['password']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_invalid_password(self): auth_data = self.build_authentication_request( user_id=self.user['id'], password=uuid.uuid4().hex) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_remote_user_no_realm(self): api = auth.controllers.Auth() @@ -2524,7 +3235,7 @@ class TestAuth(test_v3.RestfulTestCase): remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidUnscopedTokenResponse(r) self.assertNotIn('bind', token) @@ -2551,7 +3262,7 @@ class TestAuth(test_v3.RestfulTestCase): remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, 'AUTH_TYPE': 'Negotiate'}) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) # the unscoped token should have bind information in it token = self.assertValidUnscopedTokenResponse(r) @@ -2562,7 +3273,7 @@ class TestAuth(test_v3.RestfulTestCase): # using unscoped token with remote user succeeds auth_params = {'token': token, 'project_id': self.project_id} auth_data = self.build_authentication_request(**auth_params) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = self.assertValidProjectScopedTokenResponse(r) # the bind information should be carried over from the original token @@ -2601,16 +3312,16 @@ class TestAuth(test_v3.RestfulTestCase): token_data['token']['bind']) def test_authenticating_a_user_with_no_password(self): - user = self.new_user_ref(domain_id=self.domain['id']) - user.pop('password', None) # can't have a password for this test + user = unit.new_user_ref(domain_id=self.domain['id']) + del user['password'] # can't have a password for this test user = self.identity_api.create_user(user) auth_data = self.build_authentication_request( user_id=user['id'], password='password') - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_disabled_default_project_result_in_unscoped_token(self): # create a disabled project to work with @@ -2626,11 +3337,11 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_disabled_default_project_domain_result_in_unscoped_token(self): - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() r = self.post('/domains', body={'domain': domain_ref}) domain = self.assertValidDomainResponse(r, domain_ref) @@ -2652,7 +3363,7 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_no_access_to_default_project_result_in_unscoped_token(self): @@ -2664,32 +3375,35 @@ class TestAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) self.assertValidUnscopedTokenResponse(r) def test_disabled_scope_project_domain_result_in_401(self): # create a disabled domain - domain = self.new_domain_ref() - domain['enabled'] = False - self.resource_api.create_domain(domain['id'], domain) + domain = unit.new_domain_ref() + domain = self.resource_api.create_domain(domain['id'], domain) - # create a project in the disabled domain - project = self.new_project_ref(domain_id=domain['id']) + # create a project in the domain + project = unit.new_project_ref(domain_id=domain['id']) self.resource_api.create_project(project['id'], project) - # assign some role to self.user for the project in the disabled domain + # assign some role to self.user for the project in the domain self.assignment_api.add_role_to_user_and_project( self.user['id'], project['id'], self.role_id) + # Disable the domain + domain['enabled'] = False + self.resource_api.update_domain(domain['id'], domain) + # user should not be able to auth with project_id auth_data = self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], project_id=project['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) # user should not be able to auth with project_name & domain auth_data = self.build_authentication_request( @@ -2697,8 +3411,8 @@ class TestAuth(test_v3.RestfulTestCase): password=self.user['password'], project_name=project['name'], project_domain_id=domain['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_auth_methods_with_different_identities_fails(self): # get the token for a user. This is self.user which is different from @@ -2710,8 +3424,124 @@ class TestAuth(test_v3.RestfulTestCase): token=token, user_id=self.default_domain_user['id'], password=self.default_domain_user['password']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_authenticate_fails_if_project_unsafe(self): + """Verify authenticate to a project with unsafe name fails.""" + # Start with url name restrictions off, so we can create the unsafe + # named project + self.config_fixture.config(group='resource', + project_name_url_safe='off') + unsafe_name = 'i am not / safe' + project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID, + name=unsafe_name) + self.resource_api.create_project(project['id'], project) + role_member = unit.new_role_ref() + self.role_api.create_role(role_member['id'], role_member) + self.assignment_api.add_role_to_user_and_project( + self.user['id'], project['id'], role_member['id']) + + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_name=project['name'], + project_domain_id=test_v3.DEFAULT_DOMAIN_ID) + + # Since name url restriction is off, we should be able to autenticate + self.v3_create_token(auth_data) + + # Set the name url restriction to new, which should still allow us to + # authenticate + self.config_fixture.config(group='resource', + project_name_url_safe='new') + self.v3_create_token(auth_data) + + # Set the name url restriction to strict and we should fail to + # authenticate + self.config_fixture.config(group='resource', + project_name_url_safe='strict') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_authenticate_fails_if_domain_unsafe(self): + """Verify authenticate to a domain with unsafe name fails.""" + # Start with url name restrictions off, so we can create the unsafe + # named domain + self.config_fixture.config(group='resource', + domain_name_url_safe='off') + unsafe_name = 'i am not / safe' + domain = unit.new_domain_ref(name=unsafe_name) + self.resource_api.create_domain(domain['id'], domain) + role_member = unit.new_role_ref() + self.role_api.create_role(role_member['id'], role_member) + self.assignment_api.create_grant( + role_member['id'], + user_id=self.user['id'], + domain_id=domain['id']) + + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + domain_name=domain['name']) + + # Since name url restriction is off, we should be able to autenticate + self.v3_create_token(auth_data) + + # Set the name url restriction to new, which should still allow us to + # authenticate + self.config_fixture.config(group='resource', + project_name_url_safe='new') + self.v3_create_token(auth_data) + + # Set the name url restriction to strict and we should fail to + # authenticate + self.config_fixture.config(group='resource', + domain_name_url_safe='strict') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_authenticate_fails_to_project_if_domain_unsafe(self): + """Verify authenticate to a project using unsafe domain name fails.""" + # Start with url name restrictions off, so we can create the unsafe + # named domain + self.config_fixture.config(group='resource', + domain_name_url_safe='off') + unsafe_name = 'i am not / safe' + domain = unit.new_domain_ref(name=unsafe_name) + self.resource_api.create_domain(domain['id'], domain) + # Add a (safely named) project to that domain + project = unit.new_project_ref(domain_id=domain['id']) + self.resource_api.create_project(project['id'], project) + role_member = unit.new_role_ref() + self.role_api.create_role(role_member['id'], role_member) + self.assignment_api.create_grant( + role_member['id'], + user_id=self.user['id'], + project_id=project['id']) + + # An auth request via project ID, but specifying domain by name + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_name=project['name'], + project_domain_name=domain['name']) + + # Since name url restriction is off, we should be able to autenticate + self.v3_create_token(auth_data) + + # Set the name url restriction to new, which should still allow us to + # authenticate + self.config_fixture.config(group='resource', + project_name_url_safe='new') + self.v3_create_token(auth_data) + + # Set the name url restriction to strict and we should fail to + # authenticate + self.config_fixture.config(group='resource', + domain_name_url_safe='strict') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) class TestAuthJSONExternal(test_v3.RestfulTestCase): @@ -2736,7 +3566,7 @@ class TestTrustOptional(test_v3.RestfulTestCase): super(TestTrustOptional, self).config_overrides() self.config_fixture.config(group='trust', enabled=False) - def test_trusts_404(self): + def test_trusts_returns_not_found(self): self.get('/OS-TRUST/trusts', body={'trust': {}}, expected_status=http_client.NOT_FOUND) self.post('/OS-TRUST/trusts', body={'trust': {}}, @@ -2747,11 +3577,11 @@ class TestTrustOptional(test_v3.RestfulTestCase): user_id=self.user['id'], password=self.user['password'], trust_id=uuid.uuid4().hex) - self.v3_authenticate_token(auth_data, - expected_status=http_client.FORBIDDEN) + self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) -class TestTrustRedelegation(test_v3.RestfulTestCase): +class TrustAPIBehavior(test_v3.RestfulTestCase): """Redelegation valid and secure Redelegation is a hierarchical structure of trusts between initial trustor @@ -2778,7 +3608,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): """ def config_overrides(self): - super(TestTrustRedelegation, self).config_overrides() + super(TrustAPIBehavior, self).config_overrides() self.config_fixture.config( group='trust', enabled=True, @@ -2787,14 +3617,13 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): ) def setUp(self): - super(TestTrustRedelegation, self).setUp() + super(TrustAPIBehavior, self).setUp() # Create a trustee to delegate stuff to - trustee_user_ref = self.new_user_ref(domain_id=self.domain_id) - self.trustee_user = self.identity_api.create_user(trustee_user_ref) - self.trustee_user['password'] = trustee_user_ref['password'] + self.trustee_user = unit.create_user(self.identity_api, + domain_id=self.domain_id) # trustor->trustee - self.redelegated_trust_ref = self.new_trust_ref( + self.redelegated_trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user['id'], project_id=self.project_id, @@ -2804,7 +3633,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): allow_redelegation=True) # trustor->trustee (no redelegation) - self.chained_trust_ref = self.new_trust_ref( + self.chained_trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user['id'], project_id=self.project_id, @@ -2865,7 +3694,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): # Attempt to create a redelegated trust supposed to last longer # than the parent trust: let's give it 10 minutes (>1 minute). - too_long_live_chained_trust_ref = self.new_trust_ref( + too_long_live_chained_trust_ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user['id'], project_id=self.project_id, @@ -2894,7 +3723,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): def test_roles_subset(self): # Build second role - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # assign a new role to the user self.assignment_api.create_grant(role_id=role['id'], @@ -2903,6 +3732,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): # Create first trust with extended set of roles ref = self.redelegated_trust_ref + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2032).strftime(unit.TIME_FORMAT) ref['roles'].append({'id': role['id']}) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) @@ -2915,6 +3746,9 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): trust_token = self._get_trust_token(trust) # Chain second trust with roles subset + self.chained_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2028).strftime( + unit.TIME_FORMAT)) r = self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token) @@ -2927,7 +3761,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): def test_redelegate_with_role_by_name(self): # For role by name testing - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user['id'], project_id=self.project_id, @@ -2935,19 +3769,23 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): expires=dict(minutes=1), role_names=[self.role['name']], allow_redelegation=True) + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2032).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) trust = self.assertValidTrustResponse(r) # Ensure we can get a token with this trust trust_token = self._get_trust_token(trust) # Chain second trust with roles subset - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=True, role_names=[self.role['name']], allow_redelegation=True) + ref['expires_at'] = datetime.datetime.utcnow().replace( + year=2028).strftime(unit.TIME_FORMAT) r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=trust_token) @@ -2962,7 +3800,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): trust_token = self._get_trust_token(trust) # Build second trust with a role not in parent's roles - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # assign a new role to the user self.assignment_api.create_grant(role_id=role['id'], @@ -2980,12 +3818,18 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): expected_status=http_client.FORBIDDEN) def test_redelegation_terminator(self): + self.redelegated_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2032).strftime( + unit.TIME_FORMAT)) r = self.post('/OS-TRUST/trusts', body={'trust': self.redelegated_trust_ref}) trust = self.assertValidTrustResponse(r) trust_token = self._get_trust_token(trust) # Build second trust - the terminator + self.chained_trust_ref['expires_at'] = ( + datetime.datetime.utcnow().replace(year=2028).strftime( + unit.TIME_FORMAT)) ref = dict(self.chained_trust_ref, redelegation_count=1, allow_redelegation=False) @@ -3007,215 +3851,64 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): token=trust_token, expected_status=http_client.FORBIDDEN) + def test_redelegation_without_impersonation(self): + # Update trust to not allow impersonation + self.redelegated_trust_ref['impersonation'] = False -class TestTrustChain(test_v3.RestfulTestCase): - - def config_overrides(self): - super(TestTrustChain, self).config_overrides() - self.config_fixture.config( - group='trust', - enabled=True, - allow_redelegation=True, - max_redelegation_count=10 - ) - - def setUp(self): - super(TestTrustChain, self).setUp() - # Create trust chain - self.user_chain = list() - self.trust_chain = list() - for _ in range(3): - user_ref = self.new_user_ref(domain_id=self.domain_id) - user = self.identity_api.create_user(user_ref) - user['password'] = user_ref['password'] - self.user_chain.append(user) - - # trustor->trustee - trustee = self.user_chain[0] - trust_ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=trustee['id'], - project_id=self.project_id, - impersonation=True, - expires=dict(minutes=1), - role_ids=[self.role_id]) - trust_ref.update( - allow_redelegation=True, - redelegation_count=3) - - r = self.post('/OS-TRUST/trusts', - body={'trust': trust_ref}) + # Create trust + resp = self.post('/OS-TRUST/trusts', + body={'trust': self.redelegated_trust_ref}, + expected_status=http_client.CREATED) + trust = self.assertValidTrustResponse(resp) - trust = self.assertValidTrustResponse(r) + # Get trusted token without impersonation auth_data = self.build_authentication_request( - user_id=trustee['id'], - password=trustee['password'], + user_id=self.trustee_user['id'], + password=self.trustee_user['password'], trust_id=trust['id']) trust_token = self.get_requested_token(auth_data) - self.trust_chain.append(trust) - - for trustee in self.user_chain[1:]: - trust_ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=trustee['id'], - project_id=self.project_id, - impersonation=True, - role_ids=[self.role_id]) - trust_ref.update( - allow_redelegation=True) - r = self.post('/OS-TRUST/trusts', - body={'trust': trust_ref}, - token=trust_token) - trust = self.assertValidTrustResponse(r) - auth_data = self.build_authentication_request( - user_id=trustee['id'], - password=trustee['password'], - trust_id=trust['id']) - trust_token = self.get_requested_token(auth_data) - self.trust_chain.append(trust) - - trustee = self.user_chain[-1] - trust = self.trust_chain[-1] - auth_data = self.build_authentication_request( - user_id=trustee['id'], - password=trustee['password'], - trust_id=trust['id']) - - self.last_token = self.get_requested_token(auth_data) - - def assert_user_authenticate(self, user): - auth_data = self.build_authentication_request( - user_id=user['id'], - password=user['password'] - ) - r = self.v3_authenticate_token(auth_data) - self.assertValidTokenResponse(r) - - def assert_trust_tokens_revoked(self, trust_id): - trustee = self.user_chain[0] - auth_data = self.build_authentication_request( - user_id=trustee['id'], - password=trustee['password'] - ) - r = self.v3_authenticate_token(auth_data) - self.assertValidTokenResponse(r) - - revocation_response = self.get('/OS-REVOKE/events') - revocation_events = revocation_response.json_body['events'] - found = False - for event in revocation_events: - if event.get('OS-TRUST:trust_id') == trust_id: - found = True - self.assertTrue(found, 'event with trust_id %s not found in list' % - trust_id) - - def test_delete_trust_cascade(self): - self.assert_user_authenticate(self.user_chain[0]) - self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[0]['id']}, - expected_status=204) - headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, - expected_status=http_client.NOT_FOUND) - self.assert_trust_tokens_revoked(self.trust_chain[0]['id']) - - def test_delete_broken_chain(self): - self.assert_user_authenticate(self.user_chain[0]) - self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[1]['id']}, - expected_status=204) - - self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': self.trust_chain[0]['id']}, - expected_status=204) - - def test_trustor_roles_revoked(self): - self.assert_user_authenticate(self.user_chain[0]) - - self.assignment_api.remove_role_from_user_and_project( - self.user_id, self.project_id, self.role_id - ) - - auth_data = self.build_authentication_request( - token=self.last_token, - trust_id=self.trust_chain[-1]['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.NOT_FOUND) - - def test_intermediate_user_disabled(self): - self.assert_user_authenticate(self.user_chain[0]) - - disabled = self.user_chain[0] - disabled['enabled'] = False - self.identity_api.update_user(disabled['id'], disabled) - - # Bypass policy enforcement - with mock.patch.object(rules, 'enforce', return_value=True): - headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, - expected_status=http_client.FORBIDDEN) - - def test_intermediate_user_deleted(self): - self.assert_user_authenticate(self.user_chain[0]) - - self.identity_api.delete_user(self.user_chain[0]['id']) - - # Bypass policy enforcement - with mock.patch.object(rules, 'enforce', return_value=True): - headers = {'X-Subject-Token': self.last_token} - self.head('/auth/tokens', headers=headers, - expected_status=http_client.FORBIDDEN) - - -class TestTrustAuth(test_v3.RestfulTestCase): - EXTENSION_NAME = 'revoke' - EXTENSION_TO_ADD = 'revoke_extension' - - def config_overrides(self): - super(TestTrustAuth, self).config_overrides() - self.config_fixture.config(group='revoke', driver='kvs') - self.config_fixture.config( - group='token', - provider='pki', - revoke_by_id=False) - self.config_fixture.config(group='trust', enabled=True) + # Create second user for redelegation + trustee_user_2 = unit.create_user(self.identity_api, + domain_id=self.domain_id) - def setUp(self): - super(TestTrustAuth, self).setUp() - - # create a trustee to delegate stuff to - self.trustee_user = self.new_user_ref(domain_id=self.domain_id) - password = self.trustee_user['password'] - self.trustee_user = self.identity_api.create_user(self.trustee_user) - self.trustee_user['password'] = password - self.trustee_user_id = self.trustee_user['id'] + # Trust for redelegation + trust_ref_2 = unit.new_trust_ref( + trustor_user_id=self.trustee_user['id'], + trustee_user_id=trustee_user_2['id'], + project_id=self.project_id, + impersonation=False, + expires=dict(minutes=1), + role_ids=[self.role_id], + allow_redelegation=False) - def test_create_trust_bad_request(self): - # The server returns a 403 Forbidden rather than a 400, see bug 1133435 - self.post('/OS-TRUST/trusts', body={'trust': {}}, - expected_status=http_client.FORBIDDEN) + # Creating a second trust should not be allowed since trustor does not + # have the role to delegate thus returning 404 NOT FOUND. + resp = self.post('/OS-TRUST/trusts', + body={'trust': trust_ref_2}, + token=trust_token, + expected_status=http_client.NOT_FOUND) def test_create_unscoped_trust(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id) + trustee_user_id=self.trustee_user['id']) r = self.post('/OS-TRUST/trusts', body={'trust': ref}) self.assertValidTrustResponse(r, ref) def test_create_trust_no_roles(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=http_client.FORBIDDEN) def _initialize_test_consume_trust(self, count): # Make sure remaining_uses is decremented as we consume the trust - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, remaining_uses=count, role_ids=[self.role_id]) @@ -3223,30 +3916,29 @@ class TestTrustAuth(test_v3.RestfulTestCase): # make sure the trust exists trust = self.assertValidTrustResponse(r, ref) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=200) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) # get a token for the trustee auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = r.headers.get('X-Subject-Token') # get a trust token, consume one use auth_data = self.build_authentication_request( token=token, trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) return trust def test_consume_trust_once(self): trust = self._initialize_test_consume_trust(2) # check decremented value r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=200) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) trust = r.result.get('trust') self.assertIsNotNone(trust) self.assertEqual(1, trust['remaining_uses']) + # FIXME(lbragstad): Assert the role that is returned is the right role. def test_create_one_time_use_trust(self): trust = self._initialize_test_consume_trust(1) @@ -3259,61 +3951,15 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - def test_create_trust_with_bad_values_for_remaining_uses(self): - # negative values for the remaining_uses parameter are forbidden - self._create_trust_with_bad_remaining_use(bad_value=-1) - # 0 is a forbidden value as well - self._create_trust_with_bad_remaining_use(bad_value=0) - # as are non integer values - self._create_trust_with_bad_remaining_use(bad_value="a bad value") - self._create_trust_with_bad_remaining_use(bad_value=7.2) - - def _create_trust_with_bad_remaining_use(self, bad_value): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - remaining_uses=bad_value, - role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', - body={'trust': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_invalid_trust_request_without_impersonation(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - role_ids=[self.role_id]) - - del ref['impersonation'] - - self.post('/OS-TRUST/trusts', - body={'trust': ref}, - expected_status=http_client.BAD_REQUEST) - - def test_invalid_trust_request_without_trustee(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - role_ids=[self.role_id]) - - del ref['trustee_user_id'] - - self.post('/OS-TRUST/trusts', - body={'trust': ref}, - expected_status=http_client.BAD_REQUEST) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_create_unlimited_use_trust(self): # by default trusts are unlimited in terms of tokens that can be # generated from them, this test creates such a trust explicitly - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, remaining_uses=None, role_ids=[self.role_id]) @@ -3321,322 +3967,25 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self.assertValidTrustResponse(r, ref) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=200) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) token = r.headers.get('X-Subject-Token') auth_data = self.build_authentication_request( token=token, trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=200) + '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}) trust = r.result.get('trust') self.assertIsNone(trust['remaining_uses']) - def test_trust_crud(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - role_ids=[self.role_id]) - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r, ref) - - r = self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=200) - self.assertValidTrustResponse(r, ref) - - # validate roles on the trust - r = self.get( - '/OS-TRUST/trusts/%(trust_id)s/roles' % { - 'trust_id': trust['id']}, - expected_status=200) - roles = self.assertValidRoleListResponse(r, self.role) - self.assertIn(self.role['id'], [x['id'] for x in roles]) - self.head( - '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { - 'trust_id': trust['id'], - 'role_id': self.role['id']}, - expected_status=200) - r = self.get( - '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { - 'trust_id': trust['id'], - 'role_id': self.role['id']}, - expected_status=200) - self.assertValidRoleResponse(r, self.role) - - r = self.get('/OS-TRUST/trusts', expected_status=200) - self.assertValidTrustListResponse(r, trust) - - # trusts are immutable - self.patch( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - body={'trust': ref}, - expected_status=http_client.NOT_FOUND) - - self.delete( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=204) - - self.get( - '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']}, - expected_status=http_client.NOT_FOUND) - - def test_create_trust_trustee_404(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=uuid.uuid4().hex, - project_id=self.project_id, - role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=http_client.NOT_FOUND) - - def test_create_trust_trustor_trustee_backwards(self): - ref = self.new_trust_ref( - trustor_user_id=self.trustee_user_id, - trustee_user_id=self.user_id, - project_id=self.project_id, - role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=http_client.FORBIDDEN) - - def test_create_trust_project_404(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=uuid.uuid4().hex, - role_ids=[self.role_id]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=http_client.NOT_FOUND) - - def test_create_trust_role_id_404(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - role_ids=[uuid.uuid4().hex]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=http_client.NOT_FOUND) - - def test_create_trust_role_name_404(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - role_names=[uuid.uuid4().hex]) - self.post('/OS-TRUST/trusts', body={'trust': ref}, - expected_status=http_client.NOT_FOUND) - - def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.default_domain_user_id, - project_id=self.project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.default_domain_user['id'], - password=self.default_domain_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse( - r, self.default_domain_user) - - token = r.headers.get('X-Subject-Token') - - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request( - path=path, token=CONF.admin_token, - method='GET', expected_status=http_client.UNAUTHORIZED) - - def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self): - ref = self.new_trust_ref( - trustor_user_id=self.default_domain_user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.default_domain_project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - auth_data = self.build_authentication_request( - user_id=self.default_domain_user['id'], - password=self.default_domain_user['password'], - project_id=self.default_domain_project_id) - token = self.get_requested_token(auth_data) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse( - r, self.trustee_user) - token = r.headers.get('X-Subject-Token') - - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request( - path=path, token=CONF.admin_token, - method='GET', expected_status=http_client.UNAUTHORIZED) - - def test_v3_v2_intermix_project_not_in_default_domaini_failed(self): - # create a trustee in default domain to delegate stuff to - trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) - password = trustee_user['password'] - trustee_user = self.identity_api.create_user(trustee_user) - trustee_user['password'] = password - trustee_user_id = trustee_user['id'] - - ref = self.new_trust_ref( - trustor_user_id=self.default_domain_user_id, - trustee_user_id=trustee_user_id, - project_id=self.project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - auth_data = self.build_authentication_request( - user_id=self.default_domain_user['id'], - password=self.default_domain_user['password'], - project_id=self.default_domain_project_id) - token = self.get_requested_token(auth_data) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=trustee_user['id'], - password=trustee_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse( - r, trustee_user) - token = r.headers.get('X-Subject-Token') - - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request( - path=path, token=CONF.admin_token, - method='GET', expected_status=http_client.UNAUTHORIZED) - - def test_v3_v2_intermix(self): - # create a trustee in default domain to delegate stuff to - trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID) - password = trustee_user['password'] - trustee_user = self.identity_api.create_user(trustee_user) - trustee_user['password'] = password - trustee_user_id = trustee_user['id'] - - ref = self.new_trust_ref( - trustor_user_id=self.default_domain_user_id, - trustee_user_id=trustee_user_id, - project_id=self.default_domain_project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - auth_data = self.build_authentication_request( - user_id=self.default_domain_user['id'], - password=self.default_domain_user['password'], - project_id=self.default_domain_project_id) - token = self.get_requested_token(auth_data) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=trustee_user['id'], - password=trustee_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse( - r, trustee_user) - token = r.headers.get('X-Subject-Token') - - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request( - path=path, token=CONF.admin_token, - method='GET', expected_status=200) - - def test_exercise_trust_scoped_token_without_impersonation(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user) - self.assertEqual(self.trustee_user['id'], - r.result['token']['user']['id']) - self.assertEqual(self.trustee_user['name'], - r.result['token']['user']['name']) - self.assertEqual(self.domain['id'], - r.result['token']['user']['domain']['id']) - self.assertEqual(self.domain['name'], - r.result['token']['user']['domain']['name']) - self.assertEqual(self.project['id'], - r.result['token']['project']['id']) - self.assertEqual(self.project['name'], - r.result['token']['project']['name']) - - def test_exercise_trust_scoped_token_with_impersonation(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - impersonation=True, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password'], - trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse(r, self.user) - self.assertEqual(self.user['id'], r.result['token']['user']['id']) - self.assertEqual(self.user['name'], r.result['token']['user']['name']) - self.assertEqual(self.domain['id'], - r.result['token']['user']['domain']['id']) - self.assertEqual(self.domain['name'], - r.result['token']['user']['domain']['name']) - self.assertEqual(self.project['id'], - r.result['token']['project']['id']) - self.assertEqual(self.project['name'], - r.result['token']['project']['name']) - def test_impersonation_token_cannot_create_new_trust(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=True, expires=dict(minutes=1), @@ -3653,9 +4002,9 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_token = self.get_requested_token(auth_data) # Build second trust - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=True, expires=dict(minutes=1), @@ -3668,7 +4017,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): def test_trust_deleted_grant(self): # create a new role - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) grant_url = ( @@ -3682,9 +4031,9 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.put(grant_url) # create a trust that delegates the new role - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -3702,8 +4051,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data, - expected_status=http_client.FORBIDDEN) + r = self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) def test_trust_chained(self): """Test that a trust token can't be used to execute another trust. @@ -3713,28 +4062,26 @@ class TestTrustAuth(test_v3.RestfulTestCase): """ # create a sub-trustee user - sub_trustee_user = self.new_user_ref( + sub_trustee_user = unit.create_user( + self.identity_api, domain_id=test_v3.DEFAULT_DOMAIN_ID) - password = sub_trustee_user['password'] - sub_trustee_user = self.identity_api.create_user(sub_trustee_user) - sub_trustee_user['password'] = password sub_trustee_user_id = sub_trustee_user['id'] # create a new role - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # assign the new role to trustee self.put( '/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % { 'project_id': self.project_id, - 'user_id': self.trustee_user_id, + 'user_id': self.trustee_user['id'], 'role_id': role['id']}) # create a trust from trustor -> trustee - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=True, expires=dict(minutes=1), @@ -3744,14 +4091,14 @@ class TestTrustAuth(test_v3.RestfulTestCase): # authenticate as trustee so we can create a second trust auth_data = self.build_authentication_request( - user_id=self.trustee_user_id, + user_id=self.trustee_user['id'], password=self.trustee_user['password'], project_id=self.project_id) token = self.get_requested_token(auth_data) # create a trust from trustee -> sub-trustee - ref = self.new_trust_ref( - trustor_user_id=self.trustee_user_id, + ref = unit.new_trust_ref( + trustor_user_id=self.trustee_user['id'], trustee_user_id=sub_trustee_user_id, project_id=self.project_id, impersonation=True, @@ -3771,12 +4118,11 @@ class TestTrustAuth(test_v3.RestfulTestCase): auth_data = self.build_authentication_request( token=trust_token, trust_id=trust1['id']) - r = self.v3_authenticate_token(auth_data, - expected_status=http_client.FORBIDDEN) + r = self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) def assertTrustTokensRevoked(self, trust_id): - revocation_response = self.get('/OS-REVOKE/events', - expected_status=200) + revocation_response = self.get('/OS-REVOKE/events') revocation_events = revocation_response.json_body['events'] found = False for event in revocation_events: @@ -3786,9 +4132,9 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_id) def test_delete_trust_revokes_tokens(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -3800,13 +4146,12 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust_id) - r = self.v3_authenticate_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse( + r = self.v3_create_token(auth_data) + self.assertValidProjectScopedTokenResponse( r, self.trustee_user) trust_token = r.headers['X-Subject-Token'] self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust_id}, - expected_status=204) + 'trust_id': trust_id}) headers = {'X-Subject-Token': trust_token} self.head('/auth/tokens', headers=headers, expected_status=http_client.NOT_FOUND) @@ -3817,9 +4162,9 @@ class TestTrustAuth(test_v3.RestfulTestCase): self.identity_api.update_user(user['id'], user) def test_trust_get_token_fails_if_trustor_disabled(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -3833,7 +4178,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=201) + self.v3_create_token(auth_data) self.disable_user(self.user) @@ -3841,13 +4186,13 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.FORBIDDEN) + self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) def test_trust_get_token_fails_if_trustee_disabled(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -3861,7 +4206,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=201) + self.v3_create_token(auth_data) self.disable_user(self.trustee_user) @@ -3869,13 +4214,13 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_delete_trust(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -3886,57 +4231,19 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust = self.assertValidTrustResponse(r, ref) self.delete('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust['id']}, - expected_status=204) - - self.get('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust['id']}, - expected_status=http_client.NOT_FOUND) - - self.get('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust['id']}, - expected_status=http_client.NOT_FOUND) + 'trust_id': trust['id']}) auth_data = self.build_authentication_request( user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - self.v3_authenticate_token(auth_data, - expected_status=http_client.UNAUTHORIZED) - - def test_list_trusts(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - impersonation=False, - expires=dict(minutes=1), - role_ids=[self.role_id]) - - for i in range(3): - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - self.assertValidTrustResponse(r, ref) - - r = self.get('/OS-TRUST/trusts', expected_status=200) - trusts = r.result['trusts'] - self.assertEqual(3, len(trusts)) - self.assertValidTrustListResponse(r) - - r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' % - self.user_id, expected_status=200) - trusts = r.result['trusts'] - self.assertEqual(3, len(trusts)) - self.assertValidTrustListResponse(r) - - r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' % - self.user_id, expected_status=200) - trusts = r.result['trusts'] - self.assertEqual(0, len(trusts)) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) def test_change_password_invalidates_trust_tokens(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=True, expires=dict(minutes=1), @@ -3949,64 +4256,52 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.trustee_user['id'], password=self.trustee_user['password'], trust_id=trust['id']) - r = self.v3_authenticate_token(auth_data) + r = self.v3_create_token(auth_data) - self.assertValidProjectTrustScopedTokenResponse(r, self.user) + self.assertValidProjectScopedTokenResponse(r, self.user) trust_token = r.headers.get('X-Subject-Token') self.get('/OS-TRUST/trusts?trustor_user_id=%s' % - self.user_id, expected_status=200, - token=trust_token) + self.user_id, token=trust_token) self.assertValidUserResponse( self.patch('/users/%s' % self.trustee_user['id'], - body={'user': {'password': uuid.uuid4().hex}}, - expected_status=200)) + body={'user': {'password': uuid.uuid4().hex}})) self.get('/OS-TRUST/trusts?trustor_user_id=%s' % self.user_id, expected_status=http_client.UNAUTHORIZED, token=trust_token) def test_trustee_can_do_role_ops(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - impersonation=True, - role_ids=[self.role_id]) - - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password']) + resp = self.post('/OS-TRUST/trusts', + body={'trust': self.redelegated_trust_ref}) + trust = self.assertValidTrustResponse(resp) + trust_token = self._get_trust_token(trust) - r = self.get( + resp = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles' % { 'trust_id': trust['id']}, - auth=auth_data) - self.assertValidRoleListResponse(r, self.role) + token=trust_token) + self.assertValidRoleListResponse(resp, self.role) self.head( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, - auth=auth_data, - expected_status=200) + token=trust_token, + expected_status=http_client.OK) - r = self.get( + resp = self.get( '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % { 'trust_id': trust['id'], 'role_id': self.role['id']}, - auth=auth_data, - expected_status=200) - self.assertValidRoleResponse(r, self.role) + token=trust_token) + self.assertValidRoleResponse(resp, self.role) def test_do_not_consume_remaining_uses_when_get_token_fails(self): - ref = self.new_trust_ref( + ref = unit.new_trust_ref( trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, + trustee_user_id=self.trustee_user['id'], project_id=self.project_id, impersonation=False, expires=dict(minutes=1), @@ -4023,13 +4318,209 @@ class TestTrustAuth(test_v3.RestfulTestCase): user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], trust_id=trust_id) - self.v3_authenticate_token(auth_data, - expected_status=http_client.FORBIDDEN) + self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) r = self.get('/OS-TRUST/trusts/%s' % trust_id) self.assertEqual(3, r.result.get('trust').get('remaining_uses')) +class TestTrustChain(test_v3.RestfulTestCase): + + def config_overrides(self): + super(TestTrustChain, self).config_overrides() + self.config_fixture.config( + group='trust', + enabled=True, + allow_redelegation=True, + max_redelegation_count=10 + ) + + def setUp(self): + super(TestTrustChain, self).setUp() + """Create a trust chain using redelegation. + + A trust chain is a series of trusts that are redelegated. For example, + self.user_list consists of userA, userB, and userC. The first trust in + the trust chain is going to be established between self.user and userA, + call it trustA. Then, userA is going to obtain a trust scoped token + using trustA, and with that token create a trust between userA and + userB called trustB. This pattern will continue with userB creating a + trust with userC. + So the trust chain should look something like: + trustA -> trustB -> trustC + Where: + self.user is trusting userA with trustA + userA is trusting userB with trustB + userB is trusting userC with trustC + + """ + self.user_list = list() + self.trust_chain = list() + for _ in range(3): + user = unit.create_user(self.identity_api, + domain_id=self.domain_id) + self.user_list.append(user) + + # trustor->trustee redelegation with impersonation + trustee = self.user_list[0] + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=trustee['id'], + project_id=self.project_id, + impersonation=True, + expires=dict(minutes=1), + role_ids=[self.role_id], + allow_redelegation=True, + redelegation_count=3) + + # Create a trust between self.user and the first user in the list + r = self.post('/OS-TRUST/trusts', + body={'trust': trust_ref}) + + trust = self.assertValidTrustResponse(r) + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id']) + + # Generate a trusted token for the first user + trust_token = self.get_requested_token(auth_data) + self.trust_chain.append(trust) + + # Loop through the user to create a chain of redelegated trust. + for next_trustee in self.user_list[1:]: + trust_ref = unit.new_trust_ref( + trustor_user_id=self.user_id, + trustee_user_id=next_trustee['id'], + project_id=self.project_id, + impersonation=True, + role_ids=[self.role_id], + allow_redelegation=True) + r = self.post('/OS-TRUST/trusts', + body={'trust': trust_ref}, + token=trust_token) + trust = self.assertValidTrustResponse(r) + auth_data = self.build_authentication_request( + user_id=next_trustee['id'], + password=next_trustee['password'], + trust_id=trust['id']) + trust_token = self.get_requested_token(auth_data) + self.trust_chain.append(trust) + + trustee = self.user_list[-1] + trust = self.trust_chain[-1] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'], + trust_id=trust['id']) + + self.last_token = self.get_requested_token(auth_data) + + def assert_user_authenticate(self, user): + auth_data = self.build_authentication_request( + user_id=user['id'], + password=user['password'] + ) + r = self.v3_create_token(auth_data) + self.assertValidTokenResponse(r) + + def assert_trust_tokens_revoked(self, trust_id): + trustee = self.user_list[0] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password'] + ) + r = self.v3_create_token(auth_data) + self.assertValidTokenResponse(r) + + revocation_response = self.get('/OS-REVOKE/events') + revocation_events = revocation_response.json_body['events'] + found = False + for event in revocation_events: + if event.get('OS-TRUST:trust_id') == trust_id: + found = True + self.assertTrue(found, 'event with trust_id %s not found in list' % + trust_id) + + def test_delete_trust_cascade(self): + self.assert_user_authenticate(self.user_list[0]) + self.delete('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': self.trust_chain[0]['id']}) + + headers = {'X-Subject-Token': self.last_token} + self.head('/auth/tokens', headers=headers, + expected_status=http_client.NOT_FOUND) + self.assert_trust_tokens_revoked(self.trust_chain[0]['id']) + + def test_delete_broken_chain(self): + self.assert_user_authenticate(self.user_list[0]) + self.delete('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': self.trust_chain[0]['id']}) + + # Verify the two remaining trust have been deleted + for i in range(len(self.user_list) - 1): + auth_data = self.build_authentication_request( + user_id=self.user_list[i]['id'], + password=self.user_list[i]['password']) + + auth_token = self.get_requested_token(auth_data) + + # Assert chained trust have been deleted + self.get('/OS-TRUST/trusts/%(trust_id)s' % { + 'trust_id': self.trust_chain[i + 1]['id']}, + token=auth_token, + expected_status=http_client.NOT_FOUND) + + def test_trustor_roles_revoked(self): + self.assert_user_authenticate(self.user_list[0]) + + self.assignment_api.remove_role_from_user_and_project( + self.user_id, self.project_id, self.role_id + ) + + # Verify that users are not allowed to authenticate with trust + for i in range(len(self.user_list[1:])): + trustee = self.user_list[i] + auth_data = self.build_authentication_request( + user_id=trustee['id'], + password=trustee['password']) + + # Attempt to authenticate with trust + token = self.get_requested_token(auth_data) + auth_data = self.build_authentication_request( + token=token, + trust_id=self.trust_chain[i - 1]['id']) + + # Trustee has no delegated roles + self.v3_create_token(auth_data, + expected_status=http_client.FORBIDDEN) + + def test_intermediate_user_disabled(self): + self.assert_user_authenticate(self.user_list[0]) + + disabled = self.user_list[0] + disabled['enabled'] = False + self.identity_api.update_user(disabled['id'], disabled) + + # Bypass policy enforcement + with mock.patch.object(rules, 'enforce', return_value=True): + headers = {'X-Subject-Token': self.last_token} + self.head('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN) + + def test_intermediate_user_deleted(self): + self.assert_user_authenticate(self.user_list[0]) + + self.identity_api.delete_user(self.user_list[0]['id']) + + # Bypass policy enforcement + with mock.patch.object(rules, 'enforce', return_value=True): + headers = {'X-Subject-Token': self.last_token} + self.head('/auth/tokens', headers=headers, + expected_status=http_client.FORBIDDEN) + + class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase): def test_api_protection_with_no_auth_context_in_env(self): auth_data = self.build_authentication_request( @@ -4045,7 +4536,7 @@ class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase): 'query_string': {}, 'environment': {}} r = auth_controller.validate_token(context) - self.assertEqual(200, r.status_code) + self.assertEqual(http_client.OK, r.status_code) class TestAuthContext(unit.TestCase): @@ -4105,9 +4596,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): def test_get_catalog_project_scoped_token(self): """Call ``GET /auth/catalog`` with a project-scoped token.""" - r = self.get( - '/auth/catalog', - expected_status=200) + r = self.get('/auth/catalog') self.assertValidCatalogResponse(r) def test_get_catalog_domain_scoped_token(self): @@ -4141,7 +4630,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): expected_status=http_client.UNAUTHORIZED) def test_get_projects_project_scoped_token(self): - r = self.get('/auth/projects', expected_status=200) + r = self.get('/auth/projects') self.assertThat(r.json['projects'], matchers.HasLength(1)) self.assertValidProjectListResponse(r) @@ -4149,452 +4638,318 @@ class TestAuthSpecificData(test_v3.RestfulTestCase): self.put(path='/domains/%s/users/%s/roles/%s' % ( self.domain['id'], self.user['id'], self.role['id'])) - r = self.get('/auth/domains', expected_status=200) + r = self.get('/auth/domains') self.assertThat(r.json['domains'], matchers.HasLength(1)) self.assertValidDomainListResponse(r) -class TestFernetTokenProvider(test_v3.RestfulTestCase): - def setUp(self): - super(TestFernetTokenProvider, self).setUp() - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - - def _make_auth_request(self, auth_data): - resp = self.post('/auth/tokens', body=auth_data, expected_status=201) - token = resp.headers.get('X-Subject-Token') - self.assertLess(len(token), 255) - return token - - def _get_unscoped_token(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password']) - return self._make_auth_request(auth_data) - - def _get_project_scoped_token(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - project_id=self.project_id) - return self._make_auth_request(auth_data) +class TestTrustAuthPKITokenProvider(TrustAPIBehavior, TestTrustChain): + def config_overrides(self): + super(TestTrustAuthPKITokenProvider, self).config_overrides() + self.config_fixture.config(group='token', + provider='pki', + revoke_by_id=False) + self.config_fixture.config(group='trust', + enabled=True) - def _get_domain_scoped_token(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - domain_id=self.domain_id) - return self._make_auth_request(auth_data) - def _get_trust_scoped_token(self, trustee_user, trust): - auth_data = self.build_authentication_request( - user_id=trustee_user['id'], - password=trustee_user['password'], - trust_id=trust['id']) - return self._make_auth_request(auth_data) - - def _validate_token(self, token, expected_status=200): - return self.get( - '/auth/tokens', - headers={'X-Subject-Token': token}, - expected_status=expected_status) +class TestTrustAuthPKIZTokenProvider(TrustAPIBehavior, TestTrustChain): + def config_overrides(self): + super(TestTrustAuthPKIZTokenProvider, self).config_overrides() + self.config_fixture.config(group='token', + provider='pkiz', + revoke_by_id=False) + self.config_fixture.config(group='trust', + enabled=True) - def _revoke_token(self, token, expected_status=204): - return self.delete( - '/auth/tokens', - headers={'X-Subject-Token': token}, - expected_status=expected_status) - def _set_user_enabled(self, user, enabled=True): - user['enabled'] = enabled - self.identity_api.update_user(user['id'], user) +class TestTrustAuthFernetTokenProvider(TrustAPIBehavior, TestTrustChain): + def config_overrides(self): + super(TestTrustAuthFernetTokenProvider, self).config_overrides() + self.config_fixture.config(group='token', + provider='fernet', + revoke_by_id=False) + self.config_fixture.config(group='trust', + enabled=True) + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - def _create_trust(self): - # Create a trustee user - trustee_user_ref = self.new_user_ref(domain_id=self.domain_id) - trustee_user = self.identity_api.create_user(trustee_user_ref) - trustee_user['password'] = trustee_user_ref['password'] - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=trustee_user['id'], - project_id=self.project_id, - impersonation=False, - role_ids=[self.role_id]) - # Create a trust - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r) - return (trustee_user, trust) +class TestAuthFernetTokenProvider(TestAuth): + def setUp(self): + super(TestAuthFernetTokenProvider, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) def config_overrides(self): - super(TestFernetTokenProvider, self).config_overrides() + super(TestAuthFernetTokenProvider, self).config_overrides() self.config_fixture.config(group='token', provider='fernet') - def test_validate_unscoped_token(self): - unscoped_token = self._get_unscoped_token() - self._validate_token(unscoped_token) + def test_verify_with_bound_token(self): + self.config_fixture.config(group='token', bind='kerberos') + auth_data = self.build_authentication_request( + project_id=self.project['id']) + remote_user = self.default_domain_user['name'] + self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, + 'AUTH_TYPE': 'Negotiate'}) + # Bind not current supported by Fernet, see bug 1433311. + self.v3_create_token(auth_data, + expected_status=http_client.NOT_IMPLEMENTED) - def test_validate_tampered_unscoped_token_fails(self): - unscoped_token = self._get_unscoped_token() - tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + - unscoped_token[50 + 32:]) - self._validate_token(tampered_token, - expected_status=http_client.NOT_FOUND) + def test_v2_v3_bind_token_intermix(self): + self.config_fixture.config(group='token', bind='kerberos') - def test_revoke_unscoped_token(self): - unscoped_token = self._get_unscoped_token() - self._validate_token(unscoped_token) - self._revoke_token(unscoped_token) - self._validate_token(unscoped_token, - expected_status=http_client.NOT_FOUND) + # we need our own user registered to the default domain because of + # the way external auth works. + remote_user = self.default_domain_user['name'] + self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, + 'AUTH_TYPE': 'Negotiate'}) + body = {'auth': {}} + # Bind not current supported by Fernet, see bug 1433311. + self.admin_request(path='/v2.0/tokens', + method='POST', + body=body, + expected_status=http_client.NOT_IMPLEMENTED) - def test_unscoped_token_is_invalid_after_disabling_user(self): - unscoped_token = self._get_unscoped_token() - # Make sure the token is valid - self._validate_token(unscoped_token) - # Disable the user - self._set_user_enabled(self.user, enabled=False) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - unscoped_token) + def test_auth_with_bind_token(self): + self.config_fixture.config(group='token', bind=['kerberos']) - def test_unscoped_token_is_invalid_after_enabling_disabled_user(self): - unscoped_token = self._get_unscoped_token() - # Make sure the token is valid - self._validate_token(unscoped_token) - # Disable the user - self._set_user_enabled(self.user, enabled=False) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - unscoped_token) - # Enable the user - self._set_user_enabled(self.user) - # Ensure validating a token for a re-enabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - unscoped_token) + auth_data = self.build_authentication_request() + remote_user = self.default_domain_user['name'] + self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, + 'AUTH_TYPE': 'Negotiate'}) + # Bind not current supported by Fernet, see bug 1433311. + self.v3_create_token(auth_data, + expected_status=http_client.NOT_IMPLEMENTED) - def test_unscoped_token_is_invalid_after_disabling_user_domain(self): - unscoped_token = self._get_unscoped_token() - # Make sure the token is valid - self._validate_token(unscoped_token) - # Disable the user's domain - self.domain['enabled'] = False - self.resource_api.update_domain(self.domain['id'], self.domain) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - unscoped_token) - def test_unscoped_token_is_invalid_after_changing_user_password(self): - unscoped_token = self._get_unscoped_token() - # Make sure the token is valid - self._validate_token(unscoped_token) - # Change user's password - self.user['password'] = 'Password1' - self.identity_api.update_user(self.user['id'], self.user) - # Ensure updating user's password revokes existing user's tokens - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - unscoped_token) +class TestAuthTOTP(test_v3.RestfulTestCase): - def test_validate_project_scoped_token(self): - project_scoped_token = self._get_project_scoped_token() - self._validate_token(project_scoped_token) + def setUp(self): + super(TestAuthTOTP, self).setUp() - def test_validate_domain_scoped_token(self): - # Grant user access to domain - self.assignment_api.create_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) - domain_scoped_token = self._get_domain_scoped_token() - resp = self._validate_token(domain_scoped_token) - resp_json = json.loads(resp.body) - self.assertIsNotNone(resp_json['token']['catalog']) - self.assertIsNotNone(resp_json['token']['roles']) - self.assertIsNotNone(resp_json['token']['domain']) + ref = unit.new_totp_credential( + user_id=self.default_domain_user['id'], + project_id=self.default_domain_project['id']) - def test_validate_tampered_project_scoped_token_fails(self): - project_scoped_token = self._get_project_scoped_token() - tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + - project_scoped_token[50 + 32:]) - self._validate_token(tampered_token, - expected_status=http_client.NOT_FOUND) + self.secret = ref['blob'] - def test_revoke_project_scoped_token(self): - project_scoped_token = self._get_project_scoped_token() - self._validate_token(project_scoped_token) - self._revoke_token(project_scoped_token) - self._validate_token(project_scoped_token, - expected_status=http_client.NOT_FOUND) + r = self.post('/credentials', body={'credential': ref}) + self.assertValidCredentialResponse(r, ref) - def test_project_scoped_token_is_invalid_after_disabling_user(self): - project_scoped_token = self._get_project_scoped_token() - # Make sure the token is valid - self._validate_token(project_scoped_token) - # Disable the user - self._set_user_enabled(self.user, enabled=False) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - project_scoped_token) + self.addCleanup(self.cleanup) - def test_domain_scoped_token_is_invalid_after_disabling_user(self): - # Grant user access to domain - self.assignment_api.create_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) - domain_scoped_token = self._get_domain_scoped_token() - # Make sure the token is valid - self._validate_token(domain_scoped_token) - # Disable user - self._set_user_enabled(self.user, enabled=False) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - domain_scoped_token) + def auth_plugin_config_override(self): + methods = ['totp', 'token', 'password'] + super(TestAuthTOTP, self).auth_plugin_config_override(methods) - def test_domain_scoped_token_is_invalid_after_deleting_grant(self): - # Grant user access to domain - self.assignment_api.create_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) - domain_scoped_token = self._get_domain_scoped_token() - # Make sure the token is valid - self._validate_token(domain_scoped_token) - # Delete access to domain - self.assignment_api.delete_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - domain_scoped_token) + def _make_credentials(self, cred_type, count=1, user_id=None, + project_id=None, blob=None): + user_id = user_id or self.default_domain_user['id'] + project_id = project_id or self.default_domain_project['id'] - def test_project_scoped_token_invalid_after_changing_user_password(self): - project_scoped_token = self._get_project_scoped_token() - # Make sure the token is valid - self._validate_token(project_scoped_token) - # Update user's password - self.user['password'] = 'Password1' - self.identity_api.update_user(self.user['id'], self.user) - # Ensure updating user's password revokes existing tokens - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - project_scoped_token) + creds = [] + for __ in range(count): + if cred_type == 'totp': + ref = unit.new_totp_credential( + user_id=user_id, project_id=project_id, blob=blob) + else: + ref = unit.new_credential_ref( + user_id=user_id, project_id=project_id) + resp = self.post('/credentials', body={'credential': ref}) + creds.append(resp.json['credential']) + return creds + + def _make_auth_data_by_id(self, passcode, user_id=None): + return self.build_authentication_request( + user_id=user_id or self.default_domain_user['id'], + passcode=passcode, + project_id=self.project['id']) - def test_project_scoped_token_invalid_after_disabling_project(self): - project_scoped_token = self._get_project_scoped_token() - # Make sure the token is valid - self._validate_token(project_scoped_token) - # Disable project - self.project['enabled'] = False - self.resource_api.update_project(self.project['id'], self.project) - # Ensure validating a token for a disabled project fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - project_scoped_token) + def _make_auth_data_by_name(self, passcode, username, user_domain_id): + return self.build_authentication_request( + username=username, + user_domain_id=user_domain_id, + passcode=passcode, + project_id=self.project['id']) - def test_domain_scoped_token_invalid_after_disabling_domain(self): - # Grant user access to domain + def cleanup(self): + totp_creds = self.credential_api.list_credentials_for_user( + self.default_domain_user['id'], type='totp') + + other_creds = self.credential_api.list_credentials_for_user( + self.default_domain_user['id'], type='other') + + for cred in itertools.chain(other_creds, totp_creds): + self.delete('/credentials/%s' % cred['id'], + expected_status=http_client.NO_CONTENT) + + def test_with_a_valid_passcode(self): + creds = self._make_credentials('totp') + secret = creds[-1]['blob'] + auth_data = self._make_auth_data_by_id( + totp._generate_totp_passcode(secret)) + + self.v3_create_token(auth_data, expected_status=http_client.CREATED) + + def test_with_an_invalid_passcode_and_user_credentials(self): + self._make_credentials('totp') + auth_data = self._make_auth_data_by_id('000000') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_with_an_invalid_passcode_with_no_user_credentials(self): + auth_data = self._make_auth_data_by_id('000000') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_with_a_corrupt_totp_credential(self): + self._make_credentials('totp', count=1, blob='0') + auth_data = self._make_auth_data_by_id('000000') + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_with_multiple_credentials(self): + self._make_credentials('other', 3) + creds = self._make_credentials('totp', count=3) + secret = creds[-1]['blob'] + + auth_data = self._make_auth_data_by_id( + totp._generate_totp_passcode(secret)) + self.v3_create_token(auth_data, expected_status=http_client.CREATED) + + def test_with_multiple_users(self): + # make some credentials for the existing user + self._make_credentials('totp', count=3) + + # create a new user and their credentials + user = unit.create_user(self.identity_api, domain_id=self.domain_id) self.assignment_api.create_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) - domain_scoped_token = self._get_domain_scoped_token() - # Make sure the token is valid - self._validate_token(domain_scoped_token) - # Disable domain - self.domain['enabled'] = False - self.resource_api.update_domain(self.domain['id'], self.domain) - # Ensure validating a token for a disabled domain fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - domain_scoped_token) - - def test_rescope_unscoped_token_with_trust(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - self.assertLess(len(trust_scoped_token), 255) - - def test_validate_a_trust_scoped_token(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) - - def test_validate_tampered_trust_scoped_token_fails(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Get a trust scoped token - tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + - trust_scoped_token[50 + 32:]) - self._validate_token(tampered_token, - expected_status=http_client.NOT_FOUND) + user_id=user['id'], + project_id=self.project['id']) + creds = self._make_credentials('totp', count=1, user_id=user['id']) + secret = creds[-1]['blob'] - def test_revoke_trust_scoped_token(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) - self._revoke_token(trust_scoped_token) - self._validate_token(trust_scoped_token, - expected_status=http_client.NOT_FOUND) - - def test_trust_scoped_token_is_invalid_after_disabling_trustee(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) - - # Disable trustee - trustee_update_ref = dict(enabled=False) - self.identity_api.update_user(trustee_user['id'], trustee_update_ref) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - trust_scoped_token) - - def test_trust_scoped_token_invalid_after_changing_trustee_password(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) - # Change trustee's password - trustee_update_ref = dict(password='Password1') - self.identity_api.update_user(trustee_user['id'], trustee_update_ref) - # Ensure updating trustee's password revokes existing tokens - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - trust_scoped_token) - - def test_trust_scoped_token_is_invalid_after_disabling_trustor(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) - - # Disable the trustor - trustor_update_ref = dict(enabled=False) - self.identity_api.update_user(self.user['id'], trustor_update_ref) - # Ensure validating a token for a disabled user fails - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - trust_scoped_token) + # Stop the clock otherwise there is a chance of auth failure due to + # getting a different TOTP between the call here and the call in the + # auth plugin. + self.useFixture(fixture.TimeFixture()) - def test_trust_scoped_token_invalid_after_changing_trustor_password(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) + auth_data = self._make_auth_data_by_id( + totp._generate_totp_passcode(secret), user_id=user['id']) + self.v3_create_token(auth_data, expected_status=http_client.CREATED) - # Change trustor's password - trustor_update_ref = dict(password='Password1') - self.identity_api.update_user(self.user['id'], trustor_update_ref) - # Ensure updating trustor's password revokes existing user's tokens - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - trust_scoped_token) + def test_with_multiple_users_and_invalid_credentials(self): + """Prevent logging in with someone else's credentials. - def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self): - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - # Validate a trust scoped token - self._validate_token(trust_scoped_token) + It's very easy to forget to limit the credentials query by user. + Let's just test it for a sanity check. + """ + # make some credentials for the existing user + self._make_credentials('totp', count=3) - # Disable trustor's domain - self.domain['enabled'] = False - self.resource_api.update_domain(self.domain['id'], self.domain) + # create a new user and their credentials + new_user = unit.create_user(self.identity_api, + domain_id=self.domain_id) + self.assignment_api.create_grant(self.role['id'], + user_id=new_user['id'], + project_id=self.project['id']) + user2_creds = self._make_credentials( + 'totp', count=1, user_id=new_user['id']) + + user_id = self.default_domain_user['id'] # user1 + secret = user2_creds[-1]['blob'] + + auth_data = self._make_auth_data_by_id( + totp._generate_totp_passcode(secret), user_id=user_id) + self.v3_create_token(auth_data, + expected_status=http_client.UNAUTHORIZED) + + def test_with_username_and_domain_id(self): + creds = self._make_credentials('totp') + secret = creds[-1]['blob'] + auth_data = self._make_auth_data_by_name( + totp._generate_totp_passcode(secret), + username=self.default_domain_user['name'], + user_domain_id=self.default_domain_user['domain_id']) - trustor_update_ref = dict(password='Password1') - self.identity_api.update_user(self.user['id'], trustor_update_ref) - # Ensure updating trustor's password revokes existing user's tokens - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_token, - trust_scoped_token) + self.v3_create_token(auth_data, expected_status=http_client.CREATED) - def test_v2_validate_unscoped_token_returns_unauthorized(self): - """Test raised exception when validating unscoped token. - Test that validating an unscoped token in v2.0 of a v3 user of a - non-default domain returns unauthorized. - """ - unscoped_token = self._get_unscoped_token() - self.assertRaises(exception.Unauthorized, - self.token_provider_api.validate_v2_token, - unscoped_token) +class TestFetchRevocationList(test_v3.RestfulTestCase): + """Test fetch token revocation list on the v3 Identity API.""" - def test_v2_validate_domain_scoped_token_returns_unauthorized(self): - """Test raised exception when validating a domain scoped token. + def config_overrides(self): + super(TestFetchRevocationList, self).config_overrides() + self.config_fixture.config(group='token', revoke_by_id=True) + + def test_ids_no_tokens(self): + # When there's no revoked tokens the response is an empty list, and + # the response is signed. + res = self.get('/auth/tokens/OS-PKI/revoked') + signed = res.json['signed'] + clear = cms.cms_verify(signed, CONF.signing.certfile, + CONF.signing.ca_certs) + payload = json.loads(clear) + self.assertEqual({'revoked': []}, payload) + + def test_ids_token(self): + # When there's a revoked token, it's in the response, and the response + # is signed. + token_res = self.v3_create_token( + self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) - Test that validating an domain scoped token in v2.0 - returns unauthorized. - """ + token_id = token_res.headers.get('X-Subject-Token') + token_data = token_res.json['token'] - # Grant user access to domain - self.assignment_api.create_grant(self.role['id'], - user_id=self.user['id'], - domain_id=self.domain['id']) + self.delete('/auth/tokens', headers={'X-Subject-Token': token_id}) - scoped_token = self._get_domain_scoped_token() - self.assertRaises(exception.Unauthorized, - self.token_provider_api.validate_v2_token, - scoped_token) + res = self.get('/auth/tokens/OS-PKI/revoked') + signed = res.json['signed'] + clear = cms.cms_verify(signed, CONF.signing.certfile, + CONF.signing.ca_certs) + payload = json.loads(clear) - def test_v2_validate_trust_scoped_token(self): - """Test raised exception when validating a trust scoped token. + def truncate(ts_str): + return ts_str[:19] + 'Z' # 2016-01-21T15:53:52 == 19 chars. - Test that validating an trust scoped token in v2.0 returns - unauthorized. - """ + exp_token_revoke_data = { + 'id': token_id, + 'audit_id': token_data['audit_ids'][0], + 'expires': truncate(token_data['expires_at']), + } - trustee_user, trust = self._create_trust() - trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust) - self.assertRaises(exception.Unauthorized, - self.token_provider_api.validate_v2_token, - trust_scoped_token) + self.assertEqual({'revoked': [exp_token_revoke_data]}, payload) + def test_audit_id_only_no_tokens(self): + # When there's no revoked tokens and ?audit_id_only is used, the + # response is an empty list and is not signed. + res = self.get('/auth/tokens/OS-PKI/revoked?audit_id_only') + self.assertEqual({'revoked': []}, res.json) -class TestAuthFernetTokenProvider(TestAuth): - def setUp(self): - super(TestAuthFernetTokenProvider, self).setUp() - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + def test_audit_id_only_token(self): + # When there's a revoked token and ?audit_id_only is used, the + # response contains the audit_id of the token and is not signed. + token_res = self.v3_create_token( + self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.project['id'])) - def config_overrides(self): - super(TestAuthFernetTokenProvider, self).config_overrides() - self.config_fixture.config(group='token', provider='fernet') + token_id = token_res.headers.get('X-Subject-Token') + token_data = token_res.json['token'] - def test_verify_with_bound_token(self): - self.config_fixture.config(group='token', bind='kerberos') - auth_data = self.build_authentication_request( - project_id=self.project['id']) - remote_user = self.default_domain_user['name'] - self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, - 'AUTH_TYPE': 'Negotiate'}) - # Bind not current supported by Fernet, see bug 1433311. - self.v3_authenticate_token(auth_data, expected_status=501) + self.delete('/auth/tokens', headers={'X-Subject-Token': token_id}) - def test_v2_v3_bind_token_intermix(self): - self.config_fixture.config(group='token', bind='kerberos') + res = self.get('/auth/tokens/OS-PKI/revoked?audit_id_only') - # we need our own user registered to the default domain because of - # the way external auth works. - remote_user = self.default_domain_user['name'] - self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, - 'AUTH_TYPE': 'Negotiate'}) - body = {'auth': {}} - # Bind not current supported by Fernet, see bug 1433311. - self.admin_request(path='/v2.0/tokens', - method='POST', - body=body, - expected_status=501) + def truncate(ts_str): + return ts_str[:19] + 'Z' # 2016-01-21T15:53:52 == 19 chars. - def test_auth_with_bind_token(self): - self.config_fixture.config(group='token', bind=['kerberos']) + exp_token_revoke_data = { + 'audit_id': token_data['audit_ids'][0], + 'expires': truncate(token_data['expires_at']), + } - auth_data = self.build_authentication_request() - remote_user = self.default_domain_user['name'] - self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, - 'AUTH_TYPE': 'Negotiate'}) - # Bind not current supported by Fernet, see bug 1433311. - self.v3_authenticate_token(auth_data, expected_status=501) + self.assertEqual({'revoked': [exp_token_revoke_data]}, res.json) |