From 2e7b4f2027a1147ca28301e4f88adf8274b39a1f Mon Sep 17 00:00:00 2001 From: DUVAL Thomas Date: Thu, 9 Jun 2016 09:11:50 +0200 Subject: Update Keystone core to Mitaka. Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db --- .../keystone/tests/unit/token/test_backends.py | 551 +++++++++++++++++++++ .../tests/unit/token/test_fernet_provider.py | 428 ++++++---------- .../keystone/tests/unit/token/test_provider.py | 4 +- .../tests/unit/token/test_token_data_helper.py | 3 +- .../keystone/tests/unit/token/test_token_model.py | 2 +- 5 files changed, 718 insertions(+), 270 deletions(-) create mode 100644 keystone-moon/keystone/tests/unit/token/test_backends.py (limited to 'keystone-moon/keystone/tests/unit/token') diff --git a/keystone-moon/keystone/tests/unit/token/test_backends.py b/keystone-moon/keystone/tests/unit/token/test_backends.py new file mode 100644 index 00000000..feb7e017 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/token/test_backends.py @@ -0,0 +1,551 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import datetime +import hashlib +import uuid + +from keystoneclient.common import cms +from oslo_config import cfg +from oslo_utils import timeutils +import six +from six.moves import range + +from keystone import exception +from keystone.tests import unit +from keystone.tests.unit import utils as test_utils +from keystone.token import provider + + +CONF = cfg.CONF +NULL_OBJECT = object() + + +class TokenTests(object): + def _create_token_id(self): + # Use a token signed by the cms module + token_id = "" + for i in range(1, 20): + token_id += uuid.uuid4().hex + return cms.cms_sign_token(token_id, + CONF.signing.certfile, + CONF.signing.keyfile) + + def _assert_revoked_token_list_matches_token_persistence( + self, revoked_token_id_list): + # Assert that the list passed in matches the list returned by the + # token persistence service + persistence_list = [ + x['id'] + for x in self.token_provider_api.list_revoked_tokens() + ] + self.assertEqual(persistence_list, revoked_token_id_list) + + def test_token_crud(self): + token_id = self._create_token_id() + data = {'id': token_id, 'a': 'b', + 'trust_id': None, + 'user': {'id': 'testuserid'}, + 'token_data': {'access': {'token': { + 'audit_ids': [uuid.uuid4().hex]}}}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + expires = data_ref.pop('expires') + data_ref.pop('user_id') + self.assertIsInstance(expires, datetime.datetime) + data_ref.pop('id') + data.pop('id') + self.assertDictEqual(data, data_ref) + + new_data_ref = self.token_provider_api._persistence.get_token(token_id) + expires = new_data_ref.pop('expires') + self.assertIsInstance(expires, datetime.datetime) + new_data_ref.pop('user_id') + new_data_ref.pop('id') + + self.assertEqual(data, new_data_ref) + + self.token_provider_api._persistence.delete_token(token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api._persistence.get_token, token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api._persistence.delete_token, token_id) + + def create_token_sample_data(self, token_id=None, tenant_id=None, + trust_id=None, user_id=None, expires=None): + if token_id is None: + token_id = self._create_token_id() + if user_id is None: + user_id = 'testuserid' + # FIXME(morganfainberg): These tokens look nothing like "Real" tokens. + # This should be fixed when token issuance is cleaned up. + data = {'id': token_id, 'a': 'b', + 'user': {'id': user_id}, + 'access': {'token': {'audit_ids': [uuid.uuid4().hex]}}} + if tenant_id is not None: + data['tenant'] = {'id': tenant_id, 'name': tenant_id} + if tenant_id is NULL_OBJECT: + data['tenant'] = None + if expires is not None: + data['expires'] = expires + if trust_id is not None: + data['trust_id'] = trust_id + data['access'].setdefault('trust', {}) + # Testuserid2 is used here since a trustee will be different in + # the cases of impersonation and therefore should not match the + # token's user_id. + data['access']['trust']['trustee_user_id'] = 'testuserid2' + data['token_version'] = provider.V2 + # Issue token stores a copy of all token data at token['token_data']. + # This emulates that assumption as part of the test. + data['token_data'] = copy.deepcopy(data) + new_token = self.token_provider_api._persistence.create_token(token_id, + data) + return new_token['id'], data + + def test_delete_tokens(self): + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid') + self.assertEqual(0, len(tokens)) + token_id1, data = self.create_token_sample_data( + tenant_id='testtenantid') + token_id2, data = self.create_token_sample_data( + tenant_id='testtenantid') + token_id3, data = self.create_token_sample_data( + tenant_id='testtenantid', + user_id='testuserid1') + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid') + self.assertEqual(2, len(tokens)) + self.assertIn(token_id2, tokens) + self.assertIn(token_id1, tokens) + self.token_provider_api._persistence.delete_tokens( + user_id='testuserid', + tenant_id='testtenantid') + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid') + self.assertEqual(0, len(tokens)) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + token_id1) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + token_id2) + + self.token_provider_api._persistence.get_token(token_id3) + + def test_delete_tokens_trust(self): + tokens = self.token_provider_api._persistence._list_tokens( + user_id='testuserid') + self.assertEqual(0, len(tokens)) + token_id1, data = self.create_token_sample_data( + tenant_id='testtenantid', + trust_id='testtrustid') + token_id2, data = self.create_token_sample_data( + tenant_id='testtenantid', + user_id='testuserid1', + trust_id='testtrustid1') + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid') + self.assertEqual(1, len(tokens)) + self.assertIn(token_id1, tokens) + self.token_provider_api._persistence.delete_tokens( + user_id='testuserid', + tenant_id='testtenantid', + trust_id='testtrustid') + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + token_id1) + self.token_provider_api._persistence.get_token(token_id2) + + def _test_token_list(self, token_list_fn): + tokens = token_list_fn('testuserid') + self.assertEqual(0, len(tokens)) + token_id1, data = self.create_token_sample_data() + tokens = token_list_fn('testuserid') + self.assertEqual(1, len(tokens)) + self.assertIn(token_id1, tokens) + token_id2, data = self.create_token_sample_data() + tokens = token_list_fn('testuserid') + self.assertEqual(2, len(tokens)) + self.assertIn(token_id2, tokens) + self.assertIn(token_id1, tokens) + self.token_provider_api._persistence.delete_token(token_id1) + tokens = token_list_fn('testuserid') + self.assertIn(token_id2, tokens) + self.assertNotIn(token_id1, tokens) + self.token_provider_api._persistence.delete_token(token_id2) + tokens = token_list_fn('testuserid') + self.assertNotIn(token_id2, tokens) + self.assertNotIn(token_id1, tokens) + + # tenant-specific tokens + tenant1 = uuid.uuid4().hex + tenant2 = uuid.uuid4().hex + token_id3, data = self.create_token_sample_data(tenant_id=tenant1) + token_id4, data = self.create_token_sample_data(tenant_id=tenant2) + # test for existing but empty tenant (LP:1078497) + token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT) + tokens = token_list_fn('testuserid') + self.assertEqual(3, len(tokens)) + self.assertNotIn(token_id1, tokens) + self.assertNotIn(token_id2, tokens) + self.assertIn(token_id3, tokens) + self.assertIn(token_id4, tokens) + self.assertIn(token_id5, tokens) + tokens = token_list_fn('testuserid', tenant2) + self.assertEqual(1, len(tokens)) + self.assertNotIn(token_id1, tokens) + self.assertNotIn(token_id2, tokens) + self.assertNotIn(token_id3, tokens) + self.assertIn(token_id4, tokens) + + def test_token_list(self): + self._test_token_list( + self.token_provider_api._persistence._list_tokens) + + def test_token_list_trust(self): + trust_id = uuid.uuid4().hex + token_id5, data = self.create_token_sample_data(trust_id=trust_id) + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid', trust_id=trust_id) + self.assertEqual(1, len(tokens)) + self.assertIn(token_id5, tokens) + + def test_get_token_returns_not_found(self): + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + uuid.uuid4().hex) + + def test_delete_token_returns_not_found(self): + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.delete_token, + uuid.uuid4().hex) + + def test_expired_token(self): + token_id = uuid.uuid4().hex + expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) + data = {'id_hash': token_id, 'id': token_id, 'a': 'b', + 'expires': expire_time, + 'trust_id': None, + 'user': {'id': 'testuserid'}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + data_ref.pop('user_id') + self.assertDictEqual(data, data_ref) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + token_id) + + def test_null_expires_token(self): + token_id = uuid.uuid4().hex + data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, + 'user': {'id': 'testuserid'}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + self.assertIsNotNone(data_ref['expires']) + new_data_ref = self.token_provider_api._persistence.get_token(token_id) + + # MySQL doesn't store microseconds, so discard them before testing + data_ref['expires'] = data_ref['expires'].replace(microsecond=0) + new_data_ref['expires'] = new_data_ref['expires'].replace( + microsecond=0) + + self.assertEqual(data_ref, new_data_ref) + + def check_list_revoked_tokens(self, token_infos): + revocation_list = self.token_provider_api.list_revoked_tokens() + revoked_ids = [x['id'] for x in revocation_list] + revoked_audit_ids = [x['audit_id'] for x in revocation_list] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + for token_id, audit_id in token_infos: + self.assertIn(token_id, revoked_ids) + self.assertIn(audit_id, revoked_audit_ids) + + def delete_token(self): + token_id = uuid.uuid4().hex + audit_id = uuid.uuid4().hex + data = {'id_hash': token_id, 'id': token_id, 'a': 'b', + 'user': {'id': 'testuserid'}, + 'token_data': {'token': {'audit_ids': [audit_id]}}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + self.token_provider_api._persistence.delete_token(token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + data_ref['id']) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api._persistence.delete_token, + data_ref['id']) + return (token_id, audit_id) + + def test_list_revoked_tokens_returns_empty_list(self): + revoked_ids = [x['id'] + for x in self.token_provider_api.list_revoked_tokens()] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + self.assertEqual([], revoked_ids) + + def test_list_revoked_tokens_for_single_token(self): + self.check_list_revoked_tokens([self.delete_token()]) + + def test_list_revoked_tokens_for_multiple_tokens(self): + self.check_list_revoked_tokens([self.delete_token() + for x in range(2)]) + + def test_flush_expired_token(self): + token_id = uuid.uuid4().hex + expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) + data = {'id_hash': token_id, 'id': token_id, 'a': 'b', + 'expires': expire_time, + 'trust_id': None, + 'user': {'id': 'testuserid'}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + data_ref.pop('user_id') + self.assertDictEqual(data, data_ref) + + token_id = uuid.uuid4().hex + expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) + data = {'id_hash': token_id, 'id': token_id, 'a': 'b', + 'expires': expire_time, + 'trust_id': None, + 'user': {'id': 'testuserid'}} + data_ref = self.token_provider_api._persistence.create_token(token_id, + data) + data_ref.pop('user_id') + self.assertDictEqual(data, data_ref) + + self.token_provider_api._persistence.flush_expired_tokens() + tokens = self.token_provider_api._persistence._list_tokens( + 'testuserid') + self.assertEqual(1, len(tokens)) + self.assertIn(token_id, tokens) + + @unit.skip_if_cache_disabled('token') + def test_revocation_list_cache(self): + expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10) + token_id = uuid.uuid4().hex + token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b', + 'expires': expire_time, + 'trust_id': None, + 'user': {'id': 'testuserid'}, + 'token_data': {'token': { + 'audit_ids': [uuid.uuid4().hex]}}} + token2_id = uuid.uuid4().hex + token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b', + 'expires': expire_time, + 'trust_id': None, + 'user': {'id': 'testuserid'}, + 'token_data': {'token': { + 'audit_ids': [uuid.uuid4().hex]}}} + # Create 2 Tokens. + self.token_provider_api._persistence.create_token(token_id, + token_data) + self.token_provider_api._persistence.create_token(token2_id, + token2_data) + # Verify the revocation list is empty. + self.assertEqual( + [], self.token_provider_api._persistence.list_revoked_tokens()) + self.assertEqual([], self.token_provider_api.list_revoked_tokens()) + # Delete a token directly, bypassing the manager. + self.token_provider_api._persistence.driver.delete_token(token_id) + # Verify the revocation list is still empty. + self.assertEqual( + [], self.token_provider_api._persistence.list_revoked_tokens()) + self.assertEqual([], self.token_provider_api.list_revoked_tokens()) + # Invalidate the revocation list. + self.token_provider_api._persistence.invalidate_revocation_list() + # Verify the deleted token is in the revocation list. + revoked_ids = [x['id'] + for x in self.token_provider_api.list_revoked_tokens()] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + self.assertIn(token_id, revoked_ids) + # Delete the second token, through the manager + self.token_provider_api._persistence.delete_token(token2_id) + revoked_ids = [x['id'] + for x in self.token_provider_api.list_revoked_tokens()] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + # Verify both tokens are in the revocation list. + self.assertIn(token_id, revoked_ids) + self.assertIn(token2_id, revoked_ids) + + def _test_predictable_revoked_pki_token_id(self, hash_fn): + token_id = self._create_token_id() + token_id_hash = hash_fn(token_id.encode('utf-8')).hexdigest() + token = {'user': {'id': uuid.uuid4().hex}, + 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} + + self.token_provider_api._persistence.create_token(token_id, token) + self.token_provider_api._persistence.delete_token(token_id) + + revoked_ids = [x['id'] + for x in self.token_provider_api.list_revoked_tokens()] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + self.assertIn(token_id_hash, revoked_ids) + self.assertNotIn(token_id, revoked_ids) + for t in self.token_provider_api._persistence.list_revoked_tokens(): + self.assertIn('expires', t) + + def test_predictable_revoked_pki_token_id_default(self): + self._test_predictable_revoked_pki_token_id(hashlib.md5) + + def test_predictable_revoked_pki_token_id_sha256(self): + self.config_fixture.config(group='token', hash_algorithm='sha256') + self._test_predictable_revoked_pki_token_id(hashlib.sha256) + + def test_predictable_revoked_uuid_token_id(self): + token_id = uuid.uuid4().hex + token = {'user': {'id': uuid.uuid4().hex}, + 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} + + self.token_provider_api._persistence.create_token(token_id, token) + self.token_provider_api._persistence.delete_token(token_id) + + revoked_tokens = self.token_provider_api.list_revoked_tokens() + revoked_ids = [x['id'] for x in revoked_tokens] + self._assert_revoked_token_list_matches_token_persistence(revoked_ids) + self.assertIn(token_id, revoked_ids) + for t in revoked_tokens: + self.assertIn('expires', t) + + def test_create_unicode_token_id(self): + token_id = six.text_type(self._create_token_id()) + self.create_token_sample_data(token_id=token_id) + self.token_provider_api._persistence.get_token(token_id) + + def test_create_unicode_user_id(self): + user_id = six.text_type(uuid.uuid4().hex) + token_id, data = self.create_token_sample_data(user_id=user_id) + self.token_provider_api._persistence.get_token(token_id) + + def test_token_expire_timezone(self): + + @test_utils.timezone + def _create_token(expire_time): + token_id = uuid.uuid4().hex + user_id = six.text_type(uuid.uuid4().hex) + return self.create_token_sample_data(token_id=token_id, + user_id=user_id, + expires=expire_time) + + for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: + test_utils.TZ = 'UTC' + d + expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) + token_id, data_in = _create_token(expire_time) + data_get = self.token_provider_api._persistence.get_token(token_id) + + self.assertEqual(data_in['id'], data_get['id'], + 'TZ=%s' % test_utils.TZ) + + expire_time_expired = ( + timeutils.utcnow() + datetime.timedelta(minutes=-1)) + token_id, data_in = _create_token(expire_time_expired) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api._persistence.get_token, + data_in['id']) + + +class TokenCacheInvalidation(object): + def _create_test_data(self): + self.user = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id) + self.tenant = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id) + + # Create an equivalent of a scoped token + token_dict = {'user': self.user, 'tenant': self.tenant, + 'metadata': {}, 'id': 'placeholder'} + token_id, data = self.token_provider_api.issue_v2_token(token_dict) + self.scoped_token_id = token_id + + # ..and an un-scoped one + token_dict = {'user': self.user, 'tenant': None, + 'metadata': {}, 'id': 'placeholder'} + token_id, data = self.token_provider_api.issue_v2_token(token_dict) + self.unscoped_token_id = token_id + + # Validate them, in the various ways possible - this will load the + # responses into the token cache. + self._check_scoped_tokens_are_valid() + self._check_unscoped_tokens_are_valid() + + def _check_unscoped_tokens_are_invalid(self): + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_token, + self.unscoped_token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + self.unscoped_token_id) + + def _check_scoped_tokens_are_invalid(self): + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_token, + self.scoped_token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_token, + self.scoped_token_id, + self.tenant['id']) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + self.scoped_token_id) + self.assertRaises( + exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + self.scoped_token_id, + self.tenant['id']) + + def _check_scoped_tokens_are_valid(self): + self.token_provider_api.validate_token(self.scoped_token_id) + self.token_provider_api.validate_token( + self.scoped_token_id, belongs_to=self.tenant['id']) + self.token_provider_api.validate_v2_token(self.scoped_token_id) + self.token_provider_api.validate_v2_token( + self.scoped_token_id, belongs_to=self.tenant['id']) + + def _check_unscoped_tokens_are_valid(self): + self.token_provider_api.validate_token(self.unscoped_token_id) + self.token_provider_api.validate_v2_token(self.unscoped_token_id) + + def test_delete_unscoped_token(self): + self.token_provider_api._persistence.delete_token( + self.unscoped_token_id) + self._check_unscoped_tokens_are_invalid() + self._check_scoped_tokens_are_valid() + + def test_delete_scoped_token_by_id(self): + self.token_provider_api._persistence.delete_token(self.scoped_token_id) + self._check_scoped_tokens_are_invalid() + self._check_unscoped_tokens_are_valid() + + def test_delete_scoped_token_by_user(self): + self.token_provider_api._persistence.delete_tokens(self.user['id']) + # Since we are deleting all tokens for this user, they should all + # now be invalid. + self._check_scoped_tokens_are_invalid() + self._check_unscoped_tokens_are_invalid() + + def test_delete_scoped_token_by_user_and_tenant(self): + self.token_provider_api._persistence.delete_tokens( + self.user['id'], + tenant_id=self.tenant['id']) + self._check_scoped_tokens_are_invalid() + self._check_unscoped_tokens_are_valid() diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py index bfb590db..5f51d7b3 100644 --- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py +++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py @@ -22,8 +22,8 @@ from six.moves import urllib from keystone.common import config from keystone.common import utils -from keystone.contrib.federation import constants as federation_constants from keystone import exception +from keystone.federation import constants as federation_constants from keystone.tests import unit from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import database @@ -48,17 +48,25 @@ class TestFernetTokenProvider(unit.TestCase): def test_needs_persistence_returns_false(self): self.assertFalse(self.provider.needs_persistence()) - def test_invalid_v3_token_raises_404(self): - self.assertRaises( + def test_invalid_v3_token_raises_token_not_found(self): + # NOTE(lbragstad): Here we use the validate_non_persistent_token() + # methods because the validate_v3_token() method is strictly for + # validating UUID formatted tokens. It is written to assume cached + # tokens from a backend, where validate_non_persistent_token() is not. + token_id = uuid.uuid4().hex + e = self.assertRaises( exception.TokenNotFound, - self.provider.validate_v3_token, - uuid.uuid4().hex) + self.provider.validate_non_persistent_token, + token_id) + self.assertIn(token_id, u'%s' % e) - def test_invalid_v2_token_raises_404(self): - self.assertRaises( + def test_invalid_v2_token_raises_token_not_found(self): + token_id = uuid.uuid4().hex + e = self.assertRaises( exception.TokenNotFound, - self.provider.validate_v2_token, - uuid.uuid4().hex) + self.provider.validate_non_persistent_token, + token_id) + self.assertIn(token_id, u'%s' % e) class TestValidate(unit.TestCase): @@ -91,7 +99,6 @@ class TestValidate(unit.TestCase): token = token_data['token'] self.assertIsInstance(token['audit_ids'], list) self.assertIsInstance(token['expires_at'], str) - self.assertEqual({}, token['extras']) self.assertIsInstance(token['issued_at'], str) self.assertEqual(method_names, token['methods']) exp_user_info = { @@ -200,7 +207,7 @@ class TestValidate(unit.TestCase): def test_validate_v3_token_validation_error_exc(self): # When the token format isn't recognized, TokenNotFound is raised. - # A uuid string isn't a valid fernet token. + # A uuid string isn't a valid Fernet token. token_id = uuid.uuid4().hex self.assertRaises(exception.TokenNotFound, self.token_provider_api.validate_v3_token, token_id) @@ -214,10 +221,14 @@ class TestTokenFormatter(unit.TestCase): def test_restore_padding(self): # 'a' will result in '==' padding, 'aa' will result in '=' padding, and # 'aaa' will result in no padding. - strings_to_test = ['a', 'aa', 'aaa'] - - for string in strings_to_test: - encoded_string = base64.urlsafe_b64encode(string) + binary_to_test = [b'a', b'aa', b'aaa'] + + for binary in binary_to_test: + # base64.urlsafe_b64encode takes six.binary_type and returns + # six.binary_type. + encoded_string = base64.urlsafe_b64encode(binary) + encoded_string = encoded_string.decode('utf-8') + # encoded_string is now six.text_type. encoded_str_without_padding = encoded_string.rstrip('=') self.assertFalse(encoded_str_without_padding.endswith('=')) encoded_str_with_padding_restored = ( @@ -231,36 +242,57 @@ class TestTokenFormatter(unit.TestCase): second_value = uuid.uuid4().hex payload = (first_value, second_value) msgpack_payload = msgpack.packb(payload) + # msgpack_payload is six.binary_type. + + tf = token_formatters.TokenFormatter() - # NOTE(lbragstad): This method perserves the way that keystone used to + # NOTE(lbragstad): This method preserves the way that keystone used to # percent encode the tokens, prior to bug #1491926. def legacy_pack(payload): - tf = token_formatters.TokenFormatter() + # payload is six.binary_type. encrypted_payload = tf.crypto.encrypt(payload) + # encrypted_payload is six.binary_type. # the encrypted_payload is returned with padding appended - self.assertTrue(encrypted_payload.endswith('=')) + self.assertTrue(encrypted_payload.endswith(b'=')) # using urllib.parse.quote will percent encode the padding, like # keystone did in Kilo. percent_encoded_payload = urllib.parse.quote(encrypted_payload) + # percent_encoded_payload is six.text_type. - # ensure that the padding was actaully percent encoded + # ensure that the padding was actually percent encoded self.assertTrue(percent_encoded_payload.endswith('%3D')) return percent_encoded_payload token_with_legacy_padding = legacy_pack(msgpack_payload) - tf = token_formatters.TokenFormatter() + # token_with_legacy_padding is six.text_type. # demonstrate the we can validate a payload that has been percent # encoded with the Fernet logic that existed in Kilo serialized_payload = tf.unpack(token_with_legacy_padding) + # serialized_payload is six.binary_type. returned_payload = msgpack.unpackb(serialized_payload) - self.assertEqual(first_value, returned_payload[0]) - self.assertEqual(second_value, returned_payload[1]) + # returned_payload contains six.binary_type. + self.assertEqual(first_value, returned_payload[0].decode('utf-8')) + self.assertEqual(second_value, returned_payload[1].decode('utf-8')) class TestPayloads(unit.TestCase): + def assertTimestampsEqual(self, expected, actual): + # The timestamp that we get back when parsing the payload may not + # exactly match the timestamp that was put in the payload due to + # conversion to and from a float. + + exp_time = timeutils.parse_isotime(expected) + actual_time = timeutils.parse_isotime(actual) + + # the granularity of timestamp string is microseconds and it's only the + # last digit in the representation that's different, so use a delta + # just above nanoseconds. + return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time, + delta=1e-05) + def test_uuid_hex_to_byte_conversions(self): payload_cls = token_formatters.BasePayload @@ -274,249 +306,137 @@ class TestPayloads(unit.TestCase): expected_uuid_in_bytes) self.assertEqual(expected_hex_uuid, actual_hex_uuid) - def test_time_string_to_int_conversions(self): + def test_time_string_to_float_conversions(self): payload_cls = token_formatters.BasePayload - expected_time_str = utils.isotime(subsecond=True) - time_obj = timeutils.parse_isotime(expected_time_str) - expected_time_int = ( + original_time_str = utils.isotime(subsecond=True) + time_obj = timeutils.parse_isotime(original_time_str) + expected_time_float = ( (timeutils.normalize_time(time_obj) - datetime.datetime.utcfromtimestamp(0)).total_seconds()) - actual_time_int = payload_cls._convert_time_string_to_int( - expected_time_str) - self.assertEqual(expected_time_int, actual_time_int) - - actual_time_str = payload_cls._convert_int_to_time_string( - actual_time_int) + # NOTE(lbragstad): The token expiration time for Fernet tokens is + # passed in the payload of the token. This is different from the token + # creation time, which is handled by Fernet and doesn't support + # subsecond precision because it is a timestamp integer. + self.assertIsInstance(expected_time_float, float) + + actual_time_float = payload_cls._convert_time_string_to_float( + original_time_str) + self.assertIsInstance(actual_time_float, float) + self.assertEqual(expected_time_float, actual_time_float) + + # Generate expected_time_str using the same time float. Using + # original_time_str from utils.isotime will occasionally fail due to + # floating point rounding differences. + time_object = datetime.datetime.utcfromtimestamp(actual_time_float) + expected_time_str = utils.isotime(time_object, subsecond=True) + + actual_time_str = payload_cls._convert_float_to_time_string( + actual_time_float) self.assertEqual(expected_time_str, actual_time_str) - def test_unscoped_payload(self): - exp_user_id = uuid.uuid4().hex - exp_methods = ['password'] + def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None, + exp_project_id=None, exp_domain_id=None, + exp_trust_id=None, exp_federated_info=None, + exp_access_token_id=None): + exp_user_id = exp_user_id or uuid.uuid4().hex + exp_methods = exp_methods or ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] - payload = token_formatters.UnscopedPayload.assemble( - exp_user_id, exp_methods, exp_expires_at, exp_audit_ids) + payload = payload_class.assemble( + exp_user_id, exp_methods, exp_project_id, exp_domain_id, + exp_expires_at, exp_audit_ids, exp_trust_id, exp_federated_info, + exp_access_token_id) - (user_id, methods, expires_at, audit_ids) = ( - token_formatters.UnscopedPayload.disassemble(payload)) + (user_id, methods, project_id, + domain_id, expires_at, audit_ids, + trust_id, federated_info, + access_token_id) = payload_class.disassemble(payload) self.assertEqual(exp_user_id, user_id) self.assertEqual(exp_methods, methods) - self.assertEqual(exp_expires_at, expires_at) + self.assertTimestampsEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - - def test_project_scoped_payload(self): - exp_user_id = uuid.uuid4().hex - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.ProjectScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids) - - (user_id, methods, project_id, expires_at, audit_ids) = ( - token_formatters.ProjectScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + self.assertEqual(exp_domain_id, domain_id) + self.assertEqual(exp_trust_id, trust_id) + self.assertEqual(exp_access_token_id, access_token_id) - def test_domain_scoped_payload(self): - exp_user_id = uuid.uuid4().hex - exp_methods = ['password'] - exp_domain_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] + if exp_federated_info: + self.assertDictEqual(exp_federated_info, federated_info) + else: + self.assertIsNone(federated_info) - payload = token_formatters.DomainScopedPayload.assemble( - exp_user_id, exp_methods, exp_domain_id, exp_expires_at, - exp_audit_ids) + def test_unscoped_payload(self): + self._test_payload(token_formatters.UnscopedPayload) - (user_id, methods, domain_id, expires_at, audit_ids) = ( - token_formatters.DomainScopedPayload.disassemble(payload)) + def test_project_scoped_payload(self): + self._test_payload(token_formatters.ProjectScopedPayload, + exp_project_id=uuid.uuid4().hex) - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_domain_id, domain_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + def test_domain_scoped_payload(self): + self._test_payload(token_formatters.DomainScopedPayload, + exp_domain_id=uuid.uuid4().hex) def test_domain_scoped_payload_with_default_domain(self): - exp_user_id = uuid.uuid4().hex - exp_methods = ['password'] - exp_domain_id = CONF.identity.default_domain_id - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.DomainScopedPayload.assemble( - exp_user_id, exp_methods, exp_domain_id, exp_expires_at, - exp_audit_ids) - - (user_id, methods, domain_id, expires_at, audit_ids) = ( - token_formatters.DomainScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_domain_id, domain_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + self._test_payload(token_formatters.DomainScopedPayload, + exp_domain_id=CONF.identity.default_domain_id) def test_trust_scoped_payload(self): - exp_user_id = uuid.uuid4().hex - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - exp_trust_id = uuid.uuid4().hex - - payload = token_formatters.TrustScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids, exp_trust_id) - - (user_id, methods, project_id, expires_at, audit_ids, trust_id) = ( - token_formatters.TrustScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_trust_id, trust_id) - - def _test_unscoped_payload_with_user_id(self, exp_user_id): - exp_methods = ['password'] - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.UnscopedPayload.assemble( - exp_user_id, exp_methods, exp_expires_at, exp_audit_ids) - - (user_id, methods, expires_at, audit_ids) = ( - token_formatters.UnscopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + self._test_payload(token_formatters.TrustScopedPayload, + exp_project_id=uuid.uuid4().hex, + exp_trust_id=uuid.uuid4().hex) def test_unscoped_payload_with_non_uuid_user_id(self): - self._test_unscoped_payload_with_user_id('someNonUuidUserId') + self._test_payload(token_formatters.UnscopedPayload, + exp_user_id='someNonUuidUserId') def test_unscoped_payload_with_16_char_non_uuid_user_id(self): - self._test_unscoped_payload_with_user_id('0123456789abcdef') - - def _test_project_scoped_payload_with_ids(self, exp_user_id, - exp_project_id): - exp_methods = ['password'] - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] + self._test_payload(token_formatters.UnscopedPayload, + exp_user_id='0123456789abcdef') - payload = token_formatters.ProjectScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids) + def test_project_scoped_payload_with_non_uuid_ids(self): + self._test_payload(token_formatters.ProjectScopedPayload, + exp_user_id='someNonUuidUserId', + exp_project_id='someNonUuidProjectId') - (user_id, methods, project_id, expires_at, audit_ids) = ( - token_formatters.ProjectScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - - def test_project_scoped_payload_with_non_uuid_user_id(self): - self._test_project_scoped_payload_with_ids('someNonUuidUserId', - 'someNonUuidProjectId') - - def test_project_scoped_payload_with_16_char_non_uuid_user_id(self): - self._test_project_scoped_payload_with_ids('0123456789abcdef', - '0123456789abcdef') - - def _test_domain_scoped_payload_with_user_id(self, exp_user_id): - exp_methods = ['password'] - exp_domain_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.DomainScopedPayload.assemble( - exp_user_id, exp_methods, exp_domain_id, exp_expires_at, - exp_audit_ids) - - (user_id, methods, domain_id, expires_at, audit_ids) = ( - token_formatters.DomainScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_domain_id, domain_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + def test_project_scoped_payload_with_16_char_non_uuid_ids(self): + self._test_payload(token_formatters.ProjectScopedPayload, + exp_user_id='0123456789abcdef', + exp_project_id='0123456789abcdef') def test_domain_scoped_payload_with_non_uuid_user_id(self): - self._test_domain_scoped_payload_with_user_id('nonUuidUserId') + self._test_payload(token_formatters.DomainScopedPayload, + exp_user_id='nonUuidUserId', + exp_domain_id=uuid.uuid4().hex) def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self): - self._test_domain_scoped_payload_with_user_id('0123456789abcdef') - - def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id): - exp_methods = ['password'] - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - exp_trust_id = uuid.uuid4().hex - - payload = token_formatters.TrustScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids, exp_trust_id) - - (user_id, methods, project_id, expires_at, audit_ids, trust_id) = ( - token_formatters.TrustScopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_trust_id, trust_id) - - def test_trust_scoped_payload_with_non_uuid_user_id(self): - self._test_trust_scoped_payload_with_ids('someNonUuidUserId', - 'someNonUuidProjectId') - - def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self): - self._test_trust_scoped_payload_with_ids('0123456789abcdef', - '0123456789abcdef') + self._test_payload(token_formatters.DomainScopedPayload, + exp_user_id='0123456789abcdef', + exp_domain_id=uuid.uuid4().hex) + + def test_trust_scoped_payload_with_non_uuid_ids(self): + self._test_payload(token_formatters.TrustScopedPayload, + exp_user_id='someNonUuidUserId', + exp_project_id='someNonUuidProjectId', + exp_trust_id=uuid.uuid4().hex) + + def test_trust_scoped_payload_with_16_char_non_uuid_ids(self): + self._test_payload(token_formatters.TrustScopedPayload, + exp_user_id='0123456789abcdef', + exp_project_id='0123456789abcdef', + exp_trust_id=uuid.uuid4().hex) def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id): - exp_methods = ['password'] - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] exp_federated_info = {'group_ids': [{'id': exp_group_id}], 'idp_id': uuid.uuid4().hex, 'protocol_id': uuid.uuid4().hex} - payload = token_formatters.FederatedUnscopedPayload.assemble( - exp_user_id, exp_methods, exp_expires_at, exp_audit_ids, - exp_federated_info) - - (user_id, methods, expires_at, audit_ids, federated_info) = ( - token_formatters.FederatedUnscopedPayload.disassemble(payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_federated_info['group_ids'][0]['id'], - federated_info['group_ids'][0]['id']) - self.assertEqual(exp_federated_info['idp_id'], - federated_info['idp_id']) - self.assertEqual(exp_federated_info['protocol_id'], - federated_info['protocol_id']) + self._test_payload(token_formatters.FederatedUnscopedPayload, + exp_user_id=exp_user_id, + exp_federated_info=exp_federated_info) def test_federated_payload_with_non_uuid_ids(self): self._test_federated_payload_with_ids('someNonUuidUserId', @@ -527,56 +447,31 @@ class TestPayloads(unit.TestCase): '0123456789abcdef') def test_federated_project_scoped_payload(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['token'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], 'idp_id': uuid.uuid4().hex, 'protocol_id': uuid.uuid4().hex} - payload = token_formatters.FederatedProjectScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids, exp_federated_info) - - (user_id, methods, project_id, expires_at, audit_ids, - federated_info) = ( - token_formatters.FederatedProjectScopedPayload.disassemble( - payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertDictEqual(exp_federated_info, federated_info) + self._test_payload(token_formatters.FederatedProjectScopedPayload, + exp_user_id='someNonUuidUserId', + exp_methods=['token'], + exp_project_id=uuid.uuid4().hex, + exp_federated_info=exp_federated_info) def test_federated_domain_scoped_payload(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['token'] - exp_domain_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], 'idp_id': uuid.uuid4().hex, 'protocol_id': uuid.uuid4().hex} - payload = token_formatters.FederatedDomainScopedPayload.assemble( - exp_user_id, exp_methods, exp_domain_id, exp_expires_at, - exp_audit_ids, exp_federated_info) + self._test_payload(token_formatters.FederatedDomainScopedPayload, + exp_user_id='someNonUuidUserId', + exp_methods=['token'], + exp_domain_id=uuid.uuid4().hex, + exp_federated_info=exp_federated_info) - (user_id, methods, domain_id, expires_at, audit_ids, - federated_info) = ( - token_formatters.FederatedDomainScopedPayload.disassemble( - payload)) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_domain_id, domain_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertDictEqual(exp_federated_info, federated_info) + def test_oauth_scoped_payload(self): + self._test_payload(token_formatters.OauthScopedPayload, + exp_project_id=uuid.uuid4().hex, + exp_access_token_id=uuid.uuid4().hex) class TestFernetKeyRotation(unit.TestCase): @@ -610,7 +505,7 @@ class TestFernetKeyRotation(unit.TestCase): static set of keys, and simply shuffling them, would fail such a test). """ - # Load the keys into a list. + # Load the keys into a list, keys is list of six.text_type. keys = fernet_utils.load_keys() # Sort the list of keys by the keys themselves (they were previously @@ -620,7 +515,8 @@ class TestFernetKeyRotation(unit.TestCase): # Create the thumbprint using all keys in the repository. signature = hashlib.sha1() for key in keys: - signature.update(key) + # Need to convert key to six.binary_type for update. + signature.update(key.encode('utf-8')) return signature.hexdigest() def assertRepositoryState(self, expected_size): diff --git a/keystone-moon/keystone/tests/unit/token/test_provider.py b/keystone-moon/keystone/tests/unit/token/test_provider.py index be831484..7093f3ba 100644 --- a/keystone-moon/keystone/tests/unit/token/test_provider.py +++ b/keystone-moon/keystone/tests/unit/token/test_provider.py @@ -24,7 +24,7 @@ class TestRandomStrings(unit.BaseTestCase): def test_strings_can_be_converted_to_bytes(self): s = provider.random_urlsafe_str() - self.assertTrue(isinstance(s, six.string_types)) + self.assertIsInstance(s, six.text_type) b = provider.random_urlsafe_str_to_bytes(s) - self.assertTrue(isinstance(b, bytes)) + self.assertIsInstance(b, six.binary_type) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py index 6114b723..9e8c3889 100644 --- a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py +++ b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py @@ -28,7 +28,8 @@ class TestTokenDataHelper(unit.TestCase): def test_v3_token_data_helper_populate_audit_info_string(self): token_data = {} - audit_info = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2] + audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2] + audit_info = audit_info_bytes.decode('utf-8') self.v3_data_helper._populate_audit_info(token_data, audit_info) self.assertIn(audit_info, token_data['audit_ids']) self.assertThat(token_data['audit_ids'], matchers.HasLength(2)) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_model.py b/keystone-moon/keystone/tests/unit/token/test_token_model.py index f1398491..1cb0ef55 100644 --- a/keystone-moon/keystone/tests/unit/token/test_token_model.py +++ b/keystone-moon/keystone/tests/unit/token/test_token_model.py @@ -17,8 +17,8 @@ from oslo_config import cfg from oslo_utils import timeutils from six.moves import range -from keystone.contrib.federation import constants as federation_constants from keystone import exception +from keystone.federation import constants as federation_constants from keystone.models import token_model from keystone.tests.unit import core from keystone.tests.unit import test_token_provider -- cgit 1.2.3-korg