diff options
author | asteroide <thomas.duval@orange.com> | 2015-09-01 16:03:26 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-09-01 16:04:53 +0200 |
commit | 92fd2dbfb672d7b2b1cdfd5dd5cf89f7716b3e12 (patch) | |
tree | 7ba22297042019e7363fa1d4ad26d1c32c5908c6 /keystone-moon/keystone/tests/unit/test_v3_auth.py | |
parent | 26e753254f3e43399cc76e62892908b7742415e8 (diff) |
Update Keystone code from official Github repository with branch Master on 09/01/2015.
Change-Id: I0ff6099e6e2580f87f502002a998bbfe12673498
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_auth.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_auth.py | 843 |
1 files changed, 448 insertions, 395 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_auth.py b/keystone-moon/keystone/tests/unit/test_v3_auth.py index ec079170..96f0ff1f 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_auth.py +++ b/keystone-moon/keystone/tests/unit/test_v3_auth.py @@ -22,18 +22,18 @@ from keystoneclient.common import cms import mock from oslo_config import cfg from oslo_utils import timeutils -import six +from six.moves import range from testtools import matchers from testtools import testcase from keystone import auth +from keystone.common import utils from keystone import exception from keystone.policy.backends import rules from keystone.tests import unit as tests from keystone.tests.unit import ksfixtures from keystone.tests.unit import test_v3 - CONF = cfg.CONF @@ -97,8 +97,8 @@ class TestAuthInfo(test_v3.AuthTestMixin, testcase.TestCase): 'password', 'password'] context = None auth_info = auth.controllers.AuthInfo.create(context, auth_data) - self.assertEqual(auth_info.get_method_names(), - ['password', 'token']) + self.assertEqual(['password', 'token'], + auth_info.get_method_names()) def test_get_method_data_invalid_method(self): auth_data = self.build_authentication_request( @@ -114,276 +114,294 @@ class TestAuthInfo(test_v3.AuthTestMixin, testcase.TestCase): class TokenAPITests(object): - # Why is this not just setUP? Because TokenAPITests is not a test class + # Why is this not just setUp? Because TokenAPITests is not a test class # itself. If TokenAPITests became a subclass of the testcase, it would get # called by the enumerate-tests-in-file code. The way the functions get # resolved in Python for multiple inheritance means that a setUp in this # would get skipped by the testrunner. def doSetUp(self): - auth_data = self.build_authentication_request( + r = self.v3_authenticate_token(self.build_authentication_request( username=self.user['name'], user_domain_id=self.domain_id, - password=self.user['password']) - resp = self.v3_authenticate_token(auth_data) - self.token_data = resp.result - self.token = resp.headers.get('X-Subject-Token') - self.headers = {'X-Subject-Token': resp.headers.get('X-Subject-Token')} + password=self.user['password'])) + self.v3_token_data = r.result + self.v3_token = r.headers.get('X-Subject-Token') + self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')} def test_default_fixture_scope_token(self): self.assertIsNotNone(self.get_scoped_token()) - def verify_token(self, *args, **kwargs): - return cms.verify_token(*args, **kwargs) - - def test_v3_token_id(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password']) - resp = self.v3_authenticate_token(auth_data) - token_data = resp.result - token_id = resp.headers.get('X-Subject-Token') - self.assertIn('expires_at', token_data['token']) - - decoded_token = self.verify_token(token_id, CONF.signing.certfile, - CONF.signing.ca_certs) - decoded_token_dict = json.loads(decoded_token) - - token_resp_dict = json.loads(resp.body) - - self.assertEqual(decoded_token_dict, token_resp_dict) - # should be able to validate hash PKI token as well - hash_token_id = cms.cms_hash_token(token_id) - headers = {'X-Subject-Token': hash_token_id} - resp = self.get('/auth/tokens', headers=headers) - expected_token_data = resp.result - self.assertDictEqual(expected_token_data, token_data) - def test_v3_v2_intermix_non_default_domain_failed(self): - auth_data = self.build_authentication_request( + v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.user['id'], - password=self.user['password']) - token = self.get_requested_token(auth_data) + password=self.user['password'])) # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request(path=path, - token='ADMIN', - method='GET', - expected_status=401) + self.admin_request( + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + method='GET', + expected_status=401) def test_v3_v2_intermix_new_default_domain(self): # If the default_domain_id config option is changed, then should be # able to validate a v3 token with user in the new domain. # 1) Create a new domain for the user. - new_domain_id = uuid.uuid4().hex new_domain = { 'description': uuid.uuid4().hex, 'enabled': True, - 'id': new_domain_id, + 'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, } - - self.resource_api.create_domain(new_domain_id, new_domain) + self.resource_api.create_domain(new_domain['id'], new_domain) # 2) Create user in new domain. new_user_password = uuid.uuid4().hex new_user = { 'name': uuid.uuid4().hex, - 'domain_id': new_domain_id, + 'domain_id': new_domain['id'], 'password': new_user_password, 'email': uuid.uuid4().hex, } - new_user = self.identity_api.create_user(new_user) # 3) Update the default_domain_id config option to the new domain + self.config_fixture.config( + group='identity', + default_domain_id=new_domain['id']) - self.config_fixture.config(group='identity', - default_domain_id=new_domain_id) - - # 4) Get a token using v3 api. - - auth_data = self.build_authentication_request( + # 4) Get a token using v3 API. + v3_token = self.get_requested_token(self.build_authentication_request( user_id=new_user['id'], - password=new_user_password) - token = self.get_requested_token(auth_data) + password=new_user_password)) - # 5) Authenticate token using v2 api. - - path = '/v2.0/tokens/%s' % (token) - self.admin_request(path=path, - token='ADMIN', - method='GET') + # 5) Validate token using v2 API. + self.admin_request( + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + method='GET') def test_v3_v2_intermix_domain_scoped_token_failed(self): # grant the domain role to user - path = '/domains/%s/users/%s/roles/%s' % ( - self.domain['id'], self.user['id'], self.role['id']) - self.put(path=path) - auth_data = self.build_authentication_request( + self.put( + path='/domains/%s/users/%s/roles/%s' % ( + self.domain['id'], self.user['id'], self.role['id'])) + + # generate a domain-scoped v3 token + v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], - domain_id=self.domain['id']) - token = self.get_requested_token(auth_data) + domain_id=self.domain['id'])) - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request(path=path, - token='ADMIN', - method='GET', - expected_status=401) + # domain-scoped tokens are not supported by v2 + self.admin_request( + method='GET', + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + expected_status=401) def test_v3_v2_intermix_non_default_project_failed(self): - auth_data = self.build_authentication_request( + # self.project is in a non-default domain + v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], - project_id=self.project['id']) - token = self.get_requested_token(auth_data) + project_id=self.project['id'])) - # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - self.admin_request(path=path, - token='ADMIN', - method='GET', - expected_status=401) + # v2 cannot reference projects outside the default domain + self.admin_request( + method='GET', + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + expected_status=401) + + def test_v3_v2_intermix_non_default_user_failed(self): + self.assignment_api.create_grant( + self.role['id'], + user_id=self.user['id'], + project_id=self.default_domain_project['id']) + + # self.user is in a non-default domain + v3_token = self.get_requested_token(self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password'], + project_id=self.default_domain_project['id'])) + + # v2 cannot reference projects outside the default domain + self.admin_request( + method='GET', + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + expected_status=401) + + def test_v3_v2_intermix_domain_scope_failed(self): + self.assignment_api.create_grant( + self.role['id'], + user_id=self.default_domain_user['id'], + domain_id=self.domain['id']) + + v3_token = self.get_requested_token(self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + domain_id=self.domain['id'])) + + # v2 cannot reference projects outside the default domain + self.admin_request( + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + method='GET', + expected_status=401) def test_v3_v2_unscoped_token_intermix(self): - auth_data = self.build_authentication_request( + r = self.v3_authenticate_token(self.build_authentication_request( user_id=self.default_domain_user['id'], - password=self.default_domain_user['password']) - resp = self.v3_authenticate_token(auth_data) - token_data = resp.result - token = resp.headers.get('X-Subject-Token') + password=self.default_domain_user['password'])) + self.assertValidUnscopedTokenResponse(r) + v3_token_data = r.result + v3_token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - resp = self.admin_request(path=path, - token='ADMIN', - method='GET') - v2_token = resp.result - self.assertEqual(v2_token['access']['user']['id'], - token_data['token']['user']['id']) + r = self.admin_request( + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token, + method='GET') + v2_token_data = r.result + + self.assertEqual(v2_token_data['access']['user']['id'], + v3_token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees - self.assertIn(v2_token['access']['token']['expires'][:-1], - token_data['token']['expires_at']) + self.assertIn(v2_token_data['access']['token']['expires'][:-1], + v3_token_data['token']['expires_at']) def test_v3_v2_token_intermix(self): # FIXME(gyee): PKI tokens are not interchangeable because token # data is baked into the token itself. - auth_data = self.build_authentication_request( + r = self.v3_authenticate_token(self.build_authentication_request( user_id=self.default_domain_user['id'], password=self.default_domain_user['password'], - project_id=self.default_domain_project['id']) - resp = self.v3_authenticate_token(auth_data) - token_data = resp.result - token = resp.headers.get('X-Subject-Token') + project_id=self.default_domain_project['id'])) + self.assertValidProjectScopedTokenResponse(r) + v3_token_data = r.result + v3_token = r.headers.get('X-Subject-Token') # now validate the v3 token with v2 API - path = '/v2.0/tokens/%s' % (token) - resp = self.admin_request(path=path, - token='ADMIN', - method='GET') - v2_token = resp.result - self.assertEqual(v2_token['access']['user']['id'], - token_data['token']['user']['id']) - # v2 token time has not fraction of second precision so - # just need to make sure the non fraction part agrees - self.assertIn(v2_token['access']['token']['expires'][:-1], - token_data['token']['expires_at']) - self.assertEqual(v2_token['access']['user']['roles'][0]['id'], - token_data['token']['roles'][0]['id']) + r = self.admin_request( + method='GET', + path='/v2.0/tokens/%s' % v3_token, + token=CONF.admin_token) + v2_token_data = r.result - def test_v3_v2_hashed_pki_token_intermix(self): - auth_data = self.build_authentication_request( - user_id=self.default_domain_user['id'], - password=self.default_domain_user['password'], - project_id=self.default_domain_project['id']) - resp = self.v3_authenticate_token(auth_data) - token_data = resp.result - token = resp.headers.get('X-Subject-Token') - - # should be able to validate a hash PKI token in v2 too - token = cms.cms_hash_token(token) - path = '/v2.0/tokens/%s' % (token) - resp = self.admin_request(path=path, - token='ADMIN', - method='GET') - v2_token = resp.result - self.assertEqual(v2_token['access']['user']['id'], - token_data['token']['user']['id']) + self.assertEqual(v2_token_data['access']['user']['id'], + v3_token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees - self.assertIn(v2_token['access']['token']['expires'][:-1], - token_data['token']['expires_at']) - self.assertEqual(v2_token['access']['user']['roles'][0]['id'], - token_data['token']['roles'][0]['id']) + self.assertIn(v2_token_data['access']['token']['expires'][:-1], + v3_token_data['token']['expires_at']) + self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'], + v3_token_data['token']['roles'][0]['name']) def test_v2_v3_unscoped_token_intermix(self): - body = { - 'auth': { - 'passwordCredentials': { - 'userId': self.user['id'], - 'password': self.user['password'] + r = self.admin_request( + method='POST', + path='/v2.0/tokens', + body={ + 'auth': { + 'passwordCredentials': { + 'userId': self.default_domain_user['id'], + 'password': self.default_domain_user['password'] + } } - }} - resp = self.admin_request(path='/v2.0/tokens', - method='POST', - body=body) - v2_token_data = resp.result + }) + v2_token_data = r.result v2_token = v2_token_data['access']['token']['id'] - headers = {'X-Subject-Token': v2_token} - resp = self.get('/auth/tokens', headers=headers) - token_data = resp.result + + r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token}) + # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are + # missing timezones, so they will not pass this assertion. + # self.assertValidUnscopedTokenResponse(r) + v3_token_data = r.result + self.assertEqual(v2_token_data['access']['user']['id'], - token_data['token']['user']['id']) + v3_token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], - token_data['token']['expires_at']) + v3_token_data['token']['expires_at']) def test_v2_v3_token_intermix(self): - body = { - 'auth': { - 'passwordCredentials': { - 'userId': self.user['id'], - 'password': self.user['password'] - }, - 'tenantId': self.project['id'] - }} - resp = self.admin_request(path='/v2.0/tokens', - method='POST', - body=body) - v2_token_data = resp.result + r = self.admin_request( + path='/v2.0/tokens', + method='POST', + body={ + 'auth': { + 'passwordCredentials': { + 'userId': self.default_domain_user['id'], + 'password': self.default_domain_user['password'] + }, + 'tenantId': self.default_domain_project['id'] + } + }) + v2_token_data = r.result v2_token = v2_token_data['access']['token']['id'] - headers = {'X-Subject-Token': v2_token} - resp = self.get('/auth/tokens', headers=headers) - token_data = resp.result + + r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token}) + # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are + # missing timezones, so they will not pass this assertion. + # self.assertValidProjectScopedTokenResponse(r) + v3_token_data = r.result + self.assertEqual(v2_token_data['access']['user']['id'], - token_data['token']['user']['id']) + v3_token_data['token']['user']['id']) # v2 token time has not fraction of second precision so # just need to make sure the non fraction part agrees self.assertIn(v2_token_data['access']['token']['expires'][-1], - token_data['token']['expires_at']) + v3_token_data['token']['expires_at']) self.assertEqual(v2_token_data['access']['user']['roles'][0]['name'], - token_data['token']['roles'][0]['name']) + v3_token_data['token']['roles'][0]['name']) v2_issued_at = timeutils.parse_isotime( v2_token_data['access']['token']['issued_at']) v3_issued_at = timeutils.parse_isotime( - token_data['token']['issued_at']) + v3_token_data['token']['issued_at']) self.assertEqual(v2_issued_at, v3_issued_at) + def test_v2_token_deleted_on_v3(self): + # Create a v2 token. + body = { + 'auth': { + 'passwordCredentials': { + 'userId': self.default_domain_user['id'], + 'password': self.default_domain_user['password'] + }, + 'tenantId': self.default_domain_project['id'] + } + } + r = self.admin_request( + path='/v2.0/tokens', method='POST', body=body) + v2_token = r.result['access']['token']['id'] + + # Delete the v2 token using v3. + resp = self.delete( + '/auth/tokens', headers={'X-Subject-Token': v2_token}) + self.assertEqual(resp.status_code, 204) + + # Attempting to use the deleted token on v2 should fail. + self.admin_request( + path='/v2.0/tenants', method='GET', token=v2_token, + expected_status=401) + def test_rescoping_token(self): - expires = self.token_data['token']['expires_at'] - auth_data = self.build_authentication_request( - token=self.token, - project_id=self.project_id) - r = self.v3_authenticate_token(auth_data) + expires = self.v3_token_data['token']['expires_at'] + + # rescope the token + r = self.v3_authenticate_token(self.build_authentication_request( + token=self.v3_token, + project_id=self.project_id)) self.assertValidProjectScopedTokenResponse(r) - # make sure expires stayed the same + + # ensure token expiration stayed the same self.assertEqual(expires, r.result['token']['expires_at']) def test_check_token(self): @@ -394,12 +412,13 @@ class TokenAPITests(object): self.assertValidUnscopedTokenResponse(r) def test_validate_token_nocatalog(self): - auth_data = self.build_authentication_request( + v3_token = self.get_requested_token(self.build_authentication_request( user_id=self.user['id'], password=self.user['password'], - project_id=self.project['id']) - headers = {'X-Subject-Token': self.get_requested_token(auth_data)} - r = self.get('/auth/tokens?nocatalog', headers=headers) + project_id=self.project['id'])) + r = self.get( + '/auth/tokens?nocatalog', + headers={'X-Subject-Token': v3_token}) self.assertValidProjectScopedTokenResponse(r, require_catalog=False) @@ -420,10 +439,10 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): def _v2_token(self): body = { 'auth': { - "tenantId": self.project['id'], + "tenantId": self.default_domain_project['id'], 'passwordCredentials': { - 'userId': self.user['id'], - 'password': self.user['password'] + 'userId': self.default_domain_user['id'], + 'password': self.default_domain_user['password'] } }} resp = self.admin_request(path='/v2.0/tokens', @@ -462,7 +481,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): def test_rescoped_domain_token_disabled(self): self.domainA = self.new_domain_ref() - self.assignment_api.create_domain(self.domainA['id'], self.domainA) + self.resource_api.create_domain(self.domainA['id'], self.domainA) self.assignment_api.create_grant(self.role['id'], user_id=self.user['id'], domain_id=self.domainA['id']) @@ -485,37 +504,77 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase): class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests): def config_overrides(self): super(TestPKITokenAPIs, self).config_overrides() - self.config_fixture.config( - group='token', - provider='keystone.token.providers.pki.Provider') + self.config_fixture.config(group='token', provider='pki') def setUp(self): super(TestPKITokenAPIs, self).setUp() self.doSetUp() + def verify_token(self, *args, **kwargs): + return cms.verify_token(*args, **kwargs) -class TestPKIZTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): + def test_v3_token_id(self): + auth_data = self.build_authentication_request( + user_id=self.user['id'], + password=self.user['password']) + resp = self.v3_authenticate_token(auth_data) + token_data = resp.result + token_id = resp.headers.get('X-Subject-Token') + self.assertIn('expires_at', token_data['token']) - def verify_token(self, *args, **kwargs): - return cms.pkiz_verify(*args, **kwargs) + decoded_token = self.verify_token(token_id, CONF.signing.certfile, + CONF.signing.ca_certs) + decoded_token_dict = json.loads(decoded_token) + + token_resp_dict = json.loads(resp.body) + + self.assertEqual(decoded_token_dict, token_resp_dict) + # should be able to validate hash PKI token as well + hash_token_id = cms.cms_hash_token(token_id) + headers = {'X-Subject-Token': hash_token_id} + resp = self.get('/auth/tokens', headers=headers) + expected_token_data = resp.result + self.assertDictEqual(expected_token_data, token_data) + def test_v3_v2_hashed_pki_token_intermix(self): + auth_data = self.build_authentication_request( + user_id=self.default_domain_user['id'], + password=self.default_domain_user['password'], + project_id=self.default_domain_project['id']) + resp = self.v3_authenticate_token(auth_data) + token_data = resp.result + token = resp.headers.get('X-Subject-Token') + + # should be able to validate a hash PKI token in v2 too + token = cms.cms_hash_token(token) + path = '/v2.0/tokens/%s' % (token) + resp = self.admin_request(path=path, + token=CONF.admin_token, + method='GET') + v2_token = resp.result + self.assertEqual(v2_token['access']['user']['id'], + token_data['token']['user']['id']) + # v2 token time has not fraction of second precision so + # just need to make sure the non fraction part agrees + self.assertIn(v2_token['access']['token']['expires'][:-1], + token_data['token']['expires_at']) + self.assertEqual(v2_token['access']['user']['roles'][0]['id'], + token_data['token']['roles'][0]['id']) + + +class TestPKIZTokenAPIs(TestPKITokenAPIs): def config_overrides(self): super(TestPKIZTokenAPIs, self).config_overrides() - self.config_fixture.config( - group='token', - provider='keystone.token.providers.pkiz.Provider') + self.config_fixture.config(group='token', provider='pkiz') - def setUp(self): - super(TestPKIZTokenAPIs, self).setUp() - self.doSetUp() + def verify_token(self, *args, **kwargs): + return cms.pkiz_verify(*args, **kwargs) class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): def config_overrides(self): super(TestUUIDTokenAPIs, self).config_overrides() - self.config_fixture.config( - group='token', - provider='keystone.token.providers.uuid.Provider') + self.config_fixture.config(group='token', provider='uuid') def setUp(self): super(TestUUIDTokenAPIs, self).setUp() @@ -531,10 +590,16 @@ class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): self.assertIn('expires_at', token_data['token']) self.assertFalse(cms.is_asn1_token(token_id)) - def test_v3_v2_hashed_pki_token_intermix(self): - # this test is only applicable for PKI tokens - # skipping it for UUID tokens - pass + +class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests): + def config_overrides(self): + super(TestFernetTokenAPIs, self).config_overrides() + self.config_fixture.config(group='token', provider='fernet') + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + + def setUp(self): + super(TestFernetTokenAPIs, self).setUp() + self.doSetUp() class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase): @@ -675,12 +740,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): def config_overrides(self): super(TestTokenRevokeById, self).config_overrides() - self.config_fixture.config( - group='revoke', - driver='keystone.contrib.revoke.backends.kvs.Revoke') + self.config_fixture.config(group='revoke', driver='kvs') self.config_fixture.config( group='token', - provider='keystone.token.providers.pki.Provider', + provider='pki', revoke_by_id=False) def setUp(self): @@ -1069,7 +1132,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): - Delete the grant group1 has on ProjectA - Check tokens for user1 & user2 are no longer valid, since user1 and user2 are members of group1 - - Check token for user3 is still valid + - Check token for user3 is invalid too """ auth_data = self.build_authentication_request( @@ -1112,10 +1175,11 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): self.head('/auth/tokens', headers={'X-Subject-Token': token2}, expected_status=404) - # But user3's token should still be valid + # But user3's token should be invalid too as revocation is done for + # scope role & project self.head('/auth/tokens', headers={'X-Subject-Token': token3}, - expected_status=200) + expected_status=404) def test_domain_group_role_assignment_maintains_token(self): """Test domain-group role assignment maintains existing token. @@ -1202,6 +1266,14 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): def test_removing_role_assignment_does_not_affect_other_users(self): """Revoking a role from one user should not affect other users.""" + + # This group grant is not needed for the test + self.delete( + '/projects/%(project_id)s/groups/%(group_id)s/roles/%(role_id)s' % + {'project_id': self.projectA['id'], + 'group_id': self.group1['id'], + 'role_id': self.role1['id']}) + user1_token = self.get_requested_token( self.build_authentication_request( user_id=self.user1['id'], @@ -1220,12 +1292,6 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): 'project_id': self.projectA['id'], 'user_id': self.user1['id'], 'role_id': self.role1['id']}) - self.delete( - '/projects/%(project_id)s/groups/%(group_id)s/roles/%(role_id)s' % - {'project_id': self.projectA['id'], - 'group_id': self.group1['id'], - 'role_id': self.role1['id']}) - # authorization for the first user should now fail self.head('/auth/tokens', headers={'X-Subject-Token': user1_token}, @@ -1384,6 +1450,58 @@ class TestTokenRevokeById(test_v3.RestfulTestCase): expected_status=200) +class TestTokenRevokeByAssignment(TestTokenRevokeById): + + def config_overrides(self): + super(TestTokenRevokeById, self).config_overrides() + self.config_fixture.config( + group='revoke', + driver='kvs') + self.config_fixture.config( + group='token', + provider='uuid', + revoke_by_id=True) + + def test_removing_role_assignment_keeps_other_project_token_groups(self): + """Test assignment isolation. + + Revoking a group role from one project should not invalidate all group + users' tokens + """ + self.assignment_api.create_grant(self.role1['id'], + group_id=self.group1['id'], + project_id=self.projectB['id']) + + project_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password'], + project_id=self.projectB['id'])) + + other_project_token = self.get_requested_token( + self.build_authentication_request( + user_id=self.user1['id'], + password=self.user1['password'], + project_id=self.projectA['id'])) + + self.assignment_api.delete_grant(self.role1['id'], + group_id=self.group1['id'], + project_id=self.projectB['id']) + + # authorization for the projectA should still succeed + self.head('/auth/tokens', + headers={'X-Subject-Token': other_project_token}, + expected_status=200) + # while token for the projectB should not + self.head('/auth/tokens', + headers={'X-Subject-Token': project_token}, + expected_status=404) + revoked_tokens = [ + t['id'] for t in self.token_provider_api.list_revoked_tokens()] + # token is in token revocation list + self.assertIn(project_token, revoked_tokens) + + class TestTokenRevokeApi(TestTokenRevokeById): EXTENSION_NAME = 'revoke' EXTENSION_TO_ADD = 'revoke_extension' @@ -1391,12 +1509,10 @@ class TestTokenRevokeApi(TestTokenRevokeById): """Test token revocation on the v3 Identity API.""" def config_overrides(self): super(TestTokenRevokeApi, self).config_overrides() - self.config_fixture.config( - group='revoke', - driver='keystone.contrib.revoke.backends.kvs.Revoke') + self.config_fixture.config(group='revoke', driver='kvs') self.config_fixture.config( group='token', - provider='keystone.token.providers.pki.Provider', + provider='pki', revoke_by_id=False) def assertValidDeletedProjectResponse(self, events_response, project_id): @@ -1424,7 +1540,7 @@ class TestTokenRevokeApi(TestTokenRevokeById): def assertValidRevokedTokenResponse(self, events_response, **kwargs): events = events_response['events'] self.assertEqual(1, len(events)) - for k, v in six.iteritems(kwargs): + for k, v in kwargs.items(): self.assertEqual(v, events[0].get(k)) self.assertIsNotNone(events[0]['issued_before']) self.assertIsNotNone(events_response['links']) @@ -1494,7 +1610,7 @@ class TestTokenRevokeApi(TestTokenRevokeById): def assertEventDataInList(self, events, **kwargs): found = False for e in events: - for key, value in six.iteritems(kwargs): + for key, value in kwargs.items(): try: if e[key] != value: break @@ -1512,8 +1628,7 @@ class TestTokenRevokeApi(TestTokenRevokeById): 'find event with key-value pairs. Expected: ' '"%(expected)s" Events: "%(events)s"' % {'expected': ','.join( - ["'%s=%s'" % (k, v) for k, v in six.iteritems( - kwargs)]), + ["'%s=%s'" % (k, v) for k, v in kwargs.items()]), 'events': events}) def test_list_delete_token_shows_in_event_list(self): @@ -1569,8 +1684,8 @@ class TestTokenRevokeApi(TestTokenRevokeById): expected_status=200).json_body['events'] self.assertEqual(2, len(events)) - future = timeutils.isotime(timeutils.utcnow() + - datetime.timedelta(seconds=1000)) + future = utils.isotime(timeutils.utcnow() + + datetime.timedelta(seconds=1000)) events = self.get('/OS-REVOKE/events?since=%s' % (future), expected_status=200).json_body['events'] @@ -1596,148 +1711,116 @@ class TestAuthExternalDisabled(test_v3.RestfulTestCase): auth_context) -class TestAuthExternalLegacyDefaultDomain(test_v3.RestfulTestCase): - content_type = 'json' - - def config_overrides(self): - super(TestAuthExternalLegacyDefaultDomain, self).config_overrides() - self.auth_plugin_config_override( - methods=['external', 'password', 'token'], - external='keystone.auth.plugins.external.LegacyDefaultDomain', - password='keystone.auth.plugins.password.Password', - token='keystone.auth.plugins.token.Token') - - def test_remote_user_no_realm(self): - self.config_fixture.config(group='auth', methods='external') - api = auth.controllers.Auth() - context, auth_info, auth_context = self.build_external_auth_request( - self.default_domain_user['name']) - api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], - self.default_domain_user['id']) - - def test_remote_user_no_domain(self): - api = auth.controllers.Auth() - context, auth_info, auth_context = self.build_external_auth_request( - self.user['name']) - self.assertRaises(exception.Unauthorized, - api.authenticate, - context, - auth_info, - auth_context) - - -class TestAuthExternalLegacyDomain(test_v3.RestfulTestCase): +class TestAuthExternalDomain(test_v3.RestfulTestCase): content_type = 'json' def config_overrides(self): - super(TestAuthExternalLegacyDomain, self).config_overrides() - self.auth_plugin_config_override( - methods=['external', 'password', 'token'], - external='keystone.auth.plugins.external.LegacyDomain', - password='keystone.auth.plugins.password.Password', - token='keystone.auth.plugins.token.Token') + super(TestAuthExternalDomain, self).config_overrides() + self.kerberos = False + self.auth_plugin_config_override(external='Domain') def test_remote_user_with_realm(self): api = auth.controllers.Auth() - remote_user = '%s@%s' % (self.user['name'], self.domain['name']) + remote_user = self.user['name'] + remote_domain = self.domain['name'] context, auth_info, auth_context = self.build_external_auth_request( - remote_user) + remote_user, remote_domain=remote_domain, kerberos=self.kerberos) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], self.user['id']) + self.assertEqual(self.user['id'], auth_context['user_id']) # Now test to make sure the user name can, itself, contain the # '@' character. user = {'name': 'myname@mydivision'} self.identity_api.update_user(self.user['id'], user) - remote_user = '%s@%s' % (user['name'], self.domain['name']) + remote_user = user['name'] context, auth_info, auth_context = self.build_external_auth_request( - remote_user) + remote_user, remote_domain=remote_domain, kerberos=self.kerberos) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], self.user['id']) + self.assertEqual(self.user['id'], auth_context['user_id']) def test_project_id_scoped_with_remote_user(self): self.config_fixture.config(group='token', bind=['kerberos']) auth_data = self.build_authentication_request( - project_id=self.project['id']) - remote_user = '%s@%s' % (self.user['name'], self.domain['name']) + project_id=self.project['id'], + kerberos=self.kerberos) + remote_user = self.user['name'] + remote_domain = self.domain['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, + 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) r = self.v3_authenticate_token(auth_data) token = self.assertValidProjectScopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], self.user['name']) + self.assertEqual(self.user['name'], token['bind']['kerberos']) def test_unscoped_bind_with_remote_user(self): self.config_fixture.config(group='token', bind=['kerberos']) - auth_data = self.build_authentication_request() - remote_user = '%s@%s' % (self.user['name'], self.domain['name']) + auth_data = self.build_authentication_request(kerberos=self.kerberos) + remote_user = self.user['name'] + remote_domain = self.domain['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, + 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) r = self.v3_authenticate_token(auth_data) token = self.assertValidUnscopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], self.user['name']) + self.assertEqual(self.user['name'], token['bind']['kerberos']) -class TestAuthExternalDomain(test_v3.RestfulTestCase): +class TestAuthExternalDefaultDomain(test_v3.RestfulTestCase): content_type = 'json' def config_overrides(self): - super(TestAuthExternalDomain, self).config_overrides() + super(TestAuthExternalDefaultDomain, self).config_overrides() self.kerberos = False self.auth_plugin_config_override( - methods=['external', 'password', 'token'], - external='keystone.auth.plugins.external.Domain', - password='keystone.auth.plugins.password.Password', - token='keystone.auth.plugins.token.Token') + external='keystone.auth.plugins.external.DefaultDomain') - def test_remote_user_with_realm(self): + def test_remote_user_with_default_domain(self): api = auth.controllers.Auth() - remote_user = self.user['name'] - remote_domain = self.domain['name'] + remote_user = self.default_domain_user['name'] context, auth_info, auth_context = self.build_external_auth_request( - remote_user, remote_domain=remote_domain, kerberos=self.kerberos) + remote_user, kerberos=self.kerberos) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], self.user['id']) + self.assertEqual(self.default_domain_user['id'], + auth_context['user_id']) # Now test to make sure the user name can, itself, contain the # '@' character. user = {'name': 'myname@mydivision'} - self.identity_api.update_user(self.user['id'], user) + self.identity_api.update_user(self.default_domain_user['id'], user) remote_user = user['name'] context, auth_info, auth_context = self.build_external_auth_request( - remote_user, remote_domain=remote_domain, kerberos=self.kerberos) + remote_user, kerberos=self.kerberos) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], self.user['id']) + self.assertEqual(self.default_domain_user['id'], + auth_context['user_id']) def test_project_id_scoped_with_remote_user(self): self.config_fixture.config(group='token', bind=['kerberos']) auth_data = self.build_authentication_request( - project_id=self.project['id'], + project_id=self.default_domain_project['id'], kerberos=self.kerberos) - remote_user = self.user['name'] - remote_domain = self.domain['name'] + remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, - 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) r = self.v3_authenticate_token(auth_data) token = self.assertValidProjectScopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], self.user['name']) + self.assertEqual(self.default_domain_user['name'], + token['bind']['kerberos']) def test_unscoped_bind_with_remote_user(self): self.config_fixture.config(group='token', bind=['kerberos']) auth_data = self.build_authentication_request(kerberos=self.kerberos) - remote_user = self.user['name'] - remote_domain = self.domain['name'] + remote_user = self.default_domain_user['name'] self.admin_app.extra_environ.update({'REMOTE_USER': remote_user, - 'REMOTE_DOMAIN': remote_domain, 'AUTH_TYPE': 'Negotiate'}) r = self.v3_authenticate_token(auth_data) token = self.assertValidUnscopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], self.user['name']) + self.assertEqual(self.default_domain_user['name'], + token['bind']['kerberos']) class TestAuthKerberos(TestAuthExternalDomain): @@ -1746,10 +1829,7 @@ class TestAuthKerberos(TestAuthExternalDomain): super(TestAuthKerberos, self).config_overrides() self.kerberos = True self.auth_plugin_config_override( - methods=['kerberos', 'password', 'token'], - kerberos='keystone.auth.plugins.external.KerberosDomain', - password='keystone.auth.plugins.password.Password', - token='keystone.auth.plugins.token.Token') + methods=['kerberos', 'password', 'token']) class TestAuth(test_v3.RestfulTestCase): @@ -1815,7 +1895,7 @@ class TestAuth(test_v3.RestfulTestCase): password=self.user['password']) r = self.v3_authenticate_token(auth_data) self.assertValidProjectScopedTokenResponse(r) - self.assertEqual(r.result['token']['project']['id'], project['id']) + self.assertEqual(project['id'], r.result['token']['project']['id']) def test_default_project_id_scoped_token_with_user_id_no_catalog(self): project = self._second_project_as_default() @@ -1826,7 +1906,7 @@ class TestAuth(test_v3.RestfulTestCase): password=self.user['password']) r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True) self.assertValidProjectScopedTokenResponse(r, require_catalog=False) - self.assertEqual(r.result['token']['project']['id'], project['id']) + self.assertEqual(project['id'], r.result['token']['project']['id']) def test_explicit_unscoped_token(self): self._second_project_as_default() @@ -1850,8 +1930,8 @@ class TestAuth(test_v3.RestfulTestCase): project_id=self.project['id']) r = self.post('/auth/tokens?nocatalog', body=auth_data, noauth=True) self.assertValidProjectScopedTokenResponse(r, require_catalog=False) - self.assertEqual(r.result['token']['project']['id'], - self.project['id']) + self.assertEqual(self.project['id'], + r.result['token']['project']['id']) def test_auth_catalog_attributes(self): auth_data = self.build_authentication_request( @@ -2345,13 +2425,12 @@ class TestAuth(test_v3.RestfulTestCase): self.v3_authenticate_token(auth_data, expected_status=401) def test_remote_user_no_realm(self): - CONF.auth.methods = 'external' api = auth.controllers.Auth() context, auth_info, auth_context = self.build_external_auth_request( self.default_domain_user['name']) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], - self.default_domain_user['id']) + self.assertEqual(self.default_domain_user['id'], + auth_context['user_id']) # Now test to make sure the user name can, itself, contain the # '@' character. user = {'name': 'myname@mydivision'} @@ -2359,8 +2438,8 @@ class TestAuth(test_v3.RestfulTestCase): context, auth_info, auth_context = self.build_external_auth_request( user["name"]) api.authenticate(context, auth_info, auth_context) - self.assertEqual(auth_context['user_id'], - self.default_domain_user['id']) + self.assertEqual(self.default_domain_user['id'], + auth_context['user_id']) def test_remote_user_no_domain(self): api = auth.controllers.Auth() @@ -2441,8 +2520,8 @@ class TestAuth(test_v3.RestfulTestCase): headers = {'X-Subject-Token': token} r = self.get('/auth/tokens', headers=headers, token=token) token = self.assertValidProjectScopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], - self.default_domain_user['name']) + self.assertEqual(self.default_domain_user['name'], + token['bind']['kerberos']) def test_auth_with_bind_token(self): self.config_fixture.config(group='token', bind=['kerberos']) @@ -2455,7 +2534,7 @@ class TestAuth(test_v3.RestfulTestCase): # the unscoped token should have bind information in it token = self.assertValidUnscopedTokenResponse(r) - self.assertEqual(token['bind']['kerberos'], remote_user) + self.assertEqual(remote_user, token['bind']['kerberos']) token = r.headers.get('X-Subject-Token') @@ -2466,7 +2545,7 @@ class TestAuth(test_v3.RestfulTestCase): token = self.assertValidProjectScopedTokenResponse(r) # the bind information should be carried over from the original token - self.assertEqual(token['bind']['kerberos'], remote_user) + self.assertEqual(remote_user, token['bind']['kerberos']) def test_v2_v3_bind_token_intermix(self): self.config_fixture.config(group='token', bind='kerberos') @@ -2484,7 +2563,7 @@ class TestAuth(test_v3.RestfulTestCase): v2_token_data = resp.result bind = v2_token_data['access']['token']['bind'] - self.assertEqual(bind['kerberos'], self.default_domain_user['name']) + self.assertEqual(self.default_domain_user['name'], bind['kerberos']) v2_token_id = v2_token_data['access']['token']['id'] # NOTE(gyee): self.get() will try to obtain an auth token if one @@ -2613,12 +2692,8 @@ class TestAuth(test_v3.RestfulTestCase): class TestAuthJSONExternal(test_v3.RestfulTestCase): content_type = 'json' - def config_overrides(self): - super(TestAuthJSONExternal, self).config_overrides() - self.config_fixture.config(group='auth', methods='') - def auth_plugin_config_override(self, methods=None, **method_classes): - self.config_fixture.config(group='auth', methods='') + self.config_fixture.config(group='auth', methods=[]) def test_remote_user_no_method(self): api = auth.controllers.Auth() @@ -2787,12 +2862,12 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): self.post('/OS-TRUST/trusts', body={'trust': self.chained_trust_ref}, token=trust_token, - expected_status=403) + expected_status=400) def test_roles_subset(self): # Build second role role = self.new_role_ref() - self.assignment_api.create_role(role['id'], role) + self.role_api.create_role(role['id'], role) # assign a new role to the user self.assignment_api.create_grant(role_id=role['id'], user_id=self.user_id, @@ -2860,7 +2935,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): # Build second trust with a role not in parent's roles role = self.new_role_ref() - self.assignment_api.create_role(role['id'], role) + self.role_api.create_role(role['id'], role) # assign a new role to the user self.assignment_api.create_grant(role_id=role['id'], user_id=self.user_id, @@ -2895,7 +2970,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase): # Check that allow_redelegation == False caused redelegation_count # to be set to 0, while allow_redelegation is removed self.assertNotIn('allow_redelegation', trust) - self.assertEqual(trust['redelegation_count'], 0) + self.assertEqual(0, trust['redelegation_count']) trust_token = self._get_trust_token(trust) # Build third trust, same as second @@ -2921,7 +2996,7 @@ class TestTrustChain(test_v3.RestfulTestCase): # Create trust chain self.user_chain = list() self.trust_chain = list() - for _ in xrange(3): + for _ in range(3): user_ref = self.new_user_ref(domain_id=self.domain_id) user = self.identity_api.create_user(user_ref) user['password'] = user_ref['password'] @@ -3067,12 +3142,10 @@ class TestTrustAuth(test_v3.RestfulTestCase): def config_overrides(self): super(TestTrustAuth, self).config_overrides() - self.config_fixture.config( - group='revoke', - driver='keystone.contrib.revoke.backends.kvs.Revoke') + self.config_fixture.config(group='revoke', driver='kvs') self.config_fixture.config( group='token', - provider='keystone.token.providers.pki.Provider', + provider='pki', revoke_by_id=False) self.config_fixture.config(group='trust', enabled=True) @@ -3139,7 +3212,7 @@ class TestTrustAuth(test_v3.RestfulTestCase): expected_status=200) trust = r.result.get('trust') self.assertIsNotNone(trust) - self.assertEqual(trust['remaining_uses'], 1) + self.assertEqual(1, trust['remaining_uses']) def test_create_one_time_use_trust(self): trust = self._initialize_test_consume_trust(1) @@ -3320,26 +3393,6 @@ class TestTrustAuth(test_v3.RestfulTestCase): role_names=[uuid.uuid4().hex]) self.post('/OS-TRUST/trusts', body={'trust': ref}, expected_status=404) - def test_create_expired_trust(self): - ref = self.new_trust_ref( - trustor_user_id=self.user_id, - trustee_user_id=self.trustee_user_id, - project_id=self.project_id, - expires=dict(seconds=-1), - role_ids=[self.role_id]) - r = self.post('/OS-TRUST/trusts', body={'trust': ref}) - trust = self.assertValidTrustResponse(r, ref) - - self.get('/OS-TRUST/trusts/%(trust_id)s' % { - 'trust_id': trust['id']}, - expected_status=404) - - auth_data = self.build_authentication_request( - user_id=self.trustee_user['id'], - password=self.trustee_user['password'], - trust_id=trust['id']) - self.v3_authenticate_token(auth_data, expected_status=401) - def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self): ref = self.new_trust_ref( trustor_user_id=self.user_id, @@ -3365,7 +3418,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( - path=path, token='ADMIN', method='GET', expected_status=401) + path=path, token=CONF.admin_token, + method='GET', expected_status=401) def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self): ref = self.new_trust_ref( @@ -3397,7 +3451,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( - path=path, token='ADMIN', method='GET', expected_status=401) + path=path, token=CONF.admin_token, + method='GET', expected_status=401) def test_v3_v2_intermix_project_not_in_default_domaini_failed(self): # create a trustee in default domain to delegate stuff to @@ -3436,7 +3491,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( - path=path, token='ADMIN', method='GET', expected_status=401) + path=path, token=CONF.admin_token, + method='GET', expected_status=401) def test_v3_v2_intermix(self): # create a trustee in default domain to delegate stuff to @@ -3474,7 +3530,8 @@ class TestTrustAuth(test_v3.RestfulTestCase): # now validate the v3 token with v2 API path = '/v2.0/tokens/%s' % (token) self.admin_request( - path=path, token='ADMIN', method='GET', expected_status=200) + path=path, token=CONF.admin_token, + method='GET', expected_status=200) def test_exercise_trust_scoped_token_without_impersonation(self): ref = self.new_trust_ref( @@ -3494,18 +3551,18 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_id=trust['id']) r = self.v3_authenticate_token(auth_data) self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user) - self.assertEqual(r.result['token']['user']['id'], - self.trustee_user['id']) - self.assertEqual(r.result['token']['user']['name'], - self.trustee_user['name']) - self.assertEqual(r.result['token']['user']['domain']['id'], - self.domain['id']) - self.assertEqual(r.result['token']['user']['domain']['name'], - self.domain['name']) - self.assertEqual(r.result['token']['project']['id'], - self.project['id']) - self.assertEqual(r.result['token']['project']['name'], - self.project['name']) + self.assertEqual(self.trustee_user['id'], + r.result['token']['user']['id']) + self.assertEqual(self.trustee_user['name'], + r.result['token']['user']['name']) + self.assertEqual(self.domain['id'], + r.result['token']['user']['domain']['id']) + self.assertEqual(self.domain['name'], + r.result['token']['user']['domain']['name']) + self.assertEqual(self.project['id'], + r.result['token']['project']['id']) + self.assertEqual(self.project['name'], + r.result['token']['project']['name']) def test_exercise_trust_scoped_token_with_impersonation(self): ref = self.new_trust_ref( @@ -3525,16 +3582,16 @@ class TestTrustAuth(test_v3.RestfulTestCase): trust_id=trust['id']) r = self.v3_authenticate_token(auth_data) self.assertValidProjectTrustScopedTokenResponse(r, self.user) - self.assertEqual(r.result['token']['user']['id'], self.user['id']) - self.assertEqual(r.result['token']['user']['name'], self.user['name']) - self.assertEqual(r.result['token']['user']['domain']['id'], - self.domain['id']) - self.assertEqual(r.result['token']['user']['domain']['name'], - self.domain['name']) - self.assertEqual(r.result['token']['project']['id'], - self.project['id']) - self.assertEqual(r.result['token']['project']['name'], - self.project['name']) + self.assertEqual(self.user['id'], r.result['token']['user']['id']) + self.assertEqual(self.user['name'], r.result['token']['user']['name']) + self.assertEqual(self.domain['id'], + r.result['token']['user']['domain']['id']) + self.assertEqual(self.domain['name'], + r.result['token']['user']['domain']['name']) + self.assertEqual(self.project['id'], + r.result['token']['project']['id']) + self.assertEqual(self.project['name'], + r.result['token']['project']['name']) def test_impersonation_token_cannot_create_new_trust(self): ref = self.new_trust_ref( @@ -3950,9 +4007,9 @@ class TestAuthContext(tests.TestCase): self.auth_context = auth.controllers.AuthContext() def test_pick_lowest_expires_at(self): - expires_at_1 = timeutils.isotime(timeutils.utcnow()) - expires_at_2 = timeutils.isotime(timeutils.utcnow() + - datetime.timedelta(seconds=10)) + expires_at_1 = utils.isotime(timeutils.utcnow()) + expires_at_2 = utils.isotime(timeutils.utcnow() + + datetime.timedelta(seconds=10)) # make sure auth_context picks the lowest value self.auth_context['expires_at'] = expires_at_1 self.auth_context['expires_at'] = expires_at_2 @@ -4113,7 +4170,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): trustor_user_id=self.user_id, trustee_user_id=trustee_user['id'], project_id=self.project_id, - impersonation=True, + impersonation=False, role_ids=[self.role_id]) # Create a trust @@ -4123,9 +4180,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): def config_overrides(self): super(TestFernetTokenProvider, self).config_overrides() - self.config_fixture.config( - group='token', - provider='keystone.token.providers.fernet.Provider') + self.config_fixture.config(group='token', provider='fernet') def test_validate_unscoped_token(self): unscoped_token = self._get_unscoped_token() @@ -4135,7 +4190,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): unscoped_token = self._get_unscoped_token() tampered_token = (unscoped_token[:50] + uuid.uuid4().hex + unscoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=401) + self._validate_token(tampered_token, expected_status=404) def test_revoke_unscoped_token(self): unscoped_token = self._get_unscoped_token() @@ -4215,7 +4270,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): project_scoped_token = self._get_project_scoped_token() tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex + project_scoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=401) + self._validate_token(tampered_token, expected_status=404) def test_revoke_project_scoped_token(self): project_scoped_token = self._get_project_scoped_token() @@ -4323,7 +4378,7 @@ class TestFernetTokenProvider(test_v3.RestfulTestCase): # Get a trust scoped token tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex + trust_scoped_token[50 + 32:]) - self._validate_token(tampered_token, expected_status=401) + self._validate_token(tampered_token, expected_status=404) def test_revoke_trust_scoped_token(self): trustee_user, trust = self._create_trust() @@ -4454,9 +4509,7 @@ class TestAuthFernetTokenProvider(TestAuth): def config_overrides(self): super(TestAuthFernetTokenProvider, self).config_overrides() - self.config_fixture.config( - group='token', - provider='keystone.token.providers.fernet.Provider') + self.config_fixture.config(group='token', provider='fernet') def test_verify_with_bound_token(self): self.config_fixture.config(group='token', bind='kerberos') |