diff options
author | asteroide <thomas.duval@orange.com> | 2015-12-02 09:49:33 +0100 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-12-02 10:25:15 +0100 |
commit | 7a5a0e4df646d46476ec7a9fcdedd638e8781f6e (patch) | |
tree | 54eecd1210e4fb5db2b14edeac1df601da7698e2 /keystone-moon/keystone/tests/unit/token/test_fernet_provider.py | |
parent | 8d7b0ffa8e7a7bb09686d8f25176c364d5b6aa0e (diff) |
Update keystone to the branch stable/liberty.
Change-Id: I7cce62ae4b4cbca525a7b9499285455bdd04993e
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token/test_fernet_provider.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/token/test_fernet_provider.py | 276 |
1 files changed, 225 insertions, 51 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py index 5f74b430..bfb590db 100644 --- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py +++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py @@ -16,13 +16,17 @@ import hashlib import os import uuid +import msgpack from oslo_utils import timeutils +from six.moves import urllib from keystone.common import config from keystone.common import utils +from keystone.contrib.federation import constants as federation_constants from keystone import exception from keystone.tests import unit from keystone.tests.unit import ksfixtures +from keystone.tests.unit.ksfixtures import database from keystone.token import provider from keystone.token.providers import fernet from keystone.token.providers.fernet import token_formatters @@ -57,7 +61,156 @@ class TestFernetTokenProvider(unit.TestCase): uuid.uuid4().hex) +class TestValidate(unit.TestCase): + def setUp(self): + super(TestValidate, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + self.useFixture(database.Database()) + self.load_backends() + + def config_overrides(self): + super(TestValidate, self).config_overrides() + self.config_fixture.config(group='token', provider='fernet') + + def test_validate_v3_token_simple(self): + # Check the fields in the token result when use validate_v3_token + # with a simple token. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + method_names = ['password'] + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + self.assertIsInstance(token['audit_ids'], list) + self.assertIsInstance(token['expires_at'], str) + self.assertEqual({}, token['extras']) + self.assertIsInstance(token['issued_at'], str) + self.assertEqual(method_names, token['methods']) + exp_user_info = { + 'id': user_ref['id'], + 'name': user_ref['name'], + 'domain': { + 'id': domain_ref['id'], + 'name': domain_ref['name'], + }, + } + self.assertEqual(exp_user_info, token['user']) + + def test_validate_v3_token_federated_info(self): + # Check the user fields in the token result when use validate_v3_token + # when the token has federated info. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + method_names = ['mapped'] + + group_ids = [uuid.uuid4().hex, ] + identity_provider = uuid.uuid4().hex + protocol = uuid.uuid4().hex + auth_context = { + 'user_id': user_ref['id'], + 'group_ids': group_ids, + federation_constants.IDENTITY_PROVIDER: identity_provider, + federation_constants.PROTOCOL: protocol, + } + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names, auth_context=auth_context) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + exp_user_info = { + 'id': user_ref['id'], + 'name': user_ref['id'], + 'domain': {'id': CONF.federation.federated_domain_name, + 'name': CONF.federation.federated_domain_name, }, + federation_constants.FEDERATION: { + 'groups': [{'id': group_id} for group_id in group_ids], + 'identity_provider': {'id': identity_provider, }, + 'protocol': {'id': protocol, }, + }, + } + self.assertEqual(exp_user_info, token['user']) + + def test_validate_v3_token_trust(self): + # Check the trust fields in the token result when use validate_v3_token + # when the token has trust info. + + domain_ref = unit.new_domain_ref() + domain_ref = self.resource_api.create_domain(domain_ref['id'], + domain_ref) + + user_ref = unit.new_user_ref(domain_ref['id']) + user_ref = self.identity_api.create_user(user_ref) + + trustor_user_ref = unit.new_user_ref(domain_ref['id']) + trustor_user_ref = self.identity_api.create_user(trustor_user_ref) + + project_ref = unit.new_project_ref(domain_id=domain_ref['id']) + project_ref = self.resource_api.create_project(project_ref['id'], + project_ref) + + role_ref = unit.new_role_ref() + role_ref = self.role_api.create_role(role_ref['id'], role_ref) + + self.assignment_api.create_grant( + role_ref['id'], user_id=user_ref['id'], + project_id=project_ref['id']) + + self.assignment_api.create_grant( + role_ref['id'], user_id=trustor_user_ref['id'], + project_id=project_ref['id']) + + trustor_user_id = trustor_user_ref['id'] + trustee_user_id = user_ref['id'] + trust_ref = unit.new_trust_ref( + trustor_user_id, trustee_user_id, project_id=project_ref['id'], + role_ids=[role_ref['id'], ]) + trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref, + trust_ref['roles']) + + method_names = ['password'] + + token_id, token_data_ = self.token_provider_api.issue_v3_token( + user_ref['id'], method_names, project_id=project_ref['id'], + trust=trust_ref) + + token_data = self.token_provider_api.validate_v3_token(token_id) + token = token_data['token'] + exp_trust_info = { + 'id': trust_ref['id'], + 'impersonation': False, + 'trustee_user': {'id': user_ref['id'], }, + 'trustor_user': {'id': trustor_user_ref['id'], }, + } + self.assertEqual(exp_trust_info, token['OS-TRUST:trust']) + + def test_validate_v3_token_validation_error_exc(self): + # When the token format isn't recognized, TokenNotFound is raised. + + # A uuid string isn't a valid fernet token. + token_id = uuid.uuid4().hex + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v3_token, token_id) + + class TestTokenFormatter(unit.TestCase): + def setUp(self): + super(TestTokenFormatter, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + def test_restore_padding(self): # 'a' will result in '==' padding, 'aa' will result in '=' padding, and # 'aaa' will result in no padding. @@ -73,6 +226,39 @@ class TestTokenFormatter(unit.TestCase): ) self.assertEqual(encoded_string, encoded_str_with_padding_restored) + def test_legacy_padding_validation(self): + first_value = uuid.uuid4().hex + second_value = uuid.uuid4().hex + payload = (first_value, second_value) + msgpack_payload = msgpack.packb(payload) + + # NOTE(lbragstad): This method perserves the way that keystone used to + # percent encode the tokens, prior to bug #1491926. + def legacy_pack(payload): + tf = token_formatters.TokenFormatter() + encrypted_payload = tf.crypto.encrypt(payload) + + # the encrypted_payload is returned with padding appended + self.assertTrue(encrypted_payload.endswith('=')) + + # using urllib.parse.quote will percent encode the padding, like + # keystone did in Kilo. + percent_encoded_payload = urllib.parse.quote(encrypted_payload) + + # ensure that the padding was actaully percent encoded + self.assertTrue(percent_encoded_payload.endswith('%3D')) + return percent_encoded_payload + + token_with_legacy_padding = legacy_pack(msgpack_payload) + tf = token_formatters.TokenFormatter() + + # demonstrate the we can validate a payload that has been percent + # encoded with the Fernet logic that existed in Kilo + serialized_payload = tf.unpack(token_with_legacy_padding) + returned_payload = msgpack.unpackb(serialized_payload) + self.assertEqual(first_value, returned_payload[0]) + self.assertEqual(second_value, returned_payload[1]) + class TestPayloads(unit.TestCase): def test_uuid_hex_to_byte_conversions(self): @@ -204,8 +390,7 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_trust_id, trust_id) - def test_unscoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' + def _test_unscoped_payload_with_user_id(self, exp_user_id): exp_methods = ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] @@ -221,30 +406,15 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_project_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = token_formatters.ProjectScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids) - - (user_id, methods, project_id, expires_at, audit_ids) = ( - token_formatters.ProjectScopedPayload.disassemble(payload)) + def test_unscoped_payload_with_non_uuid_user_id(self): + self._test_unscoped_payload_with_user_id('someNonUuidUserId') - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) + def test_unscoped_payload_with_16_char_non_uuid_user_id(self): + self._test_unscoped_payload_with_user_id('0123456789abcdef') - def test_project_scoped_payload_with_non_uuid_project_id(self): - exp_user_id = uuid.uuid4().hex + def _test_project_scoped_payload_with_ids(self, exp_user_id, + exp_project_id): exp_methods = ['password'] - exp_project_id = 'someNonUuidProjectId' exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] @@ -261,8 +431,15 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_domain_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' + def test_project_scoped_payload_with_non_uuid_user_id(self): + self._test_project_scoped_payload_with_ids('someNonUuidUserId', + 'someNonUuidProjectId') + + def test_project_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_project_scoped_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + + def _test_domain_scoped_payload_with_user_id(self, exp_user_id): exp_methods = ['password'] exp_domain_id = uuid.uuid4().hex exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) @@ -281,32 +458,14 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_expires_at, expires_at) self.assertEqual(exp_audit_ids, audit_ids) - def test_trust_scoped_payload_with_non_uuid_user_id(self): - exp_user_id = 'someNonUuidUserId' - exp_methods = ['password'] - exp_project_id = uuid.uuid4().hex - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - exp_trust_id = uuid.uuid4().hex - - payload = token_formatters.TrustScopedPayload.assemble( - exp_user_id, exp_methods, exp_project_id, exp_expires_at, - exp_audit_ids, exp_trust_id) - - (user_id, methods, project_id, expires_at, audit_ids, trust_id) = ( - token_formatters.TrustScopedPayload.disassemble(payload)) + def test_domain_scoped_payload_with_non_uuid_user_id(self): + self._test_domain_scoped_payload_with_user_id('nonUuidUserId') - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_trust_id, trust_id) + def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_domain_scoped_payload_with_user_id('0123456789abcdef') - def test_trust_scoped_payload_with_non_uuid_project_id(self): - exp_user_id = uuid.uuid4().hex + def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id): exp_methods = ['password'] - exp_project_id = 'someNonUuidProjectId' exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] exp_trust_id = uuid.uuid4().hex @@ -325,12 +484,19 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_audit_ids, audit_ids) self.assertEqual(exp_trust_id, trust_id) - def test_federated_payload_with_non_uuid_ids(self): - exp_user_id = 'someNonUuidUserId' + def test_trust_scoped_payload_with_non_uuid_user_id(self): + self._test_trust_scoped_payload_with_ids('someNonUuidUserId', + 'someNonUuidProjectId') + + def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self): + self._test_trust_scoped_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + + def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id): exp_methods = ['password'] exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) exp_audit_ids = [provider.random_urlsafe_str()] - exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], + exp_federated_info = {'group_ids': [{'id': exp_group_id}], 'idp_id': uuid.uuid4().hex, 'protocol_id': uuid.uuid4().hex} @@ -352,6 +518,14 @@ class TestPayloads(unit.TestCase): self.assertEqual(exp_federated_info['protocol_id'], federated_info['protocol_id']) + def test_federated_payload_with_non_uuid_ids(self): + self._test_federated_payload_with_ids('someNonUuidUserId', + 'someNonUuidGroupId') + + def test_federated_payload_with_16_char_non_uuid_ids(self): + self._test_federated_payload_with_ids('0123456789abcdef', + '0123456789abcdef') + def test_federated_project_scoped_payload(self): exp_user_id = 'someNonUuidUserId' exp_methods = ['token'] |