aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token/test_fernet_provider.py')
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_fernet_provider.py428
1 files changed, 162 insertions, 266 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
index bfb590db..5f51d7b3 100644
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
@@ -22,8 +22,8 @@ from six.moves import urllib
from keystone.common import config
from keystone.common import utils
-from keystone.contrib.federation import constants as federation_constants
from keystone import exception
+from keystone.federation import constants as federation_constants
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import database
@@ -48,17 +48,25 @@ class TestFernetTokenProvider(unit.TestCase):
def test_needs_persistence_returns_false(self):
self.assertFalse(self.provider.needs_persistence())
- def test_invalid_v3_token_raises_404(self):
- self.assertRaises(
+ def test_invalid_v3_token_raises_token_not_found(self):
+ # NOTE(lbragstad): Here we use the validate_non_persistent_token()
+ # methods because the validate_v3_token() method is strictly for
+ # validating UUID formatted tokens. It is written to assume cached
+ # tokens from a backend, where validate_non_persistent_token() is not.
+ token_id = uuid.uuid4().hex
+ e = self.assertRaises(
exception.TokenNotFound,
- self.provider.validate_v3_token,
- uuid.uuid4().hex)
+ self.provider.validate_non_persistent_token,
+ token_id)
+ self.assertIn(token_id, u'%s' % e)
- def test_invalid_v2_token_raises_404(self):
- self.assertRaises(
+ def test_invalid_v2_token_raises_token_not_found(self):
+ token_id = uuid.uuid4().hex
+ e = self.assertRaises(
exception.TokenNotFound,
- self.provider.validate_v2_token,
- uuid.uuid4().hex)
+ self.provider.validate_non_persistent_token,
+ token_id)
+ self.assertIn(token_id, u'%s' % e)
class TestValidate(unit.TestCase):
@@ -91,7 +99,6 @@ class TestValidate(unit.TestCase):
token = token_data['token']
self.assertIsInstance(token['audit_ids'], list)
self.assertIsInstance(token['expires_at'], str)
- self.assertEqual({}, token['extras'])
self.assertIsInstance(token['issued_at'], str)
self.assertEqual(method_names, token['methods'])
exp_user_info = {
@@ -200,7 +207,7 @@ class TestValidate(unit.TestCase):
def test_validate_v3_token_validation_error_exc(self):
# When the token format isn't recognized, TokenNotFound is raised.
- # A uuid string isn't a valid fernet token.
+ # A uuid string isn't a valid Fernet token.
token_id = uuid.uuid4().hex
self.assertRaises(exception.TokenNotFound,
self.token_provider_api.validate_v3_token, token_id)
@@ -214,10 +221,14 @@ class TestTokenFormatter(unit.TestCase):
def test_restore_padding(self):
# 'a' will result in '==' padding, 'aa' will result in '=' padding, and
# 'aaa' will result in no padding.
- strings_to_test = ['a', 'aa', 'aaa']
-
- for string in strings_to_test:
- encoded_string = base64.urlsafe_b64encode(string)
+ binary_to_test = [b'a', b'aa', b'aaa']
+
+ for binary in binary_to_test:
+ # base64.urlsafe_b64encode takes six.binary_type and returns
+ # six.binary_type.
+ encoded_string = base64.urlsafe_b64encode(binary)
+ encoded_string = encoded_string.decode('utf-8')
+ # encoded_string is now six.text_type.
encoded_str_without_padding = encoded_string.rstrip('=')
self.assertFalse(encoded_str_without_padding.endswith('='))
encoded_str_with_padding_restored = (
@@ -231,36 +242,57 @@ class TestTokenFormatter(unit.TestCase):
second_value = uuid.uuid4().hex
payload = (first_value, second_value)
msgpack_payload = msgpack.packb(payload)
+ # msgpack_payload is six.binary_type.
+
+ tf = token_formatters.TokenFormatter()
- # NOTE(lbragstad): This method perserves the way that keystone used to
+ # NOTE(lbragstad): This method preserves the way that keystone used to
# percent encode the tokens, prior to bug #1491926.
def legacy_pack(payload):
- tf = token_formatters.TokenFormatter()
+ # payload is six.binary_type.
encrypted_payload = tf.crypto.encrypt(payload)
+ # encrypted_payload is six.binary_type.
# the encrypted_payload is returned with padding appended
- self.assertTrue(encrypted_payload.endswith('='))
+ self.assertTrue(encrypted_payload.endswith(b'='))
# using urllib.parse.quote will percent encode the padding, like
# keystone did in Kilo.
percent_encoded_payload = urllib.parse.quote(encrypted_payload)
+ # percent_encoded_payload is six.text_type.
- # ensure that the padding was actaully percent encoded
+ # ensure that the padding was actually percent encoded
self.assertTrue(percent_encoded_payload.endswith('%3D'))
return percent_encoded_payload
token_with_legacy_padding = legacy_pack(msgpack_payload)
- tf = token_formatters.TokenFormatter()
+ # token_with_legacy_padding is six.text_type.
# demonstrate the we can validate a payload that has been percent
# encoded with the Fernet logic that existed in Kilo
serialized_payload = tf.unpack(token_with_legacy_padding)
+ # serialized_payload is six.binary_type.
returned_payload = msgpack.unpackb(serialized_payload)
- self.assertEqual(first_value, returned_payload[0])
- self.assertEqual(second_value, returned_payload[1])
+ # returned_payload contains six.binary_type.
+ self.assertEqual(first_value, returned_payload[0].decode('utf-8'))
+ self.assertEqual(second_value, returned_payload[1].decode('utf-8'))
class TestPayloads(unit.TestCase):
+ def assertTimestampsEqual(self, expected, actual):
+ # The timestamp that we get back when parsing the payload may not
+ # exactly match the timestamp that was put in the payload due to
+ # conversion to and from a float.
+
+ exp_time = timeutils.parse_isotime(expected)
+ actual_time = timeutils.parse_isotime(actual)
+
+ # the granularity of timestamp string is microseconds and it's only the
+ # last digit in the representation that's different, so use a delta
+ # just above nanoseconds.
+ return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time,
+ delta=1e-05)
+
def test_uuid_hex_to_byte_conversions(self):
payload_cls = token_formatters.BasePayload
@@ -274,249 +306,137 @@ class TestPayloads(unit.TestCase):
expected_uuid_in_bytes)
self.assertEqual(expected_hex_uuid, actual_hex_uuid)
- def test_time_string_to_int_conversions(self):
+ def test_time_string_to_float_conversions(self):
payload_cls = token_formatters.BasePayload
- expected_time_str = utils.isotime(subsecond=True)
- time_obj = timeutils.parse_isotime(expected_time_str)
- expected_time_int = (
+ original_time_str = utils.isotime(subsecond=True)
+ time_obj = timeutils.parse_isotime(original_time_str)
+ expected_time_float = (
(timeutils.normalize_time(time_obj) -
datetime.datetime.utcfromtimestamp(0)).total_seconds())
- actual_time_int = payload_cls._convert_time_string_to_int(
- expected_time_str)
- self.assertEqual(expected_time_int, actual_time_int)
-
- actual_time_str = payload_cls._convert_int_to_time_string(
- actual_time_int)
+ # NOTE(lbragstad): The token expiration time for Fernet tokens is
+ # passed in the payload of the token. This is different from the token
+ # creation time, which is handled by Fernet and doesn't support
+ # subsecond precision because it is a timestamp integer.
+ self.assertIsInstance(expected_time_float, float)
+
+ actual_time_float = payload_cls._convert_time_string_to_float(
+ original_time_str)
+ self.assertIsInstance(actual_time_float, float)
+ self.assertEqual(expected_time_float, actual_time_float)
+
+ # Generate expected_time_str using the same time float. Using
+ # original_time_str from utils.isotime will occasionally fail due to
+ # floating point rounding differences.
+ time_object = datetime.datetime.utcfromtimestamp(actual_time_float)
+ expected_time_str = utils.isotime(time_object, subsecond=True)
+
+ actual_time_str = payload_cls._convert_float_to_time_string(
+ actual_time_float)
self.assertEqual(expected_time_str, actual_time_str)
- def test_unscoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
+ def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None,
+ exp_project_id=None, exp_domain_id=None,
+ exp_trust_id=None, exp_federated_info=None,
+ exp_access_token_id=None):
+ exp_user_id = exp_user_id or uuid.uuid4().hex
+ exp_methods = exp_methods or ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
- payload = token_formatters.UnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids)
+ payload = payload_class.assemble(
+ exp_user_id, exp_methods, exp_project_id, exp_domain_id,
+ exp_expires_at, exp_audit_ids, exp_trust_id, exp_federated_info,
+ exp_access_token_id)
- (user_id, methods, expires_at, audit_ids) = (
- token_formatters.UnscopedPayload.disassemble(payload))
+ (user_id, methods, project_id,
+ domain_id, expires_at, audit_ids,
+ trust_id, federated_info,
+ access_token_id) = payload_class.disassemble(payload)
self.assertEqual(exp_user_id, user_id)
self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
+ self.assertTimestampsEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
-
- def test_project_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self.assertEqual(exp_domain_id, domain_id)
+ self.assertEqual(exp_trust_id, trust_id)
+ self.assertEqual(exp_access_token_id, access_token_id)
- def test_domain_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
+ if exp_federated_info:
+ self.assertDictEqual(exp_federated_info, federated_info)
+ else:
+ self.assertIsNone(federated_info)
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
+ def test_unscoped_payload(self):
+ self._test_payload(token_formatters.UnscopedPayload)
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
+ def test_project_scoped_payload(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_project_id=uuid.uuid4().hex)
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_domain_scoped_payload(self):
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_domain_id=uuid.uuid4().hex)
def test_domain_scoped_payload_with_default_domain(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_domain_id = CONF.identity.default_domain_id
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_domain_id=CONF.identity.default_domain_id)
def test_trust_scoped_payload(self):
- exp_user_id = uuid.uuid4().hex
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
-
- def _test_unscoped_payload_with_user_id(self, exp_user_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.UnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids)
-
- (user_id, methods, expires_at, audit_ids) = (
- token_formatters.UnscopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_project_id=uuid.uuid4().hex,
+ exp_trust_id=uuid.uuid4().hex)
def test_unscoped_payload_with_non_uuid_user_id(self):
- self._test_unscoped_payload_with_user_id('someNonUuidUserId')
+ self._test_payload(token_formatters.UnscopedPayload,
+ exp_user_id='someNonUuidUserId')
def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_unscoped_payload_with_user_id('0123456789abcdef')
-
- def _test_project_scoped_payload_with_ids(self, exp_user_id,
- exp_project_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
+ self._test_payload(token_formatters.UnscopedPayload,
+ exp_user_id='0123456789abcdef')
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
+ def test_project_scoped_payload_with_non_uuid_ids(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_project_id='someNonUuidProjectId')
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
-
- def test_project_scoped_payload_with_non_uuid_user_id(self):
- self._test_project_scoped_payload_with_ids('someNonUuidUserId',
- 'someNonUuidProjectId')
-
- def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_project_scoped_payload_with_ids('0123456789abcdef',
- '0123456789abcdef')
-
- def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
- exp_methods = ['password']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.DomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, domain_id, expires_at, audit_ids) = (
- token_formatters.DomainScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_project_scoped_payload_with_16_char_non_uuid_ids(self):
+ self._test_payload(token_formatters.ProjectScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_project_id='0123456789abcdef')
def test_domain_scoped_payload_with_non_uuid_user_id(self):
- self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_user_id='nonUuidUserId',
+ exp_domain_id=uuid.uuid4().hex)
def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
-
- def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
-
- def test_trust_scoped_payload_with_non_uuid_user_id(self):
- self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
- 'someNonUuidProjectId')
-
- def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_trust_scoped_payload_with_ids('0123456789abcdef',
- '0123456789abcdef')
+ self._test_payload(token_formatters.DomainScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_domain_id=uuid.uuid4().hex)
+
+ def test_trust_scoped_payload_with_non_uuid_ids(self):
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_project_id='someNonUuidProjectId',
+ exp_trust_id=uuid.uuid4().hex)
+
+ def test_trust_scoped_payload_with_16_char_non_uuid_ids(self):
+ self._test_payload(token_formatters.TrustScopedPayload,
+ exp_user_id='0123456789abcdef',
+ exp_project_id='0123456789abcdef',
+ exp_trust_id=uuid.uuid4().hex)
def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
- exp_methods = ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': exp_group_id}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedUnscopedPayload.assemble(
- exp_user_id, exp_methods, exp_expires_at, exp_audit_ids,
- exp_federated_info)
-
- (user_id, methods, expires_at, audit_ids, federated_info) = (
- token_formatters.FederatedUnscopedPayload.disassemble(payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_federated_info['group_ids'][0]['id'],
- federated_info['group_ids'][0]['id'])
- self.assertEqual(exp_federated_info['idp_id'],
- federated_info['idp_id'])
- self.assertEqual(exp_federated_info['protocol_id'],
- federated_info['protocol_id'])
+ self._test_payload(token_formatters.FederatedUnscopedPayload,
+ exp_user_id=exp_user_id,
+ exp_federated_info=exp_federated_info)
def test_federated_payload_with_non_uuid_ids(self):
self._test_federated_payload_with_ids('someNonUuidUserId',
@@ -527,56 +447,31 @@ class TestPayloads(unit.TestCase):
'0123456789abcdef')
def test_federated_project_scoped_payload(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['token']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_federated_info)
-
- (user_id, methods, project_id, expires_at, audit_ids,
- federated_info) = (
- token_formatters.FederatedProjectScopedPayload.disassemble(
- payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertDictEqual(exp_federated_info, federated_info)
+ self._test_payload(token_formatters.FederatedProjectScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_methods=['token'],
+ exp_project_id=uuid.uuid4().hex,
+ exp_federated_info=exp_federated_info)
def test_federated_domain_scoped_payload(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['token']
- exp_domain_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
- payload = token_formatters.FederatedDomainScopedPayload.assemble(
- exp_user_id, exp_methods, exp_domain_id, exp_expires_at,
- exp_audit_ids, exp_federated_info)
+ self._test_payload(token_formatters.FederatedDomainScopedPayload,
+ exp_user_id='someNonUuidUserId',
+ exp_methods=['token'],
+ exp_domain_id=uuid.uuid4().hex,
+ exp_federated_info=exp_federated_info)
- (user_id, methods, domain_id, expires_at, audit_ids,
- federated_info) = (
- token_formatters.FederatedDomainScopedPayload.disassemble(
- payload))
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertDictEqual(exp_federated_info, federated_info)
+ def test_oauth_scoped_payload(self):
+ self._test_payload(token_formatters.OauthScopedPayload,
+ exp_project_id=uuid.uuid4().hex,
+ exp_access_token_id=uuid.uuid4().hex)
class TestFernetKeyRotation(unit.TestCase):
@@ -610,7 +505,7 @@ class TestFernetKeyRotation(unit.TestCase):
static set of keys, and simply shuffling them, would fail such a test).
"""
- # Load the keys into a list.
+ # Load the keys into a list, keys is list of six.text_type.
keys = fernet_utils.load_keys()
# Sort the list of keys by the keys themselves (they were previously
@@ -620,7 +515,8 @@ class TestFernetKeyRotation(unit.TestCase):
# Create the thumbprint using all keys in the repository.
signature = hashlib.sha1()
for key in keys:
- signature.update(key)
+ # Need to convert key to six.binary_type for update.
+ signature.update(key.encode('utf-8'))
return signature.hexdigest()
def assertRepositoryState(self, expected_size):