aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/token
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token')
-rw-r--r--keystone-moon/keystone/tests/unit/token/__init__.py0
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_backends.py551
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_fernet_provider.py611
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_pki_provider.py26
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py26
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_provider.py30
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_token_data_helper.py56
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_token_model.py263
-rw-r--r--keystone-moon/keystone/tests/unit/token/test_uuid_provider.py26
9 files changed, 0 insertions, 1589 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/__init__.py b/keystone-moon/keystone/tests/unit/token/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystone-moon/keystone/tests/unit/token/__init__.py
+++ /dev/null
diff --git a/keystone-moon/keystone/tests/unit/token/test_backends.py b/keystone-moon/keystone/tests/unit/token/test_backends.py
deleted file mode 100644
index feb7e017..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_backends.py
+++ /dev/null
@@ -1,551 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import datetime
-import hashlib
-import uuid
-
-from keystoneclient.common import cms
-from oslo_config import cfg
-from oslo_utils import timeutils
-import six
-from six.moves import range
-
-from keystone import exception
-from keystone.tests import unit
-from keystone.tests.unit import utils as test_utils
-from keystone.token import provider
-
-
-CONF = cfg.CONF
-NULL_OBJECT = object()
-
-
-class TokenTests(object):
- def _create_token_id(self):
- # Use a token signed by the cms module
- token_id = ""
- for i in range(1, 20):
- token_id += uuid.uuid4().hex
- return cms.cms_sign_token(token_id,
- CONF.signing.certfile,
- CONF.signing.keyfile)
-
- def _assert_revoked_token_list_matches_token_persistence(
- self, revoked_token_id_list):
- # Assert that the list passed in matches the list returned by the
- # token persistence service
- persistence_list = [
- x['id']
- for x in self.token_provider_api.list_revoked_tokens()
- ]
- self.assertEqual(persistence_list, revoked_token_id_list)
-
- def test_token_crud(self):
- token_id = self._create_token_id()
- data = {'id': token_id, 'a': 'b',
- 'trust_id': None,
- 'user': {'id': 'testuserid'},
- 'token_data': {'access': {'token': {
- 'audit_ids': [uuid.uuid4().hex]}}}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- expires = data_ref.pop('expires')
- data_ref.pop('user_id')
- self.assertIsInstance(expires, datetime.datetime)
- data_ref.pop('id')
- data.pop('id')
- self.assertDictEqual(data, data_ref)
-
- new_data_ref = self.token_provider_api._persistence.get_token(token_id)
- expires = new_data_ref.pop('expires')
- self.assertIsInstance(expires, datetime.datetime)
- new_data_ref.pop('user_id')
- new_data_ref.pop('id')
-
- self.assertEqual(data, new_data_ref)
-
- self.token_provider_api._persistence.delete_token(token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api._persistence.get_token, token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api._persistence.delete_token, token_id)
-
- def create_token_sample_data(self, token_id=None, tenant_id=None,
- trust_id=None, user_id=None, expires=None):
- if token_id is None:
- token_id = self._create_token_id()
- if user_id is None:
- user_id = 'testuserid'
- # FIXME(morganfainberg): These tokens look nothing like "Real" tokens.
- # This should be fixed when token issuance is cleaned up.
- data = {'id': token_id, 'a': 'b',
- 'user': {'id': user_id},
- 'access': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
- if tenant_id is not None:
- data['tenant'] = {'id': tenant_id, 'name': tenant_id}
- if tenant_id is NULL_OBJECT:
- data['tenant'] = None
- if expires is not None:
- data['expires'] = expires
- if trust_id is not None:
- data['trust_id'] = trust_id
- data['access'].setdefault('trust', {})
- # Testuserid2 is used here since a trustee will be different in
- # the cases of impersonation and therefore should not match the
- # token's user_id.
- data['access']['trust']['trustee_user_id'] = 'testuserid2'
- data['token_version'] = provider.V2
- # Issue token stores a copy of all token data at token['token_data'].
- # This emulates that assumption as part of the test.
- data['token_data'] = copy.deepcopy(data)
- new_token = self.token_provider_api._persistence.create_token(token_id,
- data)
- return new_token['id'], data
-
- def test_delete_tokens(self):
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid')
- self.assertEqual(0, len(tokens))
- token_id1, data = self.create_token_sample_data(
- tenant_id='testtenantid')
- token_id2, data = self.create_token_sample_data(
- tenant_id='testtenantid')
- token_id3, data = self.create_token_sample_data(
- tenant_id='testtenantid',
- user_id='testuserid1')
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid')
- self.assertEqual(2, len(tokens))
- self.assertIn(token_id2, tokens)
- self.assertIn(token_id1, tokens)
- self.token_provider_api._persistence.delete_tokens(
- user_id='testuserid',
- tenant_id='testtenantid')
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid')
- self.assertEqual(0, len(tokens))
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- token_id1)
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- token_id2)
-
- self.token_provider_api._persistence.get_token(token_id3)
-
- def test_delete_tokens_trust(self):
- tokens = self.token_provider_api._persistence._list_tokens(
- user_id='testuserid')
- self.assertEqual(0, len(tokens))
- token_id1, data = self.create_token_sample_data(
- tenant_id='testtenantid',
- trust_id='testtrustid')
- token_id2, data = self.create_token_sample_data(
- tenant_id='testtenantid',
- user_id='testuserid1',
- trust_id='testtrustid1')
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid')
- self.assertEqual(1, len(tokens))
- self.assertIn(token_id1, tokens)
- self.token_provider_api._persistence.delete_tokens(
- user_id='testuserid',
- tenant_id='testtenantid',
- trust_id='testtrustid')
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- token_id1)
- self.token_provider_api._persistence.get_token(token_id2)
-
- def _test_token_list(self, token_list_fn):
- tokens = token_list_fn('testuserid')
- self.assertEqual(0, len(tokens))
- token_id1, data = self.create_token_sample_data()
- tokens = token_list_fn('testuserid')
- self.assertEqual(1, len(tokens))
- self.assertIn(token_id1, tokens)
- token_id2, data = self.create_token_sample_data()
- tokens = token_list_fn('testuserid')
- self.assertEqual(2, len(tokens))
- self.assertIn(token_id2, tokens)
- self.assertIn(token_id1, tokens)
- self.token_provider_api._persistence.delete_token(token_id1)
- tokens = token_list_fn('testuserid')
- self.assertIn(token_id2, tokens)
- self.assertNotIn(token_id1, tokens)
- self.token_provider_api._persistence.delete_token(token_id2)
- tokens = token_list_fn('testuserid')
- self.assertNotIn(token_id2, tokens)
- self.assertNotIn(token_id1, tokens)
-
- # tenant-specific tokens
- tenant1 = uuid.uuid4().hex
- tenant2 = uuid.uuid4().hex
- token_id3, data = self.create_token_sample_data(tenant_id=tenant1)
- token_id4, data = self.create_token_sample_data(tenant_id=tenant2)
- # test for existing but empty tenant (LP:1078497)
- token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT)
- tokens = token_list_fn('testuserid')
- self.assertEqual(3, len(tokens))
- self.assertNotIn(token_id1, tokens)
- self.assertNotIn(token_id2, tokens)
- self.assertIn(token_id3, tokens)
- self.assertIn(token_id4, tokens)
- self.assertIn(token_id5, tokens)
- tokens = token_list_fn('testuserid', tenant2)
- self.assertEqual(1, len(tokens))
- self.assertNotIn(token_id1, tokens)
- self.assertNotIn(token_id2, tokens)
- self.assertNotIn(token_id3, tokens)
- self.assertIn(token_id4, tokens)
-
- def test_token_list(self):
- self._test_token_list(
- self.token_provider_api._persistence._list_tokens)
-
- def test_token_list_trust(self):
- trust_id = uuid.uuid4().hex
- token_id5, data = self.create_token_sample_data(trust_id=trust_id)
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid', trust_id=trust_id)
- self.assertEqual(1, len(tokens))
- self.assertIn(token_id5, tokens)
-
- def test_get_token_returns_not_found(self):
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- uuid.uuid4().hex)
-
- def test_delete_token_returns_not_found(self):
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.delete_token,
- uuid.uuid4().hex)
-
- def test_expired_token(self):
- token_id = uuid.uuid4().hex
- expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)
- data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
- 'expires': expire_time,
- 'trust_id': None,
- 'user': {'id': 'testuserid'}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- data_ref.pop('user_id')
- self.assertDictEqual(data, data_ref)
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- token_id)
-
- def test_null_expires_token(self):
- token_id = uuid.uuid4().hex
- data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None,
- 'user': {'id': 'testuserid'}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- self.assertIsNotNone(data_ref['expires'])
- new_data_ref = self.token_provider_api._persistence.get_token(token_id)
-
- # MySQL doesn't store microseconds, so discard them before testing
- data_ref['expires'] = data_ref['expires'].replace(microsecond=0)
- new_data_ref['expires'] = new_data_ref['expires'].replace(
- microsecond=0)
-
- self.assertEqual(data_ref, new_data_ref)
-
- def check_list_revoked_tokens(self, token_infos):
- revocation_list = self.token_provider_api.list_revoked_tokens()
- revoked_ids = [x['id'] for x in revocation_list]
- revoked_audit_ids = [x['audit_id'] for x in revocation_list]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- for token_id, audit_id in token_infos:
- self.assertIn(token_id, revoked_ids)
- self.assertIn(audit_id, revoked_audit_ids)
-
- def delete_token(self):
- token_id = uuid.uuid4().hex
- audit_id = uuid.uuid4().hex
- data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
- 'user': {'id': 'testuserid'},
- 'token_data': {'token': {'audit_ids': [audit_id]}}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- self.token_provider_api._persistence.delete_token(token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- data_ref['id'])
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api._persistence.delete_token,
- data_ref['id'])
- return (token_id, audit_id)
-
- def test_list_revoked_tokens_returns_empty_list(self):
- revoked_ids = [x['id']
- for x in self.token_provider_api.list_revoked_tokens()]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- self.assertEqual([], revoked_ids)
-
- def test_list_revoked_tokens_for_single_token(self):
- self.check_list_revoked_tokens([self.delete_token()])
-
- def test_list_revoked_tokens_for_multiple_tokens(self):
- self.check_list_revoked_tokens([self.delete_token()
- for x in range(2)])
-
- def test_flush_expired_token(self):
- token_id = uuid.uuid4().hex
- expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1)
- data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
- 'expires': expire_time,
- 'trust_id': None,
- 'user': {'id': 'testuserid'}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- data_ref.pop('user_id')
- self.assertDictEqual(data, data_ref)
-
- token_id = uuid.uuid4().hex
- expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)
- data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
- 'expires': expire_time,
- 'trust_id': None,
- 'user': {'id': 'testuserid'}}
- data_ref = self.token_provider_api._persistence.create_token(token_id,
- data)
- data_ref.pop('user_id')
- self.assertDictEqual(data, data_ref)
-
- self.token_provider_api._persistence.flush_expired_tokens()
- tokens = self.token_provider_api._persistence._list_tokens(
- 'testuserid')
- self.assertEqual(1, len(tokens))
- self.assertIn(token_id, tokens)
-
- @unit.skip_if_cache_disabled('token')
- def test_revocation_list_cache(self):
- expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10)
- token_id = uuid.uuid4().hex
- token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b',
- 'expires': expire_time,
- 'trust_id': None,
- 'user': {'id': 'testuserid'},
- 'token_data': {'token': {
- 'audit_ids': [uuid.uuid4().hex]}}}
- token2_id = uuid.uuid4().hex
- token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b',
- 'expires': expire_time,
- 'trust_id': None,
- 'user': {'id': 'testuserid'},
- 'token_data': {'token': {
- 'audit_ids': [uuid.uuid4().hex]}}}
- # Create 2 Tokens.
- self.token_provider_api._persistence.create_token(token_id,
- token_data)
- self.token_provider_api._persistence.create_token(token2_id,
- token2_data)
- # Verify the revocation list is empty.
- self.assertEqual(
- [], self.token_provider_api._persistence.list_revoked_tokens())
- self.assertEqual([], self.token_provider_api.list_revoked_tokens())
- # Delete a token directly, bypassing the manager.
- self.token_provider_api._persistence.driver.delete_token(token_id)
- # Verify the revocation list is still empty.
- self.assertEqual(
- [], self.token_provider_api._persistence.list_revoked_tokens())
- self.assertEqual([], self.token_provider_api.list_revoked_tokens())
- # Invalidate the revocation list.
- self.token_provider_api._persistence.invalidate_revocation_list()
- # Verify the deleted token is in the revocation list.
- revoked_ids = [x['id']
- for x in self.token_provider_api.list_revoked_tokens()]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- self.assertIn(token_id, revoked_ids)
- # Delete the second token, through the manager
- self.token_provider_api._persistence.delete_token(token2_id)
- revoked_ids = [x['id']
- for x in self.token_provider_api.list_revoked_tokens()]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- # Verify both tokens are in the revocation list.
- self.assertIn(token_id, revoked_ids)
- self.assertIn(token2_id, revoked_ids)
-
- def _test_predictable_revoked_pki_token_id(self, hash_fn):
- token_id = self._create_token_id()
- token_id_hash = hash_fn(token_id.encode('utf-8')).hexdigest()
- token = {'user': {'id': uuid.uuid4().hex},
- 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
-
- self.token_provider_api._persistence.create_token(token_id, token)
- self.token_provider_api._persistence.delete_token(token_id)
-
- revoked_ids = [x['id']
- for x in self.token_provider_api.list_revoked_tokens()]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- self.assertIn(token_id_hash, revoked_ids)
- self.assertNotIn(token_id, revoked_ids)
- for t in self.token_provider_api._persistence.list_revoked_tokens():
- self.assertIn('expires', t)
-
- def test_predictable_revoked_pki_token_id_default(self):
- self._test_predictable_revoked_pki_token_id(hashlib.md5)
-
- def test_predictable_revoked_pki_token_id_sha256(self):
- self.config_fixture.config(group='token', hash_algorithm='sha256')
- self._test_predictable_revoked_pki_token_id(hashlib.sha256)
-
- def test_predictable_revoked_uuid_token_id(self):
- token_id = uuid.uuid4().hex
- token = {'user': {'id': uuid.uuid4().hex},
- 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}}
-
- self.token_provider_api._persistence.create_token(token_id, token)
- self.token_provider_api._persistence.delete_token(token_id)
-
- revoked_tokens = self.token_provider_api.list_revoked_tokens()
- revoked_ids = [x['id'] for x in revoked_tokens]
- self._assert_revoked_token_list_matches_token_persistence(revoked_ids)
- self.assertIn(token_id, revoked_ids)
- for t in revoked_tokens:
- self.assertIn('expires', t)
-
- def test_create_unicode_token_id(self):
- token_id = six.text_type(self._create_token_id())
- self.create_token_sample_data(token_id=token_id)
- self.token_provider_api._persistence.get_token(token_id)
-
- def test_create_unicode_user_id(self):
- user_id = six.text_type(uuid.uuid4().hex)
- token_id, data = self.create_token_sample_data(user_id=user_id)
- self.token_provider_api._persistence.get_token(token_id)
-
- def test_token_expire_timezone(self):
-
- @test_utils.timezone
- def _create_token(expire_time):
- token_id = uuid.uuid4().hex
- user_id = six.text_type(uuid.uuid4().hex)
- return self.create_token_sample_data(token_id=token_id,
- user_id=user_id,
- expires=expire_time)
-
- for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']:
- test_utils.TZ = 'UTC' + d
- expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1)
- token_id, data_in = _create_token(expire_time)
- data_get = self.token_provider_api._persistence.get_token(token_id)
-
- self.assertEqual(data_in['id'], data_get['id'],
- 'TZ=%s' % test_utils.TZ)
-
- expire_time_expired = (
- timeutils.utcnow() + datetime.timedelta(minutes=-1))
- token_id, data_in = _create_token(expire_time_expired)
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api._persistence.get_token,
- data_in['id'])
-
-
-class TokenCacheInvalidation(object):
- def _create_test_data(self):
- self.user = unit.new_user_ref(
- domain_id=CONF.identity.default_domain_id)
- self.tenant = unit.new_project_ref(
- domain_id=CONF.identity.default_domain_id)
-
- # Create an equivalent of a scoped token
- token_dict = {'user': self.user, 'tenant': self.tenant,
- 'metadata': {}, 'id': 'placeholder'}
- token_id, data = self.token_provider_api.issue_v2_token(token_dict)
- self.scoped_token_id = token_id
-
- # ..and an un-scoped one
- token_dict = {'user': self.user, 'tenant': None,
- 'metadata': {}, 'id': 'placeholder'}
- token_id, data = self.token_provider_api.issue_v2_token(token_dict)
- self.unscoped_token_id = token_id
-
- # Validate them, in the various ways possible - this will load the
- # responses into the token cache.
- self._check_scoped_tokens_are_valid()
- self._check_unscoped_tokens_are_valid()
-
- def _check_unscoped_tokens_are_invalid(self):
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_token,
- self.unscoped_token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_v2_token,
- self.unscoped_token_id)
-
- def _check_scoped_tokens_are_invalid(self):
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_token,
- self.scoped_token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_token,
- self.scoped_token_id,
- self.tenant['id'])
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_v2_token,
- self.scoped_token_id)
- self.assertRaises(
- exception.TokenNotFound,
- self.token_provider_api.validate_v2_token,
- self.scoped_token_id,
- self.tenant['id'])
-
- def _check_scoped_tokens_are_valid(self):
- self.token_provider_api.validate_token(self.scoped_token_id)
- self.token_provider_api.validate_token(
- self.scoped_token_id, belongs_to=self.tenant['id'])
- self.token_provider_api.validate_v2_token(self.scoped_token_id)
- self.token_provider_api.validate_v2_token(
- self.scoped_token_id, belongs_to=self.tenant['id'])
-
- def _check_unscoped_tokens_are_valid(self):
- self.token_provider_api.validate_token(self.unscoped_token_id)
- self.token_provider_api.validate_v2_token(self.unscoped_token_id)
-
- def test_delete_unscoped_token(self):
- self.token_provider_api._persistence.delete_token(
- self.unscoped_token_id)
- self._check_unscoped_tokens_are_invalid()
- self._check_scoped_tokens_are_valid()
-
- def test_delete_scoped_token_by_id(self):
- self.token_provider_api._persistence.delete_token(self.scoped_token_id)
- self._check_scoped_tokens_are_invalid()
- self._check_unscoped_tokens_are_valid()
-
- def test_delete_scoped_token_by_user(self):
- self.token_provider_api._persistence.delete_tokens(self.user['id'])
- # Since we are deleting all tokens for this user, they should all
- # now be invalid.
- self._check_scoped_tokens_are_invalid()
- self._check_unscoped_tokens_are_invalid()
-
- def test_delete_scoped_token_by_user_and_tenant(self):
- self.token_provider_api._persistence.delete_tokens(
- self.user['id'],
- tenant_id=self.tenant['id'])
- self._check_scoped_tokens_are_invalid()
- self._check_unscoped_tokens_are_valid()
diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
deleted file mode 100644
index 5f51d7b3..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py
+++ /dev/null
@@ -1,611 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import base64
-import datetime
-import hashlib
-import os
-import uuid
-
-import msgpack
-from oslo_utils import timeutils
-from six.moves import urllib
-
-from keystone.common import config
-from keystone.common import utils
-from keystone import exception
-from keystone.federation import constants as federation_constants
-from keystone.tests import unit
-from keystone.tests.unit import ksfixtures
-from keystone.tests.unit.ksfixtures import database
-from keystone.token import provider
-from keystone.token.providers import fernet
-from keystone.token.providers.fernet import token_formatters
-from keystone.token.providers.fernet import utils as fernet_utils
-
-
-CONF = config.CONF
-
-
-class TestFernetTokenProvider(unit.TestCase):
- def setUp(self):
- super(TestFernetTokenProvider, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
- self.provider = fernet.Provider()
-
- def test_supports_bind_authentication_returns_false(self):
- self.assertFalse(self.provider._supports_bind_authentication)
-
- def test_needs_persistence_returns_false(self):
- self.assertFalse(self.provider.needs_persistence())
-
- def test_invalid_v3_token_raises_token_not_found(self):
- # NOTE(lbragstad): Here we use the validate_non_persistent_token()
- # methods because the validate_v3_token() method is strictly for
- # validating UUID formatted tokens. It is written to assume cached
- # tokens from a backend, where validate_non_persistent_token() is not.
- token_id = uuid.uuid4().hex
- e = self.assertRaises(
- exception.TokenNotFound,
- self.provider.validate_non_persistent_token,
- token_id)
- self.assertIn(token_id, u'%s' % e)
-
- def test_invalid_v2_token_raises_token_not_found(self):
- token_id = uuid.uuid4().hex
- e = self.assertRaises(
- exception.TokenNotFound,
- self.provider.validate_non_persistent_token,
- token_id)
- self.assertIn(token_id, u'%s' % e)
-
-
-class TestValidate(unit.TestCase):
- def setUp(self):
- super(TestValidate, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
- self.useFixture(database.Database())
- self.load_backends()
-
- def config_overrides(self):
- super(TestValidate, self).config_overrides()
- self.config_fixture.config(group='token', provider='fernet')
-
- def test_validate_v3_token_simple(self):
- # Check the fields in the token result when use validate_v3_token
- # with a simple token.
-
- domain_ref = unit.new_domain_ref()
- domain_ref = self.resource_api.create_domain(domain_ref['id'],
- domain_ref)
-
- user_ref = unit.new_user_ref(domain_ref['id'])
- user_ref = self.identity_api.create_user(user_ref)
-
- method_names = ['password']
- token_id, token_data_ = self.token_provider_api.issue_v3_token(
- user_ref['id'], method_names)
-
- token_data = self.token_provider_api.validate_v3_token(token_id)
- token = token_data['token']
- self.assertIsInstance(token['audit_ids'], list)
- self.assertIsInstance(token['expires_at'], str)
- self.assertIsInstance(token['issued_at'], str)
- self.assertEqual(method_names, token['methods'])
- exp_user_info = {
- 'id': user_ref['id'],
- 'name': user_ref['name'],
- 'domain': {
- 'id': domain_ref['id'],
- 'name': domain_ref['name'],
- },
- }
- self.assertEqual(exp_user_info, token['user'])
-
- def test_validate_v3_token_federated_info(self):
- # Check the user fields in the token result when use validate_v3_token
- # when the token has federated info.
-
- domain_ref = unit.new_domain_ref()
- domain_ref = self.resource_api.create_domain(domain_ref['id'],
- domain_ref)
-
- user_ref = unit.new_user_ref(domain_ref['id'])
- user_ref = self.identity_api.create_user(user_ref)
-
- method_names = ['mapped']
-
- group_ids = [uuid.uuid4().hex, ]
- identity_provider = uuid.uuid4().hex
- protocol = uuid.uuid4().hex
- auth_context = {
- 'user_id': user_ref['id'],
- 'group_ids': group_ids,
- federation_constants.IDENTITY_PROVIDER: identity_provider,
- federation_constants.PROTOCOL: protocol,
- }
- token_id, token_data_ = self.token_provider_api.issue_v3_token(
- user_ref['id'], method_names, auth_context=auth_context)
-
- token_data = self.token_provider_api.validate_v3_token(token_id)
- token = token_data['token']
- exp_user_info = {
- 'id': user_ref['id'],
- 'name': user_ref['id'],
- 'domain': {'id': CONF.federation.federated_domain_name,
- 'name': CONF.federation.federated_domain_name, },
- federation_constants.FEDERATION: {
- 'groups': [{'id': group_id} for group_id in group_ids],
- 'identity_provider': {'id': identity_provider, },
- 'protocol': {'id': protocol, },
- },
- }
- self.assertEqual(exp_user_info, token['user'])
-
- def test_validate_v3_token_trust(self):
- # Check the trust fields in the token result when use validate_v3_token
- # when the token has trust info.
-
- domain_ref = unit.new_domain_ref()
- domain_ref = self.resource_api.create_domain(domain_ref['id'],
- domain_ref)
-
- user_ref = unit.new_user_ref(domain_ref['id'])
- user_ref = self.identity_api.create_user(user_ref)
-
- trustor_user_ref = unit.new_user_ref(domain_ref['id'])
- trustor_user_ref = self.identity_api.create_user(trustor_user_ref)
-
- project_ref = unit.new_project_ref(domain_id=domain_ref['id'])
- project_ref = self.resource_api.create_project(project_ref['id'],
- project_ref)
-
- role_ref = unit.new_role_ref()
- role_ref = self.role_api.create_role(role_ref['id'], role_ref)
-
- self.assignment_api.create_grant(
- role_ref['id'], user_id=user_ref['id'],
- project_id=project_ref['id'])
-
- self.assignment_api.create_grant(
- role_ref['id'], user_id=trustor_user_ref['id'],
- project_id=project_ref['id'])
-
- trustor_user_id = trustor_user_ref['id']
- trustee_user_id = user_ref['id']
- trust_ref = unit.new_trust_ref(
- trustor_user_id, trustee_user_id, project_id=project_ref['id'],
- role_ids=[role_ref['id'], ])
- trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref,
- trust_ref['roles'])
-
- method_names = ['password']
-
- token_id, token_data_ = self.token_provider_api.issue_v3_token(
- user_ref['id'], method_names, project_id=project_ref['id'],
- trust=trust_ref)
-
- token_data = self.token_provider_api.validate_v3_token(token_id)
- token = token_data['token']
- exp_trust_info = {
- 'id': trust_ref['id'],
- 'impersonation': False,
- 'trustee_user': {'id': user_ref['id'], },
- 'trustor_user': {'id': trustor_user_ref['id'], },
- }
- self.assertEqual(exp_trust_info, token['OS-TRUST:trust'])
-
- def test_validate_v3_token_validation_error_exc(self):
- # When the token format isn't recognized, TokenNotFound is raised.
-
- # A uuid string isn't a valid Fernet token.
- token_id = uuid.uuid4().hex
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_v3_token, token_id)
-
-
-class TestTokenFormatter(unit.TestCase):
- def setUp(self):
- super(TestTokenFormatter, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
-
- def test_restore_padding(self):
- # 'a' will result in '==' padding, 'aa' will result in '=' padding, and
- # 'aaa' will result in no padding.
- binary_to_test = [b'a', b'aa', b'aaa']
-
- for binary in binary_to_test:
- # base64.urlsafe_b64encode takes six.binary_type and returns
- # six.binary_type.
- encoded_string = base64.urlsafe_b64encode(binary)
- encoded_string = encoded_string.decode('utf-8')
- # encoded_string is now six.text_type.
- encoded_str_without_padding = encoded_string.rstrip('=')
- self.assertFalse(encoded_str_without_padding.endswith('='))
- encoded_str_with_padding_restored = (
- token_formatters.TokenFormatter.restore_padding(
- encoded_str_without_padding)
- )
- self.assertEqual(encoded_string, encoded_str_with_padding_restored)
-
- def test_legacy_padding_validation(self):
- first_value = uuid.uuid4().hex
- second_value = uuid.uuid4().hex
- payload = (first_value, second_value)
- msgpack_payload = msgpack.packb(payload)
- # msgpack_payload is six.binary_type.
-
- tf = token_formatters.TokenFormatter()
-
- # NOTE(lbragstad): This method preserves the way that keystone used to
- # percent encode the tokens, prior to bug #1491926.
- def legacy_pack(payload):
- # payload is six.binary_type.
- encrypted_payload = tf.crypto.encrypt(payload)
- # encrypted_payload is six.binary_type.
-
- # the encrypted_payload is returned with padding appended
- self.assertTrue(encrypted_payload.endswith(b'='))
-
- # using urllib.parse.quote will percent encode the padding, like
- # keystone did in Kilo.
- percent_encoded_payload = urllib.parse.quote(encrypted_payload)
- # percent_encoded_payload is six.text_type.
-
- # ensure that the padding was actually percent encoded
- self.assertTrue(percent_encoded_payload.endswith('%3D'))
- return percent_encoded_payload
-
- token_with_legacy_padding = legacy_pack(msgpack_payload)
- # token_with_legacy_padding is six.text_type.
-
- # demonstrate the we can validate a payload that has been percent
- # encoded with the Fernet logic that existed in Kilo
- serialized_payload = tf.unpack(token_with_legacy_padding)
- # serialized_payload is six.binary_type.
- returned_payload = msgpack.unpackb(serialized_payload)
- # returned_payload contains six.binary_type.
- self.assertEqual(first_value, returned_payload[0].decode('utf-8'))
- self.assertEqual(second_value, returned_payload[1].decode('utf-8'))
-
-
-class TestPayloads(unit.TestCase):
- def assertTimestampsEqual(self, expected, actual):
- # The timestamp that we get back when parsing the payload may not
- # exactly match the timestamp that was put in the payload due to
- # conversion to and from a float.
-
- exp_time = timeutils.parse_isotime(expected)
- actual_time = timeutils.parse_isotime(actual)
-
- # the granularity of timestamp string is microseconds and it's only the
- # last digit in the representation that's different, so use a delta
- # just above nanoseconds.
- return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time,
- delta=1e-05)
-
- def test_uuid_hex_to_byte_conversions(self):
- payload_cls = token_formatters.BasePayload
-
- expected_hex_uuid = uuid.uuid4().hex
- uuid_obj = uuid.UUID(expected_hex_uuid)
- expected_uuid_in_bytes = uuid_obj.bytes
- actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes(
- expected_hex_uuid)
- self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes)
- actual_hex_uuid = payload_cls.convert_uuid_bytes_to_hex(
- expected_uuid_in_bytes)
- self.assertEqual(expected_hex_uuid, actual_hex_uuid)
-
- def test_time_string_to_float_conversions(self):
- payload_cls = token_formatters.BasePayload
-
- original_time_str = utils.isotime(subsecond=True)
- time_obj = timeutils.parse_isotime(original_time_str)
- expected_time_float = (
- (timeutils.normalize_time(time_obj) -
- datetime.datetime.utcfromtimestamp(0)).total_seconds())
-
- # NOTE(lbragstad): The token expiration time for Fernet tokens is
- # passed in the payload of the token. This is different from the token
- # creation time, which is handled by Fernet and doesn't support
- # subsecond precision because it is a timestamp integer.
- self.assertIsInstance(expected_time_float, float)
-
- actual_time_float = payload_cls._convert_time_string_to_float(
- original_time_str)
- self.assertIsInstance(actual_time_float, float)
- self.assertEqual(expected_time_float, actual_time_float)
-
- # Generate expected_time_str using the same time float. Using
- # original_time_str from utils.isotime will occasionally fail due to
- # floating point rounding differences.
- time_object = datetime.datetime.utcfromtimestamp(actual_time_float)
- expected_time_str = utils.isotime(time_object, subsecond=True)
-
- actual_time_str = payload_cls._convert_float_to_time_string(
- actual_time_float)
- self.assertEqual(expected_time_str, actual_time_str)
-
- def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None,
- exp_project_id=None, exp_domain_id=None,
- exp_trust_id=None, exp_federated_info=None,
- exp_access_token_id=None):
- exp_user_id = exp_user_id or uuid.uuid4().hex
- exp_methods = exp_methods or ['password']
- exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True)
- exp_audit_ids = [provider.random_urlsafe_str()]
-
- payload = payload_class.assemble(
- exp_user_id, exp_methods, exp_project_id, exp_domain_id,
- exp_expires_at, exp_audit_ids, exp_trust_id, exp_federated_info,
- exp_access_token_id)
-
- (user_id, methods, project_id,
- domain_id, expires_at, audit_ids,
- trust_id, federated_info,
- access_token_id) = payload_class.disassemble(payload)
-
- self.assertEqual(exp_user_id, user_id)
- self.assertEqual(exp_methods, methods)
- self.assertTimestampsEqual(exp_expires_at, expires_at)
- self.assertEqual(exp_audit_ids, audit_ids)
- self.assertEqual(exp_project_id, project_id)
- self.assertEqual(exp_domain_id, domain_id)
- self.assertEqual(exp_trust_id, trust_id)
- self.assertEqual(exp_access_token_id, access_token_id)
-
- if exp_federated_info:
- self.assertDictEqual(exp_federated_info, federated_info)
- else:
- self.assertIsNone(federated_info)
-
- def test_unscoped_payload(self):
- self._test_payload(token_formatters.UnscopedPayload)
-
- def test_project_scoped_payload(self):
- self._test_payload(token_formatters.ProjectScopedPayload,
- exp_project_id=uuid.uuid4().hex)
-
- def test_domain_scoped_payload(self):
- self._test_payload(token_formatters.DomainScopedPayload,
- exp_domain_id=uuid.uuid4().hex)
-
- def test_domain_scoped_payload_with_default_domain(self):
- self._test_payload(token_formatters.DomainScopedPayload,
- exp_domain_id=CONF.identity.default_domain_id)
-
- def test_trust_scoped_payload(self):
- self._test_payload(token_formatters.TrustScopedPayload,
- exp_project_id=uuid.uuid4().hex,
- exp_trust_id=uuid.uuid4().hex)
-
- def test_unscoped_payload_with_non_uuid_user_id(self):
- self._test_payload(token_formatters.UnscopedPayload,
- exp_user_id='someNonUuidUserId')
-
- def test_unscoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_payload(token_formatters.UnscopedPayload,
- exp_user_id='0123456789abcdef')
-
- def test_project_scoped_payload_with_non_uuid_ids(self):
- self._test_payload(token_formatters.ProjectScopedPayload,
- exp_user_id='someNonUuidUserId',
- exp_project_id='someNonUuidProjectId')
-
- def test_project_scoped_payload_with_16_char_non_uuid_ids(self):
- self._test_payload(token_formatters.ProjectScopedPayload,
- exp_user_id='0123456789abcdef',
- exp_project_id='0123456789abcdef')
-
- def test_domain_scoped_payload_with_non_uuid_user_id(self):
- self._test_payload(token_formatters.DomainScopedPayload,
- exp_user_id='nonUuidUserId',
- exp_domain_id=uuid.uuid4().hex)
-
- def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self):
- self._test_payload(token_formatters.DomainScopedPayload,
- exp_user_id='0123456789abcdef',
- exp_domain_id=uuid.uuid4().hex)
-
- def test_trust_scoped_payload_with_non_uuid_ids(self):
- self._test_payload(token_formatters.TrustScopedPayload,
- exp_user_id='someNonUuidUserId',
- exp_project_id='someNonUuidProjectId',
- exp_trust_id=uuid.uuid4().hex)
-
- def test_trust_scoped_payload_with_16_char_non_uuid_ids(self):
- self._test_payload(token_formatters.TrustScopedPayload,
- exp_user_id='0123456789abcdef',
- exp_project_id='0123456789abcdef',
- exp_trust_id=uuid.uuid4().hex)
-
- def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id):
- exp_federated_info = {'group_ids': [{'id': exp_group_id}],
- 'idp_id': uuid.uuid4().hex,
- 'protocol_id': uuid.uuid4().hex}
-
- self._test_payload(token_formatters.FederatedUnscopedPayload,
- exp_user_id=exp_user_id,
- exp_federated_info=exp_federated_info)
-
- def test_federated_payload_with_non_uuid_ids(self):
- self._test_federated_payload_with_ids('someNonUuidUserId',
- 'someNonUuidGroupId')
-
- def test_federated_payload_with_16_char_non_uuid_ids(self):
- self._test_federated_payload_with_ids('0123456789abcdef',
- '0123456789abcdef')
-
- def test_federated_project_scoped_payload(self):
- exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
- 'idp_id': uuid.uuid4().hex,
- 'protocol_id': uuid.uuid4().hex}
-
- self._test_payload(token_formatters.FederatedProjectScopedPayload,
- exp_user_id='someNonUuidUserId',
- exp_methods=['token'],
- exp_project_id=uuid.uuid4().hex,
- exp_federated_info=exp_federated_info)
-
- def test_federated_domain_scoped_payload(self):
- exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}],
- 'idp_id': uuid.uuid4().hex,
- 'protocol_id': uuid.uuid4().hex}
-
- self._test_payload(token_formatters.FederatedDomainScopedPayload,
- exp_user_id='someNonUuidUserId',
- exp_methods=['token'],
- exp_domain_id=uuid.uuid4().hex,
- exp_federated_info=exp_federated_info)
-
- def test_oauth_scoped_payload(self):
- self._test_payload(token_formatters.OauthScopedPayload,
- exp_project_id=uuid.uuid4().hex,
- exp_access_token_id=uuid.uuid4().hex)
-
-
-class TestFernetKeyRotation(unit.TestCase):
- def setUp(self):
- super(TestFernetKeyRotation, self).setUp()
-
- # A collection of all previously-seen signatures of the key
- # repository's contents.
- self.key_repo_signatures = set()
-
- @property
- def keys(self):
- """Key files converted to numbers."""
- return sorted(
- int(x) for x in os.listdir(CONF.fernet_tokens.key_repository))
-
- @property
- def key_repository_size(self):
- """The number of keys in the key repository."""
- return len(self.keys)
-
- @property
- def key_repository_signature(self):
- """Create a "thumbprint" of the current key repository.
-
- Because key files are renamed, this produces a hash of the contents of
- the key files, ignoring their filenames.
-
- The resulting signature can be used, for example, to ensure that you
- have a unique set of keys after you perform a key rotation (taking a
- static set of keys, and simply shuffling them, would fail such a test).
-
- """
- # Load the keys into a list, keys is list of six.text_type.
- keys = fernet_utils.load_keys()
-
- # Sort the list of keys by the keys themselves (they were previously
- # sorted by filename).
- keys.sort()
-
- # Create the thumbprint using all keys in the repository.
- signature = hashlib.sha1()
- for key in keys:
- # Need to convert key to six.binary_type for update.
- signature.update(key.encode('utf-8'))
- return signature.hexdigest()
-
- def assertRepositoryState(self, expected_size):
- """Validate the state of the key repository."""
- self.assertEqual(expected_size, self.key_repository_size)
- self.assertUniqueRepositoryState()
-
- def assertUniqueRepositoryState(self):
- """Ensures that the current key repo state has not been seen before."""
- # This is assigned to a variable because it takes some work to
- # calculate.
- signature = self.key_repository_signature
-
- # Ensure the signature is not in the set of previously seen signatures.
- self.assertNotIn(signature, self.key_repo_signatures)
-
- # Add the signature to the set of repository signatures to validate
- # that we don't see it again later.
- self.key_repo_signatures.add(signature)
-
- def test_rotation(self):
- # Initializing a key repository results in this many keys. We don't
- # support max_active_keys being set any lower.
- min_active_keys = 2
-
- # Simulate every rotation strategy up to "rotating once a week while
- # maintaining a year's worth of keys."
- for max_active_keys in range(min_active_keys, 52 + 1):
- self.config_fixture.config(group='fernet_tokens',
- max_active_keys=max_active_keys)
-
- # Ensure that resetting the key repository always results in 2
- # active keys.
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
-
- # Validate the initial repository state.
- self.assertRepositoryState(expected_size=min_active_keys)
-
- # The repository should be initialized with a staged key (0) and a
- # primary key (1). The next key is just auto-incremented.
- exp_keys = [0, 1]
- next_key_number = exp_keys[-1] + 1 # keep track of next key
- self.assertEqual(exp_keys, self.keys)
-
- # Rotate the keys just enough times to fully populate the key
- # repository.
- for rotation in range(max_active_keys - min_active_keys):
- fernet_utils.rotate_keys()
- self.assertRepositoryState(expected_size=rotation + 3)
-
- exp_keys.append(next_key_number)
- next_key_number += 1
- self.assertEqual(exp_keys, self.keys)
-
- # We should have a fully populated key repository now.
- self.assertEqual(max_active_keys, self.key_repository_size)
-
- # Rotate an additional number of times to ensure that we maintain
- # the desired number of active keys.
- for rotation in range(10):
- fernet_utils.rotate_keys()
- self.assertRepositoryState(expected_size=max_active_keys)
-
- exp_keys.pop(1)
- exp_keys.append(next_key_number)
- next_key_number += 1
- self.assertEqual(exp_keys, self.keys)
-
- def test_non_numeric_files(self):
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
- evil_file = os.path.join(CONF.fernet_tokens.key_repository, '99.bak')
- with open(evil_file, 'w'):
- pass
- fernet_utils.rotate_keys()
- self.assertTrue(os.path.isfile(evil_file))
- keys = 0
- for x in os.listdir(CONF.fernet_tokens.key_repository):
- if x == '99.bak':
- continue
- keys += 1
- self.assertEqual(3, keys)
-
-
-class TestLoadKeys(unit.TestCase):
- def test_non_numeric_files(self):
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
- evil_file = os.path.join(CONF.fernet_tokens.key_repository, '~1')
- with open(evil_file, 'w'):
- pass
- keys = fernet_utils.load_keys()
- self.assertEqual(2, len(keys))
- self.assertTrue(len(keys[0]))
diff --git a/keystone-moon/keystone/tests/unit/token/test_pki_provider.py b/keystone-moon/keystone/tests/unit/token/test_pki_provider.py
deleted file mode 100644
index b3ad4c2b..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_pki_provider.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from keystone.tests import unit
-from keystone.token.providers import pki
-
-
-class TestPkiTokenProvider(unit.TestCase):
- def setUp(self):
- super(TestPkiTokenProvider, self).setUp()
- self.provider = pki.Provider()
-
- def test_supports_bind_authentication_returns_true(self):
- self.assertTrue(self.provider._supports_bind_authentication)
-
- def test_need_persistence_return_true(self):
- self.assertIs(True, self.provider.needs_persistence())
diff --git a/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py b/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py
deleted file mode 100644
index 1ffe7cfc..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from keystone.tests import unit
-from keystone.token.providers import pkiz
-
-
-class TestPkizTokenProvider(unit.TestCase):
- def setUp(self):
- super(TestPkizTokenProvider, self).setUp()
- self.provider = pkiz.Provider()
-
- def test_supports_bind_authentication_returns_true(self):
- self.assertTrue(self.provider._supports_bind_authentication)
-
- def test_need_persistence_return_true(self):
- self.assertIs(True, self.provider.needs_persistence())
diff --git a/keystone-moon/keystone/tests/unit/token/test_provider.py b/keystone-moon/keystone/tests/unit/token/test_provider.py
deleted file mode 100644
index 7093f3ba..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_provider.py
+++ /dev/null
@@ -1,30 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-from six.moves import urllib
-
-from keystone.tests import unit
-from keystone.token import provider
-
-
-class TestRandomStrings(unit.BaseTestCase):
- def test_strings_are_url_safe(self):
- s = provider.random_urlsafe_str()
- self.assertEqual(s, urllib.parse.quote_plus(s))
-
- def test_strings_can_be_converted_to_bytes(self):
- s = provider.random_urlsafe_str()
- self.assertIsInstance(s, six.text_type)
-
- b = provider.random_urlsafe_str_to_bytes(s)
- self.assertIsInstance(b, six.binary_type)
diff --git a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py
deleted file mode 100644
index 9e8c3889..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import base64
-import uuid
-
-from testtools import matchers
-
-from keystone import exception
-from keystone.tests import unit
-from keystone.token.providers import common
-
-
-class TestTokenDataHelper(unit.TestCase):
- def setUp(self):
- super(TestTokenDataHelper, self).setUp()
- self.load_backends()
- self.v3_data_helper = common.V3TokenDataHelper()
-
- def test_v3_token_data_helper_populate_audit_info_string(self):
- token_data = {}
- audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]
- audit_info = audit_info_bytes.decode('utf-8')
- self.v3_data_helper._populate_audit_info(token_data, audit_info)
- self.assertIn(audit_info, token_data['audit_ids'])
- self.assertThat(token_data['audit_ids'], matchers.HasLength(2))
-
- def test_v3_token_data_helper_populate_audit_info_none(self):
- token_data = {}
- self.v3_data_helper._populate_audit_info(token_data, audit_info=None)
- self.assertThat(token_data['audit_ids'], matchers.HasLength(1))
- self.assertNotIn(None, token_data['audit_ids'])
-
- def test_v3_token_data_helper_populate_audit_info_list(self):
- token_data = {}
- audit_info = [base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2],
- base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]]
- self.v3_data_helper._populate_audit_info(token_data, audit_info)
- self.assertEqual(audit_info, token_data['audit_ids'])
-
- def test_v3_token_data_helper_populate_audit_info_invalid(self):
- token_data = {}
- audit_info = dict()
- self.assertRaises(exception.UnexpectedError,
- self.v3_data_helper._populate_audit_info,
- token_data=token_data,
- audit_info=audit_info)
diff --git a/keystone-moon/keystone/tests/unit/token/test_token_model.py b/keystone-moon/keystone/tests/unit/token/test_token_model.py
deleted file mode 100644
index 1cb0ef55..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_token_model.py
+++ /dev/null
@@ -1,263 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import uuid
-
-from oslo_config import cfg
-from oslo_utils import timeutils
-from six.moves import range
-
-from keystone import exception
-from keystone.federation import constants as federation_constants
-from keystone.models import token_model
-from keystone.tests.unit import core
-from keystone.tests.unit import test_token_provider
-
-
-CONF = cfg.CONF
-
-
-class TestKeystoneTokenModel(core.TestCase):
- def setUp(self):
- super(TestKeystoneTokenModel, self).setUp()
- self.v2_sample_token = copy.deepcopy(
- test_token_provider.SAMPLE_V2_TOKEN)
- self.v3_sample_token = copy.deepcopy(
- test_token_provider.SAMPLE_V3_TOKEN)
-
- def test_token_model_v3(self):
- token_data = token_model.KeystoneToken(uuid.uuid4().hex,
- self.v3_sample_token)
- self.assertIs(token_model.V3, token_data.version)
- expires = timeutils.normalize_time(timeutils.parse_isotime(
- self.v3_sample_token['token']['expires_at']))
- issued = timeutils.normalize_time(timeutils.parse_isotime(
- self.v3_sample_token['token']['issued_at']))
- self.assertEqual(expires, token_data.expires)
- self.assertEqual(issued, token_data.issued)
- self.assertEqual(self.v3_sample_token['token']['user']['id'],
- token_data.user_id)
- self.assertEqual(self.v3_sample_token['token']['user']['name'],
- token_data.user_name)
- self.assertEqual(self.v3_sample_token['token']['user']['domain']['id'],
- token_data.user_domain_id)
- self.assertEqual(
- self.v3_sample_token['token']['user']['domain']['name'],
- token_data.user_domain_name)
- self.assertEqual(
- self.v3_sample_token['token']['project']['domain']['id'],
- token_data.project_domain_id)
- self.assertEqual(
- self.v3_sample_token['token']['project']['domain']['name'],
- token_data.project_domain_name)
- self.assertEqual(self.v3_sample_token['token']['OS-TRUST:trust']['id'],
- token_data.trust_id)
- self.assertEqual(
- self.v3_sample_token['token']['OS-TRUST:trust']['trustor_user_id'],
- token_data.trustor_user_id)
- self.assertEqual(
- self.v3_sample_token['token']['OS-TRUST:trust']['trustee_user_id'],
- token_data.trustee_user_id)
- # Project Scoped Token
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'domain_id')
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'domain_name')
- self.assertFalse(token_data.domain_scoped)
- self.assertEqual(self.v3_sample_token['token']['project']['id'],
- token_data.project_id)
- self.assertEqual(self.v3_sample_token['token']['project']['name'],
- token_data.project_name)
- self.assertTrue(token_data.project_scoped)
- self.assertTrue(token_data.scoped)
- self.assertTrue(token_data.trust_scoped)
- self.assertEqual(
- [r['id'] for r in self.v3_sample_token['token']['roles']],
- token_data.role_ids)
- self.assertEqual(
- [r['name'] for r in self.v3_sample_token['token']['roles']],
- token_data.role_names)
- token_data.pop('project')
- self.assertFalse(token_data.project_scoped)
- self.assertFalse(token_data.scoped)
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_id')
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_name')
- self.assertFalse(token_data.project_scoped)
- domain_id = uuid.uuid4().hex
- domain_name = uuid.uuid4().hex
- token_data['domain'] = {'id': domain_id,
- 'name': domain_name}
- self.assertEqual(domain_id, token_data.domain_id)
- self.assertEqual(domain_name, token_data.domain_name)
- self.assertTrue(token_data.domain_scoped)
-
- token_data['audit_ids'] = [uuid.uuid4().hex]
- self.assertEqual(token_data.audit_id,
- token_data['audit_ids'][0])
- self.assertEqual(token_data.audit_chain_id,
- token_data['audit_ids'][0])
- token_data['audit_ids'].append(uuid.uuid4().hex)
- self.assertEqual(token_data.audit_chain_id,
- token_data['audit_ids'][1])
- del token_data['audit_ids']
- self.assertIsNone(token_data.audit_id)
- self.assertIsNone(token_data.audit_chain_id)
-
- def test_token_model_v3_federated_user(self):
- token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=self.v3_sample_token)
- federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
- 'protocol': {'id': 'saml2'},
- 'groups': [{'id': uuid.uuid4().hex}
- for x in range(1, 5)]}
-
- self.assertFalse(token_data.is_federated_user)
- self.assertEqual([], token_data.federation_group_ids)
- self.assertIsNone(token_data.federation_protocol_id)
- self.assertIsNone(token_data.federation_idp_id)
-
- token_data['user'][federation_constants.FEDERATION] = federation_data
-
- self.assertTrue(token_data.is_federated_user)
- self.assertEqual([x['id'] for x in federation_data['groups']],
- token_data.federation_group_ids)
- self.assertEqual(federation_data['protocol']['id'],
- token_data.federation_protocol_id)
- self.assertEqual(federation_data['identity_provider']['id'],
- token_data.federation_idp_id)
-
- def test_token_model_v2_federated_user(self):
- token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=self.v2_sample_token)
- federation_data = {'identity_provider': {'id': uuid.uuid4().hex},
- 'protocol': {'id': 'saml2'},
- 'groups': [{'id': uuid.uuid4().hex}
- for x in range(1, 5)]}
- self.assertFalse(token_data.is_federated_user)
- self.assertEqual([], token_data.federation_group_ids)
- self.assertIsNone(token_data.federation_protocol_id)
- self.assertIsNone(token_data.federation_idp_id)
-
- token_data['user'][federation_constants.FEDERATION] = federation_data
-
- # Federated users should not exist in V2, the data should remain empty
- self.assertFalse(token_data.is_federated_user)
- self.assertEqual([], token_data.federation_group_ids)
- self.assertIsNone(token_data.federation_protocol_id)
- self.assertIsNone(token_data.federation_idp_id)
-
- def test_token_model_v2(self):
- token_data = token_model.KeystoneToken(uuid.uuid4().hex,
- self.v2_sample_token)
- self.assertIs(token_model.V2, token_data.version)
- expires = timeutils.normalize_time(timeutils.parse_isotime(
- self.v2_sample_token['access']['token']['expires']))
- issued = timeutils.normalize_time(timeutils.parse_isotime(
- self.v2_sample_token['access']['token']['issued_at']))
- self.assertEqual(expires, token_data.expires)
- self.assertEqual(issued, token_data.issued)
- self.assertEqual(self.v2_sample_token['access']['user']['id'],
- token_data.user_id)
- self.assertEqual(self.v2_sample_token['access']['user']['name'],
- token_data.user_name)
- self.assertEqual(CONF.identity.default_domain_id,
- token_data.user_domain_id)
- self.assertEqual('Default', token_data.user_domain_name)
- self.assertEqual(CONF.identity.default_domain_id,
- token_data.project_domain_id)
- self.assertEqual('Default',
- token_data.project_domain_name)
- self.assertEqual(self.v2_sample_token['access']['trust']['id'],
- token_data.trust_id)
- self.assertEqual(
- self.v2_sample_token['access']['trust']['trustor_user_id'],
- token_data.trustor_user_id)
- self.assertEqual(
- self.v2_sample_token['access']['trust']['impersonation'],
- token_data.trust_impersonation)
- self.assertEqual(
- self.v2_sample_token['access']['trust']['trustee_user_id'],
- token_data.trustee_user_id)
- # Project Scoped Token
- self.assertEqual(
- self.v2_sample_token['access']['token']['tenant']['id'],
- token_data.project_id)
- self.assertEqual(
- self.v2_sample_token['access']['token']['tenant']['name'],
- token_data.project_name)
- self.assertTrue(token_data.project_scoped)
- self.assertTrue(token_data.scoped)
- self.assertTrue(token_data.trust_scoped)
- self.assertEqual(
- [r['name']
- for r in self.v2_sample_token['access']['user']['roles']],
- token_data.role_names)
- token_data['token'].pop('tenant')
- self.assertFalse(token_data.scoped)
- self.assertFalse(token_data.project_scoped)
- self.assertFalse(token_data.domain_scoped)
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_id')
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_name')
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_domain_id')
- self.assertRaises(exception.UnexpectedError, getattr, token_data,
- 'project_domain_id')
- # No Domain Scoped tokens in V2
- self.assertRaises(NotImplementedError, getattr, token_data,
- 'domain_id')
- self.assertRaises(NotImplementedError, getattr, token_data,
- 'domain_name')
- token_data['domain'] = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
- self.assertRaises(NotImplementedError, getattr, token_data,
- 'domain_id')
- self.assertRaises(NotImplementedError, getattr, token_data,
- 'domain_name')
- self.assertFalse(token_data.domain_scoped)
-
- token_data['token']['audit_ids'] = [uuid.uuid4().hex]
- self.assertEqual(token_data.audit_chain_id,
- token_data['token']['audit_ids'][0])
- token_data['token']['audit_ids'].append(uuid.uuid4().hex)
- self.assertEqual(token_data.audit_chain_id,
- token_data['token']['audit_ids'][1])
- self.assertEqual(token_data.audit_id,
- token_data['token']['audit_ids'][0])
- del token_data['token']['audit_ids']
- self.assertIsNone(token_data.audit_id)
- self.assertIsNone(token_data.audit_chain_id)
-
- def test_token_model_unknown(self):
- self.assertRaises(exception.UnsupportedTokenVersionException,
- token_model.KeystoneToken,
- token_id=uuid.uuid4().hex,
- token_data={'bogus_data': uuid.uuid4().hex})
-
- def test_token_model_dual_scoped_token(self):
- domain = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
- self.v2_sample_token['access']['domain'] = domain
- self.v3_sample_token['token']['domain'] = domain
-
- # V2 Tokens Cannot be domain scoped, this should work
- token_model.KeystoneToken(token_id=uuid.uuid4().hex,
- token_data=self.v2_sample_token)
-
- self.assertRaises(exception.UnexpectedError,
- token_model.KeystoneToken,
- token_id=uuid.uuid4().hex,
- token_data=self.v3_sample_token)
diff --git a/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py b/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py
deleted file mode 100644
index 5c364490..00000000
--- a/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py
+++ /dev/null
@@ -1,26 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from keystone.tests import unit
-from keystone.token.providers import uuid
-
-
-class TestUuidTokenProvider(unit.TestCase):
- def setUp(self):
- super(TestUuidTokenProvider, self).setUp()
- self.provider = uuid.Provider()
-
- def test_supports_bind_authentication_returns_true(self):
- self.assertTrue(self.provider._supports_bind_authentication)
-
- def test_need_persistence_return_true(self):
- self.assertIs(True, self.provider.needs_persistence())