From b8c756ecdd7cced1db4300935484e8c83701c82e Mon Sep 17 00:00:00 2001 From: WuKong Date: Tue, 30 Jun 2015 18:47:29 +0200 Subject: migrate moon code from github to opnfv Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong --- .../keystone/tests/unit/token/__init__.py | 0 .../tests/unit/token/test_fernet_provider.py | 183 ++++++++++++++ .../keystone/tests/unit/token/test_provider.py | 29 +++ .../tests/unit/token/test_token_data_helper.py | 55 +++++ .../keystone/tests/unit/token/test_token_model.py | 262 +++++++++++++++++++++ 5 files changed, 529 insertions(+) create mode 100644 keystone-moon/keystone/tests/unit/token/__init__.py create mode 100644 keystone-moon/keystone/tests/unit/token/test_fernet_provider.py create mode 100644 keystone-moon/keystone/tests/unit/token/test_provider.py create mode 100644 keystone-moon/keystone/tests/unit/token/test_token_data_helper.py create mode 100644 keystone-moon/keystone/tests/unit/token/test_token_model.py (limited to 'keystone-moon/keystone/tests/unit/token') diff --git a/keystone-moon/keystone/tests/unit/token/__init__.py b/keystone-moon/keystone/tests/unit/token/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py new file mode 100644 index 00000000..23fc0214 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/token/test_fernet_provider.py @@ -0,0 +1,183 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from oslo_utils import timeutils + +from keystone.common import config +from keystone import exception +from keystone.tests import unit as tests +from keystone.tests.unit import ksfixtures +from keystone.token import provider +from keystone.token.providers import fernet +from keystone.token.providers.fernet import token_formatters + + +CONF = config.CONF + + +class TestFernetTokenProvider(tests.TestCase): + def setUp(self): + super(TestFernetTokenProvider, self).setUp() + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + self.provider = fernet.Provider() + + def test_get_token_id_raises_not_implemented(self): + """Test that an exception is raised when calling _get_token_id.""" + token_data = {} + self.assertRaises(exception.NotImplemented, + self.provider._get_token_id, token_data) + + def test_invalid_v3_token_raises_401(self): + self.assertRaises( + exception.Unauthorized, + self.provider.validate_v3_token, + uuid.uuid4().hex) + + def test_invalid_v2_token_raises_401(self): + self.assertRaises( + exception.Unauthorized, + self.provider.validate_v2_token, + uuid.uuid4().hex) + + +class TestPayloads(tests.TestCase): + def test_uuid_hex_to_byte_conversions(self): + payload_cls = token_formatters.BasePayload + + expected_hex_uuid = uuid.uuid4().hex + uuid_obj = uuid.UUID(expected_hex_uuid) + expected_uuid_in_bytes = uuid_obj.bytes + actual_uuid_in_bytes = payload_cls.convert_uuid_hex_to_bytes( + expected_hex_uuid) + self.assertEqual(expected_uuid_in_bytes, actual_uuid_in_bytes) + actual_hex_uuid = payload_cls.convert_uuid_bytes_to_hex( + expected_uuid_in_bytes) + self.assertEqual(expected_hex_uuid, actual_hex_uuid) + + def test_time_string_to_int_conversions(self): + payload_cls = token_formatters.BasePayload + + expected_time_str = timeutils.isotime() + time_obj = timeutils.parse_isotime(expected_time_str) + expected_time_int = ( + (timeutils.normalize_time(time_obj) - + datetime.datetime.utcfromtimestamp(0)).total_seconds()) + + actual_time_int = payload_cls._convert_time_string_to_int( + expected_time_str) + self.assertEqual(expected_time_int, actual_time_int) + + actual_time_str = payload_cls._convert_int_to_time_string( + actual_time_int) + self.assertEqual(expected_time_str, actual_time_str) + + def test_unscoped_payload(self): + exp_user_id = uuid.uuid4().hex + exp_methods = ['password'] + exp_expires_at = timeutils.isotime(timeutils.utcnow()) + exp_audit_ids = [provider.random_urlsafe_str()] + + payload = token_formatters.UnscopedPayload.assemble( + exp_user_id, exp_methods, exp_expires_at, exp_audit_ids) + + (user_id, methods, expires_at, audit_ids) = ( + token_formatters.UnscopedPayload.disassemble(payload)) + + self.assertEqual(exp_user_id, user_id) + self.assertEqual(exp_methods, methods) + self.assertEqual(exp_expires_at, expires_at) + self.assertEqual(exp_audit_ids, audit_ids) + + def test_project_scoped_payload(self): + exp_user_id = uuid.uuid4().hex + exp_methods = ['password'] + exp_project_id = uuid.uuid4().hex + exp_expires_at = timeutils.isotime(timeutils.utcnow()) + exp_audit_ids = [provider.random_urlsafe_str()] + + payload = token_formatters.ProjectScopedPayload.assemble( + exp_user_id, exp_methods, exp_project_id, exp_expires_at, + exp_audit_ids) + + (user_id, methods, project_id, expires_at, audit_ids) = ( + token_formatters.ProjectScopedPayload.disassemble(payload)) + + self.assertEqual(exp_user_id, user_id) + self.assertEqual(exp_methods, methods) + self.assertEqual(exp_project_id, project_id) + self.assertEqual(exp_expires_at, expires_at) + self.assertEqual(exp_audit_ids, audit_ids) + + def test_domain_scoped_payload(self): + exp_user_id = uuid.uuid4().hex + exp_methods = ['password'] + exp_domain_id = uuid.uuid4().hex + exp_expires_at = timeutils.isotime(timeutils.utcnow()) + exp_audit_ids = [provider.random_urlsafe_str()] + + payload = token_formatters.DomainScopedPayload.assemble( + exp_user_id, exp_methods, exp_domain_id, exp_expires_at, + exp_audit_ids) + + (user_id, methods, domain_id, expires_at, audit_ids) = ( + token_formatters.DomainScopedPayload.disassemble(payload)) + + self.assertEqual(exp_user_id, user_id) + self.assertEqual(exp_methods, methods) + self.assertEqual(exp_domain_id, domain_id) + self.assertEqual(exp_expires_at, expires_at) + self.assertEqual(exp_audit_ids, audit_ids) + + def test_domain_scoped_payload_with_default_domain(self): + exp_user_id = uuid.uuid4().hex + exp_methods = ['password'] + exp_domain_id = CONF.identity.default_domain_id + exp_expires_at = timeutils.isotime(timeutils.utcnow()) + exp_audit_ids = [provider.random_urlsafe_str()] + + payload = token_formatters.DomainScopedPayload.assemble( + exp_user_id, exp_methods, exp_domain_id, exp_expires_at, + exp_audit_ids) + + (user_id, methods, domain_id, expires_at, audit_ids) = ( + token_formatters.DomainScopedPayload.disassemble(payload)) + + self.assertEqual(exp_user_id, user_id) + self.assertEqual(exp_methods, methods) + self.assertEqual(exp_domain_id, domain_id) + self.assertEqual(exp_expires_at, expires_at) + self.assertEqual(exp_audit_ids, audit_ids) + + def test_trust_scoped_payload(self): + exp_user_id = uuid.uuid4().hex + exp_methods = ['password'] + exp_project_id = uuid.uuid4().hex + exp_expires_at = timeutils.isotime(timeutils.utcnow()) + exp_audit_ids = [provider.random_urlsafe_str()] + exp_trust_id = uuid.uuid4().hex + + payload = token_formatters.TrustScopedPayload.assemble( + exp_user_id, exp_methods, exp_project_id, exp_expires_at, + exp_audit_ids, exp_trust_id) + + (user_id, methods, project_id, expires_at, audit_ids, trust_id) = ( + token_formatters.TrustScopedPayload.disassemble(payload)) + + self.assertEqual(exp_user_id, user_id) + self.assertEqual(exp_methods, methods) + self.assertEqual(exp_project_id, project_id) + self.assertEqual(exp_expires_at, expires_at) + self.assertEqual(exp_audit_ids, audit_ids) + self.assertEqual(exp_trust_id, trust_id) diff --git a/keystone-moon/keystone/tests/unit/token/test_provider.py b/keystone-moon/keystone/tests/unit/token/test_provider.py new file mode 100644 index 00000000..e5910690 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/token/test_provider.py @@ -0,0 +1,29 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import urllib + +from keystone.tests import unit +from keystone.token import provider + + +class TestRandomStrings(unit.BaseTestCase): + def test_strings_are_url_safe(self): + s = provider.random_urlsafe_str() + self.assertEqual(s, urllib.quote_plus(s)) + + def test_strings_can_be_converted_to_bytes(self): + s = provider.random_urlsafe_str() + self.assertTrue(isinstance(s, basestring)) + + b = provider.random_urlsafe_str_to_bytes(s) + self.assertTrue(isinstance(b, bytes)) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py new file mode 100644 index 00000000..a12a22d4 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/token/test_token_data_helper.py @@ -0,0 +1,55 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import uuid + +from testtools import matchers + +from keystone import exception +from keystone.tests import unit as tests +from keystone.token.providers import common + + +class TestTokenDataHelper(tests.TestCase): + def setUp(self): + super(TestTokenDataHelper, self).setUp() + self.load_backends() + self.v3_data_helper = common.V3TokenDataHelper() + + def test_v3_token_data_helper_populate_audit_info_string(self): + token_data = {} + audit_info = base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2] + self.v3_data_helper._populate_audit_info(token_data, audit_info) + self.assertIn(audit_info, token_data['audit_ids']) + self.assertThat(token_data['audit_ids'], matchers.HasLength(2)) + + def test_v3_token_data_helper_populate_audit_info_none(self): + token_data = {} + self.v3_data_helper._populate_audit_info(token_data, audit_info=None) + self.assertThat(token_data['audit_ids'], matchers.HasLength(1)) + self.assertNotIn(None, token_data['audit_ids']) + + def test_v3_token_data_helper_populate_audit_info_list(self): + token_data = {} + audit_info = [base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2], + base64.urlsafe_b64encode(uuid.uuid4().bytes)[:-2]] + self.v3_data_helper._populate_audit_info(token_data, audit_info) + self.assertEqual(audit_info, token_data['audit_ids']) + + def test_v3_token_data_helper_populate_audit_info_invalid(self): + token_data = {} + audit_info = dict() + self.assertRaises(exception.UnexpectedError, + self.v3_data_helper._populate_audit_info, + token_data=token_data, + audit_info=audit_info) diff --git a/keystone-moon/keystone/tests/unit/token/test_token_model.py b/keystone-moon/keystone/tests/unit/token/test_token_model.py new file mode 100644 index 00000000..b2474289 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/token/test_token_model.py @@ -0,0 +1,262 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import uuid + +from oslo_config import cfg +from oslo_utils import timeutils + +from keystone import exception +from keystone.models import token_model +from keystone.tests.unit import core +from keystone.tests.unit import test_token_provider + + +CONF = cfg.CONF + + +class TestKeystoneTokenModel(core.TestCase): + def setUp(self): + super(TestKeystoneTokenModel, self).setUp() + self.load_backends() + self.v2_sample_token = copy.deepcopy( + test_token_provider.SAMPLE_V2_TOKEN) + self.v3_sample_token = copy.deepcopy( + test_token_provider.SAMPLE_V3_TOKEN) + + def test_token_model_v3(self): + token_data = token_model.KeystoneToken(uuid.uuid4().hex, + self.v3_sample_token) + self.assertIs(token_model.V3, token_data.version) + expires = timeutils.normalize_time(timeutils.parse_isotime( + self.v3_sample_token['token']['expires_at'])) + issued = timeutils.normalize_time(timeutils.parse_isotime( + self.v3_sample_token['token']['issued_at'])) + self.assertEqual(expires, token_data.expires) + self.assertEqual(issued, token_data.issued) + self.assertEqual(self.v3_sample_token['token']['user']['id'], + token_data.user_id) + self.assertEqual(self.v3_sample_token['token']['user']['name'], + token_data.user_name) + self.assertEqual(self.v3_sample_token['token']['user']['domain']['id'], + token_data.user_domain_id) + self.assertEqual( + self.v3_sample_token['token']['user']['domain']['name'], + token_data.user_domain_name) + self.assertEqual( + self.v3_sample_token['token']['project']['domain']['id'], + token_data.project_domain_id) + self.assertEqual( + self.v3_sample_token['token']['project']['domain']['name'], + token_data.project_domain_name) + self.assertEqual(self.v3_sample_token['token']['OS-TRUST:trust']['id'], + token_data.trust_id) + self.assertEqual( + self.v3_sample_token['token']['OS-TRUST:trust']['trustor_user_id'], + token_data.trustor_user_id) + self.assertEqual( + self.v3_sample_token['token']['OS-TRUST:trust']['trustee_user_id'], + token_data.trustee_user_id) + # Project Scoped Token + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'domain_id') + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'domain_name') + self.assertFalse(token_data.domain_scoped) + self.assertEqual(self.v3_sample_token['token']['project']['id'], + token_data.project_id) + self.assertEqual(self.v3_sample_token['token']['project']['name'], + token_data.project_name) + self.assertTrue(token_data.project_scoped) + self.assertTrue(token_data.scoped) + self.assertTrue(token_data.trust_scoped) + self.assertEqual( + [r['id'] for r in self.v3_sample_token['token']['roles']], + token_data.role_ids) + self.assertEqual( + [r['name'] for r in self.v3_sample_token['token']['roles']], + token_data.role_names) + token_data.pop('project') + self.assertFalse(token_data.project_scoped) + self.assertFalse(token_data.scoped) + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_id') + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_name') + self.assertFalse(token_data.project_scoped) + domain_id = uuid.uuid4().hex + domain_name = uuid.uuid4().hex + token_data['domain'] = {'id': domain_id, + 'name': domain_name} + self.assertEqual(domain_id, token_data.domain_id) + self.assertEqual(domain_name, token_data.domain_name) + self.assertTrue(token_data.domain_scoped) + + token_data['audit_ids'] = [uuid.uuid4().hex] + self.assertEqual(token_data.audit_id, + token_data['audit_ids'][0]) + self.assertEqual(token_data.audit_chain_id, + token_data['audit_ids'][0]) + token_data['audit_ids'].append(uuid.uuid4().hex) + self.assertEqual(token_data.audit_chain_id, + token_data['audit_ids'][1]) + del token_data['audit_ids'] + self.assertIsNone(token_data.audit_id) + self.assertIsNone(token_data.audit_chain_id) + + def test_token_model_v3_federated_user(self): + token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=self.v3_sample_token) + federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, + 'protocol': {'id': 'saml2'}, + 'groups': [{'id': uuid.uuid4().hex} + for x in range(1, 5)]} + + self.assertFalse(token_data.is_federated_user) + self.assertEqual([], token_data.federation_group_ids) + self.assertIsNone(token_data.federation_protocol_id) + self.assertIsNone(token_data.federation_idp_id) + + token_data['user'][token_model.federation.FEDERATION] = federation_data + + self.assertTrue(token_data.is_federated_user) + self.assertEqual([x['id'] for x in federation_data['groups']], + token_data.federation_group_ids) + self.assertEqual(federation_data['protocol']['id'], + token_data.federation_protocol_id) + self.assertEqual(federation_data['identity_provider']['id'], + token_data.federation_idp_id) + + def test_token_model_v2_federated_user(self): + token_data = token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=self.v2_sample_token) + federation_data = {'identity_provider': {'id': uuid.uuid4().hex}, + 'protocol': {'id': 'saml2'}, + 'groups': [{'id': uuid.uuid4().hex} + for x in range(1, 5)]} + self.assertFalse(token_data.is_federated_user) + self.assertEqual([], token_data.federation_group_ids) + self.assertIsNone(token_data.federation_protocol_id) + self.assertIsNone(token_data.federation_idp_id) + + token_data['user'][token_model.federation.FEDERATION] = federation_data + + # Federated users should not exist in V2, the data should remain empty + self.assertFalse(token_data.is_federated_user) + self.assertEqual([], token_data.federation_group_ids) + self.assertIsNone(token_data.federation_protocol_id) + self.assertIsNone(token_data.federation_idp_id) + + def test_token_model_v2(self): + token_data = token_model.KeystoneToken(uuid.uuid4().hex, + self.v2_sample_token) + self.assertIs(token_model.V2, token_data.version) + expires = timeutils.normalize_time(timeutils.parse_isotime( + self.v2_sample_token['access']['token']['expires'])) + issued = timeutils.normalize_time(timeutils.parse_isotime( + self.v2_sample_token['access']['token']['issued_at'])) + self.assertEqual(expires, token_data.expires) + self.assertEqual(issued, token_data.issued) + self.assertEqual(self.v2_sample_token['access']['user']['id'], + token_data.user_id) + self.assertEqual(self.v2_sample_token['access']['user']['name'], + token_data.user_name) + self.assertEqual(CONF.identity.default_domain_id, + token_data.user_domain_id) + self.assertEqual('Default', token_data.user_domain_name) + self.assertEqual(CONF.identity.default_domain_id, + token_data.project_domain_id) + self.assertEqual('Default', + token_data.project_domain_name) + self.assertEqual(self.v2_sample_token['access']['trust']['id'], + token_data.trust_id) + self.assertEqual( + self.v2_sample_token['access']['trust']['trustor_user_id'], + token_data.trustor_user_id) + self.assertEqual( + self.v2_sample_token['access']['trust']['impersonation'], + token_data.trust_impersonation) + self.assertEqual( + self.v2_sample_token['access']['trust']['trustee_user_id'], + token_data.trustee_user_id) + # Project Scoped Token + self.assertEqual( + self.v2_sample_token['access']['token']['tenant']['id'], + token_data.project_id) + self.assertEqual( + self.v2_sample_token['access']['token']['tenant']['name'], + token_data.project_name) + self.assertTrue(token_data.project_scoped) + self.assertTrue(token_data.scoped) + self.assertTrue(token_data.trust_scoped) + self.assertEqual( + [r['name'] + for r in self.v2_sample_token['access']['user']['roles']], + token_data.role_names) + token_data['token'].pop('tenant') + self.assertFalse(token_data.scoped) + self.assertFalse(token_data.project_scoped) + self.assertFalse(token_data.domain_scoped) + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_id') + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_name') + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_domain_id') + self.assertRaises(exception.UnexpectedError, getattr, token_data, + 'project_domain_id') + # No Domain Scoped tokens in V2 + self.assertRaises(NotImplementedError, getattr, token_data, + 'domain_id') + self.assertRaises(NotImplementedError, getattr, token_data, + 'domain_name') + token_data['domain'] = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex} + self.assertRaises(NotImplementedError, getattr, token_data, + 'domain_id') + self.assertRaises(NotImplementedError, getattr, token_data, + 'domain_name') + self.assertFalse(token_data.domain_scoped) + + token_data['token']['audit_ids'] = [uuid.uuid4().hex] + self.assertEqual(token_data.audit_chain_id, + token_data['token']['audit_ids'][0]) + token_data['token']['audit_ids'].append(uuid.uuid4().hex) + self.assertEqual(token_data.audit_chain_id, + token_data['token']['audit_ids'][1]) + self.assertEqual(token_data.audit_id, + token_data['token']['audit_ids'][0]) + del token_data['token']['audit_ids'] + self.assertIsNone(token_data.audit_id) + self.assertIsNone(token_data.audit_chain_id) + + def test_token_model_unknown(self): + self.assertRaises(exception.UnsupportedTokenVersionException, + token_model.KeystoneToken, + token_id=uuid.uuid4().hex, + token_data={'bogus_data': uuid.uuid4().hex}) + + def test_token_model_dual_scoped_token(self): + domain = {'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex} + self.v2_sample_token['access']['domain'] = domain + self.v3_sample_token['token']['domain'] = domain + + # V2 Tokens Cannot be domain scoped, this should work + token_model.KeystoneToken(token_id=uuid.uuid4().hex, + token_data=self.v2_sample_token) + + self.assertRaises(exception.UnexpectedError, + token_model.KeystoneToken, + token_id=uuid.uuid4().hex, + token_data=self.v3_sample_token) -- cgit 1.2.3-korg