summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_auth.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_auth.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_auth.py3237
1 files changed, 1796 insertions, 1441 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_auth.py b/keystone-moon/keystone/tests/unit/test_v3_auth.py
index d53a85df..698feeb8 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_auth.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_auth.py
@@ -14,6 +14,7 @@
import copy
import datetime
+import itertools
import json
import operator
import uuid
@@ -21,6 +22,8 @@ import uuid
from keystoneclient.common import cms
import mock
from oslo_config import cfg
+from oslo_log import versionutils
+from oslo_utils import fixture
from oslo_utils import timeutils
from six.moves import http_client
from six.moves import range
@@ -28,9 +31,12 @@ from testtools import matchers
from testtools import testcase
from keystone import auth
+from keystone.auth.plugins import totp
from keystone.common import utils
+from keystone.contrib.revoke import routers
from keystone import exception
from keystone.policy.backends import rules
+from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import ksfixtures
from keystone.tests.unit import test_v3
@@ -38,7 +44,7 @@ from keystone.tests.unit import test_v3
CONF = cfg.CONF
-class TestAuthInfo(test_v3.AuthTestMixin, testcase.TestCase):
+class TestAuthInfo(common_auth.AuthTestMixin, testcase.TestCase):
def setUp(self):
super(TestAuthInfo, self).setUp()
auth.controllers.load_auth_methods()
@@ -121,7 +127,7 @@ class TokenAPITests(object):
# resolved in Python for multiple inheritance means that a setUp in this
# would get skipped by the testrunner.
def doSetUp(self):
- r = self.v3_authenticate_token(self.build_authentication_request(
+ r = self.v3_create_token(self.build_authentication_request(
username=self.user['name'],
user_domain_id=self.domain_id,
password=self.user['password']))
@@ -129,43 +135,372 @@ class TokenAPITests(object):
self.v3_token = r.headers.get('X-Subject-Token')
self.headers = {'X-Subject-Token': r.headers.get('X-Subject-Token')}
- def test_default_fixture_scope_token(self):
- self.assertIsNotNone(self.get_scoped_token())
+ def _make_auth_request(self, auth_data):
+ resp = self.post('/auth/tokens', body=auth_data)
+ token = resp.headers.get('X-Subject-Token')
+ return token
- def test_v3_v2_intermix_non_default_domain_failed(self):
- v3_token = self.get_requested_token(self.build_authentication_request(
+ def _get_unscoped_token(self):
+ auth_data = self.build_authentication_request(
user_id=self.user['id'],
- password=self.user['password']))
+ password=self.user['password'])
+ return self._make_auth_request(auth_data)
- # now validate the v3 token with v2 API
- self.admin_request(
- path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
- method='GET',
- expected_status=http_client.UNAUTHORIZED)
+ def _get_domain_scoped_token(self):
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ domain_id=self.domain_id)
+ return self._make_auth_request(auth_data)
+
+ def _get_project_scoped_token(self):
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project_id)
+ return self._make_auth_request(auth_data)
+
+ def _get_trust_scoped_token(self, trustee_user, trust):
+ auth_data = self.build_authentication_request(
+ user_id=trustee_user['id'],
+ password=trustee_user['password'],
+ trust_id=trust['id'])
+ return self._make_auth_request(auth_data)
+
+ def _create_trust(self, impersonation=False):
+ # Create a trustee user
+ trustee_user = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
+ ref = unit.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=trustee_user['id'],
+ project_id=self.project_id,
+ impersonation=impersonation,
+ role_ids=[self.role_id])
+
+ # Create a trust
+ r = self.post('/OS-TRUST/trusts', body={'trust': ref})
+ trust = self.assertValidTrustResponse(r)
+ return (trustee_user, trust)
+
+ def _validate_token(self, token, expected_status=http_client.OK):
+ return self.get(
+ '/auth/tokens',
+ headers={'X-Subject-Token': token},
+ expected_status=expected_status)
+
+ def _revoke_token(self, token, expected_status=http_client.NO_CONTENT):
+ return self.delete(
+ '/auth/tokens',
+ headers={'x-subject-token': token},
+ expected_status=expected_status)
+
+ def _set_user_enabled(self, user, enabled=True):
+ user['enabled'] = enabled
+ self.identity_api.update_user(user['id'], user)
+
+ def test_validate_unscoped_token(self):
+ unscoped_token = self._get_unscoped_token()
+ self._validate_token(unscoped_token)
+
+ def test_revoke_unscoped_token(self):
+ unscoped_token = self._get_unscoped_token()
+ self._validate_token(unscoped_token)
+ self._revoke_token(unscoped_token)
+ self._validate_token(unscoped_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_unscoped_token_is_invalid_after_disabling_user(self):
+ unscoped_token = self._get_unscoped_token()
+ # Make sure the token is valid
+ self._validate_token(unscoped_token)
+ # Disable the user
+ self._set_user_enabled(self.user, enabled=False)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ unscoped_token)
+
+ def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
+ unscoped_token = self._get_unscoped_token()
+ # Make sure the token is valid
+ self._validate_token(unscoped_token)
+ # Disable the user
+ self._set_user_enabled(self.user, enabled=False)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ unscoped_token)
+ # Enable the user
+ self._set_user_enabled(self.user)
+ # Ensure validating a token for a re-enabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ unscoped_token)
+
+ def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
+ unscoped_token = self._get_unscoped_token()
+ # Make sure the token is valid
+ self._validate_token(unscoped_token)
+ # Disable the user's domain
+ self.domain['enabled'] = False
+ self.resource_api.update_domain(self.domain['id'], self.domain)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ unscoped_token)
+
+ def test_unscoped_token_is_invalid_after_changing_user_password(self):
+ unscoped_token = self._get_unscoped_token()
+ # Make sure the token is valid
+ self._validate_token(unscoped_token)
+ # Change user's password
+ self.user['password'] = 'Password1'
+ self.identity_api.update_user(self.user['id'], self.user)
+ # Ensure updating user's password revokes existing user's tokens
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ unscoped_token)
+
+ def test_validate_domain_scoped_token(self):
+ # Grant user access to domain
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+ domain_scoped_token = self._get_domain_scoped_token()
+ resp = self._validate_token(domain_scoped_token)
+ resp_json = json.loads(resp.body)
+ self.assertIsNotNone(resp_json['token']['catalog'])
+ self.assertIsNotNone(resp_json['token']['roles'])
+ self.assertIsNotNone(resp_json['token']['domain'])
+
+ def test_domain_scoped_token_is_invalid_after_disabling_user(self):
+ # Grant user access to domain
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+ domain_scoped_token = self._get_domain_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(domain_scoped_token)
+ # Disable user
+ self._set_user_enabled(self.user, enabled=False)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ domain_scoped_token)
+
+ def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
+ # Grant user access to domain
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+ domain_scoped_token = self._get_domain_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(domain_scoped_token)
+ # Delete access to domain
+ self.assignment_api.delete_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ domain_scoped_token)
+
+ def test_domain_scoped_token_invalid_after_disabling_domain(self):
+ # Grant user access to domain
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+ domain_scoped_token = self._get_domain_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(domain_scoped_token)
+ # Disable domain
+ self.domain['enabled'] = False
+ self.resource_api.update_domain(self.domain['id'], self.domain)
+ # Ensure validating a token for a disabled domain fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ domain_scoped_token)
+
+ def test_v2_validate_domain_scoped_token_returns_unauthorized(self):
+ # Test that validating a domain scoped token in v2.0 returns
+ # unauthorized.
+ # Grant user access to domain
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+
+ scoped_token = self._get_domain_scoped_token()
+ self.assertRaises(exception.Unauthorized,
+ self.token_provider_api.validate_v2_token,
+ scoped_token)
+
+ def test_validate_project_scoped_token(self):
+ project_scoped_token = self._get_project_scoped_token()
+ self._validate_token(project_scoped_token)
+
+ def test_revoke_project_scoped_token(self):
+ project_scoped_token = self._get_project_scoped_token()
+ self._validate_token(project_scoped_token)
+ self._revoke_token(project_scoped_token)
+ self._validate_token(project_scoped_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_project_scoped_token_is_invalid_after_disabling_user(self):
+ project_scoped_token = self._get_project_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(project_scoped_token)
+ # Disable the user
+ self._set_user_enabled(self.user, enabled=False)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ project_scoped_token)
+
+ def test_project_scoped_token_invalid_after_changing_user_password(self):
+ project_scoped_token = self._get_project_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(project_scoped_token)
+ # Update user's password
+ self.user['password'] = 'Password1'
+ self.identity_api.update_user(self.user['id'], self.user)
+ # Ensure updating user's password revokes existing tokens
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ project_scoped_token)
+
+ def test_project_scoped_token_invalid_after_disabling_project(self):
+ project_scoped_token = self._get_project_scoped_token()
+ # Make sure the token is valid
+ self._validate_token(project_scoped_token)
+ # Disable project
+ self.project['enabled'] = False
+ self.resource_api.update_project(self.project['id'], self.project)
+ # Ensure validating a token for a disabled project fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ project_scoped_token)
+
+ def test_rescope_unscoped_token_with_trust(self):
+ trustee_user, trust = self._create_trust()
+ self._get_trust_scoped_token(trustee_user, trust)
+
+ def test_validate_a_trust_scoped_token(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ def test_validate_a_trust_scoped_token_impersonated(self):
+ trustee_user, trust = self._create_trust(impersonation=True)
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ def test_revoke_trust_scoped_token(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+ self._revoke_token(trust_scoped_token)
+ self._validate_token(trust_scoped_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ # Disable trustee
+ trustee_update_ref = dict(enabled=False)
+ self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ trust_scoped_token)
+
+ def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+ # Change trustee's password
+ trustee_update_ref = dict(password='Password1')
+ self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
+ # Ensure updating trustee's password revokes existing tokens
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ trust_scoped_token)
+
+ def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ # Disable the trustor
+ trustor_update_ref = dict(enabled=False)
+ self.identity_api.update_user(self.user['id'], trustor_update_ref)
+ # Ensure validating a token for a disabled user fails
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ trust_scoped_token)
+
+ def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ # Change trustor's password
+ trustor_update_ref = dict(password='Password1')
+ self.identity_api.update_user(self.user['id'], trustor_update_ref)
+ # Ensure updating trustor's password revokes existing user's tokens
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ trust_scoped_token)
+
+ def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Validate a trust scoped token
+ self._validate_token(trust_scoped_token)
+
+ # Disable trustor's domain
+ self.domain['enabled'] = False
+ self.resource_api.update_domain(self.domain['id'], self.domain)
+
+ trustor_update_ref = dict(password='Password1')
+ self.identity_api.update_user(self.user['id'], trustor_update_ref)
+ # Ensure updating trustor's password revokes existing user's tokens
+ self.assertRaises(exception.TokenNotFound,
+ self.token_provider_api.validate_token,
+ trust_scoped_token)
+
+ def test_v2_validate_trust_scoped_token(self):
+ # Test that validating an trust scoped token in v2.0 returns
+ # unauthorized.
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ self.assertRaises(exception.Unauthorized,
+ self.token_provider_api.validate_v2_token,
+ trust_scoped_token)
+
+ def test_default_fixture_scope_token(self):
+ self.assertIsNotNone(self.get_scoped_token())
def test_v3_v2_intermix_new_default_domain(self):
# If the default_domain_id config option is changed, then should be
# able to validate a v3 token with user in the new domain.
# 1) Create a new domain for the user.
- new_domain = {
- 'description': uuid.uuid4().hex,
- 'enabled': True,
- 'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex,
- }
+ new_domain = unit.new_domain_ref()
self.resource_api.create_domain(new_domain['id'], new_domain)
# 2) Create user in new domain.
- new_user_password = uuid.uuid4().hex
- new_user = {
- 'name': uuid.uuid4().hex,
- 'domain_id': new_domain['id'],
- 'password': new_user_password,
- 'email': uuid.uuid4().hex,
- }
- new_user = self.identity_api.create_user(new_user)
+ new_user = unit.create_user(self.identity_api,
+ domain_id=new_domain['id'])
# 3) Update the default_domain_id config option to the new domain
self.config_fixture.config(
@@ -175,12 +510,12 @@ class TokenAPITests(object):
# 4) Get a token using v3 API.
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=new_user['id'],
- password=new_user_password))
+ password=new_user['password']))
# 5) Validate token using v2 API.
self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
+ token=self.get_admin_token(),
method='GET')
def test_v3_v2_intermix_domain_scoped_token_failed(self):
@@ -199,10 +534,10 @@ class TokenAPITests(object):
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
+ token=self.get_admin_token(),
expected_status=http_client.UNAUTHORIZED)
- def test_v3_v2_intermix_non_default_project_failed(self):
+ def test_v3_v2_intermix_non_default_project_succeed(self):
# self.project is in a non-default domain
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
@@ -213,10 +548,9 @@ class TokenAPITests(object):
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
- expected_status=http_client.UNAUTHORIZED)
+ token=self.get_admin_token())
- def test_v3_v2_intermix_non_default_user_failed(self):
+ def test_v3_v2_intermix_non_default_user_succeed(self):
self.assignment_api.create_grant(
self.role['id'],
user_id=self.user['id'],
@@ -232,8 +566,7 @@ class TokenAPITests(object):
self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
- expected_status=http_client.UNAUTHORIZED)
+ token=self.get_admin_token())
def test_v3_v2_intermix_domain_scope_failed(self):
self.assignment_api.create_grant(
@@ -249,12 +582,12 @@ class TokenAPITests(object):
# v2 cannot reference projects outside the default domain
self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
+ token=self.get_admin_token(),
method='GET',
expected_status=http_client.UNAUTHORIZED)
def test_v3_v2_unscoped_token_intermix(self):
- r = self.v3_authenticate_token(self.build_authentication_request(
+ r = self.v3_create_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password']))
self.assertValidUnscopedTokenResponse(r)
@@ -264,7 +597,7 @@ class TokenAPITests(object):
# now validate the v3 token with v2 API
r = self.admin_request(
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token,
+ token=self.get_admin_token(),
method='GET')
v2_token_data = r.result
@@ -278,7 +611,7 @@ class TokenAPITests(object):
def test_v3_v2_token_intermix(self):
# FIXME(gyee): PKI tokens are not interchangeable because token
# data is baked into the token itself.
- r = self.v3_authenticate_token(self.build_authentication_request(
+ r = self.v3_create_token(self.build_authentication_request(
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id']))
@@ -290,7 +623,7 @@ class TokenAPITests(object):
r = self.admin_request(
method='GET',
path='/v2.0/tokens/%s' % v3_token,
- token=CONF.admin_token)
+ token=self.get_admin_token())
v2_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
@@ -318,9 +651,7 @@ class TokenAPITests(object):
v2_token = v2_token_data['access']['token']['id']
r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token})
- # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are
- # missing timezones, so they will not pass this assertion.
- # self.assertValidUnscopedTokenResponse(r)
+ self.assertValidUnscopedTokenResponse(r)
v3_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
@@ -347,9 +678,7 @@ class TokenAPITests(object):
v2_token = v2_token_data['access']['token']['id']
r = self.get('/auth/tokens', headers={'X-Subject-Token': v2_token})
- # FIXME(dolph): Due to bug 1476329, v2 tokens validated on v3 are
- # missing timezones, so they will not pass this assertion.
- # self.assertValidProjectScopedTokenResponse(r)
+ self.assertValidProjectScopedTokenResponse(r)
v3_token_data = r.result
self.assertEqual(v2_token_data['access']['user']['id'],
@@ -384,9 +713,8 @@ class TokenAPITests(object):
v2_token = r.result['access']['token']['id']
# Delete the v2 token using v3.
- resp = self.delete(
+ self.delete(
'/auth/tokens', headers={'X-Subject-Token': v2_token})
- self.assertEqual(resp.status_code, 204)
# Attempting to use the deleted token on v2 should fail.
self.admin_request(
@@ -397,7 +725,7 @@ class TokenAPITests(object):
expires = self.v3_token_data['token']['expires_at']
# rescope the token
- r = self.v3_authenticate_token(self.build_authentication_request(
+ r = self.v3_create_token(self.build_authentication_request(
token=self.v3_token,
project_id=self.project_id))
self.assertValidProjectScopedTokenResponse(r)
@@ -406,12 +734,24 @@ class TokenAPITests(object):
self.assertEqual(expires, r.result['token']['expires_at'])
def test_check_token(self):
- self.head('/auth/tokens', headers=self.headers, expected_status=200)
+ self.head('/auth/tokens', headers=self.headers,
+ expected_status=http_client.OK)
def test_validate_token(self):
r = self.get('/auth/tokens', headers=self.headers)
self.assertValidUnscopedTokenResponse(r)
+ def test_validate_missing_subject_token(self):
+ self.get('/auth/tokens',
+ expected_status=http_client.NOT_FOUND)
+
+ def test_validate_missing_auth_token(self):
+ self.admin_request(
+ method='GET',
+ path='/v3/projects',
+ token=None,
+ expected_status=http_client.UNAUTHORIZED)
+
def test_validate_token_nocatalog(self):
v3_token = self.get_requested_token(self.build_authentication_request(
user_id=self.user['id'],
@@ -422,6 +762,399 @@ class TokenAPITests(object):
headers={'X-Subject-Token': v3_token})
self.assertValidProjectScopedTokenResponse(r, require_catalog=False)
+ def test_is_admin_token_by_ids(self):
+ self.config_fixture.config(
+ group='resource',
+ admin_project_domain_name=self.domain['name'],
+ admin_project_name=self.project['name'])
+ r = self.v3_create_token(self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
+ v3_token = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
+
+ def test_is_admin_token_by_names(self):
+ self.config_fixture.config(
+ group='resource',
+ admin_project_domain_name=self.domain['name'],
+ admin_project_name=self.project['name'])
+ r = self.v3_create_token(self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_domain_name=self.domain['name'],
+ project_name=self.project['name']))
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
+ v3_token = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=True)
+
+ def test_token_for_non_admin_project_is_not_admin(self):
+ self.config_fixture.config(
+ group='resource',
+ admin_project_domain_name=self.domain['name'],
+ admin_project_name=uuid.uuid4().hex)
+ r = self.v3_create_token(self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+ v3_token = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+
+ def test_token_for_non_admin_domain_same_project_name_is_not_admin(self):
+ self.config_fixture.config(
+ group='resource',
+ admin_project_domain_name=uuid.uuid4().hex,
+ admin_project_name=self.project['name'])
+ r = self.v3_create_token(self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+ v3_token = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+
+ def test_only_admin_project_set_acts_as_non_admin(self):
+ self.config_fixture.config(
+ group='resource',
+ admin_project_name=self.project['name'])
+ r = self.v3_create_token(self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+ v3_token = r.headers.get('X-Subject-Token')
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ self.assertValidProjectScopedTokenResponse(r, is_admin_project=False)
+
+ def _create_role(self, domain_id=None):
+ """Call ``POST /roles``."""
+ ref = unit.new_role_ref(domain_id=domain_id)
+ r = self.post('/roles', body={'role': ref})
+ return self.assertValidRoleResponse(r, ref)
+
+ def _create_implied_role(self, prior_id):
+ implied = self._create_role()
+ url = '/roles/%s/implies/%s' % (prior_id, implied['id'])
+ self.put(url, expected_status=http_client.CREATED)
+ return implied
+
+ def _delete_implied_role(self, prior_role_id, implied_role_id):
+ url = '/roles/%s/implies/%s' % (prior_role_id, implied_role_id)
+ self.delete(url)
+
+ def _get_scoped_token_roles(self, is_domain=False):
+ if is_domain:
+ v3_token = self.get_domain_scoped_token()
+ else:
+ v3_token = self.get_scoped_token()
+
+ r = self.get('/auth/tokens', headers={'X-Subject-Token': v3_token})
+ v3_token_data = r.result
+ token_roles = v3_token_data['token']['roles']
+ return token_roles
+
+ def _create_implied_role_shows_in_v3_token(self, is_domain):
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(1, len(token_roles))
+
+ prior = token_roles[0]['id']
+ implied1 = self._create_implied_role(prior)
+
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(2, len(token_roles))
+
+ implied2 = self._create_implied_role(prior)
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(3, len(token_roles))
+
+ token_role_ids = [role['id'] for role in token_roles]
+ self.assertIn(prior, token_role_ids)
+ self.assertIn(implied1['id'], token_role_ids)
+ self.assertIn(implied2['id'], token_role_ids)
+
+ def test_create_implied_role_shows_in_v3_project_token(self):
+ # regardless of the default chosen, this should always
+ # test with the option set.
+ self.config_fixture.config(group='token', infer_roles=True)
+ self._create_implied_role_shows_in_v3_token(False)
+
+ def test_create_implied_role_shows_in_v3_domain_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user['id'],
+ domain_id=self.domain['id'])
+
+ self._create_implied_role_shows_in_v3_token(True)
+
+ def test_group_assigned_implied_role_shows_in_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ is_domain = False
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(1, len(token_roles))
+
+ new_role = self._create_role()
+ prior = new_role['id']
+
+ new_group_ref = unit.new_group_ref(domain_id=self.domain['id'])
+ new_group = self.identity_api.create_group(new_group_ref)
+ self.assignment_api.create_grant(prior,
+ group_id=new_group['id'],
+ project_id=self.project['id'])
+
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(1, len(token_roles))
+
+ self.identity_api.add_user_to_group(self.user['id'],
+ new_group['id'])
+
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(2, len(token_roles))
+
+ implied1 = self._create_implied_role(prior)
+
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(3, len(token_roles))
+
+ implied2 = self._create_implied_role(prior)
+ token_roles = self._get_scoped_token_roles(is_domain)
+ self.assertEqual(4, len(token_roles))
+
+ token_role_ids = [role['id'] for role in token_roles]
+ self.assertIn(prior, token_role_ids)
+ self.assertIn(implied1['id'], token_role_ids)
+ self.assertIn(implied2['id'], token_role_ids)
+
+ def test_multiple_implied_roles_show_in_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(1, len(token_roles))
+
+ prior = token_roles[0]['id']
+ implied1 = self._create_implied_role(prior)
+ implied2 = self._create_implied_role(prior)
+ implied3 = self._create_implied_role(prior)
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(4, len(token_roles))
+
+ token_role_ids = [role['id'] for role in token_roles]
+ self.assertIn(prior, token_role_ids)
+ self.assertIn(implied1['id'], token_role_ids)
+ self.assertIn(implied2['id'], token_role_ids)
+ self.assertIn(implied3['id'], token_role_ids)
+
+ def test_chained_implied_role_shows_in_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(1, len(token_roles))
+
+ prior = token_roles[0]['id']
+ implied1 = self._create_implied_role(prior)
+ implied2 = self._create_implied_role(implied1['id'])
+ implied3 = self._create_implied_role(implied2['id'])
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(4, len(token_roles))
+
+ token_role_ids = [role['id'] for role in token_roles]
+
+ self.assertIn(prior, token_role_ids)
+ self.assertIn(implied1['id'], token_role_ids)
+ self.assertIn(implied2['id'], token_role_ids)
+ self.assertIn(implied3['id'], token_role_ids)
+
+ def test_implied_role_disabled_by_config(self):
+ self.config_fixture.config(group='token', infer_roles=False)
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(1, len(token_roles))
+
+ prior = token_roles[0]['id']
+ implied1 = self._create_implied_role(prior)
+ implied2 = self._create_implied_role(implied1['id'])
+ self._create_implied_role(implied2['id'])
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(1, len(token_roles))
+ token_role_ids = [role['id'] for role in token_roles]
+ self.assertIn(prior, token_role_ids)
+
+ def test_delete_implied_role_do_not_show_in_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ token_roles = self._get_scoped_token_roles()
+ prior = token_roles[0]['id']
+ implied = self._create_implied_role(prior)
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(2, len(token_roles))
+ self._delete_implied_role(prior, implied['id'])
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(1, len(token_roles))
+
+ def test_unrelated_implied_roles_do_not_change_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ token_roles = self._get_scoped_token_roles()
+ prior = token_roles[0]['id']
+ implied = self._create_implied_role(prior)
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(2, len(token_roles))
+
+ unrelated = self._create_role()
+ url = '/roles/%s/implies/%s' % (unrelated['id'], implied['id'])
+ self.put(url, expected_status=http_client.CREATED)
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(2, len(token_roles))
+
+ self._delete_implied_role(unrelated['id'], implied['id'])
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(2, len(token_roles))
+
+ def test_domain_scpecific_roles_do_not_show_v3_token(self):
+ self.config_fixture.config(group='token', infer_roles=True)
+ initial_token_roles = self._get_scoped_token_roles()
+
+ new_role = self._create_role(domain_id=self.domain_id)
+ self.assignment_api.create_grant(new_role['id'],
+ user_id=self.user['id'],
+ project_id=self.project['id'])
+ implied = self._create_implied_role(new_role['id'])
+
+ token_roles = self._get_scoped_token_roles()
+ self.assertEqual(len(initial_token_roles) + 1, len(token_roles))
+
+ # The implied role from the domain specific role should be in the
+ # token, but not the domain specific role itself.
+ token_role_ids = [role['id'] for role in token_roles]
+ self.assertIn(implied['id'], token_role_ids)
+ self.assertNotIn(new_role['id'], token_role_ids)
+
+ def test_remove_all_roles_from_scope_result_in_404(self):
+ # create a new user
+ new_user = unit.create_user(self.identity_api,
+ domain_id=self.domain['id'])
+
+ # give the new user a role on a project
+ path = '/projects/%s/users/%s/roles/%s' % (
+ self.project['id'], new_user['id'], self.role['id'])
+ self.put(path=path)
+
+ # authenticate as the new user and get a project-scoped token
+ auth_data = self.build_authentication_request(
+ user_id=new_user['id'],
+ password=new_user['password'],
+ project_id=self.project['id'])
+ subject_token_id = self.v3_create_token(auth_data).headers.get(
+ 'X-Subject-Token')
+
+ # make sure the project-scoped token is valid
+ headers = {'X-Subject-Token': subject_token_id}
+ r = self.get('/auth/tokens', headers=headers)
+ self.assertValidProjectScopedTokenResponse(r)
+
+ # remove the roles from the user for the given scope
+ path = '/projects/%s/users/%s/roles/%s' % (
+ self.project['id'], new_user['id'], self.role['id'])
+ self.delete(path=path)
+
+ # token validation should now result in 404
+ self.get('/auth/tokens', headers=headers,
+ expected_status=http_client.NOT_FOUND)
+
+
+class TokenDataTests(object):
+ """Test the data in specific token types."""
+
+ def test_unscoped_token_format(self):
+ # ensure the unscoped token response contains the appropriate data
+ r = self.get('/auth/tokens', headers=self.headers)
+ self.assertValidUnscopedTokenResponse(r)
+
+ def test_domain_scoped_token_format(self):
+ # ensure the domain scoped token response contains the appropriate data
+ self.assignment_api.create_grant(
+ self.role['id'],
+ user_id=self.default_domain_user['id'],
+ domain_id=self.domain['id'])
+
+ domain_scoped_token = self.get_requested_token(
+ self.build_authentication_request(
+ user_id=self.default_domain_user['id'],
+ password=self.default_domain_user['password'],
+ domain_id=self.domain['id'])
+ )
+ self.headers['X-Subject-Token'] = domain_scoped_token
+ r = self.get('/auth/tokens', headers=self.headers)
+ self.assertValidDomainScopedTokenResponse(r)
+
+ def test_project_scoped_token_format(self):
+ # ensure project scoped token responses contains the appropriate data
+ project_scoped_token = self.get_requested_token(
+ self.build_authentication_request(
+ user_id=self.default_domain_user['id'],
+ password=self.default_domain_user['password'],
+ project_id=self.default_domain_project['id'])
+ )
+ self.headers['X-Subject-Token'] = project_scoped_token
+ r = self.get('/auth/tokens', headers=self.headers)
+ self.assertValidProjectScopedTokenResponse(r)
+
+ def test_extra_data_in_unscoped_token_fails_validation(self):
+ # ensure unscoped token response contains the appropriate data
+ r = self.get('/auth/tokens', headers=self.headers)
+
+ # populate the response result with some extra data
+ r.result['token'][u'extra'] = unicode(uuid.uuid4().hex)
+ self.assertRaises(exception.SchemaValidationError,
+ self.assertValidUnscopedTokenResponse,
+ r)
+
+ def test_extra_data_in_domain_scoped_token_fails_validation(self):
+ # ensure domain scoped token response contains the appropriate data
+ self.assignment_api.create_grant(
+ self.role['id'],
+ user_id=self.default_domain_user['id'],
+ domain_id=self.domain['id'])
+
+ domain_scoped_token = self.get_requested_token(
+ self.build_authentication_request(
+ user_id=self.default_domain_user['id'],
+ password=self.default_domain_user['password'],
+ domain_id=self.domain['id'])
+ )
+ self.headers['X-Subject-Token'] = domain_scoped_token
+ r = self.get('/auth/tokens', headers=self.headers)
+
+ # populate the response result with some extra data
+ r.result['token'][u'extra'] = unicode(uuid.uuid4().hex)
+ self.assertRaises(exception.SchemaValidationError,
+ self.assertValidDomainScopedTokenResponse,
+ r)
+
+ def test_extra_data_in_project_scoped_token_fails_validation(self):
+ # ensure project scoped token responses contains the appropriate data
+ project_scoped_token = self.get_requested_token(
+ self.build_authentication_request(
+ user_id=self.default_domain_user['id'],
+ password=self.default_domain_user['password'],
+ project_id=self.default_domain_project['id'])
+ )
+ self.headers['X-Subject-Token'] = project_scoped_token
+ resp = self.get('/auth/tokens', headers=self.headers)
+
+ # populate the response result with some extra data
+ resp.result['token'][u'extra'] = unicode(uuid.uuid4().hex)
+ self.assertRaises(exception.SchemaValidationError,
+ self.assertValidProjectScopedTokenResponse,
+ resp)
+
class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
def config_overrides(self):
@@ -431,7 +1164,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
allow_rescope_scoped_token=False)
def test_rescoping_v3_to_v3_disabled(self):
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
token=self.get_scoped_token(),
project_id=self.project_id),
@@ -465,7 +1198,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
def test_rescoping_v2_to_v3_disabled(self):
token = self._v2_token()
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
token=token['access']['token']['id'],
project_id=self.project_id),
@@ -481,7 +1214,7 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
def test_rescoped_domain_token_disabled(self):
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user['id'],
@@ -495,14 +1228,14 @@ class AllowRescopeScopedTokenDisabledTests(test_v3.RestfulTestCase):
self.build_authentication_request(
token=unscoped_token,
domain_id=self.domainA['id']))
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
token=domain_scoped_token,
project_id=self.project_id),
expected_status=http_client.FORBIDDEN)
-class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
+class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests, TokenDataTests):
def config_overrides(self):
super(TestPKITokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='pki')
@@ -518,7 +1251,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- resp = self.v3_authenticate_token(auth_data)
+ resp = self.v3_create_token(auth_data)
token_data = resp.result
token_id = resp.headers.get('X-Subject-Token')
self.assertIn('expires_at', token_data['token'])
@@ -542,7 +1275,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
project_id=self.default_domain_project['id'])
- resp = self.v3_authenticate_token(auth_data)
+ resp = self.v3_create_token(auth_data)
token_data = resp.result
token = resp.headers.get('X-Subject-Token')
@@ -550,7 +1283,7 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
token = cms.cms_hash_token(token)
path = '/v2.0/tokens/%s' % (token)
resp = self.admin_request(path=path,
- token=CONF.admin_token,
+ token=self.get_admin_token(),
method='GET')
v2_token = resp.result
self.assertEqual(v2_token['access']['user']['id'],
@@ -559,8 +1292,8 @@ class TestPKITokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
# just need to make sure the non fraction part agrees
self.assertIn(v2_token['access']['token']['expires'][:-1],
token_data['token']['expires_at'])
- self.assertEqual(v2_token['access']['user']['roles'][0]['id'],
- token_data['token']['roles'][0]['id'])
+ self.assertEqual(v2_token['access']['user']['roles'][0]['name'],
+ token_data['token']['roles'][0]['name'])
class TestPKIZTokenAPIs(TestPKITokenAPIs):
@@ -572,7 +1305,8 @@ class TestPKIZTokenAPIs(TestPKITokenAPIs):
return cms.pkiz_verify(*args, **kwargs)
-class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
+class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
+ TokenDataTests):
def config_overrides(self):
super(TestUUIDTokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='uuid')
@@ -585,14 +1319,15 @@ class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- resp = self.v3_authenticate_token(auth_data)
+ resp = self.v3_create_token(auth_data)
token_data = resp.result
token_id = resp.headers.get('X-Subject-Token')
self.assertIn('expires_at', token_data['token'])
self.assertFalse(cms.is_asn1_token(token_id))
-class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
+class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests,
+ TokenDataTests):
def config_overrides(self):
super(TestFernetTokenAPIs, self).config_overrides()
self.config_fixture.config(group='token', provider='fernet')
@@ -602,6 +1337,34 @@ class TestFernetTokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
super(TestFernetTokenAPIs, self).setUp()
self.doSetUp()
+ def _make_auth_request(self, auth_data):
+ token = super(TestFernetTokenAPIs, self)._make_auth_request(auth_data)
+ self.assertLess(len(token), 255)
+ return token
+
+ def test_validate_tampered_unscoped_token_fails(self):
+ unscoped_token = self._get_unscoped_token()
+ tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
+ unscoped_token[50 + 32:])
+ self._validate_token(tampered_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_validate_tampered_project_scoped_token_fails(self):
+ project_scoped_token = self._get_project_scoped_token()
+ tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
+ project_scoped_token[50 + 32:])
+ self._validate_token(tampered_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_validate_tampered_trust_scoped_token_fails(self):
+ trustee_user, trust = self._create_trust()
+ trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
+ # Get a trust scoped token
+ tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
+ trust_scoped_token[50 + 32:])
+ self._validate_token(tampered_token,
+ expected_status=http_client.NOT_FOUND)
+
class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
"""Test token revoke using v3 Identity API by token owner and admin."""
@@ -616,29 +1379,22 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
"""
super(TestTokenRevokeSelfAndAdmin, self).load_sample_data()
# DomainA setup
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
- self.userAdminA = self.new_user_ref(domain_id=self.domainA['id'])
- password = self.userAdminA['password']
- self.userAdminA = self.identity_api.create_user(self.userAdminA)
- self.userAdminA['password'] = password
+ self.userAdminA = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
- self.userNormalA = self.new_user_ref(
- domain_id=self.domainA['id'])
- password = self.userNormalA['password']
- self.userNormalA = self.identity_api.create_user(self.userNormalA)
- self.userNormalA['password'] = password
+ self.userNormalA = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.userAdminA['id'],
domain_id=self.domainA['id'])
- def config_overrides(self):
- super(TestTokenRevokeSelfAndAdmin, self).config_overrides()
- self.config_fixture.config(
- group='oslo_policy',
- policy_file=unit.dirs.etc('policy.v3cloudsample.json'))
+ def _policy_fixture(self):
+ return ksfixtures.Policy(unit.dirs.etc('policy.v3cloudsample.json'),
+ self.config_fixture)
def test_user_revokes_own_token(self):
user_token = self.get_requested_token(
@@ -655,11 +1411,13 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
- self.head('/auth/tokens', headers=headers, expected_status=200,
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.OK,
token=adminA_token)
- self.head('/auth/tokens', headers=headers, expected_status=200,
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.OK,
token=user_token)
- self.delete('/auth/tokens', headers=headers, expected_status=204,
+ self.delete('/auth/tokens', headers=headers,
token=user_token)
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
@@ -693,11 +1451,13 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
password=self.userAdminA['password'],
domain_name=self.domainA['name']))
- self.head('/auth/tokens', headers=headers, expected_status=200,
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.OK,
token=adminA_token)
- self.head('/auth/tokens', headers=headers, expected_status=200,
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.OK,
token=user_token)
- self.delete('/auth/tokens', headers=headers, expected_status=204,
+ self.delete('/auth/tokens', headers=headers,
token=adminA_token)
# invalid X-Auth-Token and invalid X-Subject-Token
self.head('/auth/tokens', headers=headers,
@@ -714,14 +1474,12 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
def test_adminB_fails_revoking_userA_token(self):
# DomainB setup
- self.domainB = self.new_domain_ref()
+ self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
- self.userAdminB = self.new_user_ref(domain_id=self.domainB['id'])
- password = self.userAdminB['password']
- self.userAdminB = self.identity_api.create_user(self.userAdminB)
- self.userAdminB['password'] = password
+ userAdminB = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
self.assignment_api.create_grant(self.role['id'],
- user_id=self.userAdminB['id'],
+ user_id=userAdminB['id'],
domain_id=self.domainB['id'])
user_token = self.get_requested_token(
@@ -733,8 +1491,8 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
adminB_token = self.get_requested_token(
self.build_authentication_request(
- user_id=self.userAdminB['id'],
- password=self.userAdminB['password'],
+ user_id=userAdminB['id'],
+ password=userAdminB['password'],
domain_name=self.domainB['name']))
self.head('/auth/tokens', headers=headers,
@@ -750,7 +1508,6 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
def config_overrides(self):
super(TestTokenRevokeById, self).config_overrides()
- self.config_fixture.config(group='revoke', driver='kvs')
self.config_fixture.config(
group='token',
provider='pki',
@@ -782,44 +1539,32 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
super(TestTokenRevokeById, self).setUp()
# Start by creating a couple of domains and projects
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
- self.domainB = self.new_domain_ref()
+ self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
- self.projectA = self.new_project_ref(domain_id=self.domainA['id'])
+ self.projectA = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.projectA['id'], self.projectA)
- self.projectB = self.new_project_ref(domain_id=self.domainA['id'])
+ self.projectB = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.projectB['id'], self.projectB)
# Now create some users
- self.user1 = self.new_user_ref(
- domain_id=self.domainA['id'])
- password = self.user1['password']
- self.user1 = self.identity_api.create_user(self.user1)
- self.user1['password'] = password
-
- self.user2 = self.new_user_ref(
- domain_id=self.domainB['id'])
- password = self.user2['password']
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
-
- self.user3 = self.new_user_ref(
- domain_id=self.domainB['id'])
- password = self.user3['password']
- self.user3 = self.identity_api.create_user(self.user3)
- self.user3['password'] = password
-
- self.group1 = self.new_group_ref(
- domain_id=self.domainA['id'])
+ self.user1 = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
+
+ self.user2 = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
+
+ self.user3 = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
+
+ self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group1 = self.identity_api.create_group(self.group1)
- self.group2 = self.new_group_ref(
- domain_id=self.domainA['id'])
+ self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group2 = self.identity_api.create_group(self.group2)
- self.group3 = self.new_group_ref(
- domain_id=self.domainB['id'])
+ self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
self.group3 = self.identity_api.create_group(self.group3)
self.identity_api.add_user_to_group(self.user1['id'],
@@ -829,9 +1574,9 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.identity_api.add_user_to_group(self.user3['id'],
self.group2['id'])
- self.role1 = self.new_role_ref()
+ self.role1 = unit.new_role_ref()
self.role_api.create_role(self.role1['id'], self.role1)
- self.role2 = self.new_role_ref()
+ self.role2 = unit.new_role_ref()
self.role_api.create_role(self.role2['id'], self.role2)
self.assignment_api.create_grant(self.role2['id'],
@@ -864,13 +1609,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm both tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
# create a new role
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# assign a new role
@@ -883,10 +1628,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# both tokens should remain valid
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': scoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
def test_deleting_user_grant_revokes_token(self):
"""Test deleting a user grant revokes token.
@@ -906,7 +1651,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# Delete the grant, which should invalidate the token
grant_url = (
'/projects/%(project_id)s/users/%(user_id)s/'
@@ -920,22 +1665,14 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
expected_status=http_client.NOT_FOUND)
def role_data_fixtures(self):
- self.projectC = self.new_project_ref(domain_id=self.domainA['id'])
+ self.projectC = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.projectC['id'], self.projectC)
- self.user4 = self.new_user_ref(domain_id=self.domainB['id'])
- password = self.user4['password']
- self.user4 = self.identity_api.create_user(self.user4)
- self.user4['password'] = password
- self.user5 = self.new_user_ref(
- domain_id=self.domainA['id'])
- password = self.user5['password']
- self.user5 = self.identity_api.create_user(self.user5)
- self.user5['password'] = password
- self.user6 = self.new_user_ref(
- domain_id=self.domainA['id'])
- password = self.user6['password']
- self.user6 = self.identity_api.create_user(self.user6)
- self.user6['password'] = password
+ self.user4 = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
+ self.user5 = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
+ self.user6 = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
self.identity_api.add_user_to_group(self.user5['id'],
self.group1['id'])
self.assignment_api.create_grant(self.role1['id'],
@@ -954,29 +1691,29 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
def test_deleting_role_revokes_token(self):
"""Test deleting a role revokes token.
- Add some additional test data, namely:
- - A third project (project C)
- - Three additional users - user4 owned by domainB and user5 and 6
- owned by domainA (different domain ownership should not affect
- the test results, just provided to broaden test coverage)
- - User5 is a member of group1
- - Group1 gets an additional assignment - role1 on projectB as
- well as its existing role1 on projectA
- - User4 has role2 on Project C
- - User6 has role1 on projectA and domainA
- - This allows us to create 5 tokens by virtue of different types
- of role assignment:
- - user1, scoped to ProjectA by virtue of user role1 assignment
- - user5, scoped to ProjectB by virtue of group role1 assignment
- - user4, scoped to ProjectC by virtue of user role2 assignment
- - user6, scoped to ProjectA by virtue of user role1 assignment
- - user6, scoped to DomainA by virtue of user role1 assignment
- - role1 is then deleted
- - Check the tokens on Project A and B, and DomainA are revoked,
- but not the one for Project C
+ Add some additional test data, namely:
+
+ - A third project (project C)
+ - Three additional users - user4 owned by domainB and user5 and 6 owned
+ by domainA (different domain ownership should not affect the test
+ results, just provided to broaden test coverage)
+ - User5 is a member of group1
+ - Group1 gets an additional assignment - role1 on projectB as well as
+ its existing role1 on projectA
+ - User4 has role2 on Project C
+ - User6 has role1 on projectA and domainA
+ - This allows us to create 5 tokens by virtue of different types of
+ role assignment:
+ - user1, scoped to ProjectA by virtue of user role1 assignment
+ - user5, scoped to ProjectB by virtue of group role1 assignment
+ - user4, scoped to ProjectC by virtue of user role2 assignment
+ - user6, scoped to ProjectA by virtue of user role1 assignment
+ - user6, scoped to DomainA by virtue of user role1 assignment
+ - role1 is then deleted
+ - Check the tokens on Project A and B, and DomainA are revoked, but not
+ the one for Project C
"""
-
self.role_data_fixtures()
# Now we are ready to start issuing requests
@@ -1008,19 +1745,19 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenA},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenB},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenD},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenE},
- expected_status=200)
+ expected_status=http_client.OK)
# Delete the role, which should invalidate the tokens
role_url = '/roles/%s' % self.role1['id']
@@ -1043,7 +1780,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# ...but the one using role2 is still valid
self.head('/auth/tokens',
headers={'X-Subject-Token': tokenC},
- expected_status=200)
+ expected_status=http_client.OK)
def test_domain_user_role_assignment_maintains_token(self):
"""Test user-domain role assignment maintains existing token.
@@ -1063,7 +1800,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# Assign a role, which should not affect the token
grant_url = (
'/domains/%(domain_id)s/users/%(user_id)s/'
@@ -1074,7 +1811,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.put(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
def test_disabling_project_revokes_token(self):
token = self.get_requested_token(
@@ -1086,7 +1823,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# disable the project, which should invalidate the token
self.patch(
@@ -1097,7 +1834,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=http_client.NOT_FOUND)
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
@@ -1114,7 +1851,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# delete the project, which should invalidate the token
self.delete(
@@ -1124,7 +1861,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
expected_status=http_client.NOT_FOUND)
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
@@ -1163,13 +1900,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': token3},
- expected_status=200)
+ expected_status=http_client.OK)
# Delete the group grant, which should invalidate the
# tokens for user1 and user2
grant_url = (
@@ -1209,7 +1946,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm token is valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
# Delete the grant, which should invalidate the token
grant_url = (
'/domains/%(domain_id)s/groups/%(group_id)s/'
@@ -1220,7 +1957,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.put(grant_url)
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
- expected_status=200)
+ expected_status=http_client.OK)
def test_group_membership_changes_revokes_token(self):
"""Test add/removal to/from group revokes token.
@@ -1250,10 +1987,10 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# Confirm tokens are valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token1},
- expected_status=200)
+ expected_status=http_client.OK)
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=200)
+ expected_status=http_client.OK)
# Remove user1 from group1, which should invalidate
# the token
self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
@@ -1265,18 +2002,17 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# But user2's token should still be valid
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=200)
+ expected_status=http_client.OK)
# Adding user2 to a group should not invalidate token
self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group2['id'],
'user_id': self.user2['id']})
self.head('/auth/tokens',
headers={'X-Subject-Token': token2},
- expected_status=200)
+ expected_status=http_client.OK)
def test_removing_role_assignment_does_not_affect_other_users(self):
"""Revoking a role from one user should not affect other users."""
-
# This group grant is not needed for the test
self.delete(
'/projects/%(project_id)s/groups/%(group_id)s/roles/%(role_id)s' %
@@ -1306,7 +2042,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.head('/auth/tokens',
headers={'X-Subject-Token': user1_token},
expected_status=http_client.NOT_FOUND)
- self.v3_authenticate_token(
+ self.v3_create_token(
self.build_authentication_request(
user_id=self.user1['id'],
password=self.user1['password'],
@@ -1316,8 +2052,8 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# authorization for the second user should still succeed
self.head('/auth/tokens',
headers={'X-Subject-Token': user3_token},
- expected_status=200)
- self.v3_authenticate_token(
+ expected_status=http_client.OK)
+ self.v3_create_token(
self.build_authentication_request(
user_id=self.user3['id'],
password=self.user3['password'],
@@ -1338,7 +2074,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']})
- # Make sure that we get a NotFound(404) when heading that role.
+ # Make sure that we get a 404 Not Found when heading that role.
self.head(role_path, expected_status=http_client.NOT_FOUND)
def get_v2_token(self, token=None, project_id=None):
@@ -1366,8 +2102,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
token = self.get_v2_token()
self.delete('/auth/tokens',
- headers={'X-Subject-Token': token},
- expected_status=204)
+ headers={'X-Subject-Token': token})
self.head('/auth/tokens',
headers={'X-Subject-Token': token},
@@ -1397,8 +2132,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# revoke the project-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': project_scoped_token},
- expected_status=204)
+ headers={'X-Subject-Token': project_scoped_token})
# The project-scoped token is invalidated.
self.head('/auth/tokens',
@@ -1408,17 +2142,16 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
# The domain-scoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': domain_scoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
# revoke the domain-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': domain_scoped_token},
- expected_status=204)
+ headers={'X-Subject-Token': domain_scoped_token})
# The domain-scoped token is invalid.
self.head('/auth/tokens',
@@ -1428,16 +2161,13 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
def test_revoke_token_from_token_v2(self):
# Test that a scoped token can be requested from an unscoped token,
# the scoped token can be revoked, and the unscoped token remains
# valid.
- # FIXME(blk-u): This isn't working correctly. The scoped token should
- # be revoked. See bug 1347318.
-
unscoped_token = self.get_v2_token()
# Get a project-scoped token from the unscoped token
@@ -1446,8 +2176,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# revoke the project-scoped token.
self.delete('/auth/tokens',
- headers={'X-Subject-Token': project_scoped_token},
- expected_status=204)
+ headers={'X-Subject-Token': project_scoped_token})
# The project-scoped token is invalidated.
self.head('/auth/tokens',
@@ -1457,7 +2186,7 @@ class TestTokenRevokeById(test_v3.RestfulTestCase):
# The unscoped token should still be valid.
self.head('/auth/tokens',
headers={'X-Subject-Token': unscoped_token},
- expected_status=200)
+ expected_status=http_client.OK)
class TestTokenRevokeByAssignment(TestTokenRevokeById):
@@ -1465,9 +2194,6 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById):
def config_overrides(self):
super(TestTokenRevokeById, self).config_overrides()
self.config_fixture.config(
- group='revoke',
- driver='kvs')
- self.config_fixture.config(
group='token',
provider='uuid',
revoke_by_id=True)
@@ -1501,7 +2227,7 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById):
# authorization for the projectA should still succeed
self.head('/auth/tokens',
headers={'X-Subject-Token': other_project_token},
- expected_status=200)
+ expected_status=http_client.OK)
# while token for the projectB should not
self.head('/auth/tokens',
headers={'X-Subject-Token': project_token},
@@ -1512,14 +2238,21 @@ class TestTokenRevokeByAssignment(TestTokenRevokeById):
self.assertIn(project_token, revoked_tokens)
-class TestTokenRevokeApi(TestTokenRevokeById):
- EXTENSION_NAME = 'revoke'
- EXTENSION_TO_ADD = 'revoke_extension'
+class RevokeContribTests(test_v3.RestfulTestCase):
+ @mock.patch.object(versionutils, 'report_deprecated_feature')
+ def test_exception_happens(self, mock_deprecator):
+ routers.RevokeExtension(mock.ANY)
+ mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY)
+ args, _kwargs = mock_deprecator.call_args
+ self.assertIn("Remove revoke_extension from", args[1])
+
+
+class TestTokenRevokeApi(TestTokenRevokeById):
"""Test token revocation on the v3 Identity API."""
+
def config_overrides(self):
super(TestTokenRevokeApi, self).config_overrides()
- self.config_fixture.config(group='revoke', driver='kvs')
self.config_fixture.config(
group='token',
provider='pki',
@@ -1536,15 +2269,19 @@ class TestTokenRevokeApi(TestTokenRevokeById):
expected_response = {'events': [{'project_id': project_id}]}
self.assertEqual(expected_response, events_response)
- def assertDomainInList(self, events_response, domain_id):
+ def assertDomainAndProjectInList(self, events_response, domain_id):
events = events_response['events']
- self.assertEqual(1, len(events))
- self.assertEqual(domain_id, events[0]['domain_id'])
+ self.assertEqual(2, len(events))
+ self.assertEqual(domain_id, events[0]['project_id'])
+ self.assertEqual(domain_id, events[1]['domain_id'])
self.assertIsNotNone(events[0]['issued_before'])
+ self.assertIsNotNone(events[1]['issued_before'])
self.assertIsNotNone(events_response['links'])
del (events_response['events'][0]['issued_before'])
+ del (events_response['events'][1]['issued_before'])
del (events_response['links'])
- expected_response = {'events': [{'domain_id': domain_id}]}
+ expected_response = {'events': [{'project_id': domain_id},
+ {'domain_id': domain_id}]}
self.assertEqual(expected_response, events_response)
def assertValidRevokedTokenResponse(self, events_response, **kwargs):
@@ -1563,62 +2300,55 @@ class TestTokenRevokeApi(TestTokenRevokeById):
def test_revoke_token(self):
scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token}
- response = self.get('/auth/tokens', headers=headers,
- expected_status=200).json_body['token']
+ response = self.get('/auth/tokens', headers=headers).json_body['token']
- self.delete('/auth/tokens', headers=headers, expected_status=204)
+ self.delete('/auth/tokens', headers=headers)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- events_response = self.get('/OS-REVOKE/events',
- expected_status=200).json_body
+ events_response = self.get('/OS-REVOKE/events').json_body
self.assertValidRevokedTokenResponse(events_response,
audit_id=response['audit_ids'][0])
def test_revoke_v2_token(self):
token = self.get_v2_token()
headers = {'X-Subject-Token': token}
- response = self.get('/auth/tokens', headers=headers,
- expected_status=200).json_body['token']
- self.delete('/auth/tokens', headers=headers, expected_status=204)
+ response = self.get('/auth/tokens',
+ headers=headers).json_body['token']
+ self.delete('/auth/tokens', headers=headers)
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- events_response = self.get('/OS-REVOKE/events',
- expected_status=200).json_body
+ events_response = self.get('/OS-REVOKE/events').json_body
self.assertValidRevokedTokenResponse(
events_response,
audit_id=response['audit_ids'][0])
- def test_revoke_by_id_false_410(self):
+ def test_revoke_by_id_false_returns_gone(self):
self.get('/auth/tokens/OS-PKI/revoked',
expected_status=http_client.GONE)
def test_list_delete_project_shows_in_event_list(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual([], events)
self.delete(
'/projects/%(project_id)s' % {'project_id': self.projectA['id']})
- events_response = self.get('/OS-REVOKE/events',
- expected_status=200).json_body
+ events_response = self.get('/OS-REVOKE/events').json_body
self.assertValidDeletedProjectResponse(events_response,
self.projectA['id'])
def test_disable_domain_shows_in_event_list(self):
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual([], events)
disable_body = {'domain': {'enabled': False}}
self.patch(
'/domains/%(project_id)s' % {'project_id': self.domainA['id']},
body=disable_body)
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body
+ events = self.get('/OS-REVOKE/events').json_body
- self.assertDomainInList(events, self.domainA['id'])
+ self.assertDomainAndProjectInList(events, self.domainA['id'])
def assertEventDataInList(self, events, **kwargs):
found = False
@@ -1646,30 +2376,31 @@ class TestTokenRevokeApi(TestTokenRevokeById):
def test_list_delete_token_shows_in_event_list(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual([], events)
scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token}
auth_req = self.build_authentication_request(token=scoped_token)
- response = self.v3_authenticate_token(auth_req)
+ response = self.v3_create_token(auth_req)
token2 = response.json_body['token']
headers2 = {'X-Subject-Token': response.headers['X-Subject-Token']}
- response = self.v3_authenticate_token(auth_req)
+ response = self.v3_create_token(auth_req)
response.json_body['token']
headers3 = {'X-Subject-Token': response.headers['X-Subject-Token']}
- self.head('/auth/tokens', headers=headers, expected_status=200)
- self.head('/auth/tokens', headers=headers2, expected_status=200)
- self.head('/auth/tokens', headers=headers3, expected_status=200)
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=headers2,
+ expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=headers3,
+ expected_status=http_client.OK)
- self.delete('/auth/tokens', headers=headers, expected_status=204)
+ self.delete('/auth/tokens', headers=headers)
# NOTE(ayoung): not deleting token3, as it should be deleted
# by previous
- events_response = self.get('/OS-REVOKE/events',
- expected_status=200).json_body
+ events_response = self.get('/OS-REVOKE/events').json_body
events = events_response['events']
self.assertEqual(1, len(events))
self.assertEventDataInList(
@@ -1677,32 +2408,32 @@ class TestTokenRevokeApi(TestTokenRevokeById):
audit_id=token2['audit_ids'][1])
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
- self.head('/auth/tokens', headers=headers2, expected_status=200)
- self.head('/auth/tokens', headers=headers3, expected_status=200)
+ self.head('/auth/tokens', headers=headers2,
+ expected_status=http_client.OK)
+ self.head('/auth/tokens', headers=headers3,
+ expected_status=http_client.OK)
def test_list_with_filter(self):
self.role_data_fixtures()
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual(0, len(events))
scoped_token = self.get_scoped_token()
headers = {'X-Subject-Token': scoped_token}
auth = self.build_authentication_request(token=scoped_token)
headers2 = {'X-Subject-Token': self.get_requested_token(auth)}
- self.delete('/auth/tokens', headers=headers, expected_status=204)
- self.delete('/auth/tokens', headers=headers2, expected_status=204)
+ self.delete('/auth/tokens', headers=headers)
+ self.delete('/auth/tokens', headers=headers2)
- events = self.get('/OS-REVOKE/events',
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events').json_body['events']
self.assertEqual(2, len(events))
future = utils.isotime(timeutils.utcnow() +
datetime.timedelta(seconds=1000))
- events = self.get('/OS-REVOKE/events?since=%s' % (future),
- expected_status=200).json_body['events']
+ events = self.get('/OS-REVOKE/events?since=%s' % (future)
+ ).json_body['events']
self.assertEqual(0, len(events))
@@ -1764,7 +2495,7 @@ class TestAuthExternalDomain(test_v3.RestfulTestCase):
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'REMOTE_DOMAIN': remote_domain,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(self.user['name'], token['bind']['kerberos'])
@@ -1776,7 +2507,7 @@ class TestAuthExternalDomain(test_v3.RestfulTestCase):
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'REMOTE_DOMAIN': remote_domain,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidUnscopedTokenResponse(r)
self.assertEqual(self.user['name'], token['bind']['kerberos'])
@@ -1820,7 +2551,7 @@ class TestAuthExternalDefaultDomain(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(self.default_domain_user['name'],
token['bind']['kerberos'])
@@ -1831,7 +2562,7 @@ class TestAuthExternalDefaultDomain(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidUnscopedTokenResponse(r)
self.assertEqual(self.default_domain_user['name'],
token['bind']['kerberos'])
@@ -1852,7 +2583,7 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_id(self):
@@ -1860,7 +2591,7 @@ class TestAuth(test_v3.RestfulTestCase):
username=self.user['name'],
user_domain_id=self.domain['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_unscoped_token_with_user_domain_name(self):
@@ -1868,7 +2599,7 @@ class TestAuth(test_v3.RestfulTestCase):
username=self.user['name'],
user_domain_name=self.domain['name'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_project_id_scoped_token_with_user_id(self):
@@ -1876,11 +2607,11 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def _second_project_as_default(self):
- ref = self.new_project_ref(domain_id=self.domain_id)
+ ref = unit.new_project_ref(domain_id=self.domain_id)
r = self.post('/projects', body={'project': ref})
project = self.assertValidProjectResponse(r, ref)
@@ -1907,7 +2638,7 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
self.assertEqual(project['id'], r.result['token']['project']['id'])
@@ -1952,7 +2683,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
catalog = r.result['token']['catalog']
self.assertEqual(1, len(catalog))
@@ -1989,13 +2720,12 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertEqual([], r.result['token']['catalog'])
def test_auth_catalog_disabled_endpoint(self):
"""On authenticate, get a catalog that excludes disabled endpoints."""
-
# Create a disabled endpoint that's like the enabled one.
disabled_endpoint_ref = copy.copy(self.endpoint)
disabled_endpoint_id = uuid.uuid4().hex
@@ -2011,21 +2741,21 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self._check_disabled_endpoint_result(r.result['token']['catalog'],
disabled_endpoint_id)
def test_project_id_scoped_token_with_user_id_unauthorized(self):
- project = self.new_project_ref(domain_id=self.domain_id)
+ project = unit.new_project_ref(domain_id=self.domain_id)
self.resource_api.create_project(project['id'], project)
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_user_and_group_roles_scoped_token(self):
"""Test correct roles are returned in scoped token.
@@ -2049,30 +2779,19 @@ class TestAuth(test_v3.RestfulTestCase):
tokens
"""
-
- domainA = self.new_domain_ref()
+ domainA = unit.new_domain_ref()
self.resource_api.create_domain(domainA['id'], domainA)
- projectA = self.new_project_ref(domain_id=domainA['id'])
+ projectA = unit.new_project_ref(domain_id=domainA['id'])
self.resource_api.create_project(projectA['id'], projectA)
- user1 = self.new_user_ref(
- domain_id=domainA['id'])
- password = user1['password']
- user1 = self.identity_api.create_user(user1)
- user1['password'] = password
+ user1 = unit.create_user(self.identity_api, domain_id=domainA['id'])
- user2 = self.new_user_ref(
- domain_id=domainA['id'])
- password = user2['password']
- user2 = self.identity_api.create_user(user2)
- user2['password'] = password
+ user2 = unit.create_user(self.identity_api, domain_id=domainA['id'])
- group1 = self.new_group_ref(
- domain_id=domainA['id'])
+ group1 = unit.new_group_ref(domain_id=domainA['id'])
group1 = self.identity_api.create_group(group1)
- group2 = self.new_group_ref(
- domain_id=domainA['id'])
+ group2 = unit.new_group_ref(domain_id=domainA['id'])
group2 = self.identity_api.create_group(group2)
self.identity_api.add_user_to_group(user1['id'],
@@ -2083,7 +2802,7 @@ class TestAuth(test_v3.RestfulTestCase):
# Now create all the roles and assign them
role_list = []
for _ in range(8):
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
role_list.append(role)
@@ -2119,7 +2838,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=user1['id'],
password=user1['password'],
project_id=projectA['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
@@ -2133,7 +2852,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=user1['id'],
password=user1['password'],
domain_id=domainA['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
@@ -2151,7 +2870,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=user1['id'],
password=user1['password'],
project_id=projectA['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidScopedTokenResponse(r)
roles_ids = []
for ref in token['roles']:
@@ -2164,30 +2883,23 @@ class TestAuth(test_v3.RestfulTestCase):
def test_auth_token_cross_domain_group_and_project(self):
"""Verify getting a token in cross domain group/project roles."""
# create domain, project and group and grant roles to user
- domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ domain1 = unit.new_domain_ref()
self.resource_api.create_domain(domain1['id'], domain1)
- project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
- 'domain_id': domain1['id']}
+ project1 = unit.new_project_ref(domain_id=domain1['id'])
self.resource_api.create_project(project1['id'], project1)
- user_foo = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID)
- password = user_foo['password']
- user_foo = self.identity_api.create_user(user_foo)
- user_foo['password'] = password
- role_member = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
+ user_foo = unit.create_user(self.identity_api,
+ domain_id=test_v3.DEFAULT_DOMAIN_ID)
+ role_member = unit.new_role_ref()
self.role_api.create_role(role_member['id'], role_member)
- role_admin = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
+ role_admin = unit.new_role_ref()
self.role_api.create_role(role_admin['id'], role_admin)
- role_foo_domain1 = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
+ role_foo_domain1 = unit.new_role_ref()
self.role_api.create_role(role_foo_domain1['id'], role_foo_domain1)
- role_group_domain1 = {'id': uuid.uuid4().hex,
- 'name': uuid.uuid4().hex}
+ role_group_domain1 = unit.new_role_ref()
self.role_api.create_role(role_group_domain1['id'], role_group_domain1)
self.assignment_api.add_user_to_project(project1['id'],
user_foo['id'])
- new_group = {'domain_id': domain1['id'], 'name': uuid.uuid4().hex}
+ new_group = unit.new_group_ref(domain_id=domain1['id'])
new_group = self.identity_api.create_group(new_group)
self.identity_api.add_user_to_group(user_foo['id'],
new_group['id'])
@@ -2216,7 +2928,7 @@ class TestAuth(test_v3.RestfulTestCase):
project_name=project1['name'],
project_domain_id=domain1['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
scoped_token = self.assertValidScopedTokenResponse(r)
project = scoped_token["project"]
roles_ids = []
@@ -2234,7 +2946,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_id=self.domain['id'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_project_id_scoped_token_with_user_domain_name(self):
@@ -2243,7 +2955,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_name=self.domain['name'],
password=self.user['password'],
project_id=self.project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidProjectScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_id(self):
@@ -2255,7 +2967,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_domain_id(self):
@@ -2268,7 +2980,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_id=self.domain['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_id_scoped_token_with_user_domain_name(self):
@@ -2281,7 +2993,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_id=self.domain['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_id(self):
@@ -2293,7 +3005,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_domain_id(self):
@@ -2306,7 +3018,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_id=self.domain['id'],
password=self.user['password'],
domain_name=self.domain['name'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_name_scoped_token_with_user_domain_name(self):
@@ -2319,12 +3031,11 @@ class TestAuth(test_v3.RestfulTestCase):
user_domain_name=self.domain['name'],
password=self.user['password'],
domain_name=self.domain['name'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_token_with_group_role(self):
- group = self.new_group_ref(
- domain_id=self.domain_id)
+ group = unit.new_group_ref(domain_id=self.domain_id)
group = self.identity_api.create_group(group)
# add user to group
@@ -2340,7 +3051,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_token_with_name(self):
@@ -2353,7 +3064,7 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_name=self.domain['name'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidDomainScopedTokenResponse(r)
def test_domain_scope_failed(self):
@@ -2361,21 +3072,21 @@ class TestAuth(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
domain_id=self.domain['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_auth_with_id(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
token = r.headers.get('X-Subject-Token')
# test token auth
auth_data = self.build_authentication_request(token=token)
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def get_v2_token(self, tenant_id=None):
@@ -2393,7 +3104,7 @@ class TestAuth(test_v3.RestfulTestCase):
def test_validate_v2_unscoped_token_with_v3_api(self):
v2_token = self.get_v2_token().result['access']['token']['id']
auth_data = self.build_authentication_request(token=v2_token)
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_validate_v2_scoped_token_with_v3_api(self):
@@ -2404,46 +3115,46 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
token=v2_token,
project_id=self.default_domain_project['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidScopedTokenResponse(r)
def test_invalid_user_id(self):
auth_data = self.build_authentication_request(
user_id=uuid.uuid4().hex,
password=self.user['password'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_invalid_user_name(self):
auth_data = self.build_authentication_request(
username=uuid.uuid4().hex,
user_domain_id=self.domain['id'],
password=self.user['password'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_invalid_domain_id(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_id=uuid.uuid4().hex,
password=self.user['password'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_invalid_domain_name(self):
auth_data = self.build_authentication_request(
username=self.user['name'],
user_domain_name=uuid.uuid4().hex,
password=self.user['password'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_invalid_password(self):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=uuid.uuid4().hex)
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_remote_user_no_realm(self):
api = auth.controllers.Auth()
@@ -2524,7 +3235,7 @@ class TestAuth(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidUnscopedTokenResponse(r)
self.assertNotIn('bind', token)
@@ -2551,7 +3262,7 @@ class TestAuth(test_v3.RestfulTestCase):
remote_user = self.default_domain_user['name']
self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
'AUTH_TYPE': 'Negotiate'})
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
# the unscoped token should have bind information in it
token = self.assertValidUnscopedTokenResponse(r)
@@ -2562,7 +3273,7 @@ class TestAuth(test_v3.RestfulTestCase):
# using unscoped token with remote user succeeds
auth_params = {'token': token, 'project_id': self.project_id}
auth_data = self.build_authentication_request(**auth_params)
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = self.assertValidProjectScopedTokenResponse(r)
# the bind information should be carried over from the original token
@@ -2601,16 +3312,16 @@ class TestAuth(test_v3.RestfulTestCase):
token_data['token']['bind'])
def test_authenticating_a_user_with_no_password(self):
- user = self.new_user_ref(domain_id=self.domain['id'])
- user.pop('password', None) # can't have a password for this test
+ user = unit.new_user_ref(domain_id=self.domain['id'])
+ del user['password'] # can't have a password for this test
user = self.identity_api.create_user(user)
auth_data = self.build_authentication_request(
user_id=user['id'],
password='password')
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_disabled_default_project_result_in_unscoped_token(self):
# create a disabled project to work with
@@ -2626,11 +3337,11 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_default_project_domain_result_in_unscoped_token(self):
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
r = self.post('/domains', body={'domain': domain_ref})
domain = self.assertValidDomainResponse(r, domain_ref)
@@ -2652,7 +3363,7 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_no_access_to_default_project_result_in_unscoped_token(self):
@@ -2664,32 +3375,35 @@ class TestAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
self.assertValidUnscopedTokenResponse(r)
def test_disabled_scope_project_domain_result_in_401(self):
# create a disabled domain
- domain = self.new_domain_ref()
- domain['enabled'] = False
- self.resource_api.create_domain(domain['id'], domain)
+ domain = unit.new_domain_ref()
+ domain = self.resource_api.create_domain(domain['id'], domain)
- # create a project in the disabled domain
- project = self.new_project_ref(domain_id=domain['id'])
+ # create a project in the domain
+ project = unit.new_project_ref(domain_id=domain['id'])
self.resource_api.create_project(project['id'], project)
- # assign some role to self.user for the project in the disabled domain
+ # assign some role to self.user for the project in the domain
self.assignment_api.add_role_to_user_and_project(
self.user['id'],
project['id'],
self.role_id)
+ # Disable the domain
+ domain['enabled'] = False
+ self.resource_api.update_domain(domain['id'], domain)
+
# user should not be able to auth with project_id
auth_data = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=project['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
# user should not be able to auth with project_name & domain
auth_data = self.build_authentication_request(
@@ -2697,8 +3411,8 @@ class TestAuth(test_v3.RestfulTestCase):
password=self.user['password'],
project_name=project['name'],
project_domain_id=domain['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_auth_methods_with_different_identities_fails(self):
# get the token for a user. This is self.user which is different from
@@ -2710,8 +3424,124 @@ class TestAuth(test_v3.RestfulTestCase):
token=token,
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_authenticate_fails_if_project_unsafe(self):
+ """Verify authenticate to a project with unsafe name fails."""
+ # Start with url name restrictions off, so we can create the unsafe
+ # named project
+ self.config_fixture.config(group='resource',
+ project_name_url_safe='off')
+ unsafe_name = 'i am not / safe'
+ project = unit.new_project_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID,
+ name=unsafe_name)
+ self.resource_api.create_project(project['id'], project)
+ role_member = unit.new_role_ref()
+ self.role_api.create_role(role_member['id'], role_member)
+ self.assignment_api.add_role_to_user_and_project(
+ self.user['id'], project['id'], role_member['id'])
+
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_name=project['name'],
+ project_domain_id=test_v3.DEFAULT_DOMAIN_ID)
+
+ # Since name url restriction is off, we should be able to autenticate
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to new, which should still allow us to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ project_name_url_safe='new')
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to strict and we should fail to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ project_name_url_safe='strict')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_authenticate_fails_if_domain_unsafe(self):
+ """Verify authenticate to a domain with unsafe name fails."""
+ # Start with url name restrictions off, so we can create the unsafe
+ # named domain
+ self.config_fixture.config(group='resource',
+ domain_name_url_safe='off')
+ unsafe_name = 'i am not / safe'
+ domain = unit.new_domain_ref(name=unsafe_name)
+ self.resource_api.create_domain(domain['id'], domain)
+ role_member = unit.new_role_ref()
+ self.role_api.create_role(role_member['id'], role_member)
+ self.assignment_api.create_grant(
+ role_member['id'],
+ user_id=self.user['id'],
+ domain_id=domain['id'])
+
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ domain_name=domain['name'])
+
+ # Since name url restriction is off, we should be able to autenticate
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to new, which should still allow us to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ project_name_url_safe='new')
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to strict and we should fail to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ domain_name_url_safe='strict')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_authenticate_fails_to_project_if_domain_unsafe(self):
+ """Verify authenticate to a project using unsafe domain name fails."""
+ # Start with url name restrictions off, so we can create the unsafe
+ # named domain
+ self.config_fixture.config(group='resource',
+ domain_name_url_safe='off')
+ unsafe_name = 'i am not / safe'
+ domain = unit.new_domain_ref(name=unsafe_name)
+ self.resource_api.create_domain(domain['id'], domain)
+ # Add a (safely named) project to that domain
+ project = unit.new_project_ref(domain_id=domain['id'])
+ self.resource_api.create_project(project['id'], project)
+ role_member = unit.new_role_ref()
+ self.role_api.create_role(role_member['id'], role_member)
+ self.assignment_api.create_grant(
+ role_member['id'],
+ user_id=self.user['id'],
+ project_id=project['id'])
+
+ # An auth request via project ID, but specifying domain by name
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_name=project['name'],
+ project_domain_name=domain['name'])
+
+ # Since name url restriction is off, we should be able to autenticate
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to new, which should still allow us to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ project_name_url_safe='new')
+ self.v3_create_token(auth_data)
+
+ # Set the name url restriction to strict and we should fail to
+ # authenticate
+ self.config_fixture.config(group='resource',
+ domain_name_url_safe='strict')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
class TestAuthJSONExternal(test_v3.RestfulTestCase):
@@ -2736,7 +3566,7 @@ class TestTrustOptional(test_v3.RestfulTestCase):
super(TestTrustOptional, self).config_overrides()
self.config_fixture.config(group='trust', enabled=False)
- def test_trusts_404(self):
+ def test_trusts_returns_not_found(self):
self.get('/OS-TRUST/trusts', body={'trust': {}},
expected_status=http_client.NOT_FOUND)
self.post('/OS-TRUST/trusts', body={'trust': {}},
@@ -2747,11 +3577,11 @@ class TestTrustOptional(test_v3.RestfulTestCase):
user_id=self.user['id'],
password=self.user['password'],
trust_id=uuid.uuid4().hex)
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.FORBIDDEN)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
-class TestTrustRedelegation(test_v3.RestfulTestCase):
+class TrustAPIBehavior(test_v3.RestfulTestCase):
"""Redelegation valid and secure
Redelegation is a hierarchical structure of trusts between initial trustor
@@ -2778,7 +3608,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
"""
def config_overrides(self):
- super(TestTrustRedelegation, self).config_overrides()
+ super(TrustAPIBehavior, self).config_overrides()
self.config_fixture.config(
group='trust',
enabled=True,
@@ -2787,14 +3617,13 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
)
def setUp(self):
- super(TestTrustRedelegation, self).setUp()
+ super(TrustAPIBehavior, self).setUp()
# Create a trustee to delegate stuff to
- trustee_user_ref = self.new_user_ref(domain_id=self.domain_id)
- self.trustee_user = self.identity_api.create_user(trustee_user_ref)
- self.trustee_user['password'] = trustee_user_ref['password']
+ self.trustee_user = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
# trustor->trustee
- self.redelegated_trust_ref = self.new_trust_ref(
+ self.redelegated_trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
@@ -2804,7 +3633,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
allow_redelegation=True)
# trustor->trustee (no redelegation)
- self.chained_trust_ref = self.new_trust_ref(
+ self.chained_trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
@@ -2865,7 +3694,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
# Attempt to create a redelegated trust supposed to last longer
# than the parent trust: let's give it 10 minutes (>1 minute).
- too_long_live_chained_trust_ref = self.new_trust_ref(
+ too_long_live_chained_trust_ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
@@ -2894,7 +3723,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
def test_roles_subset(self):
# Build second role
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# assign a new role to the user
self.assignment_api.create_grant(role_id=role['id'],
@@ -2903,6 +3732,8 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
# Create first trust with extended set of roles
ref = self.redelegated_trust_ref
+ ref['expires_at'] = datetime.datetime.utcnow().replace(
+ year=2032).strftime(unit.TIME_FORMAT)
ref['roles'].append({'id': role['id']})
r = self.post('/OS-TRUST/trusts',
body={'trust': ref})
@@ -2915,6 +3746,9 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
trust_token = self._get_trust_token(trust)
# Chain second trust with roles subset
+ self.chained_trust_ref['expires_at'] = (
+ datetime.datetime.utcnow().replace(year=2028).strftime(
+ unit.TIME_FORMAT))
r = self.post('/OS-TRUST/trusts',
body={'trust': self.chained_trust_ref},
token=trust_token)
@@ -2927,7 +3761,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
def test_redelegate_with_role_by_name(self):
# For role by name testing
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
@@ -2935,19 +3769,23 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
expires=dict(minutes=1),
role_names=[self.role['name']],
allow_redelegation=True)
+ ref['expires_at'] = datetime.datetime.utcnow().replace(
+ year=2032).strftime(unit.TIME_FORMAT)
r = self.post('/OS-TRUST/trusts',
body={'trust': ref})
trust = self.assertValidTrustResponse(r)
# Ensure we can get a token with this trust
trust_token = self._get_trust_token(trust)
# Chain second trust with roles subset
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=True,
role_names=[self.role['name']],
allow_redelegation=True)
+ ref['expires_at'] = datetime.datetime.utcnow().replace(
+ year=2028).strftime(unit.TIME_FORMAT)
r = self.post('/OS-TRUST/trusts',
body={'trust': ref},
token=trust_token)
@@ -2962,7 +3800,7 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
trust_token = self._get_trust_token(trust)
# Build second trust with a role not in parent's roles
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# assign a new role to the user
self.assignment_api.create_grant(role_id=role['id'],
@@ -2980,12 +3818,18 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
expected_status=http_client.FORBIDDEN)
def test_redelegation_terminator(self):
+ self.redelegated_trust_ref['expires_at'] = (
+ datetime.datetime.utcnow().replace(year=2032).strftime(
+ unit.TIME_FORMAT))
r = self.post('/OS-TRUST/trusts',
body={'trust': self.redelegated_trust_ref})
trust = self.assertValidTrustResponse(r)
trust_token = self._get_trust_token(trust)
# Build second trust - the terminator
+ self.chained_trust_ref['expires_at'] = (
+ datetime.datetime.utcnow().replace(year=2028).strftime(
+ unit.TIME_FORMAT))
ref = dict(self.chained_trust_ref,
redelegation_count=1,
allow_redelegation=False)
@@ -3007,215 +3851,64 @@ class TestTrustRedelegation(test_v3.RestfulTestCase):
token=trust_token,
expected_status=http_client.FORBIDDEN)
+ def test_redelegation_without_impersonation(self):
+ # Update trust to not allow impersonation
+ self.redelegated_trust_ref['impersonation'] = False
-class TestTrustChain(test_v3.RestfulTestCase):
-
- def config_overrides(self):
- super(TestTrustChain, self).config_overrides()
- self.config_fixture.config(
- group='trust',
- enabled=True,
- allow_redelegation=True,
- max_redelegation_count=10
- )
-
- def setUp(self):
- super(TestTrustChain, self).setUp()
- # Create trust chain
- self.user_chain = list()
- self.trust_chain = list()
- for _ in range(3):
- user_ref = self.new_user_ref(domain_id=self.domain_id)
- user = self.identity_api.create_user(user_ref)
- user['password'] = user_ref['password']
- self.user_chain.append(user)
-
- # trustor->trustee
- trustee = self.user_chain[0]
- trust_ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=trustee['id'],
- project_id=self.project_id,
- impersonation=True,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
- trust_ref.update(
- allow_redelegation=True,
- redelegation_count=3)
-
- r = self.post('/OS-TRUST/trusts',
- body={'trust': trust_ref})
+ # Create trust
+ resp = self.post('/OS-TRUST/trusts',
+ body={'trust': self.redelegated_trust_ref},
+ expected_status=http_client.CREATED)
+ trust = self.assertValidTrustResponse(resp)
- trust = self.assertValidTrustResponse(r)
+ # Get trusted token without impersonation
auth_data = self.build_authentication_request(
- user_id=trustee['id'],
- password=trustee['password'],
+ user_id=self.trustee_user['id'],
+ password=self.trustee_user['password'],
trust_id=trust['id'])
trust_token = self.get_requested_token(auth_data)
- self.trust_chain.append(trust)
-
- for trustee in self.user_chain[1:]:
- trust_ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=trustee['id'],
- project_id=self.project_id,
- impersonation=True,
- role_ids=[self.role_id])
- trust_ref.update(
- allow_redelegation=True)
- r = self.post('/OS-TRUST/trusts',
- body={'trust': trust_ref},
- token=trust_token)
- trust = self.assertValidTrustResponse(r)
- auth_data = self.build_authentication_request(
- user_id=trustee['id'],
- password=trustee['password'],
- trust_id=trust['id'])
- trust_token = self.get_requested_token(auth_data)
- self.trust_chain.append(trust)
-
- trustee = self.user_chain[-1]
- trust = self.trust_chain[-1]
- auth_data = self.build_authentication_request(
- user_id=trustee['id'],
- password=trustee['password'],
- trust_id=trust['id'])
-
- self.last_token = self.get_requested_token(auth_data)
-
- def assert_user_authenticate(self, user):
- auth_data = self.build_authentication_request(
- user_id=user['id'],
- password=user['password']
- )
- r = self.v3_authenticate_token(auth_data)
- self.assertValidTokenResponse(r)
-
- def assert_trust_tokens_revoked(self, trust_id):
- trustee = self.user_chain[0]
- auth_data = self.build_authentication_request(
- user_id=trustee['id'],
- password=trustee['password']
- )
- r = self.v3_authenticate_token(auth_data)
- self.assertValidTokenResponse(r)
-
- revocation_response = self.get('/OS-REVOKE/events')
- revocation_events = revocation_response.json_body['events']
- found = False
- for event in revocation_events:
- if event.get('OS-TRUST:trust_id') == trust_id:
- found = True
- self.assertTrue(found, 'event with trust_id %s not found in list' %
- trust_id)
-
- def test_delete_trust_cascade(self):
- self.assert_user_authenticate(self.user_chain[0])
- self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[0]['id']},
- expected_status=204)
- headers = {'X-Subject-Token': self.last_token}
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.NOT_FOUND)
- self.assert_trust_tokens_revoked(self.trust_chain[0]['id'])
-
- def test_delete_broken_chain(self):
- self.assert_user_authenticate(self.user_chain[0])
- self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[1]['id']},
- expected_status=204)
-
- self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': self.trust_chain[0]['id']},
- expected_status=204)
-
- def test_trustor_roles_revoked(self):
- self.assert_user_authenticate(self.user_chain[0])
-
- self.assignment_api.remove_role_from_user_and_project(
- self.user_id, self.project_id, self.role_id
- )
-
- auth_data = self.build_authentication_request(
- token=self.last_token,
- trust_id=self.trust_chain[-1]['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.NOT_FOUND)
-
- def test_intermediate_user_disabled(self):
- self.assert_user_authenticate(self.user_chain[0])
-
- disabled = self.user_chain[0]
- disabled['enabled'] = False
- self.identity_api.update_user(disabled['id'], disabled)
-
- # Bypass policy enforcement
- with mock.patch.object(rules, 'enforce', return_value=True):
- headers = {'X-Subject-Token': self.last_token}
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.FORBIDDEN)
-
- def test_intermediate_user_deleted(self):
- self.assert_user_authenticate(self.user_chain[0])
-
- self.identity_api.delete_user(self.user_chain[0]['id'])
-
- # Bypass policy enforcement
- with mock.patch.object(rules, 'enforce', return_value=True):
- headers = {'X-Subject-Token': self.last_token}
- self.head('/auth/tokens', headers=headers,
- expected_status=http_client.FORBIDDEN)
-
-
-class TestTrustAuth(test_v3.RestfulTestCase):
- EXTENSION_NAME = 'revoke'
- EXTENSION_TO_ADD = 'revoke_extension'
-
- def config_overrides(self):
- super(TestTrustAuth, self).config_overrides()
- self.config_fixture.config(group='revoke', driver='kvs')
- self.config_fixture.config(
- group='token',
- provider='pki',
- revoke_by_id=False)
- self.config_fixture.config(group='trust', enabled=True)
+ # Create second user for redelegation
+ trustee_user_2 = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
- def setUp(self):
- super(TestTrustAuth, self).setUp()
-
- # create a trustee to delegate stuff to
- self.trustee_user = self.new_user_ref(domain_id=self.domain_id)
- password = self.trustee_user['password']
- self.trustee_user = self.identity_api.create_user(self.trustee_user)
- self.trustee_user['password'] = password
- self.trustee_user_id = self.trustee_user['id']
+ # Trust for redelegation
+ trust_ref_2 = unit.new_trust_ref(
+ trustor_user_id=self.trustee_user['id'],
+ trustee_user_id=trustee_user_2['id'],
+ project_id=self.project_id,
+ impersonation=False,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id],
+ allow_redelegation=False)
- def test_create_trust_bad_request(self):
- # The server returns a 403 Forbidden rather than a 400, see bug 1133435
- self.post('/OS-TRUST/trusts', body={'trust': {}},
- expected_status=http_client.FORBIDDEN)
+ # Creating a second trust should not be allowed since trustor does not
+ # have the role to delegate thus returning 404 NOT FOUND.
+ resp = self.post('/OS-TRUST/trusts',
+ body={'trust': trust_ref_2},
+ token=trust_token,
+ expected_status=http_client.NOT_FOUND)
def test_create_unscoped_trust(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id)
+ trustee_user_id=self.trustee_user['id'])
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
self.assertValidTrustResponse(r, ref)
def test_create_trust_no_roles(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id)
self.post('/OS-TRUST/trusts', body={'trust': ref},
expected_status=http_client.FORBIDDEN)
def _initialize_test_consume_trust(self, count):
# Make sure remaining_uses is decremented as we consume the trust
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
remaining_uses=count,
role_ids=[self.role_id])
@@ -3223,30 +3916,29 @@ class TestTrustAuth(test_v3.RestfulTestCase):
# make sure the trust exists
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=200)
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
# get a token for the trustee
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = r.headers.get('X-Subject-Token')
# get a trust token, consume one use
auth_data = self.build_authentication_request(
token=token,
trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
return trust
def test_consume_trust_once(self):
trust = self._initialize_test_consume_trust(2)
# check decremented value
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=200)
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
trust = r.result.get('trust')
self.assertIsNotNone(trust)
self.assertEqual(1, trust['remaining_uses'])
+ # FIXME(lbragstad): Assert the role that is returned is the right role.
def test_create_one_time_use_trust(self):
trust = self._initialize_test_consume_trust(1)
@@ -3259,61 +3951,15 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_create_trust_with_bad_values_for_remaining_uses(self):
- # negative values for the remaining_uses parameter are forbidden
- self._create_trust_with_bad_remaining_use(bad_value=-1)
- # 0 is a forbidden value as well
- self._create_trust_with_bad_remaining_use(bad_value=0)
- # as are non integer values
- self._create_trust_with_bad_remaining_use(bad_value="a bad value")
- self._create_trust_with_bad_remaining_use(bad_value=7.2)
-
- def _create_trust_with_bad_remaining_use(self, bad_value):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- remaining_uses=bad_value,
- role_ids=[self.role_id])
- self.post('/OS-TRUST/trusts',
- body={'trust': ref},
- expected_status=http_client.BAD_REQUEST)
-
- def test_invalid_trust_request_without_impersonation(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- role_ids=[self.role_id])
-
- del ref['impersonation']
-
- self.post('/OS-TRUST/trusts',
- body={'trust': ref},
- expected_status=http_client.BAD_REQUEST)
-
- def test_invalid_trust_request_without_trustee(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- role_ids=[self.role_id])
-
- del ref['trustee_user_id']
-
- self.post('/OS-TRUST/trusts',
- body={'trust': ref},
- expected_status=http_client.BAD_REQUEST)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_create_unlimited_use_trust(self):
# by default trusts are unlimited in terms of tokens that can be
# generated from them, this test creates such a trust explicitly
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
remaining_uses=None,
role_ids=[self.role_id])
@@ -3321,322 +3967,25 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self.assertValidTrustResponse(r, ref)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=200)
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
token = r.headers.get('X-Subject-Token')
auth_data = self.build_authentication_request(
token=token,
trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=200)
+ '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']})
trust = r.result.get('trust')
self.assertIsNone(trust['remaining_uses'])
- def test_trust_crud(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- role_ids=[self.role_id])
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r, ref)
-
- r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=200)
- self.assertValidTrustResponse(r, ref)
-
- # validate roles on the trust
- r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s/roles' % {
- 'trust_id': trust['id']},
- expected_status=200)
- roles = self.assertValidRoleListResponse(r, self.role)
- self.assertIn(self.role['id'], [x['id'] for x in roles])
- self.head(
- '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
- 'trust_id': trust['id'],
- 'role_id': self.role['id']},
- expected_status=200)
- r = self.get(
- '/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
- 'trust_id': trust['id'],
- 'role_id': self.role['id']},
- expected_status=200)
- self.assertValidRoleResponse(r, self.role)
-
- r = self.get('/OS-TRUST/trusts', expected_status=200)
- self.assertValidTrustListResponse(r, trust)
-
- # trusts are immutable
- self.patch(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- body={'trust': ref},
- expected_status=http_client.NOT_FOUND)
-
- self.delete(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=204)
-
- self.get(
- '/OS-TRUST/trusts/%(trust_id)s' % {'trust_id': trust['id']},
- expected_status=http_client.NOT_FOUND)
-
- def test_create_trust_trustee_404(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=uuid.uuid4().hex,
- project_id=self.project_id,
- role_ids=[self.role_id])
- self.post('/OS-TRUST/trusts', body={'trust': ref},
- expected_status=http_client.NOT_FOUND)
-
- def test_create_trust_trustor_trustee_backwards(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.trustee_user_id,
- trustee_user_id=self.user_id,
- project_id=self.project_id,
- role_ids=[self.role_id])
- self.post('/OS-TRUST/trusts', body={'trust': ref},
- expected_status=http_client.FORBIDDEN)
-
- def test_create_trust_project_404(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=uuid.uuid4().hex,
- role_ids=[self.role_id])
- self.post('/OS-TRUST/trusts', body={'trust': ref},
- expected_status=http_client.NOT_FOUND)
-
- def test_create_trust_role_id_404(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- role_ids=[uuid.uuid4().hex])
- self.post('/OS-TRUST/trusts', body={'trust': ref},
- expected_status=http_client.NOT_FOUND)
-
- def test_create_trust_role_name_404(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- role_names=[uuid.uuid4().hex])
- self.post('/OS-TRUST/trusts', body={'trust': ref},
- expected_status=http_client.NOT_FOUND)
-
- def test_v3_v2_intermix_trustor_not_in_default_domain_failed(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.default_domain_user_id,
- project_id=self.project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.default_domain_user['id'],
- password=self.default_domain_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(
- r, self.default_domain_user)
-
- token = r.headers.get('X-Subject-Token')
-
- # now validate the v3 token with v2 API
- path = '/v2.0/tokens/%s' % (token)
- self.admin_request(
- path=path, token=CONF.admin_token,
- method='GET', expected_status=http_client.UNAUTHORIZED)
-
- def test_v3_v2_intermix_trustor_not_in_default_domaini_failed(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.default_domain_user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.default_domain_project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- auth_data = self.build_authentication_request(
- user_id=self.default_domain_user['id'],
- password=self.default_domain_user['password'],
- project_id=self.default_domain_project_id)
- token = self.get_requested_token(auth_data)
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.trustee_user['id'],
- password=self.trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(
- r, self.trustee_user)
- token = r.headers.get('X-Subject-Token')
-
- # now validate the v3 token with v2 API
- path = '/v2.0/tokens/%s' % (token)
- self.admin_request(
- path=path, token=CONF.admin_token,
- method='GET', expected_status=http_client.UNAUTHORIZED)
-
- def test_v3_v2_intermix_project_not_in_default_domaini_failed(self):
- # create a trustee in default domain to delegate stuff to
- trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID)
- password = trustee_user['password']
- trustee_user = self.identity_api.create_user(trustee_user)
- trustee_user['password'] = password
- trustee_user_id = trustee_user['id']
-
- ref = self.new_trust_ref(
- trustor_user_id=self.default_domain_user_id,
- trustee_user_id=trustee_user_id,
- project_id=self.project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- auth_data = self.build_authentication_request(
- user_id=self.default_domain_user['id'],
- password=self.default_domain_user['password'],
- project_id=self.default_domain_project_id)
- token = self.get_requested_token(auth_data)
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=trustee_user['id'],
- password=trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(
- r, trustee_user)
- token = r.headers.get('X-Subject-Token')
-
- # now validate the v3 token with v2 API
- path = '/v2.0/tokens/%s' % (token)
- self.admin_request(
- path=path, token=CONF.admin_token,
- method='GET', expected_status=http_client.UNAUTHORIZED)
-
- def test_v3_v2_intermix(self):
- # create a trustee in default domain to delegate stuff to
- trustee_user = self.new_user_ref(domain_id=test_v3.DEFAULT_DOMAIN_ID)
- password = trustee_user['password']
- trustee_user = self.identity_api.create_user(trustee_user)
- trustee_user['password'] = password
- trustee_user_id = trustee_user['id']
-
- ref = self.new_trust_ref(
- trustor_user_id=self.default_domain_user_id,
- trustee_user_id=trustee_user_id,
- project_id=self.default_domain_project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
- auth_data = self.build_authentication_request(
- user_id=self.default_domain_user['id'],
- password=self.default_domain_user['password'],
- project_id=self.default_domain_project_id)
- token = self.get_requested_token(auth_data)
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref}, token=token)
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=trustee_user['id'],
- password=trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(
- r, trustee_user)
- token = r.headers.get('X-Subject-Token')
-
- # now validate the v3 token with v2 API
- path = '/v2.0/tokens/%s' % (token)
- self.admin_request(
- path=path, token=CONF.admin_token,
- method='GET', expected_status=200)
-
- def test_exercise_trust_scoped_token_without_impersonation(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.trustee_user['id'],
- password=self.trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(r, self.trustee_user)
- self.assertEqual(self.trustee_user['id'],
- r.result['token']['user']['id'])
- self.assertEqual(self.trustee_user['name'],
- r.result['token']['user']['name'])
- self.assertEqual(self.domain['id'],
- r.result['token']['user']['domain']['id'])
- self.assertEqual(self.domain['name'],
- r.result['token']['user']['domain']['name'])
- self.assertEqual(self.project['id'],
- r.result['token']['project']['id'])
- self.assertEqual(self.project['name'],
- r.result['token']['project']['name'])
-
- def test_exercise_trust_scoped_token_with_impersonation(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- impersonation=True,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.trustee_user['id'],
- password=self.trustee_user['password'],
- trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(r, self.user)
- self.assertEqual(self.user['id'], r.result['token']['user']['id'])
- self.assertEqual(self.user['name'], r.result['token']['user']['name'])
- self.assertEqual(self.domain['id'],
- r.result['token']['user']['domain']['id'])
- self.assertEqual(self.domain['name'],
- r.result['token']['user']['domain']['name'])
- self.assertEqual(self.project['id'],
- r.result['token']['project']['id'])
- self.assertEqual(self.project['name'],
- r.result['token']['project']['name'])
-
def test_impersonation_token_cannot_create_new_trust(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
@@ -3653,9 +4002,9 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust_token = self.get_requested_token(auth_data)
# Build second trust
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
@@ -3668,7 +4017,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
def test_trust_deleted_grant(self):
# create a new role
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
grant_url = (
@@ -3682,9 +4031,9 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.put(grant_url)
# create a trust that delegates the new role
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -3702,8 +4051,8 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data,
- expected_status=http_client.FORBIDDEN)
+ r = self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
def test_trust_chained(self):
"""Test that a trust token can't be used to execute another trust.
@@ -3713,28 +4062,26 @@ class TestTrustAuth(test_v3.RestfulTestCase):
"""
# create a sub-trustee user
- sub_trustee_user = self.new_user_ref(
+ sub_trustee_user = unit.create_user(
+ self.identity_api,
domain_id=test_v3.DEFAULT_DOMAIN_ID)
- password = sub_trustee_user['password']
- sub_trustee_user = self.identity_api.create_user(sub_trustee_user)
- sub_trustee_user['password'] = password
sub_trustee_user_id = sub_trustee_user['id']
# create a new role
- role = self.new_role_ref()
+ role = unit.new_role_ref()
self.role_api.create_role(role['id'], role)
# assign the new role to trustee
self.put(
'/projects/%(project_id)s/users/%(user_id)s/roles/%(role_id)s' % {
'project_id': self.project_id,
- 'user_id': self.trustee_user_id,
+ 'user_id': self.trustee_user['id'],
'role_id': role['id']})
# create a trust from trustor -> trustee
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
@@ -3744,14 +4091,14 @@ class TestTrustAuth(test_v3.RestfulTestCase):
# authenticate as trustee so we can create a second trust
auth_data = self.build_authentication_request(
- user_id=self.trustee_user_id,
+ user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
project_id=self.project_id)
token = self.get_requested_token(auth_data)
# create a trust from trustee -> sub-trustee
- ref = self.new_trust_ref(
- trustor_user_id=self.trustee_user_id,
+ ref = unit.new_trust_ref(
+ trustor_user_id=self.trustee_user['id'],
trustee_user_id=sub_trustee_user_id,
project_id=self.project_id,
impersonation=True,
@@ -3771,12 +4118,11 @@ class TestTrustAuth(test_v3.RestfulTestCase):
auth_data = self.build_authentication_request(
token=trust_token,
trust_id=trust1['id'])
- r = self.v3_authenticate_token(auth_data,
- expected_status=http_client.FORBIDDEN)
+ r = self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
def assertTrustTokensRevoked(self, trust_id):
- revocation_response = self.get('/OS-REVOKE/events',
- expected_status=200)
+ revocation_response = self.get('/OS-REVOKE/events')
revocation_events = revocation_response.json_body['events']
found = False
for event in revocation_events:
@@ -3786,9 +4132,9 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust_id)
def test_delete_trust_revokes_tokens(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -3800,13 +4146,12 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust_id)
- r = self.v3_authenticate_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(
+ r = self.v3_create_token(auth_data)
+ self.assertValidProjectScopedTokenResponse(
r, self.trustee_user)
trust_token = r.headers['X-Subject-Token']
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust_id},
- expected_status=204)
+ 'trust_id': trust_id})
headers = {'X-Subject-Token': trust_token}
self.head('/auth/tokens', headers=headers,
expected_status=http_client.NOT_FOUND)
@@ -3817,9 +4162,9 @@ class TestTrustAuth(test_v3.RestfulTestCase):
self.identity_api.update_user(user['id'], user)
def test_trust_get_token_fails_if_trustor_disabled(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -3833,7 +4178,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data, expected_status=201)
+ self.v3_create_token(auth_data)
self.disable_user(self.user)
@@ -3841,13 +4186,13 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.FORBIDDEN)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
def test_trust_get_token_fails_if_trustee_disabled(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -3861,7 +4206,7 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data, expected_status=201)
+ self.v3_create_token(auth_data)
self.disable_user(self.trustee_user)
@@ -3869,13 +4214,13 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_delete_trust(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -3886,57 +4231,19 @@ class TestTrustAuth(test_v3.RestfulTestCase):
trust = self.assertValidTrustResponse(r, ref)
self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust['id']},
- expected_status=204)
-
- self.get('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust['id']},
- expected_status=http_client.NOT_FOUND)
-
- self.get('/OS-TRUST/trusts/%(trust_id)s' % {
- 'trust_id': trust['id']},
- expected_status=http_client.NOT_FOUND)
+ 'trust_id': trust['id']})
auth_data = self.build_authentication_request(
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.UNAUTHORIZED)
-
- def test_list_trusts(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- impersonation=False,
- expires=dict(minutes=1),
- role_ids=[self.role_id])
-
- for i in range(3):
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- self.assertValidTrustResponse(r, ref)
-
- r = self.get('/OS-TRUST/trusts', expected_status=200)
- trusts = r.result['trusts']
- self.assertEqual(3, len(trusts))
- self.assertValidTrustListResponse(r)
-
- r = self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
- self.user_id, expected_status=200)
- trusts = r.result['trusts']
- self.assertEqual(3, len(trusts))
- self.assertValidTrustListResponse(r)
-
- r = self.get('/OS-TRUST/trusts?trustee_user_id=%s' %
- self.user_id, expected_status=200)
- trusts = r.result['trusts']
- self.assertEqual(0, len(trusts))
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
def test_change_password_invalidates_trust_tokens(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=True,
expires=dict(minutes=1),
@@ -3949,64 +4256,52 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.trustee_user['id'],
password=self.trustee_user['password'],
trust_id=trust['id'])
- r = self.v3_authenticate_token(auth_data)
+ r = self.v3_create_token(auth_data)
- self.assertValidProjectTrustScopedTokenResponse(r, self.user)
+ self.assertValidProjectScopedTokenResponse(r, self.user)
trust_token = r.headers.get('X-Subject-Token')
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
- self.user_id, expected_status=200,
- token=trust_token)
+ self.user_id, token=trust_token)
self.assertValidUserResponse(
self.patch('/users/%s' % self.trustee_user['id'],
- body={'user': {'password': uuid.uuid4().hex}},
- expected_status=200))
+ body={'user': {'password': uuid.uuid4().hex}}))
self.get('/OS-TRUST/trusts?trustor_user_id=%s' %
self.user_id, expected_status=http_client.UNAUTHORIZED,
token=trust_token)
def test_trustee_can_do_role_ops(self):
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
- project_id=self.project_id,
- impersonation=True,
- role_ids=[self.role_id])
-
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
-
- auth_data = self.build_authentication_request(
- user_id=self.trustee_user['id'],
- password=self.trustee_user['password'])
+ resp = self.post('/OS-TRUST/trusts',
+ body={'trust': self.redelegated_trust_ref})
+ trust = self.assertValidTrustResponse(resp)
+ trust_token = self._get_trust_token(trust)
- r = self.get(
+ resp = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles' % {
'trust_id': trust['id']},
- auth=auth_data)
- self.assertValidRoleListResponse(r, self.role)
+ token=trust_token)
+ self.assertValidRoleListResponse(resp, self.role)
self.head(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'],
'role_id': self.role['id']},
- auth=auth_data,
- expected_status=200)
+ token=trust_token,
+ expected_status=http_client.OK)
- r = self.get(
+ resp = self.get(
'/OS-TRUST/trusts/%(trust_id)s/roles/%(role_id)s' % {
'trust_id': trust['id'],
'role_id': self.role['id']},
- auth=auth_data,
- expected_status=200)
- self.assertValidRoleResponse(r, self.role)
+ token=trust_token)
+ self.assertValidRoleResponse(resp, self.role)
def test_do_not_consume_remaining_uses_when_get_token_fails(self):
- ref = self.new_trust_ref(
+ ref = unit.new_trust_ref(
trustor_user_id=self.user_id,
- trustee_user_id=self.trustee_user_id,
+ trustee_user_id=self.trustee_user['id'],
project_id=self.project_id,
impersonation=False,
expires=dict(minutes=1),
@@ -4023,13 +4318,209 @@ class TestTrustAuth(test_v3.RestfulTestCase):
user_id=self.default_domain_user['id'],
password=self.default_domain_user['password'],
trust_id=trust_id)
- self.v3_authenticate_token(auth_data,
- expected_status=http_client.FORBIDDEN)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
r = self.get('/OS-TRUST/trusts/%s' % trust_id)
self.assertEqual(3, r.result.get('trust').get('remaining_uses'))
+class TestTrustChain(test_v3.RestfulTestCase):
+
+ def config_overrides(self):
+ super(TestTrustChain, self).config_overrides()
+ self.config_fixture.config(
+ group='trust',
+ enabled=True,
+ allow_redelegation=True,
+ max_redelegation_count=10
+ )
+
+ def setUp(self):
+ super(TestTrustChain, self).setUp()
+ """Create a trust chain using redelegation.
+
+ A trust chain is a series of trusts that are redelegated. For example,
+ self.user_list consists of userA, userB, and userC. The first trust in
+ the trust chain is going to be established between self.user and userA,
+ call it trustA. Then, userA is going to obtain a trust scoped token
+ using trustA, and with that token create a trust between userA and
+ userB called trustB. This pattern will continue with userB creating a
+ trust with userC.
+ So the trust chain should look something like:
+ trustA -> trustB -> trustC
+ Where:
+ self.user is trusting userA with trustA
+ userA is trusting userB with trustB
+ userB is trusting userC with trustC
+
+ """
+ self.user_list = list()
+ self.trust_chain = list()
+ for _ in range(3):
+ user = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
+ self.user_list.append(user)
+
+ # trustor->trustee redelegation with impersonation
+ trustee = self.user_list[0]
+ trust_ref = unit.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=trustee['id'],
+ project_id=self.project_id,
+ impersonation=True,
+ expires=dict(minutes=1),
+ role_ids=[self.role_id],
+ allow_redelegation=True,
+ redelegation_count=3)
+
+ # Create a trust between self.user and the first user in the list
+ r = self.post('/OS-TRUST/trusts',
+ body={'trust': trust_ref})
+
+ trust = self.assertValidTrustResponse(r)
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password'],
+ trust_id=trust['id'])
+
+ # Generate a trusted token for the first user
+ trust_token = self.get_requested_token(auth_data)
+ self.trust_chain.append(trust)
+
+ # Loop through the user to create a chain of redelegated trust.
+ for next_trustee in self.user_list[1:]:
+ trust_ref = unit.new_trust_ref(
+ trustor_user_id=self.user_id,
+ trustee_user_id=next_trustee['id'],
+ project_id=self.project_id,
+ impersonation=True,
+ role_ids=[self.role_id],
+ allow_redelegation=True)
+ r = self.post('/OS-TRUST/trusts',
+ body={'trust': trust_ref},
+ token=trust_token)
+ trust = self.assertValidTrustResponse(r)
+ auth_data = self.build_authentication_request(
+ user_id=next_trustee['id'],
+ password=next_trustee['password'],
+ trust_id=trust['id'])
+ trust_token = self.get_requested_token(auth_data)
+ self.trust_chain.append(trust)
+
+ trustee = self.user_list[-1]
+ trust = self.trust_chain[-1]
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password'],
+ trust_id=trust['id'])
+
+ self.last_token = self.get_requested_token(auth_data)
+
+ def assert_user_authenticate(self, user):
+ auth_data = self.build_authentication_request(
+ user_id=user['id'],
+ password=user['password']
+ )
+ r = self.v3_create_token(auth_data)
+ self.assertValidTokenResponse(r)
+
+ def assert_trust_tokens_revoked(self, trust_id):
+ trustee = self.user_list[0]
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password']
+ )
+ r = self.v3_create_token(auth_data)
+ self.assertValidTokenResponse(r)
+
+ revocation_response = self.get('/OS-REVOKE/events')
+ revocation_events = revocation_response.json_body['events']
+ found = False
+ for event in revocation_events:
+ if event.get('OS-TRUST:trust_id') == trust_id:
+ found = True
+ self.assertTrue(found, 'event with trust_id %s not found in list' %
+ trust_id)
+
+ def test_delete_trust_cascade(self):
+ self.assert_user_authenticate(self.user_list[0])
+ self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
+ 'trust_id': self.trust_chain[0]['id']})
+
+ headers = {'X-Subject-Token': self.last_token}
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.NOT_FOUND)
+ self.assert_trust_tokens_revoked(self.trust_chain[0]['id'])
+
+ def test_delete_broken_chain(self):
+ self.assert_user_authenticate(self.user_list[0])
+ self.delete('/OS-TRUST/trusts/%(trust_id)s' % {
+ 'trust_id': self.trust_chain[0]['id']})
+
+ # Verify the two remaining trust have been deleted
+ for i in range(len(self.user_list) - 1):
+ auth_data = self.build_authentication_request(
+ user_id=self.user_list[i]['id'],
+ password=self.user_list[i]['password'])
+
+ auth_token = self.get_requested_token(auth_data)
+
+ # Assert chained trust have been deleted
+ self.get('/OS-TRUST/trusts/%(trust_id)s' % {
+ 'trust_id': self.trust_chain[i + 1]['id']},
+ token=auth_token,
+ expected_status=http_client.NOT_FOUND)
+
+ def test_trustor_roles_revoked(self):
+ self.assert_user_authenticate(self.user_list[0])
+
+ self.assignment_api.remove_role_from_user_and_project(
+ self.user_id, self.project_id, self.role_id
+ )
+
+ # Verify that users are not allowed to authenticate with trust
+ for i in range(len(self.user_list[1:])):
+ trustee = self.user_list[i]
+ auth_data = self.build_authentication_request(
+ user_id=trustee['id'],
+ password=trustee['password'])
+
+ # Attempt to authenticate with trust
+ token = self.get_requested_token(auth_data)
+ auth_data = self.build_authentication_request(
+ token=token,
+ trust_id=self.trust_chain[i - 1]['id'])
+
+ # Trustee has no delegated roles
+ self.v3_create_token(auth_data,
+ expected_status=http_client.FORBIDDEN)
+
+ def test_intermediate_user_disabled(self):
+ self.assert_user_authenticate(self.user_list[0])
+
+ disabled = self.user_list[0]
+ disabled['enabled'] = False
+ self.identity_api.update_user(disabled['id'], disabled)
+
+ # Bypass policy enforcement
+ with mock.patch.object(rules, 'enforce', return_value=True):
+ headers = {'X-Subject-Token': self.last_token}
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.FORBIDDEN)
+
+ def test_intermediate_user_deleted(self):
+ self.assert_user_authenticate(self.user_list[0])
+
+ self.identity_api.delete_user(self.user_list[0]['id'])
+
+ # Bypass policy enforcement
+ with mock.patch.object(rules, 'enforce', return_value=True):
+ headers = {'X-Subject-Token': self.last_token}
+ self.head('/auth/tokens', headers=headers,
+ expected_status=http_client.FORBIDDEN)
+
+
class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase):
def test_api_protection_with_no_auth_context_in_env(self):
auth_data = self.build_authentication_request(
@@ -4045,7 +4536,7 @@ class TestAPIProtectionWithoutAuthContextMiddleware(test_v3.RestfulTestCase):
'query_string': {},
'environment': {}}
r = auth_controller.validate_token(context)
- self.assertEqual(200, r.status_code)
+ self.assertEqual(http_client.OK, r.status_code)
class TestAuthContext(unit.TestCase):
@@ -4105,9 +4596,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
def test_get_catalog_project_scoped_token(self):
"""Call ``GET /auth/catalog`` with a project-scoped token."""
- r = self.get(
- '/auth/catalog',
- expected_status=200)
+ r = self.get('/auth/catalog')
self.assertValidCatalogResponse(r)
def test_get_catalog_domain_scoped_token(self):
@@ -4141,7 +4630,7 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
expected_status=http_client.UNAUTHORIZED)
def test_get_projects_project_scoped_token(self):
- r = self.get('/auth/projects', expected_status=200)
+ r = self.get('/auth/projects')
self.assertThat(r.json['projects'], matchers.HasLength(1))
self.assertValidProjectListResponse(r)
@@ -4149,452 +4638,318 @@ class TestAuthSpecificData(test_v3.RestfulTestCase):
self.put(path='/domains/%s/users/%s/roles/%s' % (
self.domain['id'], self.user['id'], self.role['id']))
- r = self.get('/auth/domains', expected_status=200)
+ r = self.get('/auth/domains')
self.assertThat(r.json['domains'], matchers.HasLength(1))
self.assertValidDomainListResponse(r)
-class TestFernetTokenProvider(test_v3.RestfulTestCase):
- def setUp(self):
- super(TestFernetTokenProvider, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
-
- def _make_auth_request(self, auth_data):
- resp = self.post('/auth/tokens', body=auth_data, expected_status=201)
- token = resp.headers.get('X-Subject-Token')
- self.assertLess(len(token), 255)
- return token
-
- def _get_unscoped_token(self):
- auth_data = self.build_authentication_request(
- user_id=self.user['id'],
- password=self.user['password'])
- return self._make_auth_request(auth_data)
-
- def _get_project_scoped_token(self):
- auth_data = self.build_authentication_request(
- user_id=self.user['id'],
- password=self.user['password'],
- project_id=self.project_id)
- return self._make_auth_request(auth_data)
+class TestTrustAuthPKITokenProvider(TrustAPIBehavior, TestTrustChain):
+ def config_overrides(self):
+ super(TestTrustAuthPKITokenProvider, self).config_overrides()
+ self.config_fixture.config(group='token',
+ provider='pki',
+ revoke_by_id=False)
+ self.config_fixture.config(group='trust',
+ enabled=True)
- def _get_domain_scoped_token(self):
- auth_data = self.build_authentication_request(
- user_id=self.user['id'],
- password=self.user['password'],
- domain_id=self.domain_id)
- return self._make_auth_request(auth_data)
- def _get_trust_scoped_token(self, trustee_user, trust):
- auth_data = self.build_authentication_request(
- user_id=trustee_user['id'],
- password=trustee_user['password'],
- trust_id=trust['id'])
- return self._make_auth_request(auth_data)
-
- def _validate_token(self, token, expected_status=200):
- return self.get(
- '/auth/tokens',
- headers={'X-Subject-Token': token},
- expected_status=expected_status)
+class TestTrustAuthPKIZTokenProvider(TrustAPIBehavior, TestTrustChain):
+ def config_overrides(self):
+ super(TestTrustAuthPKIZTokenProvider, self).config_overrides()
+ self.config_fixture.config(group='token',
+ provider='pkiz',
+ revoke_by_id=False)
+ self.config_fixture.config(group='trust',
+ enabled=True)
- def _revoke_token(self, token, expected_status=204):
- return self.delete(
- '/auth/tokens',
- headers={'X-Subject-Token': token},
- expected_status=expected_status)
- def _set_user_enabled(self, user, enabled=True):
- user['enabled'] = enabled
- self.identity_api.update_user(user['id'], user)
+class TestTrustAuthFernetTokenProvider(TrustAPIBehavior, TestTrustChain):
+ def config_overrides(self):
+ super(TestTrustAuthFernetTokenProvider, self).config_overrides()
+ self.config_fixture.config(group='token',
+ provider='fernet',
+ revoke_by_id=False)
+ self.config_fixture.config(group='trust',
+ enabled=True)
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
- def _create_trust(self):
- # Create a trustee user
- trustee_user_ref = self.new_user_ref(domain_id=self.domain_id)
- trustee_user = self.identity_api.create_user(trustee_user_ref)
- trustee_user['password'] = trustee_user_ref['password']
- ref = self.new_trust_ref(
- trustor_user_id=self.user_id,
- trustee_user_id=trustee_user['id'],
- project_id=self.project_id,
- impersonation=False,
- role_ids=[self.role_id])
- # Create a trust
- r = self.post('/OS-TRUST/trusts', body={'trust': ref})
- trust = self.assertValidTrustResponse(r)
- return (trustee_user, trust)
+class TestAuthFernetTokenProvider(TestAuth):
+ def setUp(self):
+ super(TestAuthFernetTokenProvider, self).setUp()
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
def config_overrides(self):
- super(TestFernetTokenProvider, self).config_overrides()
+ super(TestAuthFernetTokenProvider, self).config_overrides()
self.config_fixture.config(group='token', provider='fernet')
- def test_validate_unscoped_token(self):
- unscoped_token = self._get_unscoped_token()
- self._validate_token(unscoped_token)
+ def test_verify_with_bound_token(self):
+ self.config_fixture.config(group='token', bind='kerberos')
+ auth_data = self.build_authentication_request(
+ project_id=self.project['id'])
+ remote_user = self.default_domain_user['name']
+ self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
+ 'AUTH_TYPE': 'Negotiate'})
+ # Bind not current supported by Fernet, see bug 1433311.
+ self.v3_create_token(auth_data,
+ expected_status=http_client.NOT_IMPLEMENTED)
- def test_validate_tampered_unscoped_token_fails(self):
- unscoped_token = self._get_unscoped_token()
- tampered_token = (unscoped_token[:50] + uuid.uuid4().hex +
- unscoped_token[50 + 32:])
- self._validate_token(tampered_token,
- expected_status=http_client.NOT_FOUND)
+ def test_v2_v3_bind_token_intermix(self):
+ self.config_fixture.config(group='token', bind='kerberos')
- def test_revoke_unscoped_token(self):
- unscoped_token = self._get_unscoped_token()
- self._validate_token(unscoped_token)
- self._revoke_token(unscoped_token)
- self._validate_token(unscoped_token,
- expected_status=http_client.NOT_FOUND)
+ # we need our own user registered to the default domain because of
+ # the way external auth works.
+ remote_user = self.default_domain_user['name']
+ self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
+ 'AUTH_TYPE': 'Negotiate'})
+ body = {'auth': {}}
+ # Bind not current supported by Fernet, see bug 1433311.
+ self.admin_request(path='/v2.0/tokens',
+ method='POST',
+ body=body,
+ expected_status=http_client.NOT_IMPLEMENTED)
- def test_unscoped_token_is_invalid_after_disabling_user(self):
- unscoped_token = self._get_unscoped_token()
- # Make sure the token is valid
- self._validate_token(unscoped_token)
- # Disable the user
- self._set_user_enabled(self.user, enabled=False)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- unscoped_token)
+ def test_auth_with_bind_token(self):
+ self.config_fixture.config(group='token', bind=['kerberos'])
- def test_unscoped_token_is_invalid_after_enabling_disabled_user(self):
- unscoped_token = self._get_unscoped_token()
- # Make sure the token is valid
- self._validate_token(unscoped_token)
- # Disable the user
- self._set_user_enabled(self.user, enabled=False)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- unscoped_token)
- # Enable the user
- self._set_user_enabled(self.user)
- # Ensure validating a token for a re-enabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- unscoped_token)
+ auth_data = self.build_authentication_request()
+ remote_user = self.default_domain_user['name']
+ self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
+ 'AUTH_TYPE': 'Negotiate'})
+ # Bind not current supported by Fernet, see bug 1433311.
+ self.v3_create_token(auth_data,
+ expected_status=http_client.NOT_IMPLEMENTED)
- def test_unscoped_token_is_invalid_after_disabling_user_domain(self):
- unscoped_token = self._get_unscoped_token()
- # Make sure the token is valid
- self._validate_token(unscoped_token)
- # Disable the user's domain
- self.domain['enabled'] = False
- self.resource_api.update_domain(self.domain['id'], self.domain)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- unscoped_token)
- def test_unscoped_token_is_invalid_after_changing_user_password(self):
- unscoped_token = self._get_unscoped_token()
- # Make sure the token is valid
- self._validate_token(unscoped_token)
- # Change user's password
- self.user['password'] = 'Password1'
- self.identity_api.update_user(self.user['id'], self.user)
- # Ensure updating user's password revokes existing user's tokens
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- unscoped_token)
+class TestAuthTOTP(test_v3.RestfulTestCase):
- def test_validate_project_scoped_token(self):
- project_scoped_token = self._get_project_scoped_token()
- self._validate_token(project_scoped_token)
+ def setUp(self):
+ super(TestAuthTOTP, self).setUp()
- def test_validate_domain_scoped_token(self):
- # Grant user access to domain
- self.assignment_api.create_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
- domain_scoped_token = self._get_domain_scoped_token()
- resp = self._validate_token(domain_scoped_token)
- resp_json = json.loads(resp.body)
- self.assertIsNotNone(resp_json['token']['catalog'])
- self.assertIsNotNone(resp_json['token']['roles'])
- self.assertIsNotNone(resp_json['token']['domain'])
+ ref = unit.new_totp_credential(
+ user_id=self.default_domain_user['id'],
+ project_id=self.default_domain_project['id'])
- def test_validate_tampered_project_scoped_token_fails(self):
- project_scoped_token = self._get_project_scoped_token()
- tampered_token = (project_scoped_token[:50] + uuid.uuid4().hex +
- project_scoped_token[50 + 32:])
- self._validate_token(tampered_token,
- expected_status=http_client.NOT_FOUND)
+ self.secret = ref['blob']
- def test_revoke_project_scoped_token(self):
- project_scoped_token = self._get_project_scoped_token()
- self._validate_token(project_scoped_token)
- self._revoke_token(project_scoped_token)
- self._validate_token(project_scoped_token,
- expected_status=http_client.NOT_FOUND)
+ r = self.post('/credentials', body={'credential': ref})
+ self.assertValidCredentialResponse(r, ref)
- def test_project_scoped_token_is_invalid_after_disabling_user(self):
- project_scoped_token = self._get_project_scoped_token()
- # Make sure the token is valid
- self._validate_token(project_scoped_token)
- # Disable the user
- self._set_user_enabled(self.user, enabled=False)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- project_scoped_token)
+ self.addCleanup(self.cleanup)
- def test_domain_scoped_token_is_invalid_after_disabling_user(self):
- # Grant user access to domain
- self.assignment_api.create_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
- domain_scoped_token = self._get_domain_scoped_token()
- # Make sure the token is valid
- self._validate_token(domain_scoped_token)
- # Disable user
- self._set_user_enabled(self.user, enabled=False)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- domain_scoped_token)
+ def auth_plugin_config_override(self):
+ methods = ['totp', 'token', 'password']
+ super(TestAuthTOTP, self).auth_plugin_config_override(methods)
- def test_domain_scoped_token_is_invalid_after_deleting_grant(self):
- # Grant user access to domain
- self.assignment_api.create_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
- domain_scoped_token = self._get_domain_scoped_token()
- # Make sure the token is valid
- self._validate_token(domain_scoped_token)
- # Delete access to domain
- self.assignment_api.delete_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- domain_scoped_token)
+ def _make_credentials(self, cred_type, count=1, user_id=None,
+ project_id=None, blob=None):
+ user_id = user_id or self.default_domain_user['id']
+ project_id = project_id or self.default_domain_project['id']
- def test_project_scoped_token_invalid_after_changing_user_password(self):
- project_scoped_token = self._get_project_scoped_token()
- # Make sure the token is valid
- self._validate_token(project_scoped_token)
- # Update user's password
- self.user['password'] = 'Password1'
- self.identity_api.update_user(self.user['id'], self.user)
- # Ensure updating user's password revokes existing tokens
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- project_scoped_token)
+ creds = []
+ for __ in range(count):
+ if cred_type == 'totp':
+ ref = unit.new_totp_credential(
+ user_id=user_id, project_id=project_id, blob=blob)
+ else:
+ ref = unit.new_credential_ref(
+ user_id=user_id, project_id=project_id)
+ resp = self.post('/credentials', body={'credential': ref})
+ creds.append(resp.json['credential'])
+ return creds
+
+ def _make_auth_data_by_id(self, passcode, user_id=None):
+ return self.build_authentication_request(
+ user_id=user_id or self.default_domain_user['id'],
+ passcode=passcode,
+ project_id=self.project['id'])
- def test_project_scoped_token_invalid_after_disabling_project(self):
- project_scoped_token = self._get_project_scoped_token()
- # Make sure the token is valid
- self._validate_token(project_scoped_token)
- # Disable project
- self.project['enabled'] = False
- self.resource_api.update_project(self.project['id'], self.project)
- # Ensure validating a token for a disabled project fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- project_scoped_token)
+ def _make_auth_data_by_name(self, passcode, username, user_domain_id):
+ return self.build_authentication_request(
+ username=username,
+ user_domain_id=user_domain_id,
+ passcode=passcode,
+ project_id=self.project['id'])
- def test_domain_scoped_token_invalid_after_disabling_domain(self):
- # Grant user access to domain
+ def cleanup(self):
+ totp_creds = self.credential_api.list_credentials_for_user(
+ self.default_domain_user['id'], type='totp')
+
+ other_creds = self.credential_api.list_credentials_for_user(
+ self.default_domain_user['id'], type='other')
+
+ for cred in itertools.chain(other_creds, totp_creds):
+ self.delete('/credentials/%s' % cred['id'],
+ expected_status=http_client.NO_CONTENT)
+
+ def test_with_a_valid_passcode(self):
+ creds = self._make_credentials('totp')
+ secret = creds[-1]['blob']
+ auth_data = self._make_auth_data_by_id(
+ totp._generate_totp_passcode(secret))
+
+ self.v3_create_token(auth_data, expected_status=http_client.CREATED)
+
+ def test_with_an_invalid_passcode_and_user_credentials(self):
+ self._make_credentials('totp')
+ auth_data = self._make_auth_data_by_id('000000')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_with_an_invalid_passcode_with_no_user_credentials(self):
+ auth_data = self._make_auth_data_by_id('000000')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_with_a_corrupt_totp_credential(self):
+ self._make_credentials('totp', count=1, blob='0')
+ auth_data = self._make_auth_data_by_id('000000')
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_with_multiple_credentials(self):
+ self._make_credentials('other', 3)
+ creds = self._make_credentials('totp', count=3)
+ secret = creds[-1]['blob']
+
+ auth_data = self._make_auth_data_by_id(
+ totp._generate_totp_passcode(secret))
+ self.v3_create_token(auth_data, expected_status=http_client.CREATED)
+
+ def test_with_multiple_users(self):
+ # make some credentials for the existing user
+ self._make_credentials('totp', count=3)
+
+ # create a new user and their credentials
+ user = unit.create_user(self.identity_api, domain_id=self.domain_id)
self.assignment_api.create_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
- domain_scoped_token = self._get_domain_scoped_token()
- # Make sure the token is valid
- self._validate_token(domain_scoped_token)
- # Disable domain
- self.domain['enabled'] = False
- self.resource_api.update_domain(self.domain['id'], self.domain)
- # Ensure validating a token for a disabled domain fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- domain_scoped_token)
-
- def test_rescope_unscoped_token_with_trust(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- self.assertLess(len(trust_scoped_token), 255)
-
- def test_validate_a_trust_scoped_token(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
-
- def test_validate_tampered_trust_scoped_token_fails(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Get a trust scoped token
- tampered_token = (trust_scoped_token[:50] + uuid.uuid4().hex +
- trust_scoped_token[50 + 32:])
- self._validate_token(tampered_token,
- expected_status=http_client.NOT_FOUND)
+ user_id=user['id'],
+ project_id=self.project['id'])
+ creds = self._make_credentials('totp', count=1, user_id=user['id'])
+ secret = creds[-1]['blob']
- def test_revoke_trust_scoped_token(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
- self._revoke_token(trust_scoped_token)
- self._validate_token(trust_scoped_token,
- expected_status=http_client.NOT_FOUND)
-
- def test_trust_scoped_token_is_invalid_after_disabling_trustee(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
-
- # Disable trustee
- trustee_update_ref = dict(enabled=False)
- self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- trust_scoped_token)
-
- def test_trust_scoped_token_invalid_after_changing_trustee_password(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
- # Change trustee's password
- trustee_update_ref = dict(password='Password1')
- self.identity_api.update_user(trustee_user['id'], trustee_update_ref)
- # Ensure updating trustee's password revokes existing tokens
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- trust_scoped_token)
-
- def test_trust_scoped_token_is_invalid_after_disabling_trustor(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
-
- # Disable the trustor
- trustor_update_ref = dict(enabled=False)
- self.identity_api.update_user(self.user['id'], trustor_update_ref)
- # Ensure validating a token for a disabled user fails
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- trust_scoped_token)
+ # Stop the clock otherwise there is a chance of auth failure due to
+ # getting a different TOTP between the call here and the call in the
+ # auth plugin.
+ self.useFixture(fixture.TimeFixture())
- def test_trust_scoped_token_invalid_after_changing_trustor_password(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
+ auth_data = self._make_auth_data_by_id(
+ totp._generate_totp_passcode(secret), user_id=user['id'])
+ self.v3_create_token(auth_data, expected_status=http_client.CREATED)
- # Change trustor's password
- trustor_update_ref = dict(password='Password1')
- self.identity_api.update_user(self.user['id'], trustor_update_ref)
- # Ensure updating trustor's password revokes existing user's tokens
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- trust_scoped_token)
+ def test_with_multiple_users_and_invalid_credentials(self):
+ """Prevent logging in with someone else's credentials.
- def test_trust_scoped_token_invalid_after_disabled_trustor_domain(self):
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- # Validate a trust scoped token
- self._validate_token(trust_scoped_token)
+ It's very easy to forget to limit the credentials query by user.
+ Let's just test it for a sanity check.
+ """
+ # make some credentials for the existing user
+ self._make_credentials('totp', count=3)
- # Disable trustor's domain
- self.domain['enabled'] = False
- self.resource_api.update_domain(self.domain['id'], self.domain)
+ # create a new user and their credentials
+ new_user = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=new_user['id'],
+ project_id=self.project['id'])
+ user2_creds = self._make_credentials(
+ 'totp', count=1, user_id=new_user['id'])
+
+ user_id = self.default_domain_user['id'] # user1
+ secret = user2_creds[-1]['blob']
+
+ auth_data = self._make_auth_data_by_id(
+ totp._generate_totp_passcode(secret), user_id=user_id)
+ self.v3_create_token(auth_data,
+ expected_status=http_client.UNAUTHORIZED)
+
+ def test_with_username_and_domain_id(self):
+ creds = self._make_credentials('totp')
+ secret = creds[-1]['blob']
+ auth_data = self._make_auth_data_by_name(
+ totp._generate_totp_passcode(secret),
+ username=self.default_domain_user['name'],
+ user_domain_id=self.default_domain_user['domain_id'])
- trustor_update_ref = dict(password='Password1')
- self.identity_api.update_user(self.user['id'], trustor_update_ref)
- # Ensure updating trustor's password revokes existing user's tokens
- self.assertRaises(exception.TokenNotFound,
- self.token_provider_api.validate_token,
- trust_scoped_token)
+ self.v3_create_token(auth_data, expected_status=http_client.CREATED)
- def test_v2_validate_unscoped_token_returns_unauthorized(self):
- """Test raised exception when validating unscoped token.
- Test that validating an unscoped token in v2.0 of a v3 user of a
- non-default domain returns unauthorized.
- """
- unscoped_token = self._get_unscoped_token()
- self.assertRaises(exception.Unauthorized,
- self.token_provider_api.validate_v2_token,
- unscoped_token)
+class TestFetchRevocationList(test_v3.RestfulTestCase):
+ """Test fetch token revocation list on the v3 Identity API."""
- def test_v2_validate_domain_scoped_token_returns_unauthorized(self):
- """Test raised exception when validating a domain scoped token.
+ def config_overrides(self):
+ super(TestFetchRevocationList, self).config_overrides()
+ self.config_fixture.config(group='token', revoke_by_id=True)
+
+ def test_ids_no_tokens(self):
+ # When there's no revoked tokens the response is an empty list, and
+ # the response is signed.
+ res = self.get('/auth/tokens/OS-PKI/revoked')
+ signed = res.json['signed']
+ clear = cms.cms_verify(signed, CONF.signing.certfile,
+ CONF.signing.ca_certs)
+ payload = json.loads(clear)
+ self.assertEqual({'revoked': []}, payload)
+
+ def test_ids_token(self):
+ # When there's a revoked token, it's in the response, and the response
+ # is signed.
+ token_res = self.v3_create_token(
+ self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
- Test that validating an domain scoped token in v2.0
- returns unauthorized.
- """
+ token_id = token_res.headers.get('X-Subject-Token')
+ token_data = token_res.json['token']
- # Grant user access to domain
- self.assignment_api.create_grant(self.role['id'],
- user_id=self.user['id'],
- domain_id=self.domain['id'])
+ self.delete('/auth/tokens', headers={'X-Subject-Token': token_id})
- scoped_token = self._get_domain_scoped_token()
- self.assertRaises(exception.Unauthorized,
- self.token_provider_api.validate_v2_token,
- scoped_token)
+ res = self.get('/auth/tokens/OS-PKI/revoked')
+ signed = res.json['signed']
+ clear = cms.cms_verify(signed, CONF.signing.certfile,
+ CONF.signing.ca_certs)
+ payload = json.loads(clear)
- def test_v2_validate_trust_scoped_token(self):
- """Test raised exception when validating a trust scoped token.
+ def truncate(ts_str):
+ return ts_str[:19] + 'Z' # 2016-01-21T15:53:52 == 19 chars.
- Test that validating an trust scoped token in v2.0 returns
- unauthorized.
- """
+ exp_token_revoke_data = {
+ 'id': token_id,
+ 'audit_id': token_data['audit_ids'][0],
+ 'expires': truncate(token_data['expires_at']),
+ }
- trustee_user, trust = self._create_trust()
- trust_scoped_token = self._get_trust_scoped_token(trustee_user, trust)
- self.assertRaises(exception.Unauthorized,
- self.token_provider_api.validate_v2_token,
- trust_scoped_token)
+ self.assertEqual({'revoked': [exp_token_revoke_data]}, payload)
+ def test_audit_id_only_no_tokens(self):
+ # When there's no revoked tokens and ?audit_id_only is used, the
+ # response is an empty list and is not signed.
+ res = self.get('/auth/tokens/OS-PKI/revoked?audit_id_only')
+ self.assertEqual({'revoked': []}, res.json)
-class TestAuthFernetTokenProvider(TestAuth):
- def setUp(self):
- super(TestAuthFernetTokenProvider, self).setUp()
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+ def test_audit_id_only_token(self):
+ # When there's a revoked token and ?audit_id_only is used, the
+ # response contains the audit_id of the token and is not signed.
+ token_res = self.v3_create_token(
+ self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id']))
- def config_overrides(self):
- super(TestAuthFernetTokenProvider, self).config_overrides()
- self.config_fixture.config(group='token', provider='fernet')
+ token_id = token_res.headers.get('X-Subject-Token')
+ token_data = token_res.json['token']
- def test_verify_with_bound_token(self):
- self.config_fixture.config(group='token', bind='kerberos')
- auth_data = self.build_authentication_request(
- project_id=self.project['id'])
- remote_user = self.default_domain_user['name']
- self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
- 'AUTH_TYPE': 'Negotiate'})
- # Bind not current supported by Fernet, see bug 1433311.
- self.v3_authenticate_token(auth_data, expected_status=501)
+ self.delete('/auth/tokens', headers={'X-Subject-Token': token_id})
- def test_v2_v3_bind_token_intermix(self):
- self.config_fixture.config(group='token', bind='kerberos')
+ res = self.get('/auth/tokens/OS-PKI/revoked?audit_id_only')
- # we need our own user registered to the default domain because of
- # the way external auth works.
- remote_user = self.default_domain_user['name']
- self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
- 'AUTH_TYPE': 'Negotiate'})
- body = {'auth': {}}
- # Bind not current supported by Fernet, see bug 1433311.
- self.admin_request(path='/v2.0/tokens',
- method='POST',
- body=body,
- expected_status=501)
+ def truncate(ts_str):
+ return ts_str[:19] + 'Z' # 2016-01-21T15:53:52 == 19 chars.
- def test_auth_with_bind_token(self):
- self.config_fixture.config(group='token', bind=['kerberos'])
+ exp_token_revoke_data = {
+ 'audit_id': token_data['audit_ids'][0],
+ 'expires': truncate(token_data['expires_at']),
+ }
- auth_data = self.build_authentication_request()
- remote_user = self.default_domain_user['name']
- self.admin_app.extra_environ.update({'REMOTE_USER': remote_user,
- 'AUTH_TYPE': 'Negotiate'})
- # Bind not current supported by Fernet, see bug 1433311.
- self.v3_authenticate_token(auth_data, expected_status=501)
+ self.assertEqual({'revoked': [exp_token_revoke_data]}, res.json)