aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/token
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2016-06-09 08:12:34 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2016-06-09 08:12:34 +0000
commit4bc079a2664f9a407e332291f34d174625a9d5ea (patch)
tree7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/token
parent2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff)
parent2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff)
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token')
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_backends.py551
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_fernet_provider.py428
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_provider.py4
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_token_data_helper.py3
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_token_model.py2
5 files changed, 718 insertions, 270 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/test_backends.py b/keystone-moon/keystone/tests/unit/token/test_backends.py
new file mode 100644
index 00000000..feb7e017
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/token/test_backends.py
@@ -0,0 +1,551 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import datetime
+import hashlib
+import uuid
+
+from keystoneclient.common import cms
+from oslo_config import cfg
+from oslo_utils import timeutils
+import six
+from six.moves import range
+
+from keystone import exception
+from keystone.tests import unit
+from keystone.tests.unit import utils as test_utils
+from keystone.token import provider
+
+
+CONF = cfg.CONF
+NULL_OBJECT = object()
+
+
+class TokenTests(object):
+ def _create_token_id(self):
+ # Use a token signed by the cms module
+ token_id = ""
+ for i in range(1, 20):
+ token_id += uuid.uuid4().hex
+ return cms.cms_sign_token(token_id,
+ CONF.signing.certfile,
+ CONF.signing.keyfile)
+
+ def _assert_revoked_token_list_matches_token_persistence(
+ self, revoked_token_id_list):
+ # Assert that the list passed in matches the list returned by the
+ # token persistence service
+ persistence_list = [
+ x['id']
+ for x in self.token_provider_api.list_revoked_tokens()
+ ]
+ self.assertEqual(persistence_list, revoked_token_id_list)
+
+ def test_token_crud(self):
+ token_id = self._create_token_id()
+ data = {'id': token_id, 'a': 'b',
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'},
+ 'token_data': {'access': {'token': {
+ 'audit_ids': [uuid.uuid4().hex]}}}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ expires = data_ref.pop('expires')
+ data_ref.pop('user_id')
+ self.assertIsInstance(expires, datetime.datetime)
+ data_ref.pop('id')
+ data.pop('id')
+ self.assertDictEqual(data, data_ref)
+
+ new_data_ref = self.token_provider_api._persistence.get_token(token_id)
+ expires = new_data_ref.pop('expires')
+ self.assertIsInstance(expires, datetime.datetime)
+ new_data_ref.pop('user_id')
+ new_data_ref.pop('id')
+
+ self.assertEqual(data, new_data_ref)
+
+ self.token_provider_api._persistence.delete_token(token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token, token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api._persistence.delete_token, token_id)
+
+ def create_token_sample_data(self, token_id=None, tenant_id=None,
+ trust_id=None, user_id=None, expires=None):
+ if token_id is None:
+ token_id = self._create_token_id()
+ if user_id is None:
+ user_id = 'testuserid'
+ # FIXME(morganfainberg): These tokens look nothing like "Real" tokens.
+ # This should be fixed when token issuance is cleaned up.
+ data = {'id': token_id, 'a': 'b',
+ 'user': {'id': user_id},
+ 'access': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
+ if tenant_id is not None:
+ data['tenant'] = {'id': tenant_id, 'name': tenant_id}
+ if tenant_id is NULL_OBJECT:
+ data['tenant'] = None
+ if expires is not None:
+ data['expires'] = expires
+ if trust_id is not None:
+ data['trust_id'] = trust_id
+ data['access'].setdefault('trust', {})
+ # Testuserid2 is used here since a trustee will be different in
+ # the cases of impersonation and therefore should not match the
+ # token's user_id.
+ data['access']['trust']['trustee_user_id'] = 'testuserid2'
+ data['token_version'] = provider.V2
+ # Issue token stores a copy of all token data at token['token_data'].
+ # This emulates that assumption as part of the test.
+ data['token_data'] = copy.deepcopy(data)
+ new_token = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ return new_token['id'], data
+
+ def test_delete_tokens(self):
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid')
+ self.assertEqual(0, len(tokens))
+ token_id1, data = self.create_token_sample_data(
+ tenant_id='testtenantid')
+ token_id2, data = self.create_token_sample_data(
+ tenant_id='testtenantid')
+ token_id3, data = self.create_token_sample_data(
+ tenant_id='testtenantid',
+ user_id='testuserid1')
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid')
+ self.assertEqual(2, len(tokens))
+ self.assertIn(token_id2, tokens)
+ self.assertIn(token_id1, tokens)
+ self.token_provider_api._persistence.delete_tokens(
+ user_id='testuserid',
+ tenant_id='testtenantid')
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid')
+ self.assertEqual(0, len(tokens))
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ token_id1)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ token_id2)
+
+ self.token_provider_api._persistence.get_token(token_id3)
+
+ def test_delete_tokens_trust(self):
+ tokens = self.token_provider_api._persistence._list_tokens(
+ user_id='testuserid')
+ self.assertEqual(0, len(tokens))
+ token_id1, data = self.create_token_sample_data(
+ tenant_id='testtenantid',
+ trust_id='testtrustid')
+ token_id2, data = self.create_token_sample_data(
+ tenant_id='testtenantid',
+ user_id='testuserid1',
+ trust_id='testtrustid1')
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid')
+ self.assertEqual(1, len(tokens))
+ self.assertIn(token_id1, tokens)
+ self.token_provider_api._persistence.delete_tokens(
+ user_id='testuserid',
+ tenant_id='testtenantid',
+ trust_id='testtrustid')
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ token_id1)
+ self.token_provider_api._persistence.get_token(token_id2)
+
+ def _test_token_list(self, token_list_fn):
+ tokens = token_list_fn('testuserid')
+ self.assertEqual(0, len(tokens))
+ token_id1, data = self.create_token_sample_data()
+ tokens = token_list_fn('testuserid')
+ self.assertEqual(1, len(tokens))
+ self.assertIn(token_id1, tokens)
+ token_id2, data = self.create_token_sample_data()
+ tokens = token_list_fn('testuserid')
+ self.assertEqual(2, len(tokens))
+ self.assertIn(token_id2, tokens)
+ self.assertIn(token_id1, tokens)
+ self.token_provider_api._persistence.delete_token(token_id1)
+ tokens = token_list_fn('testuserid')
+ self.assertIn(token_id2, tokens)
+ self.assertNotIn(token_id1, tokens)
+ self.token_provider_api._persistence.delete_token(token_id2)
+ tokens = token_list_fn('testuserid')
+ self.assertNotIn(token_id2, tokens)
+ self.assertNotIn(token_id1, tokens)
+
+ # tenant-specific tokens
+ tenant1 = uuid.uuid4().hex
+ tenant2 = uuid.uuid4().hex
+ token_id3, data = self.create_token_sample_data(tenant_id=tenant1)
+ token_id4, data = self.create_token_sample_data(tenant_id=tenant2)
+ # test for existing but empty tenant (LP:1078497)
+ token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT)
+ tokens = token_list_fn('testuserid')
+ self.assertEqual(3, len(tokens))
+ self.assertNotIn(token_id1, tokens)
+ self.assertNotIn(token_id2, tokens)
+ self.assertIn(token_id3, tokens)
+ self.assertIn(token_id4, tokens)
+ self.assertIn(token_id5, tokens)
+ tokens = token_list_fn('testuserid', tenant2)
+ self.assertEqual(1, len(tokens))
+ self.assertNotIn(token_id1, tokens)
+ self.assertNotIn(token_id2, tokens)
+ self.assertNotIn(token_id3, tokens)
+ self.assertIn(token_id4, tokens)
+
+ def test_token_list(self):
+ self._test_token_list(
+ self.token_provider_api._persistence._list_tokens)
+
+ def test_token_list_trust(self):
+ trust_id = uuid.uuid4().hex
+ token_id5, data = self.create_token_sample_data(trust_id=trust_id)
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid', trust_id=trust_id)
+ self.assertEqual(1, len(tokens))
+ self.assertIn(token_id5, tokens)
+
+ def test_get_token_returns_not_found(self):
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ uuid.uuid4().hex)
+
+ def test_delete_token_returns_not_found(self):
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.delete_token,
+ uuid.uuid4().hex)
+
+ def test_expired_token(self):
+ token_id = uuid.uuid4().hex
+ expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)
+ data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
+ 'expires': expire_time,
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ data_ref.pop('user_id')
+ self.assertDictEqual(data, data_ref)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ token_id)
+
+ def test_null_expires_token(self):
+ token_id = uuid.uuid4().hex
+ data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None,
+ 'user': {'id': 'testuserid'}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ self.assertIsNotNone(data_ref['expires'])
+ new_data_ref = self.token_provider_api._persistence.get_token(token_id)
+
+ # MySQL doesn't store microseconds, so discard them before testing
+ data_ref['expires'] = data_ref['expires'].replace(microsecond=0)
+ new_data_ref['expires'] = new_data_ref['expires'].replace(
+ microsecond=0)
+
+ self.assertEqual(data_ref, new_data_ref)
+
+ def check_list_revoked_tokens(self, token_infos):
+ revocation_list = self.token_provider_api.list_revoked_tokens()
+ revoked_ids = [x['id'] for x in revocation_list]
+ revoked_audit_ids = [x['audit_id'] for x in revocation_list]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ for token_id, audit_id in token_infos:
+ self.assertIn(token_id, revoked_ids)
+ self.assertIn(audit_id, revoked_audit_ids)
+
+ def delete_token(self):
+ token_id = uuid.uuid4().hex
+ audit_id = uuid.uuid4().hex
+ data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
+ 'user': {'id': 'testuserid'},
+ 'token_data': {'token': {'audit_ids': [audit_id]}}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ self.token_provider_api._persistence.delete_token(token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ data_ref['id'])
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api._persistence.delete_token,
+ data_ref['id'])
+ return (token_id, audit_id)
+
+ def test_list_revoked_tokens_returns_empty_list(self):
+ revoked_ids = [x['id']
+ for x in self.token_provider_api.list_revoked_tokens()]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ self.assertEqual([], revoked_ids)
+
+ def test_list_revoked_tokens_for_single_token(self):
+ self.check_list_revoked_tokens([self.delete_token()])
+
+ def test_list_revoked_tokens_for_multiple_tokens(self):
+ self.check_list_revoked_tokens([self.delete_token()
+ for x in range(2)])
+
+ def test_flush_expired_token(self):
+ token_id = uuid.uuid4().hex
+ expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)
+ data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
+ 'expires': expire_time,
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ data_ref.pop('user_id')
+ self.assertDictEqual(data, data_ref)
+
+ token_id = uuid.uuid4().hex
+ expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)
+ data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
+ 'expires': expire_time,
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'}}
+ data_ref = self.token_provider_api._persistence.create_token(token_id,
+ data)
+ data_ref.pop('user_id')
+ self.assertDictEqual(data, data_ref)
+
+ self.token_provider_api._persistence.flush_expired_tokens()
+ tokens = self.token_provider_api._persistence._list_tokens(
+ 'testuserid')
+ self.assertEqual(1, len(tokens))
+ self.assertIn(token_id, tokens)
+
+ @unit.skip_if_cache_disabled('token')
+ def test_revocation_list_cache(self):
+ expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10)
+ token_id = uuid.uuid4().hex
+ token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
+ 'expires': expire_time,
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'},
+ 'token_data': {'token': {
+ 'audit_ids': [uuid.uuid4().hex]}}}
+ token2_id = uuid.uuid4().hex
+ token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b',
+ 'expires': expire_time,
+ 'trust_id': None,
+ 'user': {'id': 'testuserid'},
+ 'token_data': {'token': {
+ 'audit_ids': [uuid.uuid4().hex]}}}
+ # Create 2 Tokens.
+ self.token_provider_api._persistence.create_token(token_id,
+ token_data)
+ self.token_provider_api._persistence.create_token(token2_id,
+ token2_data)
+ # Verify the revocation list is empty.
+ self.assertEqual(
+ [], self.token_provider_api._persistence.list_revoked_tokens())
+ self.assertEqual([], self.token_provider_api.list_revoked_tokens())
+ # Delete a token directly, bypassing the manager.
+ self.token_provider_api._persistence.driver.delete_token(token_id)
+ # Verify the revocation list is still empty.
+ self.assertEqual(
+ [], self.token_provider_api._persistence.list_revoked_tokens())
+ self.assertEqual([], self.token_provider_api.list_revoked_tokens())
+ # Invalidate the revocation list.
+ self.token_provider_api._persistence.invalidate_revocation_list()
+ # Verify the deleted token is in the revocation list.
+ revoked_ids = [x['id']
+ for x in self.token_provider_api.list_revoked_tokens()]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ self.assertIn(token_id, revoked_ids)
+ # Delete the second token, through the manager
+ self.token_provider_api._persistence.delete_token(token2_id)
+ revoked_ids = [x['id']
+ for x in self.token_provider_api.list_revoked_tokens()]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ # Verify both tokens are in the revocation list.
+ self.assertIn(token_id, revoked_ids)
+ self.assertIn(token2_id, revoked_ids)
+
+ def _test_predictable_revoked_pki_token_id(self, hash_fn):
+ token_id = self._create_token_id()
+ token_id_hash = hash_fn(token_id.encode('utf-8')).hexdigest()
+ token = {'user': {'id': uuid.uuid4().hex},
+ 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
+
+ self.token_provider_api._persistence.create_token(token_id, token)
+ self.token_provider_api._persistence.delete_token(token_id)
+
+ revoked_ids = [x['id']
+ for x in self.token_provider_api.list_revoked_tokens()]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ self.assertIn(token_id_hash, revoked_ids)
+ self.assertNotIn(token_id, revoked_ids)
+ for t in self.token_provider_api._persistence.list_revoked_tokens():
+ self.assertIn('expires', t)
+
+ def test_predictable_revoked_pki_token_id_default(self):
+ self._test_predictable_revoked_pki_token_id(hashlib.md5)
+
+ def test_predictable_revoked_pki_token_id_sha256(self):
+ self.config_fixture.config(group='token', hash_algorithm='sha256')
+ self._test_predictable_revoked_pki_token_id(hashlib.sha256)
+
+ def test_predictable_revoked_uuid_token_id(self):
+ token_id = uuid.uuid4().hex
+ token = {'user': {'id': uuid.uuid4().hex},
+ 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
+
+ self.token_provider_api._persistence.create_token(token_id, token)
+ self.token_provider_api._persistence.delete_token(token_id)
+
+ revoked_tokens = self.token_provider_api.list_revoked_tokens()
+ revoked_ids = [x['id'] for x in revoked_tokens]
+ self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
+ self.assertIn(token_id, revoked_ids)
+ for t in revoked_tokens:
+ self.assertIn('expires', t)
+
+ def test_create_unicode_token_id(self):
+ token_id = six.text_type(self._create_token_id())
+ self.create_token_sample_data(token_id=token_id)
+ self.token_provider_api._persistence.get_token(token_id)
+
+ def test_create_unicode_user_id(self):
+ user_id = six.text_type(uuid.uuid4().hex)
+ token_id, data = self.create_token_sample_data(user_id=user_id)
+ self.token_provider_api._persistence.get_token(token_id)
+
+ def test_token_expire_timezone(self):
+
+ @test_utils.timezone
+ def _create_token(expire_time):
+ token_id = uuid.uuid4().hex
+ user_id = six.text_type(uuid.uuid4().hex)
+ return self.create_token_sample_data(token_id=token_id,
+ user_id=user_id,
+ expires=expire_time)
+
+ for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:
+ test_utils.TZ = 'UTC' + d
+ expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)
+ token_id, data_in = _create_token(expire_time)
+ data_get = self.token_provider_api._persistence.get_token(token_id)
+
+ self.assertEqual(data_in['id'], data_get['id'],
+ 'TZ=%s' % test_utils.TZ)
+
+ expire_time_expired = (
+ timeutils.utcnow() + datetime.timedelta(minutes=-1))
+ token_id, data_in = _create_token(expire_time_expired)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api._persistence.get_token,
+ data_in['id'])
+
+
+class TokenCacheInvalidation(object):
+ def _create_test_data(self):
+ self.user = unit.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.tenant = unit.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+
+ # Create an equivalent of a scoped token
+ token_dict = {'user': self.user, 'tenant': self.tenant,
+ 'metadata': {}, 'id': 'placeholder'}
+ token_id, data = self.token_provider_api.issue_v2_token(token_dict)
+ self.scoped_token_id = token_id
+
+ # ..and an un-scoped one
+ token_dict = {'user': self.user, 'tenant': None,
+ 'metadata': {}, 'id': 'placeholder'}
+ token_id, data = self.token_provider_api.issue_v2_token(token_dict)
+ self.unscoped_token_id = token_id
+
+ # Validate them, in the various ways possible - this will load the
+ # responses into the token cache.
+ self._check_scoped_tokens_are_valid()
+ self._check_unscoped_tokens_are_valid()
+
+ def _check_unscoped_tokens_are_invalid(self):
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ self.unscoped_token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ self.unscoped_token_id)
+
+ def _check_scoped_tokens_are_invalid(self):
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ self.scoped_token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ self.scoped_token_id,
+ self.tenant['id'])
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ self.scoped_token_id)
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ self.scoped_token_id,
+ self.tenant['id'])
+
+ def _check_scoped_tokens_are_valid(self):
+ self.token_provider_api.validate_token(self.scoped_token_id)
+ self.token_provider_api.validate_token(
+ self.scoped_token_id, belongs_to=self.tenant['id'])
+ self.token_provider_api.validate_v2_token(self.scoped_token_id)
+ self.token_provider_api.validate_v2_token(
+ self.scoped_token_id, belongs_to=self.tenant['id'])
+
+ def _check_unscoped_tokens_are_valid(self):
+ self.token_provider_api.validate_token(self.unscoped_token_id)
+ self.token_provider_api.validate_v2_token(self.unscoped_token_id)
+
+ def test_delete_unscoped_token(self):
+ self.token_provider_api._persistence.delete_token(
+ self.unscoped_token_id)
+ self._check_unscoped_tokens_are_invalid()
+ self._check_scoped_tokens_are_valid()
+
+ def test_delete_scoped_token_by_id(self):
+ self.token_provider_api._persistence.delete_token(self.scoped_token_id)
+ self._check_scoped_tokens_are_invalid()
+ self._check_unscoped_tokens_are_valid()
+
+ def test_delete_scoped_token_by_user(self):
+ self.token_provider_api._persistence.delete_tokens(self.user['id'])
+ # Since we are deleting all tokens for this user, they should all
+ # now be invalid.
+ self._check_scoped_tokens_are_invalid()
+ self._check_unscoped_tokens_are_invalid()
+
+ def test_delete_scoped_token_by_user_and_tenant(self):
+ self.token_provider_api._persistence.delete_tokens(
+ self.user['id'],
+ tenant_id=self.tenant['id'])
+ self._check_scoped_tokens_are_invalid()
+ self._check_unscoped_tokens_are_valid()
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
index bfb590db..5f51d7b3 100644
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
@@ -22,8 +22,8 @@ from six.moves import urllib
from keystone.common import config
from keystone.common import utils
-from keystone.contrib.federation import constants as federation_constants
from keystone import exception
+from keystone.federation import constants as federation_constants
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
@@ -48,17 +48,25 @@ class TestFernetTokenProvider(unit.TestCase):
def test_needs_persistence_returns_false(self):
self.assertFalse(self.provider.needs_persistence())
- def test_invalid_v3_token_raises_404(self):
- self.assertRaises(
+ def test_invalid_v3_token_raises_token_not_found(self):
+ # NOTE(lbragstad): Here we use the validate_non_persistent_token()
+ # methods because the validate_v3_token() method is strictly for
+ # validating UUID formatted tokens. It is written to assume cached
+ # tokens from a backend, where validate_non_persistent_token() is not.
+ token_id = uuid.uuid4().hex
+ e = self.assertRaises(
exception.TokenNotFound,
- self.provider.validate_v3_token,
- uuid.uuid4().hex)
+ self.provider.validate_non_persistent_token,
+ token_id)
+ self.assertIn(token_id, u'%s' % e)
- def test_invalid_v2_token_raises_404(self):
- self.assertRaises(
+ def test_invalid_v2_token_raises_token_not_found(self):
+ token_id = uuid.uuid4().hex
+ e = self.assertRaises(
exception.TokenNotFound,
- self.provider.validate_v2_token,
- uuid.uuid4().hex)
+ self.provider.validate_non_persistent_token,
+ token_id)
+ self.assertIn(token_id, u'%s' % e)
class TestValidate(unit.TestCase):
@@ -91,7 +99,6 @@ class TestValidate(unit.TestCase):
token = token_data['token']
self.assertIsInstance(token['audit_ids'], list)
self.assertIsInstance(token['expires_at'], str)
- self.assertEqual({}, token['extras'])
self.assertIsInstance(token['issued_at'], str)
self.assertEqual(method_names, token['methods'])
exp_user_info = {
@@ -200,7 +207,7 @@ class TestValidate(unit.TestCase):
def test_validate_v3_token_validation_error_exc(self):
# When the token format isn't recognized, TokenNotFound is raised.
- # A uuid string isn't a valid fernet token.
+ # A uuid string isn't a valid Fernet token.
token_id = uuid.uuid4().hex
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v3_token, token_id)
@@ -214,10 +221,14 @@ class TestTokenFormatter(unit.TestCase):
def test_restore_padding(self):
# 'a' will result in '==' padding, 'aa' will result in '=' padding, and
# 'aaa' will result in no padding.
- strings_to_test = ['a', 'aa', 'aaa']
-
- for string in strings_to_test:
- encoded_string = base64.urlsafe_b64encode(string)
+ binary_to_test = [b'a', b'aa', b'aaa']
+
+ for binary in binary_to_test:
+ # base64.urlsafe_b64encode takes six.binary_type and returns
+ # six.binary_type.
+ encoded_string = base64.urlsafe_b64encode(binary)
+ encoded_string = encoded_string.decode('utf-8')
+ # encoded_string is now six.text_type.
encoded_str_without_padding = encoded_string.rstrip('=')
self.assertFalse(encoded_str_without_padding.endswith('='))
encoded_str_with_padding_restored = (
@@ -231,36 +242,57 @@ class TestTokenFormatter(unit.TestCase):
second_value = uuid.uuid4().hex
payload = (first_value, second_value)
msgpack_payload = msgpack.packb(payload)
+ # msgpack_payload is six.binary_type.
+
+ tf = token_formatters.TokenFormatter()
- # NOTE(lbragstad): This method perserves the way that keystone used to
+ # NOTE(lbragstad): This method preserves the way that keystone used to
# percent encode the tokens, prior to bug #1491926.
def legacy_pack(payload):
- tf = token_formatters.TokenFormatter()
+ # payload is six.binary_type.
encrypted_payload = tf.crypto.encrypt(payload)
+ # encrypted_payload is six.binary_type.
# the encrypted_payload is returned with padding appended
- self.assertTrue(encrypted_payload.endswith('='))
+ self.assertTrue(encrypted_payload.endswith(b'='))
# using urllib.parse.quote will percent encode the padding, like
# keystone did in Kilo.
percent_encoded_payload = urllib.parse.quote(encrypted_payload)
+ # percent_encoded_payload is six.text_type.
- # ensure that the padding was actaully percent encoded
+ # ensure that the padding was actually percent encoded
self.assertTrue(percent_encoded_payload.endswith('%3D'))
return percent_encoded_payload
token_with_legacy_padding = legacy_pack(msgpack_payload)
- tf = token_formatters.TokenFormatter()
+ # token_with_legacy_padding is six.text_type.
# demonstrate the we can validate a payload that has been percent
# encoded with the Fernet logic that existed in Kilo
serialized_payload = tf.unpack(token_with_legacy_padding)
+ # serialized_payload is six.binary_type.
returned_payload = msgpack.unpackb(serialized_payload)
- self.assertEqual(first_value, returned_payload[0])
- self.assertEqual(second_value, returned_payload[1])
+ # returned_payload contains six.binary_type.
+ self.assertEqual(first_value, returned_payload[0].decode('utf-8'))
+ self.assertEqual(second_value, returned_payload[1].decode('utf-8'))
class TestPayloads(unit.TestCase):
+ def assertTimestampsEqual(self, expected, actual):
+ # The timestamp that we get back when parsing the payload may not
+ # exactly match the timestamp that was put in the payload due to
+ # conversion to and from a float.
+
+ exp_time = timeutils.parse_isotime(expected)
+ actual_time = timeutils.parse_isotime(actual)
+
+ # the granularity of timestamp string is microseconds and it's only the
+ # last digit in the representation that's different, so use a delta
+ # just above nanoseconds.
+ return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time,
+ delta=1e-05)
+
def test_uuid_hex_to_byte_conversions(self):
payload_cls = token_formatters.BasePayload
@@ -274,249 +306,137 @@ class TestPayloads(unit.TestCase):
expected_uuid_in_bytes)
self.assertEqual(expected_hex_uuid, actual_hex_uuid)
- def test_time_string_to_int_conversions(self):
+ def test_time_string_to_float_conversions(self):
payload_cls = token_formatters.BasePayload
- expected_time_str = utils.isotime(subsecond=True)
- time_obj = timeutils.parse_isotime(expected_time_str)
- expected_time_int = (
+ original_time_str = utils.isotime(subsecond=True)
+ time_obj = timeutils.parse_isotime(original_time_str)
+ expected_time_float = (
(timeutils.normalize_time(time_obj) -
datetime.datetime.utcfromtimestamp(0)).total_seconds())
- actual_time_int = payload_cls._convert_time_string_to_int(
- expected_time_str)
- self.assertEqual(expected_time_int, actual_time_int)
-
- actual_time_str = payload_cls._convert_int_to_time_string(
- actual_time_int)
+ # NOTE(lbragstad): The token expiration time for Fernet tokens is
+ # passed in the payload of the token. This is different from the token
+ # creation time, which is handled by Fernet and doesn't support
+ # subsecond precision because it is a timestamp integer.
+ self.assertIsInstance(expected_time_float, float)
+
+ actual_time_float = payload_cls._convert_time_string_to_float(
+ original_time_str)
+ self.assertIsInstance(actual_time_float, float)
+ self.assertEqual(expected_time_float, actual_time_float)
+
+ # Generate expected_time_str using the same time float. Using
+ # original_time_str from utils.isotime will occasionally fail due to
+ # floating point rounding differences.
+ time_object = datetime.datetime.utcfromtimestamp(actual_time_float)
+ expected_time_str = utils.isotime(time_object, subsecond=True)
+
+ actual_time_str = payload_cls._convert_float_to_time_string(
+ actual_time_float)
self.assertEqual(expected_time_str, actual_time_str)
- def test_unscoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
+ def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None,
+ exp_project_id=None, exp_domain_id=None,
+ exp_trust_id=None, exp_federated_info=None,
+ exp_access_token_id=None):
+ exp_user_id = exp_user_id or uuid.uuid4().hex
+ exp_methods = exp_methods or ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
- payload = token_formatters.UnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids)
+ payload = payload_class.assemble(
+ exp_user_id, exp_methods, exp_project_id, exp_domain_id,
+ exp_expires_at, exp_audit_ids, exp_trust_id, exp_federated_info,
+ exp_access_token_id)
- (user_id, methods, expires_at, audit_ids) = (
- token_formatters.UnscopedPayload.disassemble(payload))
+ (user_id, methods, project_id,
+ domain_id, expires_at, audit_ids,
+ trust_id, federated_info,
+ access_token_id) = payload_class.disassemble(payload)
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
+ self.assertTimestampsEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
-
- def test_project_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self.assertEqual(exp_domain_id, domain_id)
+ self.assertEqual(exp_trust_id, trust_id)
+ self.assertEqual(exp_access_token_id, access_token_id)
- def test_domain_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
+ if exp_federated_info:
+ self.assertDictEqual(exp_federated_info, federated_info)
+ else:
+ self.assertIsNone(federated_info)
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
+ def test_unscoped_payload(self):
+ self._test_payload(token_formatters.UnscopedPayload)
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
+ def test_project_scoped_payload(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_project_id=uuid.uuid4().hex)
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_domain_scoped_payload(self):
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_domain_id=uuid.uuid4().hex)
def test_domain_scoped_payload_with_default_domain(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_domain_id = CONF.identity.default_domain_id
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_domain_id=CONF.identity.default_domain_id)
def test_trust_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
-
- def _test_unscoped_payload_with_user_id(self, exp_user_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.UnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids)
-
- (user_id, methods, expires_at, audit_ids) = (
- token_formatters.UnscopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_project_id=uuid.uuid4().hex,
+ exp_trust_id=uuid.uuid4().hex)
def test_unscoped_payload_with_non_uuid_user_id(self):
- self._test_unscoped_payload_with_user_id('someNonUuidUserId')
+ self._test_payload(token_formatters.UnscopedPayload,
+ exp_user_id='someNonUuidUserId')
def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_unscoped_payload_with_user_id('0123456789abcdef')
-
- def _test_project_scoped_payload_with_ids(self, exp_user_id,
- exp_project_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
+ self._test_payload(token_formatters.UnscopedPayload,
+ exp_user_id='0123456789abcdef')
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
+ def test_project_scoped_payload_with_non_uuid_ids(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_project_id='someNonUuidProjectId')
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
-
- def test_project_scoped_payload_with_non_uuid_user_id(self):
- self._test_project_scoped_payload_with_ids('someNonUuidUserId',
- 'someNonUuidProjectId')
-
- def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_project_scoped_payload_with_ids('0123456789abcdef',
- '0123456789abcdef')
-
- def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
- exp_methods = ['password']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_project_scoped_payload_with_16_char_non_uuid_ids(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_project_id='0123456789abcdef')
def test_domain_scoped_payload_with_non_uuid_user_id(self):
- self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_user_id='nonUuidUserId',
+ exp_domain_id=uuid.uuid4().hex)
def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
-
- def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
-
- def test_trust_scoped_payload_with_non_uuid_user_id(self):
- self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
- 'someNonUuidProjectId')
-
- def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_trust_scoped_payload_with_ids('0123456789abcdef',
- '0123456789abcdef')
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_domain_id=uuid.uuid4().hex)
+
+ def test_trust_scoped_payload_with_non_uuid_ids(self):
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_project_id='someNonUuidProjectId',
+ exp_trust_id=uuid.uuid4().hex)
+
+ def test_trust_scoped_payload_with_16_char_non_uuid_ids(self):
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_project_id='0123456789abcdef',
+ exp_trust_id=uuid.uuid4().hex)
def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': exp_group_id}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedUnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids,
- exp_federated_info)
-
- (user_id, methods, expires_at, audit_ids, federated_info) = (
- token_formatters.FederatedUnscopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_federated_info['group_ids'][0]['id'],
- federated_info['group_ids'][0]['id'])
- self.assertEqual(exp_federated_info['idp_id'],
- federated_info['idp_id'])
- self.assertEqual(exp_federated_info['protocol_id'],
- federated_info['protocol_id'])
+ self._test_payload(token_formatters.FederatedUnscopedPayload,
+ exp_user_id=exp_user_id,
+ exp_federated_info=exp_federated_info)
def test_federated_payload_with_non_uuid_ids(self):
self._test_federated_payload_with_ids('someNonUuidUserId',
@@ -527,56 +447,31 @@ class TestPayloads(unit.TestCase):
'0123456789abcdef')
def test_federated_project_scoped_payload(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['token']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_federated_info)
-
- (user_id, methods, project_id, expires_at, audit_ids,
- federated_info) = (
- token_formatters.FederatedProjectScopedPayload.disassemble(
- payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertDictEqual(exp_federated_info, federated_info)
+ self._test_payload(token_formatters.FederatedProjectScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_methods=['token'],
+ exp_project_id=uuid.uuid4().hex,
+ exp_federated_info=exp_federated_info)
def test_federated_domain_scoped_payload(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['token']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedDomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids, exp_federated_info)
+ self._test_payload(token_formatters.FederatedDomainScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_methods=['token'],
+ exp_domain_id=uuid.uuid4().hex,
+ exp_federated_info=exp_federated_info)
- (user_id, methods, domain_id, expires_at, audit_ids,
- federated_info) = (
- token_formatters.FederatedDomainScopedPayload.disassemble(
- payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertDictEqual(exp_federated_info, federated_info)
+ def test_oauth_scoped_payload(self):
+ self._test_payload(token_formatters.OauthScopedPayload,
+ exp_project_id=uuid.uuid4().hex,
+ exp_access_token_id=uuid.uuid4().hex)
class TestFernetKeyRotation(unit.TestCase):
@@ -610,7 +505,7 @@ class TestFernetKeyRotation(unit.TestCase):
static set of keys, and simply shuffling them, would fail such a test).
"""
- # Load the keys into a list.
+ # Load the keys into a list, keys is list of six.text_type.
keys = fernet_utils.load_keys()
# Sort the list of keys by the keys themselves (they were previously
@@ -620,7 +515,8 @@ class TestFernetKeyRotation(unit.TestCase):
# Create the thumbprint using all keys in the repository.
signature = hashlib.sha1()
for key in keys:
- signature.update(key)
+ # Need to convert key to six.binary_type for update.
+ signature.update(key.encode('utf-8'))
return signature.hexdigest()
def assertRepositoryState(self, expected_size):
diff --git a/keystone-moon/keystone/tests/unit/token/test_provider.py b/keystone-moon/keystone/tests/unit/token/test_provider.py
index be831484..7093f3ba 100644
--- a/keystone-moon/keystone/tests/unit/token/test_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_provider.py
@@ -24,7 +24,7 @@ class TestRandomStrings(unit.BaseTestCase):
def test_strings_can_be_converted_to_bytes(self):
s = provider.random_urlsafe_str()
- self.assertTrue(isinstance(s, six.string_types))
+ self.assertIsInstance(s, six.text_type)
b = provider.random_urlsafe_str_to_bytes(s)
- self.assertTrue(isinstance(b, bytes))
+ self.assertIsInstance(b, six.binary_type)
diff --git a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py
index 6114b723..9e8c3889 100644
--- a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py
+++ b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py
@@ -28,7 +28,8 @@ class TestTokenDataHelper(unit.TestCase):
def test_v3_token_data_helper_populate_audit_info_string(self):
token_data = {}
- audit_info = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
+ audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
+ audit_info = audit_info_bytes.decode('utf-8')
self.v3_data_helper._populate_audit_info(token_data, audit_info)
self.assertIn(audit_info, token_data['audit_ids'])
self.assertThat(token_data['audit_ids'], matchers.HasLength(2))
diff --git a/keystone-moon/keystone/tests/unit/token/test_token_model.py b/keystone-moon/keystone/tests/unit/token/test_token_model.py
index f1398491..1cb0ef55 100644
--- a/keystone-moon/keystone/tests/unit/token/test_token_model.py
+++ b/keystone-moon/keystone/tests/unit/token/test_token_model.py
@@ -17,8 +17,8 @@ from oslo_config import cfg
from oslo_utils import timeutils
from six.moves import range
-from keystone.contrib.federation import constants as federation_constants
from keystone import exception
+from keystone.federation import constants as federation_constants
from keystone.models import token_model
from keystone.tests.unit import core
from keystone.tests.unit import test_token_provider