From 7a5a0e4df646d46476ec7a9fcdedd638e8781f6e Mon Sep 17 00:00:00 2001
From: asteroide <thomas.duval@orange.com>
Date: Wed, 2 Dec 2015 09:49:33 +0100
Subject: Update keystone to the branch stable/liberty.

Change-Id: I7cce62ae4b4cbca525a7b9499285455bdd04993e
---
 .../tests/unit/token/test_fernet_provider.py       | 276 +++++++++++++++++----
 1 file changed, 225 insertions(+), 51 deletions(-)

(limited to 'keystone-moon/keystone/tests/unit/token')

diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
index 5f74b430..bfb590db 100644
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
@@ -16,13 +16,17 @@ import hashlib
 import os
 import uuid
 
+import msgpack
 from oslo_utils import timeutils
+from six.moves import urllib
 
 from keystone.common import config
 from keystone.common import utils
+from keystone.contrib.federation import constants as federation_constants
 from keystone import exception
 from keystone.tests import unit
 from keystone.tests.unit import ksfixtures
+from keystone.tests.unit.ksfixtures import database
 from keystone.token import provider
 from keystone.token.providers import fernet
 from keystone.token.providers.fernet import token_formatters
@@ -57,7 +61,156 @@ class TestFernetTokenProvider(unit.TestCase):
             uuid.uuid4().hex)
 
 
+class TestValidate(unit.TestCase):
+    def setUp(self):
+        super(TestValidate, self).setUp()
+        self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+        self.useFixture(database.Database())
+        self.load_backends()
+
+    def config_overrides(self):
+        super(TestValidate, self).config_overrides()
+        self.config_fixture.config(group='token', provider='fernet')
+
+    def test_validate_v3_token_simple(self):
+        # Check the fields in the token result when use validate_v3_token
+        # with a simple token.
+
+        domain_ref = unit.new_domain_ref()
+        domain_ref = self.resource_api.create_domain(domain_ref['id'],
+                                                     domain_ref)
+
+        user_ref = unit.new_user_ref(domain_ref['id'])
+        user_ref = self.identity_api.create_user(user_ref)
+
+        method_names = ['password']
+        token_id, token_data_ = self.token_provider_api.issue_v3_token(
+            user_ref['id'], method_names)
+
+        token_data = self.token_provider_api.validate_v3_token(token_id)
+        token = token_data['token']
+        self.assertIsInstance(token['audit_ids'], list)
+        self.assertIsInstance(token['expires_at'], str)
+        self.assertEqual({}, token['extras'])
+        self.assertIsInstance(token['issued_at'], str)
+        self.assertEqual(method_names, token['methods'])
+        exp_user_info = {
+            'id': user_ref['id'],
+            'name': user_ref['name'],
+            'domain': {
+                'id': domain_ref['id'],
+                'name': domain_ref['name'],
+            },
+        }
+        self.assertEqual(exp_user_info, token['user'])
+
+    def test_validate_v3_token_federated_info(self):
+        # Check the user fields in the token result when use validate_v3_token
+        # when the token has federated info.
+
+        domain_ref = unit.new_domain_ref()
+        domain_ref = self.resource_api.create_domain(domain_ref['id'],
+                                                     domain_ref)
+
+        user_ref = unit.new_user_ref(domain_ref['id'])
+        user_ref = self.identity_api.create_user(user_ref)
+
+        method_names = ['mapped']
+
+        group_ids = [uuid.uuid4().hex, ]
+        identity_provider = uuid.uuid4().hex
+        protocol = uuid.uuid4().hex
+        auth_context = {
+            'user_id': user_ref['id'],
+            'group_ids': group_ids,
+            federation_constants.IDENTITY_PROVIDER: identity_provider,
+            federation_constants.PROTOCOL: protocol,
+        }
+        token_id, token_data_ = self.token_provider_api.issue_v3_token(
+            user_ref['id'], method_names, auth_context=auth_context)
+
+        token_data = self.token_provider_api.validate_v3_token(token_id)
+        token = token_data['token']
+        exp_user_info = {
+            'id': user_ref['id'],
+            'name': user_ref['id'],
+            'domain': {'id': CONF.federation.federated_domain_name,
+                       'name': CONF.federation.federated_domain_name, },
+            federation_constants.FEDERATION: {
+                'groups': [{'id': group_id} for group_id in group_ids],
+                'identity_provider': {'id': identity_provider, },
+                'protocol': {'id': protocol, },
+            },
+        }
+        self.assertEqual(exp_user_info, token['user'])
+
+    def test_validate_v3_token_trust(self):
+        # Check the trust fields in the token result when use validate_v3_token
+        # when the token has trust info.
+
+        domain_ref = unit.new_domain_ref()
+        domain_ref = self.resource_api.create_domain(domain_ref['id'],
+                                                     domain_ref)
+
+        user_ref = unit.new_user_ref(domain_ref['id'])
+        user_ref = self.identity_api.create_user(user_ref)
+
+        trustor_user_ref = unit.new_user_ref(domain_ref['id'])
+        trustor_user_ref = self.identity_api.create_user(trustor_user_ref)
+
+        project_ref = unit.new_project_ref(domain_id=domain_ref['id'])
+        project_ref = self.resource_api.create_project(project_ref['id'],
+                                                       project_ref)
+
+        role_ref = unit.new_role_ref()
+        role_ref = self.role_api.create_role(role_ref['id'], role_ref)
+
+        self.assignment_api.create_grant(
+            role_ref['id'], user_id=user_ref['id'],
+            project_id=project_ref['id'])
+
+        self.assignment_api.create_grant(
+            role_ref['id'], user_id=trustor_user_ref['id'],
+            project_id=project_ref['id'])
+
+        trustor_user_id = trustor_user_ref['id']
+        trustee_user_id = user_ref['id']
+        trust_ref = unit.new_trust_ref(
+            trustor_user_id, trustee_user_id, project_id=project_ref['id'],
+            role_ids=[role_ref['id'], ])
+        trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref,
+                                                trust_ref['roles'])
+
+        method_names = ['password']
+
+        token_id, token_data_ = self.token_provider_api.issue_v3_token(
+            user_ref['id'], method_names, project_id=project_ref['id'],
+            trust=trust_ref)
+
+        token_data = self.token_provider_api.validate_v3_token(token_id)
+        token = token_data['token']
+        exp_trust_info = {
+            'id': trust_ref['id'],
+            'impersonation': False,
+            'trustee_user': {'id': user_ref['id'], },
+            'trustor_user': {'id': trustor_user_ref['id'], },
+        }
+        self.assertEqual(exp_trust_info, token['OS-TRUST:trust'])
+
+    def test_validate_v3_token_validation_error_exc(self):
+        # When the token format isn't recognized, TokenNotFound is raised.
+
+        # A uuid string isn't a valid fernet token.
+        token_id = uuid.uuid4().hex
+        self.assertRaises(exception.TokenNotFound,
+                          self.token_provider_api.validate_v3_token, token_id)
+
+
 class TestTokenFormatter(unit.TestCase):
+    def setUp(self):
+        super(TestTokenFormatter, self).setUp()
+        self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+
     def test_restore_padding(self):
         # 'a' will result in '==' padding, 'aa' will result in '=' padding, and
         # 'aaa' will result in no padding.
@@ -73,6 +226,39 @@ class TestTokenFormatter(unit.TestCase):
             )
             self.assertEqual(encoded_string, encoded_str_with_padding_restored)
 
+    def test_legacy_padding_validation(self):
+        first_value = uuid.uuid4().hex
+        second_value = uuid.uuid4().hex
+        payload = (first_value, second_value)
+        msgpack_payload = msgpack.packb(payload)
+
+        # NOTE(lbragstad): This method perserves the way that keystone used to
+        # percent encode the tokens, prior to bug #1491926.
+        def legacy_pack(payload):
+            tf = token_formatters.TokenFormatter()
+            encrypted_payload = tf.crypto.encrypt(payload)
+
+            # the encrypted_payload is returned with padding appended
+            self.assertTrue(encrypted_payload.endswith('='))
+
+            # using urllib.parse.quote will percent encode the padding, like
+            # keystone did in Kilo.
+            percent_encoded_payload = urllib.parse.quote(encrypted_payload)
+
+            # ensure that the padding was actaully percent encoded
+            self.assertTrue(percent_encoded_payload.endswith('%3D'))
+            return percent_encoded_payload
+
+        token_with_legacy_padding = legacy_pack(msgpack_payload)
+        tf = token_formatters.TokenFormatter()
+
+        # demonstrate the we can validate a payload that has been percent
+        # encoded with the Fernet logic that existed in Kilo
+        serialized_payload = tf.unpack(token_with_legacy_padding)
+        returned_payload = msgpack.unpackb(serialized_payload)
+        self.assertEqual(first_value, returned_payload[0])
+        self.assertEqual(second_value, returned_payload[1])
+
 
 class TestPayloads(unit.TestCase):
     def test_uuid_hex_to_byte_conversions(self):
@@ -204,8 +390,7 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_audit_ids, audit_ids)
         self.assertEqual(exp_trust_id, trust_id)
 
-    def test_unscoped_payload_with_non_uuid_user_id(self):
-        exp_user_id = 'someNonUuidUserId'
+    def _test_unscoped_payload_with_user_id(self, exp_user_id):
         exp_methods = ['password']
         exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
         exp_audit_ids = [provider.random_urlsafe_str()]
@@ -221,30 +406,15 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_expires_at, expires_at)
         self.assertEqual(exp_audit_ids, audit_ids)
 
-    def test_project_scoped_payload_with_non_uuid_user_id(self):
-        exp_user_id = 'someNonUuidUserId'
-        exp_methods = ['password']
-        exp_project_id = uuid.uuid4().hex
-        exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
-        exp_audit_ids = [provider.random_urlsafe_str()]
-
-        payload = token_formatters.ProjectScopedPayload.assemble(
-            exp_user_id, exp_methods, exp_project_id, exp_expires_at,
-            exp_audit_ids)
-
-        (user_id, methods, project_id, expires_at, audit_ids) = (
-            token_formatters.ProjectScopedPayload.disassemble(payload))
+    def test_unscoped_payload_with_non_uuid_user_id(self):
+        self._test_unscoped_payload_with_user_id('someNonUuidUserId')
 
-        self.assertEqual(exp_user_id, user_id)
-        self.assertEqual(exp_methods, methods)
-        self.assertEqual(exp_project_id, project_id)
-        self.assertEqual(exp_expires_at, expires_at)
-        self.assertEqual(exp_audit_ids, audit_ids)
+    def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
+        self._test_unscoped_payload_with_user_id('0123456789abcdef')
 
-    def test_project_scoped_payload_with_non_uuid_project_id(self):
-        exp_user_id = uuid.uuid4().hex
+    def _test_project_scoped_payload_with_ids(self, exp_user_id,
+                                              exp_project_id):
         exp_methods = ['password']
-        exp_project_id = 'someNonUuidProjectId'
         exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
         exp_audit_ids = [provider.random_urlsafe_str()]
 
@@ -261,8 +431,15 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_expires_at, expires_at)
         self.assertEqual(exp_audit_ids, audit_ids)
 
-    def test_domain_scoped_payload_with_non_uuid_user_id(self):
-        exp_user_id = 'someNonUuidUserId'
+    def test_project_scoped_payload_with_non_uuid_user_id(self):
+        self._test_project_scoped_payload_with_ids('someNonUuidUserId',
+                                                   'someNonUuidProjectId')
+
+    def test_project_scoped_payload_with_16_char_non_uuid_user_id(self):
+        self._test_project_scoped_payload_with_ids('0123456789abcdef',
+                                                   '0123456789abcdef')
+
+    def _test_domain_scoped_payload_with_user_id(self, exp_user_id):
         exp_methods = ['password']
         exp_domain_id = uuid.uuid4().hex
         exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
@@ -281,32 +458,14 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_expires_at, expires_at)
         self.assertEqual(exp_audit_ids, audit_ids)
 
-    def test_trust_scoped_payload_with_non_uuid_user_id(self):
-        exp_user_id = 'someNonUuidUserId'
-        exp_methods = ['password']
-        exp_project_id = uuid.uuid4().hex
-        exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
-        exp_audit_ids = [provider.random_urlsafe_str()]
-        exp_trust_id = uuid.uuid4().hex
-
-        payload = token_formatters.TrustScopedPayload.assemble(
-            exp_user_id, exp_methods, exp_project_id, exp_expires_at,
-            exp_audit_ids, exp_trust_id)
-
-        (user_id, methods, project_id, expires_at, audit_ids, trust_id) = (
-            token_formatters.TrustScopedPayload.disassemble(payload))
+    def test_domain_scoped_payload_with_non_uuid_user_id(self):
+        self._test_domain_scoped_payload_with_user_id('nonUuidUserId')
 
-        self.assertEqual(exp_user_id, user_id)
-        self.assertEqual(exp_methods, methods)
-        self.assertEqual(exp_project_id, project_id)
-        self.assertEqual(exp_expires_at, expires_at)
-        self.assertEqual(exp_audit_ids, audit_ids)
-        self.assertEqual(exp_trust_id, trust_id)
+    def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
+        self._test_domain_scoped_payload_with_user_id('0123456789abcdef')
 
-    def test_trust_scoped_payload_with_non_uuid_project_id(self):
-        exp_user_id = uuid.uuid4().hex
+    def _test_trust_scoped_payload_with_ids(self, exp_user_id, exp_project_id):
         exp_methods = ['password']
-        exp_project_id = 'someNonUuidProjectId'
         exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
         exp_audit_ids = [provider.random_urlsafe_str()]
         exp_trust_id = uuid.uuid4().hex
@@ -325,12 +484,19 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_audit_ids, audit_ids)
         self.assertEqual(exp_trust_id, trust_id)
 
-    def test_federated_payload_with_non_uuid_ids(self):
-        exp_user_id = 'someNonUuidUserId'
+    def test_trust_scoped_payload_with_non_uuid_user_id(self):
+        self._test_trust_scoped_payload_with_ids('someNonUuidUserId',
+                                                 'someNonUuidProjectId')
+
+    def test_trust_scoped_payload_with_16_char_non_uuid_user_id(self):
+        self._test_trust_scoped_payload_with_ids('0123456789abcdef',
+                                                 '0123456789abcdef')
+
+    def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
         exp_methods = ['password']
         exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
         exp_audit_ids = [provider.random_urlsafe_str()]
-        exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
+        exp_federated_info = {'group_ids': [{'id': exp_group_id}],
                               'idp_id': uuid.uuid4().hex,
                               'protocol_id': uuid.uuid4().hex}
 
@@ -352,6 +518,14 @@ class TestPayloads(unit.TestCase):
         self.assertEqual(exp_federated_info['protocol_id'],
                          federated_info['protocol_id'])
 
+    def test_federated_payload_with_non_uuid_ids(self):
+        self._test_federated_payload_with_ids('someNonUuidUserId',
+                                              'someNonUuidGroupId')
+
+    def test_federated_payload_with_16_char_non_uuid_ids(self):
+        self._test_federated_payload_with_ids('0123456789abcdef',
+                                              '0123456789abcdef')
+
     def test_federated_project_scoped_payload(self):
         exp_user_id = 'someNonUuidUserId'
         exp_methods = ['token']
-- 
cgit 1.2.3-korg