summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_auth.py
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
committerWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
commitb8c756ecdd7cced1db4300935484e8c83701c82e (patch)
tree87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/test_auth.py
parentc304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff)
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_auth.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_auth.py1328
1 files changed, 1328 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_auth.py b/keystone-moon/keystone/tests/unit/test_auth.py
new file mode 100644
index 00000000..295e028d
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_auth.py
@@ -0,0 +1,1328 @@
+# Copyright 2012 OpenStack Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import datetime
+import uuid
+
+import mock
+from oslo_config import cfg
+from oslo_utils import timeutils
+from testtools import matchers
+
+from keystone import assignment
+from keystone import auth
+from keystone.common import authorization
+from keystone import config
+from keystone import exception
+from keystone.models import token_model
+from keystone.tests import unit as tests
+from keystone.tests.unit import default_fixtures
+from keystone.tests.unit.ksfixtures import database
+from keystone import token
+from keystone.token import provider
+from keystone import trust
+
+
+CONF = cfg.CONF
+TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+
+HOST_URL = 'http://keystone:5001'
+
+
+def _build_user_auth(token=None, user_id=None, username=None,
+ password=None, tenant_id=None, tenant_name=None,
+ trust_id=None):
+ """Build auth dictionary.
+
+ It will create an auth dictionary based on all the arguments
+ that it receives.
+ """
+ auth_json = {}
+ if token is not None:
+ auth_json['token'] = token
+ if username or password:
+ auth_json['passwordCredentials'] = {}
+ if username is not None:
+ auth_json['passwordCredentials']['username'] = username
+ if user_id is not None:
+ auth_json['passwordCredentials']['userId'] = user_id
+ if password is not None:
+ auth_json['passwordCredentials']['password'] = password
+ if tenant_name is not None:
+ auth_json['tenantName'] = tenant_name
+ if tenant_id is not None:
+ auth_json['tenantId'] = tenant_id
+ if trust_id is not None:
+ auth_json['trust_id'] = trust_id
+ return auth_json
+
+
+class AuthTest(tests.TestCase):
+ def setUp(self):
+ self.useFixture(database.Database())
+ super(AuthTest, self).setUp()
+
+ self.load_backends()
+ self.load_fixtures(default_fixtures)
+
+ self.context_with_remote_user = {'environment':
+ {'REMOTE_USER': 'FOO',
+ 'AUTH_TYPE': 'Negotiate'}}
+ self.empty_context = {'environment': {}}
+
+ self.controller = token.controllers.Auth()
+
+ def assertEqualTokens(self, a, b, enforce_audit_ids=True):
+ """Assert that two tokens are equal.
+
+ Compare two tokens except for their ids. This also truncates
+ the time in the comparison.
+ """
+ def normalize(token):
+ token['access']['token']['id'] = 'dummy'
+ del token['access']['token']['expires']
+ del token['access']['token']['issued_at']
+ del token['access']['token']['audit_ids']
+ return token
+
+ self.assertCloseEnoughForGovernmentWork(
+ timeutils.parse_isotime(a['access']['token']['expires']),
+ timeutils.parse_isotime(b['access']['token']['expires']))
+ self.assertCloseEnoughForGovernmentWork(
+ timeutils.parse_isotime(a['access']['token']['issued_at']),
+ timeutils.parse_isotime(b['access']['token']['issued_at']))
+ if enforce_audit_ids:
+ self.assertIn(a['access']['token']['audit_ids'][0],
+ b['access']['token']['audit_ids'])
+ self.assertThat(len(a['access']['token']['audit_ids']),
+ matchers.LessThan(3))
+ self.assertThat(len(b['access']['token']['audit_ids']),
+ matchers.LessThan(3))
+
+ return self.assertDictEqual(normalize(a), normalize(b))
+
+
+class AuthBadRequests(AuthTest):
+ def test_no_external_auth(self):
+ """Verify that _authenticate_external() raises exception if N/A."""
+ self.assertRaises(
+ token.controllers.ExternalAuthNotApplicable,
+ self.controller._authenticate_external,
+ context={}, auth={})
+
+ def test_empty_remote_user(self):
+ """Verify that _authenticate_external() raises exception if
+ REMOTE_USER is set as the empty string.
+ """
+ context = {'environment': {'REMOTE_USER': ''}}
+ self.assertRaises(
+ token.controllers.ExternalAuthNotApplicable,
+ self.controller._authenticate_external,
+ context=context, auth={})
+
+ def test_no_token_in_auth(self):
+ """Verify that _authenticate_token() raises exception if no token."""
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller._authenticate_token,
+ None, {})
+
+ def test_no_credentials_in_auth(self):
+ """Verify that _authenticate_local() raises exception if no creds."""
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller._authenticate_local,
+ None, {})
+
+ def test_empty_username_and_userid_in_auth(self):
+ """Verify that empty username and userID raises ValidationError."""
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller._authenticate_local,
+ None, {'passwordCredentials': {'password': 'abc',
+ 'userId': '', 'username': ''}})
+
+ def test_authenticate_blank_request_body(self):
+ """Verify sending empty json dict raises the right exception."""
+ self.assertRaises(exception.ValidationError,
+ self.controller.authenticate,
+ {}, {})
+
+ def test_authenticate_blank_auth(self):
+ """Verify sending blank 'auth' raises the right exception."""
+ body_dict = _build_user_auth()
+ self.assertRaises(exception.ValidationError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_invalid_auth_content(self):
+ """Verify sending invalid 'auth' raises the right exception."""
+ self.assertRaises(exception.ValidationError,
+ self.controller.authenticate,
+ {}, {'auth': 'abcd'})
+
+ def test_authenticate_user_id_too_large(self):
+ """Verify sending large 'userId' raises the right exception."""
+ body_dict = _build_user_auth(user_id='0' * 65, username='FOO',
+ password='foo2')
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_username_too_large(self):
+ """Verify sending large 'username' raises the right exception."""
+ body_dict = _build_user_auth(username='0' * 65, password='foo2')
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_tenant_id_too_large(self):
+ """Verify sending large 'tenantId' raises the right exception."""
+ body_dict = _build_user_auth(username='FOO', password='foo2',
+ tenant_id='0' * 65)
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_tenant_name_too_large(self):
+ """Verify sending large 'tenantName' raises the right exception."""
+ body_dict = _build_user_auth(username='FOO', password='foo2',
+ tenant_name='0' * 65)
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_token_too_large(self):
+ """Verify sending large 'token' raises the right exception."""
+ body_dict = _build_user_auth(token={'id': '0' * 8193})
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_password_too_large(self):
+ """Verify sending large 'password' raises the right exception."""
+ length = CONF.identity.max_password_length + 1
+ body_dict = _build_user_auth(username='FOO', password='0' * length)
+ self.assertRaises(exception.ValidationSizeError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+
+class AuthWithToken(AuthTest):
+ def test_unscoped_token(self):
+ """Verify getting an unscoped token with password creds."""
+ body_dict = _build_user_auth(username='FOO',
+ password='foo2')
+ unscoped_token = self.controller.authenticate({}, body_dict)
+ self.assertNotIn('tenant', unscoped_token['access']['token'])
+
+ def test_auth_invalid_token(self):
+ """Verify exception is raised if invalid token."""
+ body_dict = _build_user_auth(token={"id": uuid.uuid4().hex})
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_auth_bad_formatted_token(self):
+ """Verify exception is raised if invalid token."""
+ body_dict = _build_user_auth(token={})
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_auth_unscoped_token_no_project(self):
+ """Verify getting an unscoped token with an unscoped token."""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
+ unscoped_token = self.controller.authenticate({}, body_dict)
+
+ body_dict = _build_user_auth(
+ token=unscoped_token["access"]["token"])
+ unscoped_token_2 = self.controller.authenticate({}, body_dict)
+
+ self.assertEqualTokens(unscoped_token, unscoped_token_2)
+
+ def test_auth_unscoped_token_project(self):
+ """Verify getting a token in a tenant with an unscoped token."""
+ # Add a role in so we can check we get this back
+ self.assignment_api.add_role_to_user_and_project(
+ self.user_foo['id'],
+ self.tenant_bar['id'],
+ self.role_member['id'])
+ # Get an unscoped tenant
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
+ unscoped_token = self.controller.authenticate({}, body_dict)
+ # Get a token on BAR tenant using the unscoped tenant
+ body_dict = _build_user_auth(
+ token=unscoped_token["access"]["token"],
+ tenant_name="BAR")
+ scoped_token = self.controller.authenticate({}, body_dict)
+
+ tenant = scoped_token["access"]["token"]["tenant"]
+ roles = scoped_token["access"]["metadata"]["roles"]
+ self.assertEqual(self.tenant_bar['id'], tenant["id"])
+ self.assertThat(roles, matchers.Contains(self.role_member['id']))
+
+ def test_auth_token_project_group_role(self):
+ """Verify getting a token in a tenant with group roles."""
+ # Add a v2 style role in so we can check we get this back
+ self.assignment_api.add_role_to_user_and_project(
+ self.user_foo['id'],
+ self.tenant_bar['id'],
+ self.role_member['id'])
+ # Now create a group role for this user as well
+ domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.resource_api.create_domain(domain1['id'], domain1)
+ new_group = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex}
+ new_group = self.identity_api.create_group(new_group)
+ self.identity_api.add_user_to_group(self.user_foo['id'],
+ new_group['id'])
+ self.assignment_api.create_grant(
+ group_id=new_group['id'],
+ project_id=self.tenant_bar['id'],
+ role_id=self.role_admin['id'])
+
+ # Get a scoped token for the tenant
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name="BAR")
+
+ scoped_token = self.controller.authenticate({}, body_dict)
+
+ tenant = scoped_token["access"]["token"]["tenant"]
+ roles = scoped_token["access"]["metadata"]["roles"]
+ self.assertEqual(self.tenant_bar['id'], tenant["id"])
+ self.assertIn(self.role_member['id'], roles)
+ self.assertIn(self.role_admin['id'], roles)
+
+ def test_belongs_to_no_tenant(self):
+ r = self.controller.authenticate(
+ {},
+ auth={
+ 'passwordCredentials': {
+ 'username': self.user_foo['name'],
+ 'password': self.user_foo['password']
+ }
+ })
+ unscoped_token_id = r['access']['token']['id']
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.validate_token,
+ dict(is_admin=True, query_string={'belongsTo': 'BAR'}),
+ token_id=unscoped_token_id)
+
+ def test_belongs_to(self):
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name="BAR")
+
+ scoped_token = self.controller.authenticate({}, body_dict)
+ scoped_token_id = scoped_token['access']['token']['id']
+
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.validate_token,
+ dict(is_admin=True, query_string={'belongsTo': 'me'}),
+ token_id=scoped_token_id)
+
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.validate_token,
+ dict(is_admin=True, query_string={'belongsTo': 'BAR'}),
+ token_id=scoped_token_id)
+
+ def test_token_auth_with_binding(self):
+ self.config_fixture.config(group='token', bind=['kerberos'])
+ body_dict = _build_user_auth()
+ unscoped_token = self.controller.authenticate(
+ self.context_with_remote_user, body_dict)
+
+ # the token should have bind information in it
+ bind = unscoped_token['access']['token']['bind']
+ self.assertEqual('FOO', bind['kerberos'])
+
+ body_dict = _build_user_auth(
+ token=unscoped_token['access']['token'],
+ tenant_name='BAR')
+
+ # using unscoped token without remote user context fails
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ self.empty_context, body_dict)
+
+ # using token with remote user context succeeds
+ scoped_token = self.controller.authenticate(
+ self.context_with_remote_user, body_dict)
+
+ # the bind information should be carried over from the original token
+ bind = scoped_token['access']['token']['bind']
+ self.assertEqual('FOO', bind['kerberos'])
+
+ def test_deleting_role_revokes_token(self):
+ role_controller = assignment.controllers.Role()
+ project1 = {'id': 'Project1', 'name': uuid.uuid4().hex,
+ 'domain_id': DEFAULT_DOMAIN_ID}
+ self.resource_api.create_project(project1['id'], project1)
+ role_one = {'id': 'role_one', 'name': uuid.uuid4().hex}
+ self.role_api.create_role(role_one['id'], role_one)
+ self.assignment_api.add_role_to_user_and_project(
+ self.user_foo['id'], project1['id'], role_one['id'])
+ no_context = {}
+
+ # Get a scoped token for the tenant
+ body_dict = _build_user_auth(
+ username=self.user_foo['name'],
+ password=self.user_foo['password'],
+ tenant_name=project1['name'])
+ token = self.controller.authenticate(no_context, body_dict)
+ # Ensure it is valid
+ token_id = token['access']['token']['id']
+ self.controller.validate_token(
+ dict(is_admin=True, query_string={}),
+ token_id=token_id)
+
+ # Delete the role, which should invalidate the token
+ role_controller.delete_role(
+ dict(is_admin=True, query_string={}), role_one['id'])
+
+ # Check the token is now invalid
+ self.assertRaises(
+ exception.TokenNotFound,
+ self.controller.validate_token,
+ dict(is_admin=True, query_string={}),
+ token_id=token_id)
+
+ def test_only_original_audit_id_is_kept(self):
+ context = {}
+
+ def get_audit_ids(token):
+ return token['access']['token']['audit_ids']
+
+ # get a token
+ body_dict = _build_user_auth(username='FOO', password='foo2')
+ unscoped_token = self.controller.authenticate(context, body_dict)
+ starting_audit_id = get_audit_ids(unscoped_token)[0]
+ self.assertIsNotNone(starting_audit_id)
+
+ # get another token to ensure the correct parent audit_id is set
+ body_dict = _build_user_auth(token=unscoped_token["access"]["token"])
+ unscoped_token_2 = self.controller.authenticate(context, body_dict)
+ audit_ids = get_audit_ids(unscoped_token_2)
+ self.assertThat(audit_ids, matchers.HasLength(2))
+ self.assertThat(audit_ids[-1], matchers.Equals(starting_audit_id))
+
+ # get another token from token 2 and ensure the correct parent
+ # audit_id is set
+ body_dict = _build_user_auth(token=unscoped_token_2["access"]["token"])
+ unscoped_token_3 = self.controller.authenticate(context, body_dict)
+ audit_ids = get_audit_ids(unscoped_token_3)
+ self.assertThat(audit_ids, matchers.HasLength(2))
+ self.assertThat(audit_ids[-1], matchers.Equals(starting_audit_id))
+
+ def test_revoke_by_audit_chain_id_original_token(self):
+ self.config_fixture.config(group='token', revoke_by_id=False)
+ context = {}
+
+ # get a token
+ body_dict = _build_user_auth(username='FOO', password='foo2')
+ unscoped_token = self.controller.authenticate(context, body_dict)
+ token_id = unscoped_token['access']['token']['id']
+ # get a second token
+ body_dict = _build_user_auth(token=unscoped_token["access"]["token"])
+ unscoped_token_2 = self.controller.authenticate(context, body_dict)
+ token_2_id = unscoped_token_2['access']['token']['id']
+
+ self.token_provider_api.revoke_token(token_id, revoke_chain=True)
+
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_id)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_2_id)
+
+ def test_revoke_by_audit_chain_id_chained_token(self):
+ self.config_fixture.config(group='token', revoke_by_id=False)
+ context = {}
+
+ # get a token
+ body_dict = _build_user_auth(username='FOO', password='foo2')
+ unscoped_token = self.controller.authenticate(context, body_dict)
+ token_id = unscoped_token['access']['token']['id']
+ # get a second token
+ body_dict = _build_user_auth(token=unscoped_token["access"]["token"])
+ unscoped_token_2 = self.controller.authenticate(context, body_dict)
+ token_2_id = unscoped_token_2['access']['token']['id']
+
+ self.token_provider_api.revoke_token(token_2_id, revoke_chain=True)
+
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_id)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_2_id)
+
+ def _mock_audit_info(self, parent_audit_id):
+ # NOTE(morgainfainberg): The token model and other cases that are
+ # extracting the audit id expect 'None' if the audit id doesn't
+ # exist. This ensures that the audit_id is None and the
+ # audit_chain_id will also return None.
+ return [None, None]
+
+ def test_revoke_with_no_audit_info(self):
+ self.config_fixture.config(group='token', revoke_by_id=False)
+ context = {}
+
+ with mock.patch.object(provider, 'audit_info', self._mock_audit_info):
+ # get a token
+ body_dict = _build_user_auth(username='FOO', password='foo2')
+ unscoped_token = self.controller.authenticate(context, body_dict)
+ token_id = unscoped_token['access']['token']['id']
+ # get a second token
+ body_dict = _build_user_auth(
+ token=unscoped_token['access']['token'])
+ unscoped_token_2 = self.controller.authenticate(context, body_dict)
+ token_2_id = unscoped_token_2['access']['token']['id']
+
+ self.token_provider_api.revoke_token(token_id, revoke_chain=True)
+
+ revoke_events = self.revoke_api.list_events()
+ self.assertThat(revoke_events, matchers.HasLength(1))
+ revoke_event = revoke_events[0].to_dict()
+ self.assertIn('expires_at', revoke_event)
+ self.assertEqual(unscoped_token_2['access']['token']['expires'],
+ revoke_event['expires_at'])
+
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_id)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_2_id)
+
+ # get a new token, with no audit info
+ body_dict = _build_user_auth(username='FOO', password='foo2')
+ unscoped_token = self.controller.authenticate(context, body_dict)
+ token_id = unscoped_token['access']['token']['id']
+ # get a second token
+ body_dict = _build_user_auth(
+ token=unscoped_token['access']['token'])
+ unscoped_token_2 = self.controller.authenticate(context, body_dict)
+ token_2_id = unscoped_token_2['access']['token']['id']
+
+ # Revoke by audit_id, no audit_info means both parent and child
+ # token are revoked.
+ self.token_provider_api.revoke_token(token_id)
+
+ revoke_events = self.revoke_api.list_events()
+ self.assertThat(revoke_events, matchers.HasLength(2))
+ revoke_event = revoke_events[1].to_dict()
+ self.assertIn('expires_at', revoke_event)
+ self.assertEqual(unscoped_token_2['access']['token']['expires'],
+ revoke_event['expires_at'])
+
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_id)
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_2_id)
+
+
+class AuthWithPasswordCredentials(AuthTest):
+ def test_auth_invalid_user(self):
+ """Verify exception is raised if invalid user."""
+ body_dict = _build_user_auth(
+ username=uuid.uuid4().hex,
+ password=uuid.uuid4().hex)
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_auth_valid_user_invalid_password(self):
+ """Verify exception is raised if invalid password."""
+ body_dict = _build_user_auth(
+ username="FOO",
+ password=uuid.uuid4().hex)
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_auth_empty_password(self):
+ """Verify exception is raised if empty password."""
+ body_dict = _build_user_auth(
+ username="FOO",
+ password="")
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_auth_no_password(self):
+ """Verify exception is raised if empty password."""
+ body_dict = _build_user_auth(username="FOO")
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_blank_password_credentials(self):
+ """Sending empty dict as passwordCredentials raises a 400 error."""
+ body_dict = {'passwordCredentials': {}, 'tenantName': 'demo'}
+ self.assertRaises(exception.ValidationError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_authenticate_no_username(self):
+ """Verify skipping username raises the right exception."""
+ body_dict = _build_user_auth(password="pass",
+ tenant_name="demo")
+ self.assertRaises(exception.ValidationError,
+ self.controller.authenticate,
+ {}, body_dict)
+
+ def test_bind_without_remote_user(self):
+ self.config_fixture.config(group='token', bind=['kerberos'])
+ body_dict = _build_user_auth(username='FOO', password='foo2',
+ tenant_name='BAR')
+ token = self.controller.authenticate({}, body_dict)
+ self.assertNotIn('bind', token['access']['token'])
+
+ def test_change_default_domain_id(self):
+ # If the default_domain_id config option is not the default then the
+ # user in auth data is from the new default domain.
+
+ # 1) Create a new domain.
+ new_domain_id = uuid.uuid4().hex
+ new_domain = {
+ 'description': uuid.uuid4().hex,
+ 'enabled': True,
+ 'id': new_domain_id,
+ 'name': uuid.uuid4().hex,
+ }
+
+ self.resource_api.create_domain(new_domain_id, new_domain)
+
+ # 2) Create user "foo" in new domain with different password than
+ # default-domain foo.
+ new_user_password = uuid.uuid4().hex
+ new_user = {
+ 'name': self.user_foo['name'],
+ 'domain_id': new_domain_id,
+ 'password': new_user_password,
+ 'email': 'foo@bar2.com',
+ }
+
+ new_user = self.identity_api.create_user(new_user)
+
+ # 3) Update the default_domain_id config option to the new domain
+
+ self.config_fixture.config(group='identity',
+ default_domain_id=new_domain_id)
+
+ # 4) Authenticate as "foo" using the password in the new domain.
+
+ body_dict = _build_user_auth(
+ username=self.user_foo['name'],
+ password=new_user_password)
+
+ # The test is successful if this doesn't raise, so no need to assert.
+ self.controller.authenticate({}, body_dict)
+
+
+class AuthWithRemoteUser(AuthTest):
+ def test_unscoped_remote_authn(self):
+ """Verify getting an unscoped token with external authn."""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2')
+ local_token = self.controller.authenticate(
+ {}, body_dict)
+
+ body_dict = _build_user_auth()
+ remote_token = self.controller.authenticate(
+ self.context_with_remote_user, body_dict)
+
+ self.assertEqualTokens(local_token, remote_token,
+ enforce_audit_ids=False)
+
+ def test_unscoped_remote_authn_jsonless(self):
+ """Verify that external auth with invalid request fails."""
+ self.assertRaises(
+ exception.ValidationError,
+ self.controller.authenticate,
+ {'REMOTE_USER': 'FOO'},
+ None)
+
+ def test_scoped_remote_authn(self):
+ """Verify getting a token with external authn."""
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name='BAR')
+ local_token = self.controller.authenticate(
+ {}, body_dict)
+
+ body_dict = _build_user_auth(
+ tenant_name='BAR')
+ remote_token = self.controller.authenticate(
+ self.context_with_remote_user, body_dict)
+
+ self.assertEqualTokens(local_token, remote_token,
+ enforce_audit_ids=False)
+
+ def test_scoped_nometa_remote_authn(self):
+ """Verify getting a token with external authn and no metadata."""
+ body_dict = _build_user_auth(
+ username='TWO',
+ password='two2',
+ tenant_name='BAZ')
+ local_token = self.controller.authenticate(
+ {}, body_dict)
+
+ body_dict = _build_user_auth(tenant_name='BAZ')
+ remote_token = self.controller.authenticate(
+ {'environment': {'REMOTE_USER': 'TWO'}}, body_dict)
+
+ self.assertEqualTokens(local_token, remote_token,
+ enforce_audit_ids=False)
+
+ def test_scoped_remote_authn_invalid_user(self):
+ """Verify that external auth with invalid user fails."""
+ body_dict = _build_user_auth(tenant_name="BAR")
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate,
+ {'environment': {'REMOTE_USER': uuid.uuid4().hex}},
+ body_dict)
+
+ def test_bind_with_kerberos(self):
+ self.config_fixture.config(group='token', bind=['kerberos'])
+ body_dict = _build_user_auth(tenant_name="BAR")
+ token = self.controller.authenticate(self.context_with_remote_user,
+ body_dict)
+ self.assertEqual('FOO', token['access']['token']['bind']['kerberos'])
+
+ def test_bind_without_config_opt(self):
+ self.config_fixture.config(group='token', bind=['x509'])
+ body_dict = _build_user_auth(tenant_name='BAR')
+ token = self.controller.authenticate(self.context_with_remote_user,
+ body_dict)
+ self.assertNotIn('bind', token['access']['token'])
+
+
+class AuthWithTrust(AuthTest):
+ def setUp(self):
+ super(AuthWithTrust, self).setUp()
+
+ self.trust_controller = trust.controllers.TrustV3()
+ self.auth_v3_controller = auth.controllers.Auth()
+ self.trustor = self.user_foo
+ self.trustee = self.user_two
+ self.assigned_roles = [self.role_member['id'],
+ self.role_browser['id']]
+ for assigned_role in self.assigned_roles:
+ self.assignment_api.add_role_to_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+
+ self.sample_data = {'trustor_user_id': self.trustor['id'],
+ 'trustee_user_id': self.trustee['id'],
+ 'project_id': self.tenant_bar['id'],
+ 'impersonation': True,
+ 'roles': [{'id': self.role_browser['id']},
+ {'name': self.role_member['name']}]}
+
+ def config_overrides(self):
+ super(AuthWithTrust, self).config_overrides()
+ self.config_fixture.config(group='trust', enabled=True)
+
+ def _create_auth_context(self, token_id):
+ token_ref = token_model.KeystoneToken(
+ token_id=token_id,
+ token_data=self.token_provider_api.validate_token(token_id))
+ auth_context = authorization.token_to_auth_context(token_ref)
+ return {'environment': {authorization.AUTH_CONTEXT_ENV: auth_context},
+ 'token_id': token_id,
+ 'host_url': HOST_URL}
+
+ def create_trust(self, trust_data, trustor_name, expires_at=None,
+ impersonation=True):
+ username = trustor_name
+ password = 'foo2'
+ unscoped_token = self.get_unscoped_token(username, password)
+ context = self._create_auth_context(
+ unscoped_token['access']['token']['id'])
+ trust_data_copy = copy.deepcopy(trust_data)
+ trust_data_copy['expires_at'] = expires_at
+ trust_data_copy['impersonation'] = impersonation
+
+ return self.trust_controller.create_trust(
+ context, trust=trust_data_copy)['trust']
+
+ def get_unscoped_token(self, username, password='foo2'):
+ body_dict = _build_user_auth(username=username, password=password)
+ return self.controller.authenticate({}, body_dict)
+
+ def build_v2_token_request(self, username, password, trust,
+ tenant_id=None):
+ if not tenant_id:
+ tenant_id = self.tenant_bar['id']
+ unscoped_token = self.get_unscoped_token(username, password)
+ unscoped_token_id = unscoped_token['access']['token']['id']
+ request_body = _build_user_auth(token={'id': unscoped_token_id},
+ trust_id=trust['id'],
+ tenant_id=tenant_id)
+ return request_body
+
+ def test_create_trust_bad_data_fails(self):
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ context = self._create_auth_context(
+ unscoped_token['access']['token']['id'])
+ bad_sample_data = {'trustor_user_id': self.trustor['id'],
+ 'project_id': self.tenant_bar['id'],
+ 'roles': [{'id': self.role_browser['id']}]}
+
+ self.assertRaises(exception.ValidationError,
+ self.trust_controller.create_trust,
+ context, trust=bad_sample_data)
+
+ def test_create_trust_no_roles(self):
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ context = {'token_id': unscoped_token['access']['token']['id']}
+ self.sample_data['roles'] = []
+ self.assertRaises(exception.Forbidden,
+ self.trust_controller.create_trust,
+ context, trust=self.sample_data)
+
+ def test_create_trust(self):
+ expires_at = timeutils.strtime(timeutils.utcnow() +
+ datetime.timedelta(minutes=10),
+ fmt=TIME_FORMAT)
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'],
+ expires_at=expires_at)
+ self.assertEqual(self.trustor['id'], new_trust['trustor_user_id'])
+ self.assertEqual(self.trustee['id'], new_trust['trustee_user_id'])
+ role_ids = [self.role_browser['id'], self.role_member['id']]
+ self.assertTrue(timeutils.parse_strtime(new_trust['expires_at'],
+ fmt=TIME_FORMAT))
+ self.assertIn('%s/v3/OS-TRUST/' % HOST_URL,
+ new_trust['links']['self'])
+ self.assertIn('%s/v3/OS-TRUST/' % HOST_URL,
+ new_trust['roles_links']['self'])
+
+ for role in new_trust['roles']:
+ self.assertIn(role['id'], role_ids)
+
+ def test_create_trust_expires_bad(self):
+ self.assertRaises(exception.ValidationTimeStampError,
+ self.create_trust, self.sample_data,
+ self.trustor['name'], expires_at="bad")
+ self.assertRaises(exception.ValidationTimeStampError,
+ self.create_trust, self.sample_data,
+ self.trustor['name'], expires_at="")
+ self.assertRaises(exception.ValidationTimeStampError,
+ self.create_trust, self.sample_data,
+ self.trustor['name'], expires_at="Z")
+
+ def test_create_trust_without_project_id(self):
+ """Verify that trust can be created without project id and
+ token can be generated with that trust.
+ """
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ context = self._create_auth_context(
+ unscoped_token['access']['token']['id'])
+ self.sample_data['project_id'] = None
+ self.sample_data['roles'] = []
+ new_trust = self.trust_controller.create_trust(
+ context, trust=self.sample_data)['trust']
+ self.assertEqual(self.trustor['id'], new_trust['trustor_user_id'])
+ self.assertEqual(self.trustee['id'], new_trust['trustee_user_id'])
+ self.assertIs(new_trust['impersonation'], True)
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+ token_user = auth_response['access']['user']
+ self.assertEqual(token_user['id'], new_trust['trustor_user_id'])
+
+ def test_get_trust(self):
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ context = {'token_id': unscoped_token['access']['token']['id'],
+ 'host_url': HOST_URL}
+ new_trust = self.trust_controller.create_trust(
+ context, trust=self.sample_data)['trust']
+ trust = self.trust_controller.get_trust(context,
+ new_trust['id'])['trust']
+ self.assertEqual(self.trustor['id'], trust['trustor_user_id'])
+ self.assertEqual(self.trustee['id'], trust['trustee_user_id'])
+ role_ids = [self.role_browser['id'], self.role_member['id']]
+ for role in new_trust['roles']:
+ self.assertIn(role['id'], role_ids)
+
+ def test_create_trust_no_impersonation(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'],
+ expires_at=None, impersonation=False)
+ self.assertEqual(self.trustor['id'], new_trust['trustor_user_id'])
+ self.assertEqual(self.trustee['id'], new_trust['trustee_user_id'])
+ self.assertIs(new_trust['impersonation'], False)
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+ token_user = auth_response['access']['user']
+ self.assertEqual(token_user['id'], new_trust['trustee_user_id'])
+
+ # TODO(ayoung): Endpoints
+
+ def test_create_trust_impersonation(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ self.assertEqual(self.trustor['id'], new_trust['trustor_user_id'])
+ self.assertEqual(self.trustee['id'], new_trust['trustee_user_id'])
+ self.assertIs(new_trust['impersonation'], True)
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+ token_user = auth_response['access']['user']
+ self.assertEqual(token_user['id'], new_trust['trustor_user_id'])
+
+ def test_token_from_trust_wrong_user_fails(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ request_body = self.build_v2_token_request('FOO', 'foo2', new_trust)
+ self.assertRaises(exception.Forbidden, self.controller.authenticate,
+ {}, request_body)
+
+ def test_token_from_trust_wrong_project_fails(self):
+ for assigned_role in self.assigned_roles:
+ self.assignment_api.add_role_to_user_and_project(
+ self.trustor['id'], self.tenant_baz['id'], assigned_role)
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ request_body = self.build_v2_token_request('TWO', 'two2', new_trust,
+ self.tenant_baz['id'])
+ self.assertRaises(exception.Forbidden, self.controller.authenticate,
+ {}, request_body)
+
+ def fetch_v2_token_from_trust(self, trust):
+ request_body = self.build_v2_token_request('TWO', 'two2', trust)
+ auth_response = self.controller.authenticate({}, request_body)
+ return auth_response
+
+ def fetch_v3_token_from_trust(self, trust, trustee):
+ v3_password_data = {
+ 'identity': {
+ "methods": ["password"],
+ "password": {
+ "user": {
+ "id": trustee["id"],
+ "password": trustee["password"]
+ }
+ }
+ },
+ 'scope': {
+ 'project': {
+ 'id': self.tenant_baz['id']
+ }
+ }
+ }
+ auth_response = (self.auth_v3_controller.authenticate_for_token
+ ({'environment': {},
+ 'query_string': {}},
+ v3_password_data))
+ token = auth_response.headers['X-Subject-Token']
+
+ v3_req_with_trust = {
+ "identity": {
+ "methods": ["token"],
+ "token": {"id": token}},
+ "scope": {
+ "OS-TRUST:trust": {"id": trust['id']}}}
+ token_auth_response = (self.auth_v3_controller.authenticate_for_token
+ ({'environment': {},
+ 'query_string': {}},
+ v3_req_with_trust))
+ return token_auth_response
+
+ def test_create_v3_token_from_trust(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
+
+ trust_token_user = auth_response.json['token']['user']
+ self.assertEqual(self.trustor['id'], trust_token_user['id'])
+
+ trust_token_trust = auth_response.json['token']['OS-TRUST:trust']
+ self.assertEqual(trust_token_trust['id'], new_trust['id'])
+ self.assertEqual(self.trustor['id'],
+ trust_token_trust['trustor_user']['id'])
+ self.assertEqual(self.trustee['id'],
+ trust_token_trust['trustee_user']['id'])
+
+ trust_token_roles = auth_response.json['token']['roles']
+ self.assertEqual(2, len(trust_token_roles))
+
+ def test_v3_trust_token_get_token_fails(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ auth_response = self.fetch_v3_token_from_trust(new_trust, self.trustee)
+ trust_token = auth_response.headers['X-Subject-Token']
+ v3_token_data = {'identity': {
+ 'methods': ['token'],
+ 'token': {'id': trust_token}
+ }}
+ self.assertRaises(
+ exception.Forbidden,
+ self.auth_v3_controller.authenticate_for_token,
+ {'environment': {},
+ 'query_string': {}}, v3_token_data)
+
+ def test_token_from_trust(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+
+ self.assertIsNotNone(auth_response)
+ self.assertEqual(2,
+ len(auth_response['access']['metadata']['roles']),
+ "user_foo has three roles, but the token should"
+ " only get the two roles specified in the trust.")
+
+ def assert_token_count_for_trust(self, trust, expected_value):
+ tokens = self.token_provider_api._persistence._list_tokens(
+ self.trustee['id'], trust_id=trust['id'])
+ token_count = len(tokens)
+ self.assertEqual(expected_value, token_count)
+
+ def test_delete_tokens_for_user_invalidates_tokens_from_trust(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ self.assert_token_count_for_trust(new_trust, 0)
+ self.fetch_v2_token_from_trust(new_trust)
+ self.assert_token_count_for_trust(new_trust, 1)
+ self.token_provider_api._persistence.delete_tokens_for_user(
+ self.trustee['id'])
+ self.assert_token_count_for_trust(new_trust, 0)
+
+ def test_token_from_trust_cant_get_another_token(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+ trust_token_id = auth_response['access']['token']['id']
+ request_body = _build_user_auth(token={'id': trust_token_id},
+ tenant_id=self.tenant_bar['id'])
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_delete_trust_revokes_token(self):
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ context = self._create_auth_context(
+ unscoped_token['access']['token']['id'])
+ self.fetch_v2_token_from_trust(new_trust)
+ trust_id = new_trust['id']
+ tokens = self.token_provider_api._persistence._list_tokens(
+ self.trustor['id'],
+ trust_id=trust_id)
+ self.assertEqual(1, len(tokens))
+ self.trust_controller.delete_trust(context, trust_id=trust_id)
+ tokens = self.token_provider_api._persistence._list_tokens(
+ self.trustor['id'],
+ trust_id=trust_id)
+ self.assertEqual(0, len(tokens))
+
+ def test_token_from_trust_with_no_role_fails(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ for assigned_role in self.assigned_roles:
+ self.assignment_api.remove_role_from_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+ request_body = self.build_v2_token_request('TWO', 'two2', new_trust)
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_expired_trust_get_token_fails(self):
+ expiry = "1999-02-18T10:10:00Z"
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'],
+ expiry)
+ request_body = self.build_v2_token_request('TWO', 'two2', new_trust)
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_token_from_trust_with_wrong_role_fails(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ self.assignment_api.add_role_to_user_and_project(
+ self.trustor['id'],
+ self.tenant_bar['id'],
+ self.role_other['id'])
+ for assigned_role in self.assigned_roles:
+ self.assignment_api.remove_role_from_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+
+ request_body = self.build_v2_token_request('TWO', 'two2', new_trust)
+
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_do_not_consume_remaining_uses_when_get_token_fails(self):
+ trust_data = copy.deepcopy(self.sample_data)
+ trust_data['remaining_uses'] = 3
+ new_trust = self.create_trust(trust_data, self.trustor['name'])
+
+ for assigned_role in self.assigned_roles:
+ self.assignment_api.remove_role_from_user_and_project(
+ self.trustor['id'], self.tenant_bar['id'], assigned_role)
+
+ request_body = self.build_v2_token_request('TWO', 'two2', new_trust)
+ self.assertRaises(exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ unscoped_token = self.get_unscoped_token(self.trustor['name'])
+ context = self._create_auth_context(
+ unscoped_token['access']['token']['id'])
+ trust = self.trust_controller.get_trust(context,
+ new_trust['id'])['trust']
+ self.assertEqual(3, trust['remaining_uses'])
+
+ def test_v2_trust_token_contains_trustor_user_id_and_impersonation(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ auth_response = self.fetch_v2_token_from_trust(new_trust)
+
+ self.assertEqual(new_trust['trustee_user_id'],
+ auth_response['access']['trust']['trustee_user_id'])
+ self.assertEqual(new_trust['trustor_user_id'],
+ auth_response['access']['trust']['trustor_user_id'])
+ self.assertEqual(new_trust['impersonation'],
+ auth_response['access']['trust']['impersonation'])
+ self.assertEqual(new_trust['id'],
+ auth_response['access']['trust']['id'])
+
+ validate_response = self.controller.validate_token(
+ context=dict(is_admin=True, query_string={}),
+ token_id=auth_response['access']['token']['id'])
+ self.assertEqual(
+ new_trust['trustee_user_id'],
+ validate_response['access']['trust']['trustee_user_id'])
+ self.assertEqual(
+ new_trust['trustor_user_id'],
+ validate_response['access']['trust']['trustor_user_id'])
+ self.assertEqual(
+ new_trust['impersonation'],
+ validate_response['access']['trust']['impersonation'])
+ self.assertEqual(
+ new_trust['id'],
+ validate_response['access']['trust']['id'])
+
+ def disable_user(self, user):
+ user['enabled'] = False
+ self.identity_api.update_user(user['id'], user)
+
+ def test_trust_get_token_fails_if_trustor_disabled(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ request_body = self.build_v2_token_request(self.trustee['name'],
+ self.trustee['password'],
+ new_trust)
+ self.disable_user(self.trustor)
+ self.assertRaises(
+ exception.Forbidden,
+ self.controller.authenticate, {}, request_body)
+
+ def test_trust_get_token_fails_if_trustee_disabled(self):
+ new_trust = self.create_trust(self.sample_data, self.trustor['name'])
+ request_body = self.build_v2_token_request(self.trustee['name'],
+ self.trustee['password'],
+ new_trust)
+ self.disable_user(self.trustee)
+ self.assertRaises(
+ exception.Unauthorized,
+ self.controller.authenticate, {}, request_body)
+
+
+class TokenExpirationTest(AuthTest):
+
+ @mock.patch.object(timeutils, 'utcnow')
+ def _maintain_token_expiration(self, mock_utcnow):
+ """Token expiration should be maintained after re-auth & validation."""
+ now = datetime.datetime.utcnow()
+ mock_utcnow.return_value = now
+
+ r = self.controller.authenticate(
+ {},
+ auth={
+ 'passwordCredentials': {
+ 'username': self.user_foo['name'],
+ 'password': self.user_foo['password']
+ }
+ })
+ unscoped_token_id = r['access']['token']['id']
+ original_expiration = r['access']['token']['expires']
+
+ mock_utcnow.return_value = now + datetime.timedelta(seconds=1)
+
+ r = self.controller.validate_token(
+ dict(is_admin=True, query_string={}),
+ token_id=unscoped_token_id)
+ self.assertEqual(original_expiration, r['access']['token']['expires'])
+
+ mock_utcnow.return_value = now + datetime.timedelta(seconds=2)
+
+ r = self.controller.authenticate(
+ {},
+ auth={
+ 'token': {
+ 'id': unscoped_token_id,
+ },
+ 'tenantId': self.tenant_bar['id'],
+ })
+ scoped_token_id = r['access']['token']['id']
+ self.assertEqual(original_expiration, r['access']['token']['expires'])
+
+ mock_utcnow.return_value = now + datetime.timedelta(seconds=3)
+
+ r = self.controller.validate_token(
+ dict(is_admin=True, query_string={}),
+ token_id=scoped_token_id)
+ self.assertEqual(original_expiration, r['access']['token']['expires'])
+
+ def test_maintain_uuid_token_expiration(self):
+ self.config_fixture.config(
+ group='token',
+ provider='keystone.token.providers.uuid.Provider')
+ self._maintain_token_expiration()
+
+
+class AuthCatalog(tests.SQLDriverOverrides, AuthTest):
+ """Tests for the catalog provided in the auth response."""
+
+ def config_files(self):
+ config_files = super(AuthCatalog, self).config_files()
+ # We need to use a backend that supports disabled endpoints, like the
+ # SQL backend.
+ config_files.append(tests.dirs.tests_conf('backend_sql.conf'))
+ return config_files
+
+ def _create_endpoints(self):
+ def create_region(**kwargs):
+ ref = {'id': uuid.uuid4().hex}
+ ref.update(kwargs)
+ self.catalog_api.create_region(ref)
+ return ref
+
+ def create_endpoint(service_id, region, **kwargs):
+ id_ = uuid.uuid4().hex
+ ref = {
+ 'id': id_,
+ 'interface': 'public',
+ 'region_id': region,
+ 'service_id': service_id,
+ 'url': 'http://localhost/%s' % uuid.uuid4().hex,
+ }
+ ref.update(kwargs)
+ self.catalog_api.create_endpoint(id_, ref)
+ return ref
+
+ # Create a service for use with the endpoints.
+ def create_service(**kwargs):
+ id_ = uuid.uuid4().hex
+ ref = {
+ 'id': id_,
+ 'name': uuid.uuid4().hex,
+ 'type': uuid.uuid4().hex,
+ }
+ ref.update(kwargs)
+ self.catalog_api.create_service(id_, ref)
+ return ref
+
+ enabled_service_ref = create_service(enabled=True)
+ disabled_service_ref = create_service(enabled=False)
+
+ region = create_region()
+
+ # Create endpoints
+ enabled_endpoint_ref = create_endpoint(
+ enabled_service_ref['id'], region['id'])
+ create_endpoint(
+ enabled_service_ref['id'], region['id'], enabled=False,
+ interface='internal')
+ create_endpoint(
+ disabled_service_ref['id'], region['id'])
+
+ return enabled_endpoint_ref
+
+ def test_auth_catalog_disabled_endpoint(self):
+ """On authenticate, get a catalog that excludes disabled endpoints."""
+ endpoint_ref = self._create_endpoints()
+
+ # Authenticate
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name="BAR")
+
+ token = self.controller.authenticate({}, body_dict)
+
+ # Check the catalog
+ self.assertEqual(1, len(token['access']['serviceCatalog']))
+ endpoint = token['access']['serviceCatalog'][0]['endpoints'][0]
+ self.assertEqual(
+ 1, len(token['access']['serviceCatalog'][0]['endpoints']))
+
+ exp_endpoint = {
+ 'id': endpoint_ref['id'],
+ 'publicURL': endpoint_ref['url'],
+ 'region': endpoint_ref['region_id'],
+ }
+
+ self.assertEqual(exp_endpoint, endpoint)
+
+ def test_validate_catalog_disabled_endpoint(self):
+ """On validate, get back a catalog that excludes disabled endpoints."""
+ endpoint_ref = self._create_endpoints()
+
+ # Authenticate
+ body_dict = _build_user_auth(
+ username='FOO',
+ password='foo2',
+ tenant_name="BAR")
+
+ token = self.controller.authenticate({}, body_dict)
+
+ # Validate
+ token_id = token['access']['token']['id']
+ validate_ref = self.controller.validate_token(
+ dict(is_admin=True, query_string={}),
+ token_id=token_id)
+
+ # Check the catalog
+ self.assertEqual(1, len(token['access']['serviceCatalog']))
+ endpoint = validate_ref['access']['serviceCatalog'][0]['endpoints'][0]
+ self.assertEqual(
+ 1, len(token['access']['serviceCatalog'][0]['endpoints']))
+
+ exp_endpoint = {
+ 'id': endpoint_ref['id'],
+ 'publicURL': endpoint_ref['url'],
+ 'region': endpoint_ref['region_id'],
+ }
+
+ self.assertEqual(exp_endpoint, endpoint)
+
+
+class NonDefaultAuthTest(tests.TestCase):
+
+ def test_add_non_default_auth_method(self):
+ self.config_fixture.config(group='auth',
+ methods=['password', 'token', 'custom'])
+ config.setup_authentication()
+ self.assertTrue(hasattr(CONF.auth, 'custom'))