summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token/test_fernet_provider.py')
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_fernet_provider.py276
1 files changed, 225 insertions, 51 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
index 5f74b430..bfb590db 100644
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
@@ -16,13 +16,17 @@ import hashlib
import os
import uuid
+import msgpack
from oslo_utils import timeutils
+from six.moves import urllib
from keystone.common import config
from keystone.common import utils
+from keystone.contrib.federation import constants as federation_constants
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
+from keystone.tests.unit.ksfixtures import database
from keystone.token import provider
from keystone.token.providers import fernet
from keystone.token.providers.fernet import token_formatters
@@ -57,7 +61,156 @@ class TestFernetTokenProvider(unit.TestCase):
uuid.uuid4().hex)
+class TestValidate(unit.TestCase):
+ def setUp(self):
+ super(TestValidate, self).setUp()
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+ self.useFixture(database.Database())
+ self.load_backends()
+
+ def config_overrides(self):
+ super(TestValidate, self).config_overrides()
+ self.config_fixture.config(group='token', provider='fernet')
+
+ def test_validate_v3_token_simple(self):
+ # Check the fields in the token result when use validate_v3_token
+ # with a simple token.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ method_names = ['password']
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ self.assertIsInstance(token['audit_ids'], list)
+ self.assertIsInstance(token['expires_at'], str)
+ self.assertEqual({}, token['extras'])
+ self.assertIsInstance(token['issued_at'], str)
+ self.assertEqual(method_names, token['methods'])
+ exp_user_info = {
+ 'id': user_ref['id'],
+ 'name': user_ref['name'],
+ 'domain': {
+ 'id': domain_ref['id'],
+ 'name': domain_ref['name'],
+ },
+ }
+ self.assertEqual(exp_user_info, token['user'])
+
+ def test_validate_v3_token_federated_info(self):
+ # Check the user fields in the token result when use validate_v3_token
+ # when the token has federated info.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ method_names = ['mapped']
+
+ group_ids = [uuid.uuid4().hex, ]
+ identity_provider = uuid.uuid4().hex
+ protocol = uuid.uuid4().hex
+ auth_context = {
+ 'user_id': user_ref['id'],
+ 'group_ids': group_ids,
+ federation_constants.IDENTITY_PROVIDER: identity_provider,
+ federation_constants.PROTOCOL: protocol,
+ }
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names, auth_context=auth_context)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ exp_user_info = {
+ 'id': user_ref['id'],
+ 'name': user_ref['id'],
+ 'domain': {'id': CONF.federation.federated_domain_name,
+ 'name': CONF.federation.federated_domain_name, },
+ federation_constants.FEDERATION: {
+ 'groups': [{'id': group_id} for group_id in group_ids],
+ 'identity_provider': {'id': identity_provider, },
+ 'protocol': {'id': protocol, },
+ },
+ }
+ self.assertEqual(exp_user_info, token['user'])
+
+ def test_validate_v3_token_trust(self):
+ # Check the trust fields in the token result when use validate_v3_token
+ # when the token has trust info.
+
+ domain_ref = unit.new_domain_ref()
+ domain_ref = self.resource_api.create_domain(domain_ref['id'],
+ domain_ref)
+
+ user_ref = unit.new_user_ref(domain_ref['id'])
+ user_ref = self.identity_api.create_user(user_ref)
+
+ trustor_user_ref = unit.new_user_ref(domain_ref['id'])
+ trustor_user_ref = self.identity_api.create_user(trustor_user_ref)
+
+ project_ref = unit.new_project_ref(domain_id=domain_ref['id'])
+ project_ref = self.resource_api.create_project(project_ref['id'],
+ project_ref)
+
+ role_ref = unit.new_role_ref()
+ role_ref = self.role_api.create_role(role_ref['id'], role_ref)
+
+ self.assignment_api.create_grant(
+ role_ref['id'], user_id=user_ref['id'],
+ project_id=project_ref['id'])
+
+ self.assignment_api.create_grant(
+ role_ref['id'], user_id=trustor_user_ref['id'],
+ project_id=project_ref['id'])
+
+ trustor_user_id = trustor_user_ref['id']
+ trustee_user_id = user_ref['id']
+ trust_ref = unit.new_trust_ref(
+ trustor_user_id, trustee_user_id, project_id=project_ref['id'],
+ role_ids=[role_ref['id'], ])
+ trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref,
+ trust_ref['roles'])
+
+ method_names = ['password']
+
+ token_id, token_data_ = self.token_provider_api.issue_v3_token(
+ user_ref['id'], method_names, project_id=project_ref['id'],
+ trust=trust_ref)
+
+ token_data = self.token_provider_api.validate_v3_token(token_id)
+ token = token_data['token']
+ exp_trust_info = {
+ 'id': trust_ref['id'],
+ 'impersonation': False,
+ 'trustee_user': {'id': user_ref['id'], },
+ 'trustor_user': {'id': trustor_user_ref['id'], },
+ }
+ self.assertEqual(exp_trust_info, token['OS-TRUST:trust'])
+
+ def test_validate_v3_token_validation_error_exc(self):
+ # When the token format isn't recognized, TokenNotFound is raised.
+
+ # A uuid string isn't a valid fernet token.
+ token_id = uuid.uuid4().hex
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v3_token, token_id)
+
+
class TestTokenFormatter(unit.TestCase):
+ def setUp(self):
+ super(TestTokenFormatter, self).setUp()
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+
def test_restore_padding(self):
# 'a' will result in '==' padding, 'aa' will result in '=' padding, and
# 'aaa' will result in no padding.
@@ -73,6 +226,39 @@ class TestTokenFormatter(unit.TestCase):
)
self.assertEqual(encoded_string, encoded_str_with_padding_restored)
+ def test_legacy_padding_validation(self):
+ first_value = uuid.uuid4().hex
+ second_value = uuid.uuid4().hex
+ payload = (first_value, second_value)
+ msgpack_payload = msgpack.packb(payload)
+
+ # NOTE(lbragstad): This method perserves the way that keystone used to
+ # percent encode the tokens, prior to bug #1491926.
+ def legacy_pack(payload):
+ tf = token_formatters.TokenFormatter()
+ encrypted_payload = tf.crypto.encrypt(payload)
+
+ # the encrypted_payload is returned with padding appended
+ self.assertTrue(encrypted_payload.endswith('='))
+
+ # using urllib.parse.quote will percent encode the padding, like
+ # keystone did in Kilo.
+ percent_encoded_payload = urllib.parse.quote(encrypted_payload)
+
+ # ensure that the padding was actaully percent encoded
+ self.assertTrue(percent_encoded_payload.endswith('%3D'))
+ return percent_encoded_payload
+
+ token_with_legacy_padding = legacy_pack(msgpack_payload)
+ tf = token_formatters.TokenFormatter()
+
+ # demonstrate the we can validate a payload that has been percent
+ # encoded with the Fernet logic that existed in Kilo
+ serialized_payload = tf.unpack(token_with_legacy_padding)
+ returned_payload = msgpack.unpackb(serialized_payload)
+ self.assertEqual(first_value, returned_payload[0])
+ self.assertEqual(second_value, returned_payload[1])
+
class TestPayloads(unit.TestCase):
def test_uuid_hex_to_byte_conversions(self):
@@ -204,8 +390,7 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
- def test_unscoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
+ def _test_unscoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
@@ -221,30 +406,15 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_project_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = token_formatters.ProjectScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids)
-
- (user_id, methods, project_id, expires_at, audit_ids) = (
- token_formatters.ProjectScopedPayload.disassemble(payload))
+ def test_unscoped_payload_with_non_uuid_user_id(self):
+ self._test_unscoped_payload_with_user_id('someNonUuidUserId')
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
+ def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_unscoped_payload_with_user_id('0123456789abcdef')
- def test_project_scoped_payload_with_non_uuid_project_id(self):
- exp_user_id = uuid.uuid4().hex
+ def _test_project_scoped_payload_with_ids(self, exp_user_id,
+ exp_project_id):
exp_methods = ['password']
- exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
@@ -261,8 +431,15 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_domain_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
+ def test_project_scoped_payload_with_non_uuid_user_id(self):
+ self._test_project_scoped_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidProjectId')
+
+ def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_project_scoped_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
+ def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
exp_methods = ['password']
exp_domain_id = uuid.uuid4().hex
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
@@ -281,32 +458,14 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_expires_at, expires_at)
self.assertEqual(exp_audit_ids, audit_ids)
- def test_trust_scoped_payload_with_non_uuid_user_id(self):
- exp_user_id = 'someNonUuidUserId'
- exp_methods = ['password']
- exp_project_id = uuid.uuid4().hex
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
- exp_trust_id = uuid.uuid4().hex
-
- payload = token_formatters.TrustScopedPayload.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_expires_at,
- exp_audit_ids, exp_trust_id)
-
- (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
- token_formatters.TrustScopedPayload.disassemble(payload))
+ def test_domain_scoped_payload_with_non_uuid_user_id(self):
+ self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_trust_id, trust_id)
+ def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
- def test_trust_scoped_payload_with_non_uuid_project_id(self):
- exp_user_id = uuid.uuid4().hex
+ def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
exp_methods = ['password']
- exp_project_id = 'someNonUuidProjectId'
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
exp_trust_id = uuid.uuid4().hex
@@ -325,12 +484,19 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_audit_ids, audit_ids)
self.assertEqual(exp_trust_id, trust_id)
- def test_federated_payload_with_non_uuid_ids(self):
- exp_user_id = 'someNonUuidUserId'
+ def test_trust_scoped_payload_with_non_uuid_user_id(self):
+ self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidProjectId')
+
+ def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
+ self._test_trust_scoped_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
+ def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
exp_methods = ['password']
exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
exp_audit_ids = [provider.random_urlsafe_str()]
- exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
+ exp_federated_info = {'group_ids': [{'id': exp_group_id}],
'idp_id': uuid.uuid4().hex,
'protocol_id': uuid.uuid4().hex}
@@ -352,6 +518,14 @@ class TestPayloads(unit.TestCase):
self.assertEqual(exp_federated_info['protocol_id'],
federated_info['protocol_id'])
+ def test_federated_payload_with_non_uuid_ids(self):
+ self._test_federated_payload_with_ids('someNonUuidUserId',
+ 'someNonUuidGroupId')
+
+ def test_federated_payload_with_16_char_non_uuid_ids(self):
+ self._test_federated_payload_with_ids('0123456789abcdef',
+ '0123456789abcdef')
+
def test_federated_project_scoped_payload(self):
exp_user_id = 'someNonUuidUserId'
exp_methods = ['token']