diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/token/test_backends.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/token/test_backends.py | 551 |
1 files changed, 0 insertions, 551 deletions
diff --git a/keystone-moon/keystone/tests/unit/token/test_backends.py b/keystone-moon/keystone/tests/unit/token/test_backends.py deleted file mode 100644 index feb7e017..00000000 --- a/keystone-moon/keystone/tests/unit/token/test_backends.py +++ /dev/null @@ -1,551 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import datetime -import hashlib -import uuid - -from keystoneclient.common import cms -from oslo_config import cfg -from oslo_utils import timeutils -import six -from six.moves import range - -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import utils as test_utils -from keystone.token import provider - - -CONF = cfg.CONF -NULL_OBJECT = object() - - -class TokenTests(object): - def _create_token_id(self): - # Use a token signed by the cms module - token_id = "" - for i in range(1, 20): - token_id += uuid.uuid4().hex - return cms.cms_sign_token(token_id, - CONF.signing.certfile, - CONF.signing.keyfile) - - def _assert_revoked_token_list_matches_token_persistence( - self, revoked_token_id_list): - # Assert that the list passed in matches the list returned by the - # token persistence service - persistence_list = [ - x['id'] - for x in self.token_provider_api.list_revoked_tokens() - ] - self.assertEqual(persistence_list, revoked_token_id_list) - - def test_token_crud(self): - token_id = self._create_token_id() - data = {'id': token_id, 'a': 'b', - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'access': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - expires = data_ref.pop('expires') - data_ref.pop('user_id') - self.assertIsInstance(expires, datetime.datetime) - data_ref.pop('id') - data.pop('id') - self.assertDictEqual(data, data_ref) - - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - expires = new_data_ref.pop('expires') - self.assertIsInstance(expires, datetime.datetime) - new_data_ref.pop('user_id') - new_data_ref.pop('id') - - self.assertEqual(data, new_data_ref) - - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, token_id) - - def create_token_sample_data(self, token_id=None, tenant_id=None, - trust_id=None, user_id=None, expires=None): - if token_id is None: - token_id = self._create_token_id() - if user_id is None: - user_id = 'testuserid' - # FIXME(morganfainberg): These tokens look nothing like "Real" tokens. - # This should be fixed when token issuance is cleaned up. - data = {'id': token_id, 'a': 'b', - 'user': {'id': user_id}, - 'access': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - if tenant_id is not None: - data['tenant'] = {'id': tenant_id, 'name': tenant_id} - if tenant_id is NULL_OBJECT: - data['tenant'] = None - if expires is not None: - data['expires'] = expires - if trust_id is not None: - data['trust_id'] = trust_id - data['access'].setdefault('trust', {}) - # Testuserid2 is used here since a trustee will be different in - # the cases of impersonation and therefore should not match the - # token's user_id. - data['access']['trust']['trustee_user_id'] = 'testuserid2' - data['token_version'] = provider.V2 - # Issue token stores a copy of all token data at token['token_data']. - # This emulates that assumption as part of the test. - data['token_data'] = copy.deepcopy(data) - new_token = self.token_provider_api._persistence.create_token(token_id, - data) - return new_token['id'], data - - def test_delete_tokens(self): - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid') - token_id3, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(0, len(tokens)) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id2) - - self.token_provider_api._persistence.get_token(token_id3) - - def test_delete_tokens_trust(self): - tokens = self.token_provider_api._persistence._list_tokens( - user_id='testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data( - tenant_id='testtenantid', - trust_id='testtrustid') - token_id2, data = self.create_token_sample_data( - tenant_id='testtenantid', - user_id='testuserid1', - trust_id='testtrustid1') - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_tokens( - user_id='testuserid', - tenant_id='testtenantid', - trust_id='testtrustid') - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id1) - self.token_provider_api._persistence.get_token(token_id2) - - def _test_token_list(self, token_list_fn): - tokens = token_list_fn('testuserid') - self.assertEqual(0, len(tokens)) - token_id1, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id1, tokens) - token_id2, data = self.create_token_sample_data() - tokens = token_list_fn('testuserid') - self.assertEqual(2, len(tokens)) - self.assertIn(token_id2, tokens) - self.assertIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id1) - tokens = token_list_fn('testuserid') - self.assertIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - self.token_provider_api._persistence.delete_token(token_id2) - tokens = token_list_fn('testuserid') - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id1, tokens) - - # tenant-specific tokens - tenant1 = uuid.uuid4().hex - tenant2 = uuid.uuid4().hex - token_id3, data = self.create_token_sample_data(tenant_id=tenant1) - token_id4, data = self.create_token_sample_data(tenant_id=tenant2) - # test for existing but empty tenant (LP:1078497) - token_id5, data = self.create_token_sample_data(tenant_id=NULL_OBJECT) - tokens = token_list_fn('testuserid') - self.assertEqual(3, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - self.assertIn(token_id5, tokens) - tokens = token_list_fn('testuserid', tenant2) - self.assertEqual(1, len(tokens)) - self.assertNotIn(token_id1, tokens) - self.assertNotIn(token_id2, tokens) - self.assertNotIn(token_id3, tokens) - self.assertIn(token_id4, tokens) - - def test_token_list(self): - self._test_token_list( - self.token_provider_api._persistence._list_tokens) - - def test_token_list_trust(self): - trust_id = uuid.uuid4().hex - token_id5, data = self.create_token_sample_data(trust_id=trust_id) - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid', trust_id=trust_id) - self.assertEqual(1, len(tokens)) - self.assertIn(token_id5, tokens) - - def test_get_token_returns_not_found(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - uuid.uuid4().hex) - - def test_delete_token_returns_not_found(self): - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - uuid.uuid4().hex) - - def test_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - token_id) - - def test_null_expires_token(self): - token_id = uuid.uuid4().hex - data = {'id': token_id, 'id_hash': token_id, 'a': 'b', 'expires': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.assertIsNotNone(data_ref['expires']) - new_data_ref = self.token_provider_api._persistence.get_token(token_id) - - # MySQL doesn't store microseconds, so discard them before testing - data_ref['expires'] = data_ref['expires'].replace(microsecond=0) - new_data_ref['expires'] = new_data_ref['expires'].replace( - microsecond=0) - - self.assertEqual(data_ref, new_data_ref) - - def check_list_revoked_tokens(self, token_infos): - revocation_list = self.token_provider_api.list_revoked_tokens() - revoked_ids = [x['id'] for x in revocation_list] - revoked_audit_ids = [x['audit_id'] for x in revocation_list] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - for token_id, audit_id in token_infos: - self.assertIn(token_id, revoked_ids) - self.assertIn(audit_id, revoked_audit_ids) - - def delete_token(self): - token_id = uuid.uuid4().hex - audit_id = uuid.uuid4().hex - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'user': {'id': 'testuserid'}, - 'token_data': {'token': {'audit_ids': [audit_id]}}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - self.token_provider_api._persistence.delete_token(token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_ref['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api._persistence.delete_token, - data_ref['id']) - return (token_id, audit_id) - - def test_list_revoked_tokens_returns_empty_list(self): - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertEqual([], revoked_ids) - - def test_list_revoked_tokens_for_single_token(self): - self.check_list_revoked_tokens([self.delete_token()]) - - def test_list_revoked_tokens_for_multiple_tokens(self): - self.check_list_revoked_tokens([self.delete_token() - for x in range(2)]) - - def test_flush_expired_token(self): - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() - datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - - token_id = uuid.uuid4().hex - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}} - data_ref = self.token_provider_api._persistence.create_token(token_id, - data) - data_ref.pop('user_id') - self.assertDictEqual(data, data_ref) - - self.token_provider_api._persistence.flush_expired_tokens() - tokens = self.token_provider_api._persistence._list_tokens( - 'testuserid') - self.assertEqual(1, len(tokens)) - self.assertIn(token_id, tokens) - - @unit.skip_if_cache_disabled('token') - def test_revocation_list_cache(self): - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=10) - token_id = uuid.uuid4().hex - token_data = {'id_hash': token_id, 'id': token_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}} - token2_id = uuid.uuid4().hex - token2_data = {'id_hash': token2_id, 'id': token2_id, 'a': 'b', - 'expires': expire_time, - 'trust_id': None, - 'user': {'id': 'testuserid'}, - 'token_data': {'token': { - 'audit_ids': [uuid.uuid4().hex]}}} - # Create 2 Tokens. - self.token_provider_api._persistence.create_token(token_id, - token_data) - self.token_provider_api._persistence.create_token(token2_id, - token2_data) - # Verify the revocation list is empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Delete a token directly, bypassing the manager. - self.token_provider_api._persistence.driver.delete_token(token_id) - # Verify the revocation list is still empty. - self.assertEqual( - [], self.token_provider_api._persistence.list_revoked_tokens()) - self.assertEqual([], self.token_provider_api.list_revoked_tokens()) - # Invalidate the revocation list. - self.token_provider_api._persistence.invalidate_revocation_list() - # Verify the deleted token is in the revocation list. - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - # Delete the second token, through the manager - self.token_provider_api._persistence.delete_token(token2_id) - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - # Verify both tokens are in the revocation list. - self.assertIn(token_id, revoked_ids) - self.assertIn(token2_id, revoked_ids) - - def _test_predictable_revoked_pki_token_id(self, hash_fn): - token_id = self._create_token_id() - token_id_hash = hash_fn(token_id.encode('utf-8')).hexdigest() - token = {'user': {'id': uuid.uuid4().hex}, - 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_ids = [x['id'] - for x in self.token_provider_api.list_revoked_tokens()] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id_hash, revoked_ids) - self.assertNotIn(token_id, revoked_ids) - for t in self.token_provider_api._persistence.list_revoked_tokens(): - self.assertIn('expires', t) - - def test_predictable_revoked_pki_token_id_default(self): - self._test_predictable_revoked_pki_token_id(hashlib.md5) - - def test_predictable_revoked_pki_token_id_sha256(self): - self.config_fixture.config(group='token', hash_algorithm='sha256') - self._test_predictable_revoked_pki_token_id(hashlib.sha256) - - def test_predictable_revoked_uuid_token_id(self): - token_id = uuid.uuid4().hex - token = {'user': {'id': uuid.uuid4().hex}, - 'token_data': {'token': {'audit_ids': [uuid.uuid4().hex]}}} - - self.token_provider_api._persistence.create_token(token_id, token) - self.token_provider_api._persistence.delete_token(token_id) - - revoked_tokens = self.token_provider_api.list_revoked_tokens() - revoked_ids = [x['id'] for x in revoked_tokens] - self._assert_revoked_token_list_matches_token_persistence(revoked_ids) - self.assertIn(token_id, revoked_ids) - for t in revoked_tokens: - self.assertIn('expires', t) - - def test_create_unicode_token_id(self): - token_id = six.text_type(self._create_token_id()) - self.create_token_sample_data(token_id=token_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_create_unicode_user_id(self): - user_id = six.text_type(uuid.uuid4().hex) - token_id, data = self.create_token_sample_data(user_id=user_id) - self.token_provider_api._persistence.get_token(token_id) - - def test_token_expire_timezone(self): - - @test_utils.timezone - def _create_token(expire_time): - token_id = uuid.uuid4().hex - user_id = six.text_type(uuid.uuid4().hex) - return self.create_token_sample_data(token_id=token_id, - user_id=user_id, - expires=expire_time) - - for d in ['+0', '-11', '-8', '-5', '+5', '+8', '+14']: - test_utils.TZ = 'UTC' + d - expire_time = timeutils.utcnow() + datetime.timedelta(minutes=1) - token_id, data_in = _create_token(expire_time) - data_get = self.token_provider_api._persistence.get_token(token_id) - - self.assertEqual(data_in['id'], data_get['id'], - 'TZ=%s' % test_utils.TZ) - - expire_time_expired = ( - timeutils.utcnow() + datetime.timedelta(minutes=-1)) - token_id, data_in = _create_token(expire_time_expired) - self.assertRaises(exception.TokenNotFound, - self.token_provider_api._persistence.get_token, - data_in['id']) - - -class TokenCacheInvalidation(object): - def _create_test_data(self): - self.user = unit.new_user_ref( - domain_id=CONF.identity.default_domain_id) - self.tenant = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - - # Create an equivalent of a scoped token - token_dict = {'user': self.user, 'tenant': self.tenant, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.scoped_token_id = token_id - - # ..and an un-scoped one - token_dict = {'user': self.user, 'tenant': None, - 'metadata': {}, 'id': 'placeholder'} - token_id, data = self.token_provider_api.issue_v2_token(token_dict) - self.unscoped_token_id = token_id - - # Validate them, in the various ways possible - this will load the - # responses into the token cache. - self._check_scoped_tokens_are_valid() - self._check_unscoped_tokens_are_valid() - - def _check_unscoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.unscoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.unscoped_token_id) - - def _check_scoped_tokens_are_invalid(self): - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_token, - self.scoped_token_id, - self.tenant['id']) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id) - self.assertRaises( - exception.TokenNotFound, - self.token_provider_api.validate_v2_token, - self.scoped_token_id, - self.tenant['id']) - - def _check_scoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.scoped_token_id) - self.token_provider_api.validate_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - self.token_provider_api.validate_v2_token(self.scoped_token_id) - self.token_provider_api.validate_v2_token( - self.scoped_token_id, belongs_to=self.tenant['id']) - - def _check_unscoped_tokens_are_valid(self): - self.token_provider_api.validate_token(self.unscoped_token_id) - self.token_provider_api.validate_v2_token(self.unscoped_token_id) - - def test_delete_unscoped_token(self): - self.token_provider_api._persistence.delete_token( - self.unscoped_token_id) - self._check_unscoped_tokens_are_invalid() - self._check_scoped_tokens_are_valid() - - def test_delete_scoped_token_by_id(self): - self.token_provider_api._persistence.delete_token(self.scoped_token_id) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() - - def test_delete_scoped_token_by_user(self): - self.token_provider_api._persistence.delete_tokens(self.user['id']) - # Since we are deleting all tokens for this user, they should all - # now be invalid. - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_invalid() - - def test_delete_scoped_token_by_user_and_tenant(self): - self.token_provider_api._persistence.delete_tokens( - self.user['id'], - tenant_id=self.tenant['id']) - self._check_scoped_tokens_are_invalid() - self._check_unscoped_tokens_are_valid() |