diff options
author | 2015-06-30 18:47:29 +0200 | |
---|---|---|
committer | 2015-06-30 18:47:29 +0200 | |
commit | b8c756ecdd7cced1db4300935484e8c83701c82e (patch) | |
tree | 87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/test_auth.py | |
parent | c304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff) |
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604
Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_auth.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_auth.py | 1328 |
1 files changed, 1328 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_auth.py b/keystone-moon/keystone/tests/unit/test_auth.py new file mode 100644 index 00000000..295e028d --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_auth.py @@ -0,0 +1,1328 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import datetime +import uuid + +import mock +from oslo_config import cfg +from oslo_utils import timeutils +from testtools import matchers + +from keystone import assignment +from keystone import auth +from keystone.common import authorization +from keystone import config +from keystone import exception +from keystone.models import token_model +from keystone.tests import unit as tests +from keystone.tests.unit import default_fixtures +from keystone.tests.unit.ksfixtures import database +from keystone import token +from keystone.token import provider +from keystone import trust + + +CONF = cfg.CONF +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' +DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id + +HOST_URL = 'http://keystone:5001' + + +def _build_user_auth(token=None, user_id=None, username=None, + password=None, tenant_id=None, tenant_name=None, + trust_id=None): + """Build auth dictionary. + + It will create an auth dictionary based on all the arguments + that it receives. + """ + auth_json = {} + if token is not None: + auth_json['token'] = token + if username or password: + auth_json['passwordCredentials'] = {} + if username is not None: + auth_json['passwordCredentials']['username'] = username + if user_id is not None: + auth_json['passwordCredentials']['userId'] = user_id + if password is not None: + auth_json['passwordCredentials']['password'] = password + if tenant_name is not None: + auth_json['tenantName'] = tenant_name + if tenant_id is not None: + auth_json['tenantId'] = tenant_id + if trust_id is not None: + auth_json['trust_id'] = trust_id + return auth_json + + +class AuthTest(tests.TestCase): + def setUp(self): + self.useFixture(database.Database()) + super(AuthTest, self).setUp() + + self.load_backends() + self.load_fixtures(default_fixtures) + + self.context_with_remote_user = {'environment': + {'REMOTE_USER': 'FOO', + 'AUTH_TYPE': 'Negotiate'}} + self.empty_context = {'environment': {}} + + self.controller = token.controllers.Auth() + + def assertEqualTokens(self, a, b, enforce_audit_ids=True): + """Assert that two tokens are equal. + + Compare two tokens except for their ids. This also truncates + the time in the comparison. + """ + def normalize(token): + token['access']['token']['id'] = 'dummy' + del token['access']['token']['expires'] + del token['access']['token']['issued_at'] + del token['access']['token']['audit_ids'] + return token + + self.assertCloseEnoughForGovernmentWork( + timeutils.parse_isotime(a['access']['token']['expires']), + timeutils.parse_isotime(b['access']['token']['expires'])) + self.assertCloseEnoughForGovernmentWork( + timeutils.parse_isotime(a['access']['token']['issued_at']), + timeutils.parse_isotime(b['access']['token']['issued_at'])) + if enforce_audit_ids: + self.assertIn(a['access']['token']['audit_ids'][0], + b['access']['token']['audit_ids']) + self.assertThat(len(a['access']['token']['audit_ids']), + matchers.LessThan(3)) + self.assertThat(len(b['access']['token']['audit_ids']), + matchers.LessThan(3)) + + return self.assertDictEqual(normalize(a), normalize(b)) + + +class AuthBadRequests(AuthTest): + def test_no_external_auth(self): + """Verify that _authenticate_external() raises exception if N/A.""" + self.assertRaises( + token.controllers.ExternalAuthNotApplicable, + self.controller._authenticate_external, + context={}, auth={}) + + def test_empty_remote_user(self): + """Verify that _authenticate_external() raises exception if + REMOTE_USER is set as the empty string. + """ + context = {'environment': {'REMOTE_USER': ''}} + self.assertRaises( + token.controllers.ExternalAuthNotApplicable, + self.controller._authenticate_external, + context=context, auth={}) + + def test_no_token_in_auth(self): + """Verify that _authenticate_token() raises exception if no token.""" + self.assertRaises( + exception.ValidationError, + self.controller._authenticate_token, + None, {}) + + def test_no_credentials_in_auth(self): + """Verify that _authenticate_local() raises exception if no creds.""" + self.assertRaises( + exception.ValidationError, + self.controller._authenticate_local, + None, {}) + + def test_empty_username_and_userid_in_auth(self): + """Verify that empty username and userID raises ValidationError.""" + self.assertRaises( + exception.ValidationError, + self.controller._authenticate_local, + None, {'passwordCredentials': {'password': 'abc', + 'userId': '', 'username': ''}}) + + def test_authenticate_blank_request_body(self): + """Verify sending empty json dict raises the right exception.""" + self.assertRaises(exception.ValidationError, + self.controller.authenticate, + {}, {}) + + def test_authenticate_blank_auth(self): + """Verify sending blank 'auth' raises the right exception.""" + body_dict = _build_user_auth() + self.assertRaises(exception.ValidationError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_invalid_auth_content(self): + """Verify sending invalid 'auth' raises the right exception.""" + self.assertRaises(exception.ValidationError, + self.controller.authenticate, + {}, {'auth': 'abcd'}) + + def test_authenticate_user_id_too_large(self): + """Verify sending large 'userId' raises the right exception.""" + body_dict = _build_user_auth(user_id='0' * 65, username='FOO', + password='foo2') + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_username_too_large(self): + """Verify sending large 'username' raises the right exception.""" + body_dict = _build_user_auth(username='0' * 65, password='foo2') + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_tenant_id_too_large(self): + """Verify sending large 'tenantId' raises the right exception.""" + body_dict = _build_user_auth(username='FOO', password='foo2', + tenant_id='0' * 65) + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_tenant_name_too_large(self): + """Verify sending large 'tenantName' raises the right exception.""" + body_dict = _build_user_auth(username='FOO', password='foo2', + tenant_name='0' * 65) + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_token_too_large(self): + """Verify sending large 'token' raises the right exception.""" + body_dict = _build_user_auth(token={'id': '0' * 8193}) + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_password_too_large(self): + """Verify sending large 'password' raises the right exception.""" + length = CONF.identity.max_password_length + 1 + body_dict = _build_user_auth(username='FOO', password='0' * length) + self.assertRaises(exception.ValidationSizeError, + self.controller.authenticate, + {}, body_dict) + + +class AuthWithToken(AuthTest): + def test_unscoped_token(self): + """Verify getting an unscoped token with password creds.""" + body_dict = _build_user_auth(username='FOO', + password='foo2') + unscoped_token = self.controller.authenticate({}, body_dict) + self.assertNotIn('tenant', unscoped_token['access']['token']) + + def test_auth_invalid_token(self): + """Verify exception is raised if invalid token.""" + body_dict = _build_user_auth(token={"id": uuid.uuid4().hex}) + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + {}, body_dict) + + def test_auth_bad_formatted_token(self): + """Verify exception is raised if invalid token.""" + body_dict = _build_user_auth(token={}) + self.assertRaises( + exception.ValidationError, + self.controller.authenticate, + {}, body_dict) + + def test_auth_unscoped_token_no_project(self): + """Verify getting an unscoped token with an unscoped token.""" + body_dict = _build_user_auth( + username='FOO', + password='foo2') + unscoped_token = self.controller.authenticate({}, body_dict) + + body_dict = _build_user_auth( + token=unscoped_token["access"]["token"]) + unscoped_token_2 = self.controller.authenticate({}, body_dict) + + self.assertEqualTokens(unscoped_token, unscoped_token_2) + + def test_auth_unscoped_token_project(self): + """Verify getting a token in a tenant with an unscoped token.""" + # Add a role in so we can check we get this back + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_bar['id'], + self.role_member['id']) + # Get an unscoped tenant + body_dict = _build_user_auth( + username='FOO', + password='foo2') + unscoped_token = self.controller.authenticate({}, body_dict) + # Get a token on BAR tenant using the unscoped tenant + body_dict = _build_user_auth( + token=unscoped_token["access"]["token"], + tenant_name="BAR") + scoped_token = self.controller.authenticate({}, body_dict) + + tenant = scoped_token["access"]["token"]["tenant"] + roles = scoped_token["access"]["metadata"]["roles"] + self.assertEqual(self.tenant_bar['id'], tenant["id"]) + self.assertThat(roles, matchers.Contains(self.role_member['id'])) + + def test_auth_token_project_group_role(self): + """Verify getting a token in a tenant with group roles.""" + # Add a v2 style role in so we can check we get this back + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], + self.tenant_bar['id'], + self.role_member['id']) + # Now create a group role for this user as well + domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + self.resource_api.create_domain(domain1['id'], domain1) + new_group = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex} + new_group = self.identity_api.create_group(new_group) + self.identity_api.add_user_to_group(self.user_foo['id'], + new_group['id']) + self.assignment_api.create_grant( + group_id=new_group['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_admin['id']) + + # Get a scoped token for the tenant + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + scoped_token = self.controller.authenticate({}, body_dict) + + tenant = scoped_token["access"]["token"]["tenant"] + roles = scoped_token["access"]["metadata"]["roles"] + self.assertEqual(self.tenant_bar['id'], tenant["id"]) + self.assertIn(self.role_member['id'], roles) + self.assertIn(self.role_admin['id'], roles) + + def test_belongs_to_no_tenant(self): + r = self.controller.authenticate( + {}, + auth={ + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'] + } + }) + unscoped_token_id = r['access']['token']['id'] + self.assertRaises( + exception.Unauthorized, + self.controller.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'BAR'}), + token_id=unscoped_token_id) + + def test_belongs_to(self): + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + scoped_token = self.controller.authenticate({}, body_dict) + scoped_token_id = scoped_token['access']['token']['id'] + + self.assertRaises( + exception.Unauthorized, + self.controller.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'me'}), + token_id=scoped_token_id) + + self.assertRaises( + exception.Unauthorized, + self.controller.validate_token, + dict(is_admin=True, query_string={'belongsTo': 'BAR'}), + token_id=scoped_token_id) + + def test_token_auth_with_binding(self): + self.config_fixture.config(group='token', bind=['kerberos']) + body_dict = _build_user_auth() + unscoped_token = self.controller.authenticate( + self.context_with_remote_user, body_dict) + + # the token should have bind information in it + bind = unscoped_token['access']['token']['bind'] + self.assertEqual('FOO', bind['kerberos']) + + body_dict = _build_user_auth( + token=unscoped_token['access']['token'], + tenant_name='BAR') + + # using unscoped token without remote user context fails + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + self.empty_context, body_dict) + + # using token with remote user context succeeds + scoped_token = self.controller.authenticate( + self.context_with_remote_user, body_dict) + + # the bind information should be carried over from the original token + bind = scoped_token['access']['token']['bind'] + self.assertEqual('FOO', bind['kerberos']) + + def test_deleting_role_revokes_token(self): + role_controller = assignment.controllers.Role() + project1 = {'id': 'Project1', 'name': uuid.uuid4().hex, + 'domain_id': DEFAULT_DOMAIN_ID} + self.resource_api.create_project(project1['id'], project1) + role_one = {'id': 'role_one', 'name': uuid.uuid4().hex} + self.role_api.create_role(role_one['id'], role_one) + self.assignment_api.add_role_to_user_and_project( + self.user_foo['id'], project1['id'], role_one['id']) + no_context = {} + + # Get a scoped token for the tenant + body_dict = _build_user_auth( + username=self.user_foo['name'], + password=self.user_foo['password'], + tenant_name=project1['name']) + token = self.controller.authenticate(no_context, body_dict) + # Ensure it is valid + token_id = token['access']['token']['id'] + self.controller.validate_token( + dict(is_admin=True, query_string={}), + token_id=token_id) + + # Delete the role, which should invalidate the token + role_controller.delete_role( + dict(is_admin=True, query_string={}), role_one['id']) + + # Check the token is now invalid + self.assertRaises( + exception.TokenNotFound, + self.controller.validate_token, + dict(is_admin=True, query_string={}), + token_id=token_id) + + def test_only_original_audit_id_is_kept(self): + context = {} + + def get_audit_ids(token): + return token['access']['token']['audit_ids'] + + # get a token + body_dict = _build_user_auth(username='FOO', password='foo2') + unscoped_token = self.controller.authenticate(context, body_dict) + starting_audit_id = get_audit_ids(unscoped_token)[0] + self.assertIsNotNone(starting_audit_id) + + # get another token to ensure the correct parent audit_id is set + body_dict = _build_user_auth(token=unscoped_token["access"]["token"]) + unscoped_token_2 = self.controller.authenticate(context, body_dict) + audit_ids = get_audit_ids(unscoped_token_2) + self.assertThat(audit_ids, matchers.HasLength(2)) + self.assertThat(audit_ids[-1], matchers.Equals(starting_audit_id)) + + # get another token from token 2 and ensure the correct parent + # audit_id is set + body_dict = _build_user_auth(token=unscoped_token_2["access"]["token"]) + unscoped_token_3 = self.controller.authenticate(context, body_dict) + audit_ids = get_audit_ids(unscoped_token_3) + self.assertThat(audit_ids, matchers.HasLength(2)) + self.assertThat(audit_ids[-1], matchers.Equals(starting_audit_id)) + + def test_revoke_by_audit_chain_id_original_token(self): + self.config_fixture.config(group='token', revoke_by_id=False) + context = {} + + # get a token + body_dict = _build_user_auth(username='FOO', password='foo2') + unscoped_token = self.controller.authenticate(context, body_dict) + token_id = unscoped_token['access']['token']['id'] + # get a second token + body_dict = _build_user_auth(token=unscoped_token["access"]["token"]) + unscoped_token_2 = self.controller.authenticate(context, body_dict) + token_2_id = unscoped_token_2['access']['token']['id'] + + self.token_provider_api.revoke_token(token_id, revoke_chain=True) + + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_id) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_2_id) + + def test_revoke_by_audit_chain_id_chained_token(self): + self.config_fixture.config(group='token', revoke_by_id=False) + context = {} + + # get a token + body_dict = _build_user_auth(username='FOO', password='foo2') + unscoped_token = self.controller.authenticate(context, body_dict) + token_id = unscoped_token['access']['token']['id'] + # get a second token + body_dict = _build_user_auth(token=unscoped_token["access"]["token"]) + unscoped_token_2 = self.controller.authenticate(context, body_dict) + token_2_id = unscoped_token_2['access']['token']['id'] + + self.token_provider_api.revoke_token(token_2_id, revoke_chain=True) + + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_id) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_2_id) + + def _mock_audit_info(self, parent_audit_id): + # NOTE(morgainfainberg): The token model and other cases that are + # extracting the audit id expect 'None' if the audit id doesn't + # exist. This ensures that the audit_id is None and the + # audit_chain_id will also return None. + return [None, None] + + def test_revoke_with_no_audit_info(self): + self.config_fixture.config(group='token', revoke_by_id=False) + context = {} + + with mock.patch.object(provider, 'audit_info', self._mock_audit_info): + # get a token + body_dict = _build_user_auth(username='FOO', password='foo2') + unscoped_token = self.controller.authenticate(context, body_dict) + token_id = unscoped_token['access']['token']['id'] + # get a second token + body_dict = _build_user_auth( + token=unscoped_token['access']['token']) + unscoped_token_2 = self.controller.authenticate(context, body_dict) + token_2_id = unscoped_token_2['access']['token']['id'] + + self.token_provider_api.revoke_token(token_id, revoke_chain=True) + + revoke_events = self.revoke_api.list_events() + self.assertThat(revoke_events, matchers.HasLength(1)) + revoke_event = revoke_events[0].to_dict() + self.assertIn('expires_at', revoke_event) + self.assertEqual(unscoped_token_2['access']['token']['expires'], + revoke_event['expires_at']) + + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_id) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_2_id) + + # get a new token, with no audit info + body_dict = _build_user_auth(username='FOO', password='foo2') + unscoped_token = self.controller.authenticate(context, body_dict) + token_id = unscoped_token['access']['token']['id'] + # get a second token + body_dict = _build_user_auth( + token=unscoped_token['access']['token']) + unscoped_token_2 = self.controller.authenticate(context, body_dict) + token_2_id = unscoped_token_2['access']['token']['id'] + + # Revoke by audit_id, no audit_info means both parent and child + # token are revoked. + self.token_provider_api.revoke_token(token_id) + + revoke_events = self.revoke_api.list_events() + self.assertThat(revoke_events, matchers.HasLength(2)) + revoke_event = revoke_events[1].to_dict() + self.assertIn('expires_at', revoke_event) + self.assertEqual(unscoped_token_2['access']['token']['expires'], + revoke_event['expires_at']) + + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_id) + self.assertRaises(exception.TokenNotFound, + self.token_provider_api.validate_v2_token, + token_id=token_2_id) + + +class AuthWithPasswordCredentials(AuthTest): + def test_auth_invalid_user(self): + """Verify exception is raised if invalid user.""" + body_dict = _build_user_auth( + username=uuid.uuid4().hex, + password=uuid.uuid4().hex) + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + {}, body_dict) + + def test_auth_valid_user_invalid_password(self): + """Verify exception is raised if invalid password.""" + body_dict = _build_user_auth( + username="FOO", + password=uuid.uuid4().hex) + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + {}, body_dict) + + def test_auth_empty_password(self): + """Verify exception is raised if empty password.""" + body_dict = _build_user_auth( + username="FOO", + password="") + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + {}, body_dict) + + def test_auth_no_password(self): + """Verify exception is raised if empty password.""" + body_dict = _build_user_auth(username="FOO") + self.assertRaises( + exception.ValidationError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_blank_password_credentials(self): + """Sending empty dict as passwordCredentials raises a 400 error.""" + body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'} + self.assertRaises(exception.ValidationError, + self.controller.authenticate, + {}, body_dict) + + def test_authenticate_no_username(self): + """Verify skipping username raises the right exception.""" + body_dict = _build_user_auth(password="pass", + tenant_name="demo") + self.assertRaises(exception.ValidationError, + self.controller.authenticate, + {}, body_dict) + + def test_bind_without_remote_user(self): + self.config_fixture.config(group='token', bind=['kerberos']) + body_dict = _build_user_auth(username='FOO', password='foo2', + tenant_name='BAR') + token = self.controller.authenticate({}, body_dict) + self.assertNotIn('bind', token['access']['token']) + + def test_change_default_domain_id(self): + # If the default_domain_id config option is not the default then the + # user in auth data is from the new default domain. + + # 1) Create a new domain. + new_domain_id = uuid.uuid4().hex + new_domain = { + 'description': uuid.uuid4().hex, + 'enabled': True, + 'id': new_domain_id, + 'name': uuid.uuid4().hex, + } + + self.resource_api.create_domain(new_domain_id, new_domain) + + # 2) Create user "foo" in new domain with different password than + # default-domain foo. + new_user_password = uuid.uuid4().hex + new_user = { + 'name': self.user_foo['name'], + 'domain_id': new_domain_id, + 'password': new_user_password, + 'email': 'foo@bar2.com', + } + + new_user = self.identity_api.create_user(new_user) + + # 3) Update the default_domain_id config option to the new domain + + self.config_fixture.config(group='identity', + default_domain_id=new_domain_id) + + # 4) Authenticate as "foo" using the password in the new domain. + + body_dict = _build_user_auth( + username=self.user_foo['name'], + password=new_user_password) + + # The test is successful if this doesn't raise, so no need to assert. + self.controller.authenticate({}, body_dict) + + +class AuthWithRemoteUser(AuthTest): + def test_unscoped_remote_authn(self): + """Verify getting an unscoped token with external authn.""" + body_dict = _build_user_auth( + username='FOO', + password='foo2') + local_token = self.controller.authenticate( + {}, body_dict) + + body_dict = _build_user_auth() + remote_token = self.controller.authenticate( + self.context_with_remote_user, body_dict) + + self.assertEqualTokens(local_token, remote_token, + enforce_audit_ids=False) + + def test_unscoped_remote_authn_jsonless(self): + """Verify that external auth with invalid request fails.""" + self.assertRaises( + exception.ValidationError, + self.controller.authenticate, + {'REMOTE_USER': 'FOO'}, + None) + + def test_scoped_remote_authn(self): + """Verify getting a token with external authn.""" + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name='BAR') + local_token = self.controller.authenticate( + {}, body_dict) + + body_dict = _build_user_auth( + tenant_name='BAR') + remote_token = self.controller.authenticate( + self.context_with_remote_user, body_dict) + + self.assertEqualTokens(local_token, remote_token, + enforce_audit_ids=False) + + def test_scoped_nometa_remote_authn(self): + """Verify getting a token with external authn and no metadata.""" + body_dict = _build_user_auth( + username='TWO', + password='two2', + tenant_name='BAZ') + local_token = self.controller.authenticate( + {}, body_dict) + + body_dict = _build_user_auth(tenant_name='BAZ') + remote_token = self.controller.authenticate( + {'environment': {'REMOTE_USER': 'TWO'}}, body_dict) + + self.assertEqualTokens(local_token, remote_token, + enforce_audit_ids=False) + + def test_scoped_remote_authn_invalid_user(self): + """Verify that external auth with invalid user fails.""" + body_dict = _build_user_auth(tenant_name="BAR") + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, + {'environment': {'REMOTE_USER': uuid.uuid4().hex}}, + body_dict) + + def test_bind_with_kerberos(self): + self.config_fixture.config(group='token', bind=['kerberos']) + body_dict = _build_user_auth(tenant_name="BAR") + token = self.controller.authenticate(self.context_with_remote_user, + body_dict) + self.assertEqual('FOO', token['access']['token']['bind']['kerberos']) + + def test_bind_without_config_opt(self): + self.config_fixture.config(group='token', bind=['x509']) + body_dict = _build_user_auth(tenant_name='BAR') + token = self.controller.authenticate(self.context_with_remote_user, + body_dict) + self.assertNotIn('bind', token['access']['token']) + + +class AuthWithTrust(AuthTest): + def setUp(self): + super(AuthWithTrust, self).setUp() + + self.trust_controller = trust.controllers.TrustV3() + self.auth_v3_controller = auth.controllers.Auth() + self.trustor = self.user_foo + self.trustee = self.user_two + self.assigned_roles = [self.role_member['id'], + self.role_browser['id']] + for assigned_role in self.assigned_roles: + self.assignment_api.add_role_to_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + + self.sample_data = {'trustor_user_id': self.trustor['id'], + 'trustee_user_id': self.trustee['id'], + 'project_id': self.tenant_bar['id'], + 'impersonation': True, + 'roles': [{'id': self.role_browser['id']}, + {'name': self.role_member['name']}]} + + def config_overrides(self): + super(AuthWithTrust, self).config_overrides() + self.config_fixture.config(group='trust', enabled=True) + + def _create_auth_context(self, token_id): + token_ref = token_model.KeystoneToken( + token_id=token_id, + token_data=self.token_provider_api.validate_token(token_id)) + auth_context = authorization.token_to_auth_context(token_ref) + return {'environment': {authorization.AUTH_CONTEXT_ENV: auth_context}, + 'token_id': token_id, + 'host_url': HOST_URL} + + def create_trust(self, trust_data, trustor_name, expires_at=None, + impersonation=True): + username = trustor_name + password = 'foo2' + unscoped_token = self.get_unscoped_token(username, password) + context = self._create_auth_context( + unscoped_token['access']['token']['id']) + trust_data_copy = copy.deepcopy(trust_data) + trust_data_copy['expires_at'] = expires_at + trust_data_copy['impersonation'] = impersonation + + return self.trust_controller.create_trust( + context, trust=trust_data_copy)['trust'] + + def get_unscoped_token(self, username, password='foo2'): + body_dict = _build_user_auth(username=username, password=password) + return self.controller.authenticate({}, body_dict) + + def build_v2_token_request(self, username, password, trust, + tenant_id=None): + if not tenant_id: + tenant_id = self.tenant_bar['id'] + unscoped_token = self.get_unscoped_token(username, password) + unscoped_token_id = unscoped_token['access']['token']['id'] + request_body = _build_user_auth(token={'id': unscoped_token_id}, + trust_id=trust['id'], + tenant_id=tenant_id) + return request_body + + def test_create_trust_bad_data_fails(self): + unscoped_token = self.get_unscoped_token(self.trustor['name']) + context = self._create_auth_context( + unscoped_token['access']['token']['id']) + bad_sample_data = {'trustor_user_id': self.trustor['id'], + 'project_id': self.tenant_bar['id'], + 'roles': [{'id': self.role_browser['id']}]} + + self.assertRaises(exception.ValidationError, + self.trust_controller.create_trust, + context, trust=bad_sample_data) + + def test_create_trust_no_roles(self): + unscoped_token = self.get_unscoped_token(self.trustor['name']) + context = {'token_id': unscoped_token['access']['token']['id']} + self.sample_data['roles'] = [] + self.assertRaises(exception.Forbidden, + self.trust_controller.create_trust, + context, trust=self.sample_data) + + def test_create_trust(self): + expires_at = timeutils.strtime(timeutils.utcnow() + + datetime.timedelta(minutes=10), + fmt=TIME_FORMAT) + new_trust = self.create_trust(self.sample_data, self.trustor['name'], + expires_at=expires_at) + self.assertEqual(self.trustor['id'], new_trust['trustor_user_id']) + self.assertEqual(self.trustee['id'], new_trust['trustee_user_id']) + role_ids = [self.role_browser['id'], self.role_member['id']] + self.assertTrue(timeutils.parse_strtime(new_trust['expires_at'], + fmt=TIME_FORMAT)) + self.assertIn('%s/v3/OS-TRUST/' % HOST_URL, + new_trust['links']['self']) + self.assertIn('%s/v3/OS-TRUST/' % HOST_URL, + new_trust['roles_links']['self']) + + for role in new_trust['roles']: + self.assertIn(role['id'], role_ids) + + def test_create_trust_expires_bad(self): + self.assertRaises(exception.ValidationTimeStampError, + self.create_trust, self.sample_data, + self.trustor['name'], expires_at="bad") + self.assertRaises(exception.ValidationTimeStampError, + self.create_trust, self.sample_data, + self.trustor['name'], expires_at="") + self.assertRaises(exception.ValidationTimeStampError, + self.create_trust, self.sample_data, + self.trustor['name'], expires_at="Z") + + def test_create_trust_without_project_id(self): + """Verify that trust can be created without project id and + token can be generated with that trust. + """ + unscoped_token = self.get_unscoped_token(self.trustor['name']) + context = self._create_auth_context( + unscoped_token['access']['token']['id']) + self.sample_data['project_id'] = None + self.sample_data['roles'] = [] + new_trust = self.trust_controller.create_trust( + context, trust=self.sample_data)['trust'] + self.assertEqual(self.trustor['id'], new_trust['trustor_user_id']) + self.assertEqual(self.trustee['id'], new_trust['trustee_user_id']) + self.assertIs(new_trust['impersonation'], True) + auth_response = self.fetch_v2_token_from_trust(new_trust) + token_user = auth_response['access']['user'] + self.assertEqual(token_user['id'], new_trust['trustor_user_id']) + + def test_get_trust(self): + unscoped_token = self.get_unscoped_token(self.trustor['name']) + context = {'token_id': unscoped_token['access']['token']['id'], + 'host_url': HOST_URL} + new_trust = self.trust_controller.create_trust( + context, trust=self.sample_data)['trust'] + trust = self.trust_controller.get_trust(context, + new_trust['id'])['trust'] + self.assertEqual(self.trustor['id'], trust['trustor_user_id']) + self.assertEqual(self.trustee['id'], trust['trustee_user_id']) + role_ids = [self.role_browser['id'], self.role_member['id']] + for role in new_trust['roles']: + self.assertIn(role['id'], role_ids) + + def test_create_trust_no_impersonation(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name'], + expires_at=None, impersonation=False) + self.assertEqual(self.trustor['id'], new_trust['trustor_user_id']) + self.assertEqual(self.trustee['id'], new_trust['trustee_user_id']) + self.assertIs(new_trust['impersonation'], False) + auth_response = self.fetch_v2_token_from_trust(new_trust) + token_user = auth_response['access']['user'] + self.assertEqual(token_user['id'], new_trust['trustee_user_id']) + + # TODO(ayoung): Endpoints + + def test_create_trust_impersonation(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + self.assertEqual(self.trustor['id'], new_trust['trustor_user_id']) + self.assertEqual(self.trustee['id'], new_trust['trustee_user_id']) + self.assertIs(new_trust['impersonation'], True) + auth_response = self.fetch_v2_token_from_trust(new_trust) + token_user = auth_response['access']['user'] + self.assertEqual(token_user['id'], new_trust['trustor_user_id']) + + def test_token_from_trust_wrong_user_fails(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + request_body = self.build_v2_token_request('FOO', 'foo2', new_trust) + self.assertRaises(exception.Forbidden, self.controller.authenticate, + {}, request_body) + + def test_token_from_trust_wrong_project_fails(self): + for assigned_role in self.assigned_roles: + self.assignment_api.add_role_to_user_and_project( + self.trustor['id'], self.tenant_baz['id'], assigned_role) + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + request_body = self.build_v2_token_request('TWO', 'two2', new_trust, + self.tenant_baz['id']) + self.assertRaises(exception.Forbidden, self.controller.authenticate, + {}, request_body) + + def fetch_v2_token_from_trust(self, trust): + request_body = self.build_v2_token_request('TWO', 'two2', trust) + auth_response = self.controller.authenticate({}, request_body) + return auth_response + + def fetch_v3_token_from_trust(self, trust, trustee): + v3_password_data = { + 'identity': { + "methods": ["password"], + "password": { + "user": { + "id": trustee["id"], + "password": trustee["password"] + } + } + }, + 'scope': { + 'project': { + 'id': self.tenant_baz['id'] + } + } + } + auth_response = (self.auth_v3_controller.authenticate_for_token + ({'environment': {}, + 'query_string': {}}, + v3_password_data)) + token = auth_response.headers['X-Subject-Token'] + + v3_req_with_trust = { + "identity": { + "methods": ["token"], + "token": {"id": token}}, + "scope": { + "OS-TRUST:trust": {"id": trust['id']}}} + token_auth_response = (self.auth_v3_controller.authenticate_for_token + ({'environment': {}, + 'query_string': {}}, + v3_req_with_trust)) + return token_auth_response + + def test_create_v3_token_from_trust(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee) + + trust_token_user = auth_response.json['token']['user'] + self.assertEqual(self.trustor['id'], trust_token_user['id']) + + trust_token_trust = auth_response.json['token']['OS-TRUST:trust'] + self.assertEqual(trust_token_trust['id'], new_trust['id']) + self.assertEqual(self.trustor['id'], + trust_token_trust['trustor_user']['id']) + self.assertEqual(self.trustee['id'], + trust_token_trust['trustee_user']['id']) + + trust_token_roles = auth_response.json['token']['roles'] + self.assertEqual(2, len(trust_token_roles)) + + def test_v3_trust_token_get_token_fails(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee) + trust_token = auth_response.headers['X-Subject-Token'] + v3_token_data = {'identity': { + 'methods': ['token'], + 'token': {'id': trust_token} + }} + self.assertRaises( + exception.Forbidden, + self.auth_v3_controller.authenticate_for_token, + {'environment': {}, + 'query_string': {}}, v3_token_data) + + def test_token_from_trust(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + auth_response = self.fetch_v2_token_from_trust(new_trust) + + self.assertIsNotNone(auth_response) + self.assertEqual(2, + len(auth_response['access']['metadata']['roles']), + "user_foo has three roles, but the token should" + " only get the two roles specified in the trust.") + + def assert_token_count_for_trust(self, trust, expected_value): + tokens = self.token_provider_api._persistence._list_tokens( + self.trustee['id'], trust_id=trust['id']) + token_count = len(tokens) + self.assertEqual(expected_value, token_count) + + def test_delete_tokens_for_user_invalidates_tokens_from_trust(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + self.assert_token_count_for_trust(new_trust, 0) + self.fetch_v2_token_from_trust(new_trust) + self.assert_token_count_for_trust(new_trust, 1) + self.token_provider_api._persistence.delete_tokens_for_user( + self.trustee['id']) + self.assert_token_count_for_trust(new_trust, 0) + + def test_token_from_trust_cant_get_another_token(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + auth_response = self.fetch_v2_token_from_trust(new_trust) + trust_token_id = auth_response['access']['token']['id'] + request_body = _build_user_auth(token={'id': trust_token_id}, + tenant_id=self.tenant_bar['id']) + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_delete_trust_revokes_token(self): + unscoped_token = self.get_unscoped_token(self.trustor['name']) + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + context = self._create_auth_context( + unscoped_token['access']['token']['id']) + self.fetch_v2_token_from_trust(new_trust) + trust_id = new_trust['id'] + tokens = self.token_provider_api._persistence._list_tokens( + self.trustor['id'], + trust_id=trust_id) + self.assertEqual(1, len(tokens)) + self.trust_controller.delete_trust(context, trust_id=trust_id) + tokens = self.token_provider_api._persistence._list_tokens( + self.trustor['id'], + trust_id=trust_id) + self.assertEqual(0, len(tokens)) + + def test_token_from_trust_with_no_role_fails(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + for assigned_role in self.assigned_roles: + self.assignment_api.remove_role_from_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + request_body = self.build_v2_token_request('TWO', 'two2', new_trust) + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_expired_trust_get_token_fails(self): + expiry = "1999-02-18T10:10:00Z" + new_trust = self.create_trust(self.sample_data, self.trustor['name'], + expiry) + request_body = self.build_v2_token_request('TWO', 'two2', new_trust) + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_token_from_trust_with_wrong_role_fails(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + self.assignment_api.add_role_to_user_and_project( + self.trustor['id'], + self.tenant_bar['id'], + self.role_other['id']) + for assigned_role in self.assigned_roles: + self.assignment_api.remove_role_from_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + + request_body = self.build_v2_token_request('TWO', 'two2', new_trust) + + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_do_not_consume_remaining_uses_when_get_token_fails(self): + trust_data = copy.deepcopy(self.sample_data) + trust_data['remaining_uses'] = 3 + new_trust = self.create_trust(trust_data, self.trustor['name']) + + for assigned_role in self.assigned_roles: + self.assignment_api.remove_role_from_user_and_project( + self.trustor['id'], self.tenant_bar['id'], assigned_role) + + request_body = self.build_v2_token_request('TWO', 'two2', new_trust) + self.assertRaises(exception.Forbidden, + self.controller.authenticate, {}, request_body) + + unscoped_token = self.get_unscoped_token(self.trustor['name']) + context = self._create_auth_context( + unscoped_token['access']['token']['id']) + trust = self.trust_controller.get_trust(context, + new_trust['id'])['trust'] + self.assertEqual(3, trust['remaining_uses']) + + def test_v2_trust_token_contains_trustor_user_id_and_impersonation(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + auth_response = self.fetch_v2_token_from_trust(new_trust) + + self.assertEqual(new_trust['trustee_user_id'], + auth_response['access']['trust']['trustee_user_id']) + self.assertEqual(new_trust['trustor_user_id'], + auth_response['access']['trust']['trustor_user_id']) + self.assertEqual(new_trust['impersonation'], + auth_response['access']['trust']['impersonation']) + self.assertEqual(new_trust['id'], + auth_response['access']['trust']['id']) + + validate_response = self.controller.validate_token( + context=dict(is_admin=True, query_string={}), + token_id=auth_response['access']['token']['id']) + self.assertEqual( + new_trust['trustee_user_id'], + validate_response['access']['trust']['trustee_user_id']) + self.assertEqual( + new_trust['trustor_user_id'], + validate_response['access']['trust']['trustor_user_id']) + self.assertEqual( + new_trust['impersonation'], + validate_response['access']['trust']['impersonation']) + self.assertEqual( + new_trust['id'], + validate_response['access']['trust']['id']) + + def disable_user(self, user): + user['enabled'] = False + self.identity_api.update_user(user['id'], user) + + def test_trust_get_token_fails_if_trustor_disabled(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + request_body = self.build_v2_token_request(self.trustee['name'], + self.trustee['password'], + new_trust) + self.disable_user(self.trustor) + self.assertRaises( + exception.Forbidden, + self.controller.authenticate, {}, request_body) + + def test_trust_get_token_fails_if_trustee_disabled(self): + new_trust = self.create_trust(self.sample_data, self.trustor['name']) + request_body = self.build_v2_token_request(self.trustee['name'], + self.trustee['password'], + new_trust) + self.disable_user(self.trustee) + self.assertRaises( + exception.Unauthorized, + self.controller.authenticate, {}, request_body) + + +class TokenExpirationTest(AuthTest): + + @mock.patch.object(timeutils, 'utcnow') + def _maintain_token_expiration(self, mock_utcnow): + """Token expiration should be maintained after re-auth & validation.""" + now = datetime.datetime.utcnow() + mock_utcnow.return_value = now + + r = self.controller.authenticate( + {}, + auth={ + 'passwordCredentials': { + 'username': self.user_foo['name'], + 'password': self.user_foo['password'] + } + }) + unscoped_token_id = r['access']['token']['id'] + original_expiration = r['access']['token']['expires'] + + mock_utcnow.return_value = now + datetime.timedelta(seconds=1) + + r = self.controller.validate_token( + dict(is_admin=True, query_string={}), + token_id=unscoped_token_id) + self.assertEqual(original_expiration, r['access']['token']['expires']) + + mock_utcnow.return_value = now + datetime.timedelta(seconds=2) + + r = self.controller.authenticate( + {}, + auth={ + 'token': { + 'id': unscoped_token_id, + }, + 'tenantId': self.tenant_bar['id'], + }) + scoped_token_id = r['access']['token']['id'] + self.assertEqual(original_expiration, r['access']['token']['expires']) + + mock_utcnow.return_value = now + datetime.timedelta(seconds=3) + + r = self.controller.validate_token( + dict(is_admin=True, query_string={}), + token_id=scoped_token_id) + self.assertEqual(original_expiration, r['access']['token']['expires']) + + def test_maintain_uuid_token_expiration(self): + self.config_fixture.config( + group='token', + provider='keystone.token.providers.uuid.Provider') + self._maintain_token_expiration() + + +class AuthCatalog(tests.SQLDriverOverrides, AuthTest): + """Tests for the catalog provided in the auth response.""" + + def config_files(self): + config_files = super(AuthCatalog, self).config_files() + # We need to use a backend that supports disabled endpoints, like the + # SQL backend. + config_files.append(tests.dirs.tests_conf('backend_sql.conf')) + return config_files + + def _create_endpoints(self): + def create_region(**kwargs): + ref = {'id': uuid.uuid4().hex} + ref.update(kwargs) + self.catalog_api.create_region(ref) + return ref + + def create_endpoint(service_id, region, **kwargs): + id_ = uuid.uuid4().hex + ref = { + 'id': id_, + 'interface': 'public', + 'region_id': region, + 'service_id': service_id, + 'url': 'http://localhost/%s' % uuid.uuid4().hex, + } + ref.update(kwargs) + self.catalog_api.create_endpoint(id_, ref) + return ref + + # Create a service for use with the endpoints. + def create_service(**kwargs): + id_ = uuid.uuid4().hex + ref = { + 'id': id_, + 'name': uuid.uuid4().hex, + 'type': uuid.uuid4().hex, + } + ref.update(kwargs) + self.catalog_api.create_service(id_, ref) + return ref + + enabled_service_ref = create_service(enabled=True) + disabled_service_ref = create_service(enabled=False) + + region = create_region() + + # Create endpoints + enabled_endpoint_ref = create_endpoint( + enabled_service_ref['id'], region['id']) + create_endpoint( + enabled_service_ref['id'], region['id'], enabled=False, + interface='internal') + create_endpoint( + disabled_service_ref['id'], region['id']) + + return enabled_endpoint_ref + + def test_auth_catalog_disabled_endpoint(self): + """On authenticate, get a catalog that excludes disabled endpoints.""" + endpoint_ref = self._create_endpoints() + + # Authenticate + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + token = self.controller.authenticate({}, body_dict) + + # Check the catalog + self.assertEqual(1, len(token['access']['serviceCatalog'])) + endpoint = token['access']['serviceCatalog'][0]['endpoints'][0] + self.assertEqual( + 1, len(token['access']['serviceCatalog'][0]['endpoints'])) + + exp_endpoint = { + 'id': endpoint_ref['id'], + 'publicURL': endpoint_ref['url'], + 'region': endpoint_ref['region_id'], + } + + self.assertEqual(exp_endpoint, endpoint) + + def test_validate_catalog_disabled_endpoint(self): + """On validate, get back a catalog that excludes disabled endpoints.""" + endpoint_ref = self._create_endpoints() + + # Authenticate + body_dict = _build_user_auth( + username='FOO', + password='foo2', + tenant_name="BAR") + + token = self.controller.authenticate({}, body_dict) + + # Validate + token_id = token['access']['token']['id'] + validate_ref = self.controller.validate_token( + dict(is_admin=True, query_string={}), + token_id=token_id) + + # Check the catalog + self.assertEqual(1, len(token['access']['serviceCatalog'])) + endpoint = validate_ref['access']['serviceCatalog'][0]['endpoints'][0] + self.assertEqual( + 1, len(token['access']['serviceCatalog'][0]['endpoints'])) + + exp_endpoint = { + 'id': endpoint_ref['id'], + 'publicURL': endpoint_ref['url'], + 'region': endpoint_ref['region_id'], + } + + self.assertEqual(exp_endpoint, endpoint) + + +class NonDefaultAuthTest(tests.TestCase): + + def test_add_non_default_auth_method(self): + self.config_fixture.config(group='auth', + methods=['password', 'token', 'custom']) + config.setup_authentication() + self.assertTrue(hasattr(CONF.auth, 'custom')) |