diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/token | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token')
9 files changed, 0 insertions, 1589 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/__init__.py b/keystone-moon/keystone/tests/unit/token/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/tests/unit/token/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/tests/unit/token/test_backends.py b/keystone-moon/keystone/tests/unit/token/test_backends.py deleted file mode 100644 index feb7e017..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_backends.py +++ /dev/null @@ -1,551 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import datetime -import hashlib -import uuid - -from keystoneclient.common import cms -from oslo_config import cfg -from oslo_utils import timeutils -import six -from six.moves import range - -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import utils as test_utils -from keystone.token import provider - - -CONF = cfg.CONF -NULL_OBJECT = object() - - -class TokenTests(object): - def _create_token_id(self): - # Use a token signed by the cms module - token_id = "" - for i in range(1, 20): - token_id += uuid.uuid4().hex - return cms.cms_sign_token(token_id, - CONF.signing.certfile, - CONF.signing.keyfile) - - def _assert_revoked_token_list_matches_token_persistence( - self, revoked_token_id_list): - # Assert that the list passed in matches the list returned by the - # token persistence service - persistence_list = [ - x['id'] - for x in self.token_provider_api.list_revoked_tokens() - ] - self.assertEqual(persistence_list, revoked_token_id_list) - - def test_token_crud(self): - token_id = self._create_token_id() - data = {'id': token_id, 'a': 'b', - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'access': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - expires = data_ref.pop('expires') - data_ref.pop('user_id') - self.assertIsInstance(expires, datetime.datetime) - data_ref.pop('id') - data.pop('id') - self.assertDictEqual(data, data_ref) - - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - expires = new_data_ref.pop('expires') - self.assertIsInstance(expires, datetime.datetime) - new_data_ref.pop('user_id') - new_data_ref.pop('id') - - self.assertEqual(data, new_data_ref) - - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, token_id) - - def create_token_sample_data(self, token_id=None, tenant_id=None, - trust_id=None, user_id=None, expires=None): - if token_id is None: - token_id = self._create_token_id() - if user_id is None: - user_id = 'testuserid' - # FIXME(morganfainberg): These tokens look nothing like "Real" tokens. - # This should be fixed when token issuance is cleaned up. - data = {'id': token_id, 'a': 'b', - 'user': {'id': user_id}, - 'access': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - if tenant_id is not None: - data['tenant'] = {'id': tenant_id, 'name': tenant_id} - if tenant_id is NULL_OBJECT: - data['tenant'] = None - if expires is not None: - data['expires'] = expires - if trust_id is not None: - data['trust_id'] = trust_id - data['access'].setdefault('trust', {}) - # Testuserid2 is used here since a trustee will be different in - # the cases of impersonation and therefore should not match the - # token's user_id. - data['access']['trust']['trustee_user_id'] = 'testuserid2' - data['token_version'] = provider.V2 - # Issue token stores a copy of all token data at token['token_data']. - # This emulates that assumption as part of the test. - data['token_data'] = copy.deepcopy(data) - new_token = self.token_provider_api._persistence.create_token(token_id, - data) - return new_token['id'], data - - def test_delete_tokens(self): - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id3, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id2) - - self.token_provider_api._persistence.get_token(token_id3) - - def test_delete_tokens_trust(self): - tokens = self.token_provider_api._persistence._list_tokens( - user_id='testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid', - trust_id='testtrustid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1', - trust_id='testtrustid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid', - trust_id='testtrustid') - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.token_provider_api._persistence.get_token(token_id2) - - def _test_token_list(self, token_list_fn): - tokens = token_list_fn('testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - token_id2, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id1) - tokens = token_list_fn('testuserid') - self.assertIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id2) - tokens = token_list_fn('testuserid') - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - - # tenant-specific tokens - tenant1 = uuid.uuid4().hex - tenant2 = uuid.uuid4().hex - token_id3, data = self.create_token_sample_data(tenant_id=tenant1) - token_id4, data = self.create_token_sample_data(tenant_id=tenant2) - # test for existing but empty tenant (LP:1078497) - token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT) - tokens = token_list_fn('testuserid') - self.assertEqual(3, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - self.assertIn(token_id5, tokens) - tokens = token_list_fn('testuserid', tenant2) - self.assertEqual(1, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - - def test_token_list(self): - self._test_token_list( - self.token_provider_api._persistence._list_tokens) - - def test_token_list_trust(self): - trust_id = uuid.uuid4().hex - token_id5, data = self.create_token_sample_data(trust_id=trust_id) - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid', trust_id=trust_id) - self.assertEqual(1, len(tokens)) - self.assertIn(token_id5, tokens) - - def test_get_token_returns_not_found(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - uuid.uuid4().hex) - - def test_delete_token_returns_not_found(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - uuid.uuid4().hex) - - def test_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id) - - def test_null_expires_token(self): - token_id = uuid.uuid4().hex - data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.assertIsNotNone(data_ref['expires']) - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - - # MySQL doesn't store microseconds, so discard them before testing - data_ref['expires'] = data_ref['expires'].replace(microsecond=0) - new_data_ref['expires'] = new_data_ref['expires'].replace( - microsecond=0) - - self.assertEqual(data_ref, new_data_ref) - - def check_list_revoked_tokens(self, token_infos): - revocation_list = self.token_provider_api.list_revoked_tokens() - revoked_ids = [x['id'] for x in revocation_list] - revoked_audit_ids = [x['audit_id'] for x in revocation_list] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - for token_id, audit_id in token_infos: - self.assertIn(token_id, revoked_ids) - self.assertIn(audit_id, revoked_audit_ids) - - def delete_token(self): - token_id = uuid.uuid4().hex - audit_id = uuid.uuid4().hex - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'user': {'id': 'testuserid'}, - 'token_data': {'token': {'audit_ids': [audit_id]}}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_ref['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - data_ref['id']) - return (token_id, audit_id) - - def test_list_revoked_tokens_returns_empty_list(self): - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertEqual([], revoked_ids) - - def test_list_revoked_tokens_for_single_token(self): - self.check_list_revoked_tokens([self.delete_token()]) - - def test_list_revoked_tokens_for_multiple_tokens(self): - self.check_list_revoked_tokens([self.delete_token() - for x in range(2)]) - - def test_flush_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - - self.token_provider_api._persistence.flush_expired_tokens() - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id, tokens) - - @unit.skip_if_cache_disabled('token') - def test_revocation_list_cache(self): - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10) - token_id = uuid.uuid4().hex - token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}} - token2_id = uuid.uuid4().hex - token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}} - # Create 2 Tokens. - self.token_provider_api._persistence.create_token(token_id, - token_data) - self.token_provider_api._persistence.create_token(token2_id, - token2_data) - # Verify the revocation list is empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Delete a token directly, bypassing the manager. - self.token_provider_api._persistence.driver.delete_token(token_id) - # Verify the revocation list is still empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Invalidate the revocation list. - self.token_provider_api._persistence.invalidate_revocation_list() - # Verify the deleted token is in the revocation list. - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - # Delete the second token, through the manager - self.token_provider_api._persistence.delete_token(token2_id) - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - # Verify both tokens are in the revocation list. - self.assertIn(token_id, revoked_ids) - self.assertIn(token2_id, revoked_ids) - - def _test_predictable_revoked_pki_token_id(self, hash_fn): - token_id = self._create_token_id() - token_id_hash = hash_fn(token_id.encode('utf-8')).hexdigest() - token = {'user': {'id': uuid.uuid4().hex}, - 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id_hash, revoked_ids) - self.assertNotIn(token_id, revoked_ids) - for t in self.token_provider_api._persistence.list_revoked_tokens(): - self.assertIn('expires', t) - - def test_predictable_revoked_pki_token_id_default(self): - self._test_predictable_revoked_pki_token_id(hashlib.md5) - - def test_predictable_revoked_pki_token_id_sha256(self): - self.config_fixture.config(group='token', hash_algorithm='sha256') - self._test_predictable_revoked_pki_token_id(hashlib.sha256) - - def test_predictable_revoked_uuid_token_id(self): - token_id = uuid.uuid4().hex - token = {'user': {'id': uuid.uuid4().hex}, - 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_tokens = self.token_provider_api.list_revoked_tokens() - revoked_ids = [x['id'] for x in revoked_tokens] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - for t in revoked_tokens: - self.assertIn('expires', t) - - def test_create_unicode_token_id(self): - token_id = six.text_type(self._create_token_id()) - self.create_token_sample_data(token_id=token_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_create_unicode_user_id(self): - user_id = six.text_type(uuid.uuid4().hex) - token_id, data = self.create_token_sample_data(user_id=user_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_token_expire_timezone(self): - - @test_utils.timezone - def _create_token(expire_time): - token_id = uuid.uuid4().hex - user_id = six.text_type(uuid.uuid4().hex) - return self.create_token_sample_data(token_id=token_id, - user_id=user_id, - expires=expire_time) - - for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: - test_utils.TZ = 'UTC' + d - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - token_id, data_in = _create_token(expire_time) - data_get = self.token_provider_api._persistence.get_token(token_id) - - self.assertEqual(data_in['id'], data_get['id'], - 'TZ=%s' % test_utils.TZ) - - expire_time_expired = ( - timeutils.utcnow() + datetime.timedelta(minutes=-1)) - token_id, data_in = _create_token(expire_time_expired) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_in['id']) - - -class TokenCacheInvalidation(object): - def _create_test_data(self): - self.user = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - self.tenant = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - - # Create an equivalent of a scoped token - token_dict = {'user': self.user, 'tenant': self.tenant, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.scoped_token_id = token_id - - # ..and an un-scoped one - token_dict = {'user': self.user, 'tenant': None, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.unscoped_token_id = token_id - - # Validate them, in the various ways possible - this will load the - # responses into the token cache. - self._check_scoped_tokens_are_valid() - self._check_unscoped_tokens_are_valid() - - def _check_unscoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.unscoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.unscoped_token_id) - - def _check_scoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id, - self.tenant['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id, - self.tenant['id']) - - def _check_scoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.scoped_token_id) - self.token_provider_api.validate_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - self.token_provider_api.validate_v2_token(self.scoped_token_id) - self.token_provider_api.validate_v2_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - - def _check_unscoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.unscoped_token_id) - self.token_provider_api.validate_v2_token(self.unscoped_token_id) - - def test_delete_unscoped_token(self): - self.token_provider_api._persistence.delete_token( - self.unscoped_token_id) - self._check_unscoped_tokens_are_invalid() - self._check_scoped_tokens_are_valid() - - def test_delete_scoped_token_by_id(self): - self.token_provider_api._persistence.delete_token(self.scoped_token_id) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() - - def test_delete_scoped_token_by_user(self): - self.token_provider_api._persistence.delete_tokens(self.user['id']) - # Since we are deleting all tokens for this user, they should all - # now be invalid. - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_invalid() - - def test_delete_scoped_token_by_user_and_tenant(self): - self.token_provider_api._persistence.delete_tokens( - self.user['id'], - tenant_id=self.tenant['id']) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py deleted file mode 100644 index 5f51d7b3..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py +++ /dev/null @@ -1,611 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import datetime -import hashlib -import os -import uuid - -import msgpack -from oslo_utils import timeutils -from six.moves import urllib - -from keystone.common import config -from keystone.common import utils -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.tests import unit -from keystone.tests.unit import ksfixtures -from keystone.tests.unit.ksfixtures import database -from keystone.token import provider -from keystone.token.providers import fernet -from keystone.token.providers.fernet import token_formatters -from keystone.token.providers.fernet import utils as fernet_utils - - -CONF = config.CONF - - -class TestFernetTokenProvider(unit.TestCase): - def setUp(self): - super(TestFernetTokenProvider, self).setUp() - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - self.provider = fernet.Provider() - - def test_supports_bind_authentication_returns_false(self): - self.assertFalse(self.provider._supports_bind_authentication) - - def test_needs_persistence_returns_false(self): - self.assertFalse(self.provider.needs_persistence()) - - def test_invalid_v3_token_raises_token_not_found(self): - # NOTE(lbragstad): Here we use the validate_non_persistent_token() - # methods because the validate_v3_token() method is strictly for - # validating UUID formatted tokens. It is written to assume cached - # tokens from a backend, where validate_non_persistent_token() is not. - token_id = uuid.uuid4().hex - e = self.assertRaises( - exception.TokenNotFound, - self.provider.validate_non_persistent_token, - token_id) - self.assertIn(token_id, u'%s' % e) - - def test_invalid_v2_token_raises_token_not_found(self): - token_id = uuid.uuid4().hex - e = self.assertRaises( - exception.TokenNotFound, - self.provider.validate_non_persistent_token, - token_id) - self.assertIn(token_id, u'%s' % e) - - -class TestValidate(unit.TestCase): - def setUp(self): - super(TestValidate, self).setUp() - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - self.useFixture(database.Database()) - self.load_backends() - - def config_overrides(self): - super(TestValidate, self).config_overrides() - self.config_fixture.config(group='token', provider='fernet') - - def test_validate_v3_token_simple(self): - # Check the fields in the token result when use validate_v3_token - # with a simple token. - - domain_ref = unit.new_domain_ref() - domain_ref = self.resource_api.create_domain(domain_ref['id'], - domain_ref) - - user_ref = unit.new_user_ref(domain_ref['id']) - user_ref = self.identity_api.create_user(user_ref) - - method_names = ['password'] - token_id, token_data_ = self.token_provider_api.issue_v3_token( - user_ref['id'], method_names) - - token_data = self.token_provider_api.validate_v3_token(token_id) - token = token_data['token'] - self.assertIsInstance(token['audit_ids'], list) - self.assertIsInstance(token['expires_at'], str) - self.assertIsInstance(token['issued_at'], str) - self.assertEqual(method_names, token['methods']) - exp_user_info = { - 'id': user_ref['id'], - 'name': user_ref['name'], - 'domain': { - 'id': domain_ref['id'], - 'name': domain_ref['name'], - }, - } - self.assertEqual(exp_user_info, token['user']) - - def test_validate_v3_token_federated_info(self): - # Check the user fields in the token result when use validate_v3_token - # when the token has federated info. - - domain_ref = unit.new_domain_ref() - domain_ref = self.resource_api.create_domain(domain_ref['id'], - domain_ref) - - user_ref = unit.new_user_ref(domain_ref['id']) - user_ref = self.identity_api.create_user(user_ref) - - method_names = ['mapped'] - - group_ids = [uuid.uuid4().hex, ] - identity_provider = uuid.uuid4().hex - protocol = uuid.uuid4().hex - auth_context = { - 'user_id': user_ref['id'], - 'group_ids': group_ids, - federation_constants.IDENTITY_PROVIDER: identity_provider, - federation_constants.PROTOCOL: protocol, - } - token_id, token_data_ = self.token_provider_api.issue_v3_token( - user_ref['id'], method_names, auth_context=auth_context) - - token_data = self.token_provider_api.validate_v3_token(token_id) - token = token_data['token'] - exp_user_info = { - 'id': user_ref['id'], - 'name': user_ref['id'], - 'domain': {'id': CONF.federation.federated_domain_name, - 'name': CONF.federation.federated_domain_name, }, - federation_constants.FEDERATION: { - 'groups': [{'id': group_id} for group_id in group_ids], - 'identity_provider': {'id': identity_provider, }, - 'protocol': {'id': protocol, }, - }, - } - self.assertEqual(exp_user_info, token['user']) - - def test_validate_v3_token_trust(self): - # Check the trust fields in the token result when use validate_v3_token - # when the token has trust info. - - domain_ref = unit.new_domain_ref() - domain_ref = self.resource_api.create_domain(domain_ref['id'], - domain_ref) - - user_ref = unit.new_user_ref(domain_ref['id']) - user_ref = self.identity_api.create_user(user_ref) - - trustor_user_ref = unit.new_user_ref(domain_ref['id']) - trustor_user_ref = self.identity_api.create_user(trustor_user_ref) - - project_ref = unit.new_project_ref(domain_id=domain_ref['id']) - project_ref = self.resource_api.create_project(project_ref['id'], - project_ref) - - role_ref = unit.new_role_ref() - role_ref = self.role_api.create_role(role_ref['id'], role_ref) - - self.assignment_api.create_grant( - role_ref['id'], user_id=user_ref['id'], - project_id=project_ref['id']) - - self.assignment_api.create_grant( - role_ref['id'], user_id=trustor_user_ref['id'], - project_id=project_ref['id']) - - trustor_user_id = trustor_user_ref['id'] - trustee_user_id = user_ref['id'] - trust_ref = unit.new_trust_ref( - trustor_user_id, trustee_user_id, project_id=project_ref['id'], - role_ids=[role_ref['id'], ]) - trust_ref = self.trust_api.create_trust(trust_ref['id'], trust_ref, - trust_ref['roles']) - - method_names = ['password'] - - token_id, token_data_ = self.token_provider_api.issue_v3_token( - user_ref['id'], method_names, project_id=project_ref['id'], - trust=trust_ref) - - token_data = self.token_provider_api.validate_v3_token(token_id) - token = token_data['token'] - exp_trust_info = { - 'id': trust_ref['id'], - 'impersonation': False, - 'trustee_user': {'id': user_ref['id'], }, - 'trustor_user': {'id': trustor_user_ref['id'], }, - } - self.assertEqual(exp_trust_info, token['OS-TRUST:trust']) - - def test_validate_v3_token_validation_error_exc(self): - # When the token format isn't recognized, TokenNotFound is raised. - - # A uuid string isn't a valid Fernet token. - token_id = uuid.uuid4().hex - self.assertRaises(exception.TokenNotFound, - self.token_provider_api.validate_v3_token, token_id) - - -class TestTokenFormatter(unit.TestCase): - def setUp(self): - super(TestTokenFormatter, self).setUp() - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - - def test_restore_padding(self): - # 'a' will result in '==' padding, 'aa' will result in '=' padding, and - # 'aaa' will result in no padding. - binary_to_test = [b'a', b'aa', b'aaa'] - - for binary in binary_to_test: - # base64.urlsafe_b64encode takes six.binary_type and returns - # six.binary_type. - encoded_string = base64.urlsafe_b64encode(binary) - encoded_string = encoded_string.decode('utf-8') - # encoded_string is now six.text_type. - encoded_str_without_padding = encoded_string.rstrip('=') - self.assertFalse(encoded_str_without_padding.endswith('=')) - encoded_str_with_padding_restored = ( - token_formatters.TokenFormatter.restore_padding( - encoded_str_without_padding) - ) - self.assertEqual(encoded_string, encoded_str_with_padding_restored) - - def test_legacy_padding_validation(self): - first_value = uuid.uuid4().hex - second_value = uuid.uuid4().hex - payload = (first_value, second_value) - msgpack_payload = msgpack.packb(payload) - # msgpack_payload is six.binary_type. - - tf = token_formatters.TokenFormatter() - - # NOTE(lbragstad): This method preserves the way that keystone used to - # percent encode the tokens, prior to bug #1491926. - def legacy_pack(payload): - # payload is six.binary_type. - encrypted_payload = tf.crypto.encrypt(payload) - # encrypted_payload is six.binary_type. - - # the encrypted_payload is returned with padding appended - self.assertTrue(encrypted_payload.endswith(b'=')) - - # using urllib.parse.quote will percent encode the padding, like - # keystone did in Kilo. - percent_encoded_payload = urllib.parse.quote(encrypted_payload) - # percent_encoded_payload is six.text_type. - - # ensure that the padding was actually percent encoded - self.assertTrue(percent_encoded_payload.endswith('%3D')) - return percent_encoded_payload - - token_with_legacy_padding = legacy_pack(msgpack_payload) - # token_with_legacy_padding is six.text_type. - - # demonstrate the we can validate a payload that has been percent - # encoded with the Fernet logic that existed in Kilo - serialized_payload = tf.unpack(token_with_legacy_padding) - # serialized_payload is six.binary_type. - returned_payload = msgpack.unpackb(serialized_payload) - # returned_payload contains six.binary_type. - self.assertEqual(first_value, returned_payload[0].decode('utf-8')) - self.assertEqual(second_value, returned_payload[1].decode('utf-8')) - - -class TestPayloads(unit.TestCase): - def assertTimestampsEqual(self, expected, actual): - # The timestamp that we get back when parsing the payload may not - # exactly match the timestamp that was put in the payload due to - # conversion to and from a float. - - exp_time = timeutils.parse_isotime(expected) - actual_time = timeutils.parse_isotime(actual) - - # the granularity of timestamp string is microseconds and it's only the - # last digit in the representation that's different, so use a delta - # just above nanoseconds. - return self.assertCloseEnoughForGovernmentWork(exp_time, actual_time, - delta=1e-05) - - def test_uuid_hex_to_byte_conversions(self): - payload_cls = token_formatters.BasePayload - - expected_hex_uuid = uuid.uuid4().hex - uuid_obj = uuid.UUID(expected_hex_uuid) - expected_uuid_in_bytes = uuid_obj.bytes - actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes( - expected_hex_uuid) - self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes) - actual_hex_uuid = payload_cls.convert_uuid_bytes_to_hex( - expected_uuid_in_bytes) - self.assertEqual(expected_hex_uuid, actual_hex_uuid) - - def test_time_string_to_float_conversions(self): - payload_cls = token_formatters.BasePayload - - original_time_str = utils.isotime(subsecond=True) - time_obj = timeutils.parse_isotime(original_time_str) - expected_time_float = ( - (timeutils.normalize_time(time_obj) - - datetime.datetime.utcfromtimestamp(0)).total_seconds()) - - # NOTE(lbragstad): The token expiration time for Fernet tokens is - # passed in the payload of the token. This is different from the token - # creation time, which is handled by Fernet and doesn't support - # subsecond precision because it is a timestamp integer. - self.assertIsInstance(expected_time_float, float) - - actual_time_float = payload_cls._convert_time_string_to_float( - original_time_str) - self.assertIsInstance(actual_time_float, float) - self.assertEqual(expected_time_float, actual_time_float) - - # Generate expected_time_str using the same time float. Using - # original_time_str from utils.isotime will occasionally fail due to - # floating point rounding differences. - time_object = datetime.datetime.utcfromtimestamp(actual_time_float) - expected_time_str = utils.isotime(time_object, subsecond=True) - - actual_time_str = payload_cls._convert_float_to_time_string( - actual_time_float) - self.assertEqual(expected_time_str, actual_time_str) - - def _test_payload(self, payload_class, exp_user_id=None, exp_methods=None, - exp_project_id=None, exp_domain_id=None, - exp_trust_id=None, exp_federated_info=None, - exp_access_token_id=None): - exp_user_id = exp_user_id or uuid.uuid4().hex - exp_methods = exp_methods or ['password'] - exp_expires_at = utils.isotime(timeutils.utcnow(), subsecond=True) - exp_audit_ids = [provider.random_urlsafe_str()] - - payload = payload_class.assemble( - exp_user_id, exp_methods, exp_project_id, exp_domain_id, - exp_expires_at, exp_audit_ids, exp_trust_id, exp_federated_info, - exp_access_token_id) - - (user_id, methods, project_id, - domain_id, expires_at, audit_ids, - trust_id, federated_info, - access_token_id) = payload_class.disassemble(payload) - - self.assertEqual(exp_user_id, user_id) - self.assertEqual(exp_methods, methods) - self.assertTimestampsEqual(exp_expires_at, expires_at) - self.assertEqual(exp_audit_ids, audit_ids) - self.assertEqual(exp_project_id, project_id) - self.assertEqual(exp_domain_id, domain_id) - self.assertEqual(exp_trust_id, trust_id) - self.assertEqual(exp_access_token_id, access_token_id) - - if exp_federated_info: - self.assertDictEqual(exp_federated_info, federated_info) - else: - self.assertIsNone(federated_info) - - def test_unscoped_payload(self): - self._test_payload(token_formatters.UnscopedPayload) - - def test_project_scoped_payload(self): - self._test_payload(token_formatters.ProjectScopedPayload, - exp_project_id=uuid.uuid4().hex) - - def test_domain_scoped_payload(self): - self._test_payload(token_formatters.DomainScopedPayload, - exp_domain_id=uuid.uuid4().hex) - - def test_domain_scoped_payload_with_default_domain(self): - self._test_payload(token_formatters.DomainScopedPayload, - exp_domain_id=CONF.identity.default_domain_id) - - def test_trust_scoped_payload(self): - self._test_payload(token_formatters.TrustScopedPayload, - exp_project_id=uuid.uuid4().hex, - exp_trust_id=uuid.uuid4().hex) - - def test_unscoped_payload_with_non_uuid_user_id(self): - self._test_payload(token_formatters.UnscopedPayload, - exp_user_id='someNonUuidUserId') - - def test_unscoped_payload_with_16_char_non_uuid_user_id(self): - self._test_payload(token_formatters.UnscopedPayload, - exp_user_id='0123456789abcdef') - - def test_project_scoped_payload_with_non_uuid_ids(self): - self._test_payload(token_formatters.ProjectScopedPayload, - exp_user_id='someNonUuidUserId', - exp_project_id='someNonUuidProjectId') - - def test_project_scoped_payload_with_16_char_non_uuid_ids(self): - self._test_payload(token_formatters.ProjectScopedPayload, - exp_user_id='0123456789abcdef', - exp_project_id='0123456789abcdef') - - def test_domain_scoped_payload_with_non_uuid_user_id(self): - self._test_payload(token_formatters.DomainScopedPayload, - exp_user_id='nonUuidUserId', - exp_domain_id=uuid.uuid4().hex) - - def test_domain_scoped_payload_with_16_char_non_uuid_user_id(self): - self._test_payload(token_formatters.DomainScopedPayload, - exp_user_id='0123456789abcdef', - exp_domain_id=uuid.uuid4().hex) - - def test_trust_scoped_payload_with_non_uuid_ids(self): - self._test_payload(token_formatters.TrustScopedPayload, - exp_user_id='someNonUuidUserId', - exp_project_id='someNonUuidProjectId', - exp_trust_id=uuid.uuid4().hex) - - def test_trust_scoped_payload_with_16_char_non_uuid_ids(self): - self._test_payload(token_formatters.TrustScopedPayload, - exp_user_id='0123456789abcdef', - exp_project_id='0123456789abcdef', - exp_trust_id=uuid.uuid4().hex) - - def _test_federated_payload_with_ids(self, exp_user_id, exp_group_id): - exp_federated_info = {'group_ids': [{'id': exp_group_id}], - 'idp_id': uuid.uuid4().hex, - 'protocol_id': uuid.uuid4().hex} - - self._test_payload(token_formatters.FederatedUnscopedPayload, - exp_user_id=exp_user_id, - exp_federated_info=exp_federated_info) - - def test_federated_payload_with_non_uuid_ids(self): - self._test_federated_payload_with_ids('someNonUuidUserId', - 'someNonUuidGroupId') - - def test_federated_payload_with_16_char_non_uuid_ids(self): - self._test_federated_payload_with_ids('0123456789abcdef', - '0123456789abcdef') - - def test_federated_project_scoped_payload(self): - exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], - 'idp_id': uuid.uuid4().hex, - 'protocol_id': uuid.uuid4().hex} - - self._test_payload(token_formatters.FederatedProjectScopedPayload, - exp_user_id='someNonUuidUserId', - exp_methods=['token'], - exp_project_id=uuid.uuid4().hex, - exp_federated_info=exp_federated_info) - - def test_federated_domain_scoped_payload(self): - exp_federated_info = {'group_ids': [{'id': 'someNonUuidGroupId'}], - 'idp_id': uuid.uuid4().hex, - 'protocol_id': uuid.uuid4().hex} - - self._test_payload(token_formatters.FederatedDomainScopedPayload, - exp_user_id='someNonUuidUserId', - exp_methods=['token'], - exp_domain_id=uuid.uuid4().hex, - exp_federated_info=exp_federated_info) - - def test_oauth_scoped_payload(self): - self._test_payload(token_formatters.OauthScopedPayload, - exp_project_id=uuid.uuid4().hex, - exp_access_token_id=uuid.uuid4().hex) - - -class TestFernetKeyRotation(unit.TestCase): - def setUp(self): - super(TestFernetKeyRotation, self).setUp() - - # A collection of all previously-seen signatures of the key - # repository's contents. - self.key_repo_signatures = set() - - @property - def keys(self): - """Key files converted to numbers.""" - return sorted( - int(x) for x in os.listdir(CONF.fernet_tokens.key_repository)) - - @property - def key_repository_size(self): - """The number of keys in the key repository.""" - return len(self.keys) - - @property - def key_repository_signature(self): - """Create a "thumbprint" of the current key repository. - - Because key files are renamed, this produces a hash of the contents of - the key files, ignoring their filenames. - - The resulting signature can be used, for example, to ensure that you - have a unique set of keys after you perform a key rotation (taking a - static set of keys, and simply shuffling them, would fail such a test). - - """ - # Load the keys into a list, keys is list of six.text_type. - keys = fernet_utils.load_keys() - - # Sort the list of keys by the keys themselves (they were previously - # sorted by filename). - keys.sort() - - # Create the thumbprint using all keys in the repository. - signature = hashlib.sha1() - for key in keys: - # Need to convert key to six.binary_type for update. - signature.update(key.encode('utf-8')) - return signature.hexdigest() - - def assertRepositoryState(self, expected_size): - """Validate the state of the key repository.""" - self.assertEqual(expected_size, self.key_repository_size) - self.assertUniqueRepositoryState() - - def assertUniqueRepositoryState(self): - """Ensures that the current key repo state has not been seen before.""" - # This is assigned to a variable because it takes some work to - # calculate. - signature = self.key_repository_signature - - # Ensure the signature is not in the set of previously seen signatures. - self.assertNotIn(signature, self.key_repo_signatures) - - # Add the signature to the set of repository signatures to validate - # that we don't see it again later. - self.key_repo_signatures.add(signature) - - def test_rotation(self): - # Initializing a key repository results in this many keys. We don't - # support max_active_keys being set any lower. - min_active_keys = 2 - - # Simulate every rotation strategy up to "rotating once a week while - # maintaining a year's worth of keys." - for max_active_keys in range(min_active_keys, 52 + 1): - self.config_fixture.config(group='fernet_tokens', - max_active_keys=max_active_keys) - - # Ensure that resetting the key repository always results in 2 - # active keys. - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - - # Validate the initial repository state. - self.assertRepositoryState(expected_size=min_active_keys) - - # The repository should be initialized with a staged key (0) and a - # primary key (1). The next key is just auto-incremented. - exp_keys = [0, 1] - next_key_number = exp_keys[-1] + 1 # keep track of next key - self.assertEqual(exp_keys, self.keys) - - # Rotate the keys just enough times to fully populate the key - # repository. - for rotation in range(max_active_keys - min_active_keys): - fernet_utils.rotate_keys() - self.assertRepositoryState(expected_size=rotation + 3) - - exp_keys.append(next_key_number) - next_key_number += 1 - self.assertEqual(exp_keys, self.keys) - - # We should have a fully populated key repository now. - self.assertEqual(max_active_keys, self.key_repository_size) - - # Rotate an additional number of times to ensure that we maintain - # the desired number of active keys. - for rotation in range(10): - fernet_utils.rotate_keys() - self.assertRepositoryState(expected_size=max_active_keys) - - exp_keys.pop(1) - exp_keys.append(next_key_number) - next_key_number += 1 - self.assertEqual(exp_keys, self.keys) - - def test_non_numeric_files(self): - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - evil_file = os.path.join(CONF.fernet_tokens.key_repository, '99.bak') - with open(evil_file, 'w'): - pass - fernet_utils.rotate_keys() - self.assertTrue(os.path.isfile(evil_file)) - keys = 0 - for x in os.listdir(CONF.fernet_tokens.key_repository): - if x == '99.bak': - continue - keys += 1 - self.assertEqual(3, keys) - - -class TestLoadKeys(unit.TestCase): - def test_non_numeric_files(self): - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - evil_file = os.path.join(CONF.fernet_tokens.key_repository, '~1') - with open(evil_file, 'w'): - pass - keys = fernet_utils.load_keys() - self.assertEqual(2, len(keys)) - self.assertTrue(len(keys[0])) diff --git a/keystone-moon/keystone/tests/unit/token/test_pki_provider.py b/keystone-moon/keystone/tests/unit/token/test_pki_provider.py deleted file mode 100644 index b3ad4c2b..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_pki_provider.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.tests import unit -from keystone.token.providers import pki - - -class TestPkiTokenProvider(unit.TestCase): - def setUp(self): - super(TestPkiTokenProvider, self).setUp() - self.provider = pki.Provider() - - def test_supports_bind_authentication_returns_true(self): - self.assertTrue(self.provider._supports_bind_authentication) - - def test_need_persistence_return_true(self): - self.assertIs(True, self.provider.needs_persistence()) diff --git a/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py b/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py deleted file mode 100644 index 1ffe7cfc..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_pkiz_provider.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.tests import unit -from keystone.token.providers import pkiz - - -class TestPkizTokenProvider(unit.TestCase): - def setUp(self): - super(TestPkizTokenProvider, self).setUp() - self.provider = pkiz.Provider() - - def test_supports_bind_authentication_returns_true(self): - self.assertTrue(self.provider._supports_bind_authentication) - - def test_need_persistence_return_true(self): - self.assertIs(True, self.provider.needs_persistence()) diff --git a/keystone-moon/keystone/tests/unit/token/test_provider.py b/keystone-moon/keystone/tests/unit/token/test_provider.py deleted file mode 100644 index 7093f3ba..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_provider.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import six -from six.moves import urllib - -from keystone.tests import unit -from keystone.token import provider - - -class TestRandomStrings(unit.BaseTestCase): - def test_strings_are_url_safe(self): - s = provider.random_urlsafe_str() - self.assertEqual(s, urllib.parse.quote_plus(s)) - - def test_strings_can_be_converted_to_bytes(self): - s = provider.random_urlsafe_str() - self.assertIsInstance(s, six.text_type) - - b = provider.random_urlsafe_str_to_bytes(s) - self.assertIsInstance(b, six.binary_type) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py deleted file mode 100644 index 9e8c3889..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py +++ /dev/null @@ -1,56 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import uuid - -from testtools import matchers - -from keystone import exception -from keystone.tests import unit -from keystone.token.providers import common - - -class TestTokenDataHelper(unit.TestCase): - def setUp(self): - super(TestTokenDataHelper, self).setUp() - self.load_backends() - self.v3_data_helper = common.V3TokenDataHelper() - - def test_v3_token_data_helper_populate_audit_info_string(self): - token_data = {} - audit_info_bytes = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2] - audit_info = audit_info_bytes.decode('utf-8') - self.v3_data_helper._populate_audit_info(token_data, audit_info) - self.assertIn(audit_info, token_data['audit_ids']) - self.assertThat(token_data['audit_ids'], matchers.HasLength(2)) - - def test_v3_token_data_helper_populate_audit_info_none(self): - token_data = {} - self.v3_data_helper._populate_audit_info(token_data, audit_info=None) - self.assertThat(token_data['audit_ids'], matchers.HasLength(1)) - self.assertNotIn(None, token_data['audit_ids']) - - def test_v3_token_data_helper_populate_audit_info_list(self): - token_data = {} - audit_info = [base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2], - base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]] - self.v3_data_helper._populate_audit_info(token_data, audit_info) - self.assertEqual(audit_info, token_data['audit_ids']) - - def test_v3_token_data_helper_populate_audit_info_invalid(self): - token_data = {} - audit_info = dict() - self.assertRaises(exception.UnexpectedError, - self.v3_data_helper._populate_audit_info, - token_data=token_data, - audit_info=audit_info) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_model.py b/keystone-moon/keystone/tests/unit/token/test_token_model.py deleted file mode 100644 index 1cb0ef55..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_token_model.py +++ /dev/null @@ -1,263 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import uuid - -from oslo_config import cfg -from oslo_utils import timeutils -from six.moves import range - -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.models import token_model -from keystone.tests.unit import core -from keystone.tests.unit import test_token_provider - - -CONF = cfg.CONF - - -class TestKeystoneTokenModel(core.TestCase): - def setUp(self): - super(TestKeystoneTokenModel, self).setUp() - self.v2_sample_token = copy.deepcopy( - test_token_provider.SAMPLE_V2_TOKEN) - self.v3_sample_token = copy.deepcopy( - test_token_provider.SAMPLE_V3_TOKEN) - - def test_token_model_v3(self): - token_data = token_model.KeystoneToken(uuid.uuid4().hex, - self.v3_sample_token) - self.assertIs(token_model.V3, token_data.version) - expires = timeutils.normalize_time(timeutils.parse_isotime( - self.v3_sample_token['token']['expires_at'])) - issued = timeutils.normalize_time(timeutils.parse_isotime( - self.v3_sample_token['token']['issued_at'])) - self.assertEqual(expires, token_data.expires) - self.assertEqual(issued, token_data.issued) - self.assertEqual(self.v3_sample_token['token']['user']['id'], - token_data.user_id) - self.assertEqual(self.v3_sample_token['token']['user']['name'], - token_data.user_name) - self.assertEqual(self.v3_sample_token['token']['user']['domain']['id'], - token_data.user_domain_id) - self.assertEqual( - self.v3_sample_token['token']['user']['domain']['name'], - token_data.user_domain_name) - self.assertEqual( - self.v3_sample_token['token']['project']['domain']['id'], - token_data.project_domain_id) - self.assertEqual( - self.v3_sample_token['token']['project']['domain']['name'], - token_data.project_domain_name) - self.assertEqual(self.v3_sample_token['token']['OS-TRUST:trust']['id'], - token_data.trust_id) - self.assertEqual( - self.v3_sample_token['token']['OS-TRUST:trust']['trustor_user_id'], - token_data.trustor_user_id) - self.assertEqual( - self.v3_sample_token['token']['OS-TRUST:trust']['trustee_user_id'], - token_data.trustee_user_id) - # Project Scoped Token - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'domain_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'domain_name') - self.assertFalse(token_data.domain_scoped) - self.assertEqual(self.v3_sample_token['token']['project']['id'], - token_data.project_id) - self.assertEqual(self.v3_sample_token['token']['project']['name'], - token_data.project_name) - self.assertTrue(token_data.project_scoped) - self.assertTrue(token_data.scoped) - self.assertTrue(token_data.trust_scoped) - self.assertEqual( - [r['id'] for r in self.v3_sample_token['token']['roles']], - token_data.role_ids) - self.assertEqual( - [r['name'] for r in self.v3_sample_token['token']['roles']], - token_data.role_names) - token_data.pop('project') - self.assertFalse(token_data.project_scoped) - self.assertFalse(token_data.scoped) - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_name') - self.assertFalse(token_data.project_scoped) - domain_id = uuid.uuid4().hex - domain_name = uuid.uuid4().hex - token_data['domain'] = {'id': domain_id, - 'name': domain_name} - self.assertEqual(domain_id, token_data.domain_id) - self.assertEqual(domain_name, token_data.domain_name) - self.assertTrue(token_data.domain_scoped) - - token_data['audit_ids'] = [uuid.uuid4().hex] - self.assertEqual(token_data.audit_id, - token_data['audit_ids'][0]) - self.assertEqual(token_data.audit_chain_id, - token_data['audit_ids'][0]) - token_data['audit_ids'].append(uuid.uuid4().hex) - self.assertEqual(token_data.audit_chain_id, - token_data['audit_ids'][1]) - del token_data['audit_ids'] - self.assertIsNone(token_data.audit_id) - self.assertIsNone(token_data.audit_chain_id) - - def test_token_model_v3_federated_user(self): - token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=self.v3_sample_token) - federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, - 'protocol': {'id': 'saml2'}, - 'groups': [{'id': uuid.uuid4().hex} - for x in range(1, 5)]} - - self.assertFalse(token_data.is_federated_user) - self.assertEqual([], token_data.federation_group_ids) - self.assertIsNone(token_data.federation_protocol_id) - self.assertIsNone(token_data.federation_idp_id) - - token_data['user'][federation_constants.FEDERATION] = federation_data - - self.assertTrue(token_data.is_federated_user) - self.assertEqual([x['id'] for x in federation_data['groups']], - token_data.federation_group_ids) - self.assertEqual(federation_data['protocol']['id'], - token_data.federation_protocol_id) - self.assertEqual(federation_data['identity_provider']['id'], - token_data.federation_idp_id) - - def test_token_model_v2_federated_user(self): - token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=self.v2_sample_token) - federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, - 'protocol': {'id': 'saml2'}, - 'groups': [{'id': uuid.uuid4().hex} - for x in range(1, 5)]} - self.assertFalse(token_data.is_federated_user) - self.assertEqual([], token_data.federation_group_ids) - self.assertIsNone(token_data.federation_protocol_id) - self.assertIsNone(token_data.federation_idp_id) - - token_data['user'][federation_constants.FEDERATION] = federation_data - - # Federated users should not exist in V2, the data should remain empty - self.assertFalse(token_data.is_federated_user) - self.assertEqual([], token_data.federation_group_ids) - self.assertIsNone(token_data.federation_protocol_id) - self.assertIsNone(token_data.federation_idp_id) - - def test_token_model_v2(self): - token_data = token_model.KeystoneToken(uuid.uuid4().hex, - self.v2_sample_token) - self.assertIs(token_model.V2, token_data.version) - expires = timeutils.normalize_time(timeutils.parse_isotime( - self.v2_sample_token['access']['token']['expires'])) - issued = timeutils.normalize_time(timeutils.parse_isotime( - self.v2_sample_token['access']['token']['issued_at'])) - self.assertEqual(expires, token_data.expires) - self.assertEqual(issued, token_data.issued) - self.assertEqual(self.v2_sample_token['access']['user']['id'], - token_data.user_id) - self.assertEqual(self.v2_sample_token['access']['user']['name'], - token_data.user_name) - self.assertEqual(CONF.identity.default_domain_id, - token_data.user_domain_id) - self.assertEqual('Default', token_data.user_domain_name) - self.assertEqual(CONF.identity.default_domain_id, - token_data.project_domain_id) - self.assertEqual('Default', - token_data.project_domain_name) - self.assertEqual(self.v2_sample_token['access']['trust']['id'], - token_data.trust_id) - self.assertEqual( - self.v2_sample_token['access']['trust']['trustor_user_id'], - token_data.trustor_user_id) - self.assertEqual( - self.v2_sample_token['access']['trust']['impersonation'], - token_data.trust_impersonation) - self.assertEqual( - self.v2_sample_token['access']['trust']['trustee_user_id'], - token_data.trustee_user_id) - # Project Scoped Token - self.assertEqual( - self.v2_sample_token['access']['token']['tenant']['id'], - token_data.project_id) - self.assertEqual( - self.v2_sample_token['access']['token']['tenant']['name'], - token_data.project_name) - self.assertTrue(token_data.project_scoped) - self.assertTrue(token_data.scoped) - self.assertTrue(token_data.trust_scoped) - self.assertEqual( - [r['name'] - for r in self.v2_sample_token['access']['user']['roles']], - token_data.role_names) - token_data['token'].pop('tenant') - self.assertFalse(token_data.scoped) - self.assertFalse(token_data.project_scoped) - self.assertFalse(token_data.domain_scoped) - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_name') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_domain_id') - self.assertRaises(exception.UnexpectedError, getattr, token_data, - 'project_domain_id') - # No Domain Scoped tokens in V2 - self.assertRaises(NotImplementedError, getattr, token_data, - 'domain_id') - self.assertRaises(NotImplementedError, getattr, token_data, - 'domain_name') - token_data['domain'] = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.assertRaises(NotImplementedError, getattr, token_data, - 'domain_id') - self.assertRaises(NotImplementedError, getattr, token_data, - 'domain_name') - self.assertFalse(token_data.domain_scoped) - - token_data['token']['audit_ids'] = [uuid.uuid4().hex] - self.assertEqual(token_data.audit_chain_id, - token_data['token']['audit_ids'][0]) - token_data['token']['audit_ids'].append(uuid.uuid4().hex) - self.assertEqual(token_data.audit_chain_id, - token_data['token']['audit_ids'][1]) - self.assertEqual(token_data.audit_id, - token_data['token']['audit_ids'][0]) - del token_data['token']['audit_ids'] - self.assertIsNone(token_data.audit_id) - self.assertIsNone(token_data.audit_chain_id) - - def test_token_model_unknown(self): - self.assertRaises(exception.UnsupportedTokenVersionException, - token_model.KeystoneToken, - token_id=uuid.uuid4().hex, - token_data={'bogus_data': uuid.uuid4().hex}) - - def test_token_model_dual_scoped_token(self): - domain = {'id': uuid.uuid4().hex, - 'name': uuid.uuid4().hex} - self.v2_sample_token['access']['domain'] = domain - self.v3_sample_token['token']['domain'] = domain - - # V2 Tokens Cannot be domain scoped, this should work - token_model.KeystoneToken(token_id=uuid.uuid4().hex, - token_data=self.v2_sample_token) - - self.assertRaises(exception.UnexpectedError, - token_model.KeystoneToken, - token_id=uuid.uuid4().hex, - token_data=self.v3_sample_token) diff --git a/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py b/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py deleted file mode 100644 index 5c364490..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_uuid_provider.py +++ /dev/null @@ -1,26 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.tests import unit -from keystone.token.providers import uuid - - -class TestUuidTokenProvider(unit.TestCase): - def setUp(self): - super(TestUuidTokenProvider, self).setUp() - self.provider = uuid.Provider() - - def test_supports_bind_authentication_returns_true(self): - self.assertTrue(self.provider._supports_bind_authentication) - - def test_need_persistence_return_true(self): - self.assertIs(True, self.provider.needs_persistence()) |