aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_protection.py
diff options
context:
space:
mode:
authorRuan HE <ruan.he@orange.com>2016-06-09 08:12:34 +0000
committerGerrit Code Review <gerrit@172.30.200.206>2016-06-09 08:12:34 +0000
commit4bc079a2664f9a407e332291f34d174625a9d5ea (patch)
tree7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_v3_protection.py
parent2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff)
parent2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff)
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_protection.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_protection.py739
1 files changed, 610 insertions, 129 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py
index 9922ae5e..f77a1528 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_protection.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_protection.py
@@ -20,19 +20,22 @@ from oslo_serialization import jsonutils
from six.moves import http_client
from keystone import exception
-from keystone.policy.backends import rules
from keystone.tests import unit
+from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
from keystone.tests.unit import test_v3
+from keystone.tests.unit import utils
CONF = cfg.CONF
-DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
class IdentityTestProtectedCase(test_v3.RestfulTestCase):
"""Test policy enforcement on the v3 Identity API."""
+ def _policy_fixture(self):
+ return ksfixtures.Policy(self.tmpfilename, self.config_fixture)
+
def setUp(self):
"""Setup for Identity Protection Test Cases.
@@ -49,14 +52,9 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
the default domain.
"""
- # Ensure that test_v3.RestfulTestCase doesn't load its own
- # sample data, which would make checking the results of our
- # tests harder
- super(IdentityTestProtectedCase, self).setUp()
self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
self.tmpfilename = self.tempfile.file_name
- self.config_fixture.config(group='oslo_policy',
- policy_file=self.tmpfilename)
+ super(IdentityTestProtectedCase, self).setUp()
# A default auth request we can use - un-scoped user token
self.auth = self.build_authentication_request(
@@ -66,45 +64,33 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
def load_sample_data(self):
self._populate_default_domain()
# Start by creating a couple of domains
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
- self.domainB = self.new_domain_ref()
+ self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
- self.domainC = self.new_domain_ref()
- self.domainC['enabled'] = False
+ self.domainC = unit.new_domain_ref(enabled=False)
self.resource_api.create_domain(self.domainC['id'], self.domainC)
# Now create some users, one in domainA and two of them in domainB
- self.user1 = self.new_user_ref(domain_id=self.domainA['id'])
- password = uuid.uuid4().hex
- self.user1['password'] = password
- self.user1 = self.identity_api.create_user(self.user1)
- self.user1['password'] = password
-
- self.user2 = self.new_user_ref(domain_id=self.domainB['id'])
- password = uuid.uuid4().hex
- self.user2['password'] = password
- self.user2 = self.identity_api.create_user(self.user2)
- self.user2['password'] = password
-
- self.user3 = self.new_user_ref(domain_id=self.domainB['id'])
- password = uuid.uuid4().hex
- self.user3['password'] = password
- self.user3 = self.identity_api.create_user(self.user3)
- self.user3['password'] = password
-
- self.group1 = self.new_group_ref(domain_id=self.domainA['id'])
+ self.user1 = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
+ self.user2 = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
+ self.user3 = unit.create_user(self.identity_api,
+ domain_id=self.domainB['id'])
+
+ self.group1 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group1 = self.identity_api.create_group(self.group1)
- self.group2 = self.new_group_ref(domain_id=self.domainA['id'])
+ self.group2 = unit.new_group_ref(domain_id=self.domainA['id'])
self.group2 = self.identity_api.create_group(self.group2)
- self.group3 = self.new_group_ref(domain_id=self.domainB['id'])
+ self.group3 = unit.new_group_ref(domain_id=self.domainB['id'])
self.group3 = self.identity_api.create_group(self.group3)
- self.role = self.new_role_ref()
+ self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
- self.role1 = self.new_role_ref()
+ self.role1 = unit.new_role_ref()
self.role_api.create_role(self.role1['id'], self.role1)
self.assignment_api.create_grant(self.role['id'],
user_id=self.user1['id'],
@@ -348,34 +334,23 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
def load_sample_data(self):
self._populate_default_domain()
- self.just_a_user = self.new_user_ref(
+ self.just_a_user = unit.create_user(
+ self.identity_api,
domain_id=CONF.identity.default_domain_id)
- password = uuid.uuid4().hex
- self.just_a_user['password'] = password
- self.just_a_user = self.identity_api.create_user(self.just_a_user)
- self.just_a_user['password'] = password
-
- self.another_user = self.new_user_ref(
+ self.another_user = unit.create_user(
+ self.identity_api,
domain_id=CONF.identity.default_domain_id)
- password = uuid.uuid4().hex
- self.another_user['password'] = password
- self.another_user = self.identity_api.create_user(self.another_user)
- self.another_user['password'] = password
-
- self.admin_user = self.new_user_ref(
+ self.admin_user = unit.create_user(
+ self.identity_api,
domain_id=CONF.identity.default_domain_id)
- password = uuid.uuid4().hex
- self.admin_user['password'] = password
- self.admin_user = self.identity_api.create_user(self.admin_user)
- self.admin_user['password'] = password
- self.role = self.new_role_ref()
+ self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
- self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'}
+ self.admin_role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.admin_role['id'], self.admin_role)
# Create and assign roles to the project
- self.project = self.new_project_ref(
+ self.project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id)
self.resource_api.create_project(self.project['id'], self.project)
self.assignment_api.create_grant(self.role['id'],
@@ -461,7 +436,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
token = self.get_requested_token(auth)
self.head('/auth/tokens', token=token,
- headers={'X-Subject-Token': token}, expected_status=200)
+ headers={'X-Subject-Token': token},
+ expected_status=http_client.OK)
def test_user_check_user_token(self):
# A user can check one of their own tokens.
@@ -474,7 +450,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
token2 = self.get_requested_token(auth)
self.head('/auth/tokens', token=token1,
- headers={'X-Subject-Token': token2}, expected_status=200)
+ headers={'X-Subject-Token': token2},
+ expected_status=http_client.OK)
def test_user_check_other_user_token_rejected(self):
# A user cannot check another user's token.
@@ -510,7 +487,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase):
user_token = self.get_requested_token(user_auth)
self.head('/auth/tokens', token=admin_token,
- headers={'X-Subject-Token': user_token}, expected_status=200)
+ headers={'X-Subject-Token': user_token},
+ expected_status=http_client.OK)
def test_user_revoke_same_token(self):
# Given a non-admin user token, the token can be used to revoke
@@ -579,6 +557,10 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
test_v3.AssignmentTestMixin):
"""Test policy enforcement of the sample v3 cloud policy file."""
+ def _policy_fixture(self):
+ return ksfixtures.Policy(unit.dirs.etc('policy.v3cloudsample.json'),
+ self.config_fixture)
+
def setUp(self):
"""Setup for v3 Cloud Policy Sample Test Cases.
@@ -592,8 +574,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
- domain_admin_user has role 'admin' on domainA,
- project_admin_user has role 'admin' on the project,
- just_a_user has a non-admin role on both domainA and the project.
- - admin_domain has user cloud_admin_user, with an 'admin' role
- on admin_domain.
+ - admin_domain has admin_project, and user cloud_admin_user, with an
+ 'admin' role on admin_project.
We test various api protection rules from the cloud sample policy
file to make sure the sample is valid and that we correctly enforce it.
@@ -604,62 +586,61 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# tests harder
super(IdentityTestv3CloudPolicySample, self).setUp()
- # Finally, switch to the v3 sample policy file
- self.addCleanup(rules.reset)
- rules.reset()
self.config_fixture.config(
- group='oslo_policy',
- policy_file=unit.dirs.etc('policy.v3cloudsample.json'))
+ group='resource',
+ admin_project_name=self.admin_project['name'])
+ self.config_fixture.config(
+ group='resource',
+ admin_project_domain_name=self.admin_domain['name'])
def load_sample_data(self):
# Start by creating a couple of domains
self._populate_default_domain()
- self.domainA = self.new_domain_ref()
+ self.domainA = unit.new_domain_ref()
self.resource_api.create_domain(self.domainA['id'], self.domainA)
- self.domainB = self.new_domain_ref()
+ self.domainB = unit.new_domain_ref()
self.resource_api.create_domain(self.domainB['id'], self.domainB)
- self.admin_domain = {'id': 'admin_domain_id', 'name': 'Admin_domain'}
+ self.admin_domain = unit.new_domain_ref()
self.resource_api.create_domain(self.admin_domain['id'],
self.admin_domain)
+ self.admin_project = unit.new_project_ref(
+ domain_id=self.admin_domain['id'])
+ self.resource_api.create_project(self.admin_project['id'],
+ self.admin_project)
+
# And our users
- self.cloud_admin_user = self.new_user_ref(
+ self.cloud_admin_user = unit.create_user(
+ self.identity_api,
domain_id=self.admin_domain['id'])
- password = uuid.uuid4().hex
- self.cloud_admin_user['password'] = password
- self.cloud_admin_user = (
- self.identity_api.create_user(self.cloud_admin_user))
- self.cloud_admin_user['password'] = password
- self.just_a_user = self.new_user_ref(domain_id=self.domainA['id'])
- password = uuid.uuid4().hex
- self.just_a_user['password'] = password
- self.just_a_user = self.identity_api.create_user(self.just_a_user)
- self.just_a_user['password'] = password
- self.domain_admin_user = self.new_user_ref(
+ self.just_a_user = unit.create_user(
+ self.identity_api,
domain_id=self.domainA['id'])
- password = uuid.uuid4().hex
- self.domain_admin_user['password'] = password
- self.domain_admin_user = (
- self.identity_api.create_user(self.domain_admin_user))
- self.domain_admin_user['password'] = password
- self.project_admin_user = self.new_user_ref(
+ self.domain_admin_user = unit.create_user(
+ self.identity_api,
+ domain_id=self.domainA['id'])
+ self.domainB_admin_user = unit.create_user(
+ self.identity_api,
+ domain_id=self.domainB['id'])
+ self.project_admin_user = unit.create_user(
+ self.identity_api,
domain_id=self.domainA['id'])
- password = uuid.uuid4().hex
- self.project_admin_user['password'] = password
- self.project_admin_user = (
- self.identity_api.create_user(self.project_admin_user))
- self.project_admin_user['password'] = password
-
- # The admin role and another plain role
- self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'}
+ self.project_adminB_user = unit.create_user(
+ self.identity_api,
+ domain_id=self.domainB['id'])
+
+ # The admin role, a domain specific role and another plain role
+ self.admin_role = unit.new_role_ref(name='admin')
self.role_api.create_role(self.admin_role['id'], self.admin_role)
- self.role = self.new_role_ref()
+ self.roleA = unit.new_role_ref(domain_id=self.domainA['id'])
+ self.role_api.create_role(self.roleA['id'], self.roleA)
+ self.role = unit.new_role_ref()
self.role_api.create_role(self.role['id'], self.role)
- # The cloud admin just gets the admin role
+ # The cloud admin just gets the admin role on the special admin project
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.cloud_admin_user['id'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
# Assign roles to the domain
self.assignment_api.create_grant(self.admin_role['id'],
@@ -668,13 +649,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=self.domainB_admin_user['id'],
+ domain_id=self.domainB['id'])
# Create and assign roles to the project
- self.project = self.new_project_ref(domain_id=self.domainA['id'])
+ self.project = unit.new_project_ref(domain_id=self.domainA['id'])
self.resource_api.create_project(self.project['id'], self.project)
+ self.projectB = unit.new_project_ref(domain_id=self.domainB['id'])
+ self.resource_api.create_project(self.projectB['id'], self.projectB)
self.assignment_api.create_grant(self.admin_role['id'],
user_id=self.project_admin_user['id'],
project_id=self.project['id'])
+ self.assignment_api.create_grant(
+ self.admin_role['id'], user_id=self.project_adminB_user['id'],
+ project_id=self.projectB['id'])
self.assignment_api.create_grant(self.role['id'],
user_id=self.just_a_user['id'],
project_id=self.project['id'])
@@ -683,7 +672,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# Return the expected return codes for APIs with and without data
# with any specified status overriding the normal values
if expected_status is None:
- return (200, 201, 204)
+ return (http_client.OK, http_client.CREATED,
+ http_client.NO_CONTENT)
else:
return (expected_status, expected_status, expected_status)
@@ -702,7 +692,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
- user_ref = self.new_user_ref(domain_id=domain_id)
+ user_ref = unit.new_user_ref(domain_id=domain_id)
self.post('/users', auth=self.auth, body={'user': user_ref},
expected_status=status_created)
@@ -721,7 +711,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
- proj_ref = self.new_project_ref(domain_id=domain_id)
+ proj_ref = unit.new_project_ref(domain_id=domain_id)
self.post('/projects', auth=self.auth, body={'project': proj_ref},
expected_status=status_created)
@@ -740,13 +730,14 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.delete(entity_url, auth=self.auth,
expected_status=status_no_data)
- domain_ref = self.new_domain_ref()
+ domain_ref = unit.new_domain_ref()
self.post('/domains', auth=self.auth, body={'domain': domain_ref},
expected_status=status_created)
- def _test_grants(self, target, entity_id, expected=None):
+ def _test_grants(self, target, entity_id, role_domain_id=None,
+ list_status_OK=False, expected=None):
status_OK, status_created, status_no_data = self._stati(expected)
- a_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ a_role = unit.new_role_ref(domain_id=role_domain_id)
self.role_api.create_role(a_role['id'], a_role)
collection_url = (
@@ -762,11 +753,67 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
expected_status=status_no_data)
self.head(member_url, auth=self.auth,
expected_status=status_no_data)
- self.get(collection_url, auth=self.auth,
- expected_status=status_OK)
+ if list_status_OK:
+ self.get(collection_url, auth=self.auth)
+ else:
+ self.get(collection_url, auth=self.auth,
+ expected_status=status_OK)
self.delete(member_url, auth=self.auth,
expected_status=status_no_data)
+ def _role_management_cases(self, read_status_OK=False, expected=None):
+ # Set the different status values for different types of call depending
+ # on whether we expect the calls to fail or not.
+ status_OK, status_created, status_no_data = self._stati(expected)
+ entity_url = '/roles/%s' % self.role['id']
+ list_url = '/roles'
+
+ if read_status_OK:
+ self.get(entity_url, auth=self.auth)
+ self.get(list_url, auth=self.auth)
+ else:
+ self.get(entity_url, auth=self.auth,
+ expected_status=status_OK)
+ self.get(list_url, auth=self.auth,
+ expected_status=status_OK)
+
+ role = {'name': 'Updated'}
+ self.patch(entity_url, auth=self.auth, body={'role': role},
+ expected_status=status_OK)
+ self.delete(entity_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ role_ref = unit.new_role_ref()
+ self.post('/roles', auth=self.auth, body={'role': role_ref},
+ expected_status=status_created)
+
+ def _domain_role_management_cases(self, domain_id, read_status_OK=False,
+ expected=None):
+ # Set the different status values for different types of call depending
+ # on whether we expect the calls to fail or not.
+ status_OK, status_created, status_no_data = self._stati(expected)
+ entity_url = '/roles/%s' % self.roleA['id']
+ list_url = '/roles?domain_id=%s' % domain_id
+
+ if read_status_OK:
+ self.get(entity_url, auth=self.auth)
+ self.get(list_url, auth=self.auth)
+ else:
+ self.get(entity_url, auth=self.auth,
+ expected_status=status_OK)
+ self.get(list_url, auth=self.auth,
+ expected_status=status_OK)
+
+ role = {'name': 'Updated'}
+ self.patch(entity_url, auth=self.auth, body={'role': role},
+ expected_status=status_OK)
+ self.delete(entity_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ role_ref = unit.new_role_ref(domain_id=domain_id)
+ self.post('/roles', auth=self.auth, body={'role': role_ref},
+ expected_status=status_created)
+
def test_user_management(self):
# First, authenticate with a user that does not have the domain
# admin role - shouldn't be able to do much.
@@ -786,13 +833,90 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self._test_user_management(self.domainA['id'])
+ def test_user_management_normalized_keys(self):
+ """Illustrate the inconsistent handling of hyphens in keys.
+
+ To quote Morgan in bug 1526244:
+
+ the reason this is converted from "domain-id" to "domain_id" is
+ because of how we process/normalize data. The way we have to handle
+ specific data types for known columns requires avoiding "-" in the
+ actual python code since "-" is not valid for attributes in python
+ w/o significant use of "getattr" etc.
+
+ In short, historically we handle some things in conversions. The
+ use of "extras" has long been a poor design choice that leads to
+ odd/strange inconsistent behaviors because of other choices made in
+ handling data from within the body. (In many cases we convert from
+ "-" to "_" throughout openstack)
+
+ Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9
+
+ """
+ # Authenticate with a user that has the domain admin role
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ # Show that we can read a normal user without any surprises.
+ r = self.get(
+ '/users/%s' % self.just_a_user['id'],
+ auth=self.auth,
+ expected_status=http_client.OK)
+ self.assertValidUserResponse(r)
+
+ # We don't normalize query string keys, so both of these result in a
+ # 403, because we didn't specify a domain_id query string in either
+ # case, and we explicitly require one (it doesn't matter what
+ # 'domain-id' value you use).
+ self.get(
+ '/users?domain-id=%s' % self.domainA['id'],
+ auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+ self.get(
+ '/users?domain-id=%s' % self.domainB['id'],
+ auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ # If we try updating the user's 'domain_id' by specifying a
+ # 'domain-id', then it'll be stored into extras rather than normalized,
+ # and the user's actual 'domain_id' is not affected.
+ r = self.patch(
+ '/users/%s' % self.just_a_user['id'],
+ auth=self.auth,
+ body={'user': {'domain-id': self.domainB['id']}},
+ expected_status=http_client.OK)
+ self.assertEqual(self.domainB['id'], r.json['user']['domain-id'])
+ self.assertEqual(self.domainA['id'], r.json['user']['domain_id'])
+ self.assertNotEqual(self.domainB['id'], self.just_a_user['domain_id'])
+ self.assertValidUserResponse(r, self.just_a_user)
+
+ # Finally, show that we can create a new user without any surprises.
+ # But if we specify a 'domain-id' instead of a 'domain_id', we get a
+ # Forbidden response because we fail a policy check before
+ # normalization occurs.
+ user_ref = unit.new_user_ref(domain_id=self.domainA['id'])
+ r = self.post(
+ '/users',
+ auth=self.auth,
+ body={'user': user_ref},
+ expected_status=http_client.CREATED)
+ self.assertValidUserResponse(r, ref=user_ref)
+ user_ref['domain-id'] = user_ref.pop('domain_id')
+ self.post(
+ '/users',
+ auth=self.auth,
+ body={'user': user_ref},
+ expected_status=exception.ForbiddenAction.code)
+
def test_user_management_by_cloud_admin(self):
# Test users management with a cloud admin. This user should
# be able to manage users in any domain.
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
self._test_user_management(self.domainA['id'])
@@ -824,7 +948,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
# Check whether cloud admin can operate a domain
# other than its own domain or not
@@ -858,10 +982,56 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
self._test_grants('domains', self.domainA['id'])
+ def test_domain_grants_by_cloud_admin_for_domain_specific_role(self):
+ # Test domain grants with a cloud admin. This user should be
+ # able to manage domain roles on any domain.
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ project_id=self.admin_project['id'])
+
+ self._test_grants('domains', self.domainA['id'],
+ role_domain_id=self.domainB['id'])
+
+ def test_domain_grants_by_non_admin_for_domain_specific_role(self):
+ # A non-admin shouldn't be able to do anything
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('domains', self.domainA['id'],
+ role_domain_id=self.domainA['id'],
+ expected=exception.ForbiddenAction.code)
+ self._test_grants('domains', self.domainA['id'],
+ role_domain_id=self.domainB['id'],
+ expected=exception.ForbiddenAction.code)
+
+ def test_domain_grants_by_domain_admin_for_domain_specific_role(self):
+ # Authenticate with a user that does have the domain admin role,
+ # should not be able to assign a domain_specific role from another
+ # domain
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('domains', self.domainA['id'],
+ role_domain_id=self.domainB['id'],
+ # List status will always be OK, since we are not
+ # granting/checking/deleting assignments
+ list_status_OK=True,
+ expected=exception.ForbiddenAction.code)
+
+ # They should be able to assign a domain specific role from the same
+ # domain
+ self._test_grants('domains', self.domainA['id'],
+ role_domain_id=self.domainA['id'])
+
def test_project_grants(self):
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
@@ -890,11 +1060,67 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self._test_grants('projects', self.project['id'])
+ def test_project_grants_by_non_admin_for_domain_specific_role(self):
+ # A non-admin shouldn't be able to do anything
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainA['id'],
+ expected=exception.ForbiddenAction.code)
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainB['id'],
+ expected=exception.ForbiddenAction.code)
+
+ def test_project_grants_by_project_admin_for_domain_specific_role(self):
+ # Authenticate with a user that does have the project admin role,
+ # should not be able to assign a domain_specific role from another
+ # domain
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainB['id'],
+ # List status will always be OK, since we are not
+ # granting/checking/deleting assignments
+ list_status_OK=True,
+ expected=exception.ForbiddenAction.code)
+
+ # They should be able to assign a domain specific role from the same
+ # domain
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainA['id'])
+
+ def test_project_grants_by_domain_admin_for_domain_specific_role(self):
+ # Authenticate with a user that does have the domain admin role,
+ # should not be able to assign a domain_specific role from another
+ # domain
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainB['id'],
+ # List status will always be OK, since we are not
+ # granting/checking/deleting assignments
+ list_status_OK=True,
+ expected=exception.ForbiddenAction.code)
+
+ # They should be able to assign a domain specific role from the same
+ # domain
+ self._test_grants('projects', self.project['id'],
+ role_domain_id=self.domainA['id'])
+
def test_cloud_admin_list_assignments_of_domain(self):
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
collection_url = self.build_role_assignment_query_url(
domain_id=self.domainA['id'])
@@ -968,7 +1194,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
collection_url = self.build_role_assignment_query_url(
project_id=self.project['id'])
@@ -990,7 +1216,33 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, project_admin_entity)
self.assertRoleAssignmentInListResponse(r, project_user_entity)
- @unit.utils.wip('waiting on bug #1437407')
+ def test_admin_project_list_assignments_of_project(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ collection_url = self.build_role_assignment_query_url(
+ project_id=self.project['id'])
+ r = self.get(collection_url, auth=self.auth)
+ self.assertValidRoleAssignmentListResponse(
+ r, expected_length=2, resource_url=collection_url)
+
+ project_admin_entity = self.build_role_assignment_entity(
+ project_id=self.project['id'],
+ user_id=self.project_admin_user['id'],
+ role_id=self.admin_role['id'],
+ inherited_to_projects=False)
+ project_user_entity = self.build_role_assignment_entity(
+ project_id=self.project['id'],
+ user_id=self.just_a_user['id'],
+ role_id=self.role['id'],
+ inherited_to_projects=False)
+
+ self.assertRoleAssignmentInListResponse(r, project_admin_entity)
+ self.assertRoleAssignmentInListResponse(r, project_user_entity)
+
+ @utils.wip('waiting on bug #1437407')
def test_domain_admin_list_assignments_of_project(self):
self.auth = self.build_authentication_request(
user_id=self.domain_admin_user['id'],
@@ -1017,6 +1269,53 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.assertRoleAssignmentInListResponse(r, project_admin_entity)
self.assertRoleAssignmentInListResponse(r, project_user_entity)
+ def test_domain_admin_list_assignment_tree(self):
+ # Add a child project to the standard test data
+ sub_project = unit.new_project_ref(domain_id=self.domainA['id'],
+ parent_id=self.project['id'])
+ self.resource_api.create_project(sub_project['id'], sub_project)
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.just_a_user['id'],
+ project_id=sub_project['id'])
+
+ collection_url = self.build_role_assignment_query_url(
+ project_id=self.project['id'])
+ collection_url += '&include_subtree=True'
+
+ # The domain admin should be able to list the assignment tree
+ auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ r = self.get(collection_url, auth=auth)
+ self.assertValidRoleAssignmentListResponse(
+ r, expected_length=3, resource_url=collection_url)
+
+ # A project admin should not be able to
+ auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ r = self.get(collection_url, auth=auth,
+ expected_status=http_client.FORBIDDEN)
+
+ # A neither should a domain admin from a different domain
+ domainB_admin_user = unit.create_user(
+ self.identity_api,
+ domain_id=self.domainB['id'])
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=domainB_admin_user['id'],
+ domain_id=self.domainB['id'])
+ auth = self.build_authentication_request(
+ user_id=domainB_admin_user['id'],
+ password=domainB_admin_user['password'],
+ domain_id=self.domainB['id'])
+
+ r = self.get(collection_url, auth=auth,
+ expected_status=http_client.FORBIDDEN)
+
def test_domain_user_list_assignments_of_project_failed(self):
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
@@ -1040,7 +1339,23 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
+
+ self._test_domain_management()
+
+ def test_admin_project(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ self._test_domain_management(
+ expected=exception.ForbiddenAction.code)
+
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ project_id=self.admin_project['id'])
self._test_domain_management()
@@ -1050,16 +1365,15 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
password=self.domain_admin_user['password'],
domain_id=self.domainA['id'])
entity_url = '/domains/%s' % self.domainA['id']
- self.get(entity_url, auth=self.auth, expected_status=200)
+ self.get(entity_url, auth=self.auth)
def test_list_user_credentials(self):
- self.credential_user = self.new_credential_ref(self.just_a_user['id'])
- self.credential_api.create_credential(self.credential_user['id'],
- self.credential_user)
- self.credential_admin = self.new_credential_ref(
- self.cloud_admin_user['id'])
- self.credential_api.create_credential(self.credential_admin['id'],
- self.credential_admin)
+ credential_user = unit.new_credential_ref(self.just_a_user['id'])
+ self.credential_api.create_credential(credential_user['id'],
+ credential_user)
+ credential_admin = unit.new_credential_ref(self.cloud_admin_user['id'])
+ self.credential_api.create_credential(credential_admin['id'],
+ credential_admin)
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],
@@ -1075,9 +1389,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
def test_get_and_delete_ec2_credentials(self):
"""Tests getting and deleting ec2 credentials through the ec2 API."""
- another_user = self.new_user_ref(domain_id=self.domainA['id'])
- password = another_user['password']
- another_user = self.identity_api.create_user(another_user)
+ another_user = unit.create_user(self.identity_api,
+ domain_id=self.domainA['id'])
# create a credential for just_a_user
just_user_auth = self.build_authentication_request(
@@ -1091,7 +1404,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
# another normal user can't get the credential
another_user_auth = self.build_authentication_request(
user_id=another_user['id'],
- password=password)
+ password=another_user['password'])
another_user_url = '/users/%s/credentials/OS-EC2/%s' % (
another_user['id'], r.result['credential']['access'])
self.get(another_user_url, auth=another_user_auth,
@@ -1160,7 +1473,26 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
admin_auth = self.build_authentication_request(
user_id=self.cloud_admin_user['id'],
password=self.cloud_admin_user['password'],
- domain_id=self.admin_domain['id'])
+ project_id=self.admin_project['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.get('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token})
+
+ def test_admin_project_validate_user_token(self):
+ # An admin can validate a user's token.
+ # This is GET /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
admin_token = self.get_requested_token(admin_auth)
user_auth = self.build_authentication_request(
@@ -1182,7 +1514,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
token = self.get_requested_token(auth)
self.head('/auth/tokens', token=token,
- headers={'X-Subject-Token': token}, expected_status=200)
+ headers={'X-Subject-Token': token},
+ expected_status=http_client.OK)
def test_user_check_user_token(self):
# A user can check one of their own tokens.
@@ -1195,7 +1528,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
token2 = self.get_requested_token(auth)
self.head('/auth/tokens', token=token1,
- headers={'X-Subject-Token': token2}, expected_status=200)
+ headers={'X-Subject-Token': token2},
+ expected_status=http_client.OK)
def test_user_check_other_user_token_rejected(self):
# A user cannot check another user's token.
@@ -1231,7 +1565,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
user_token = self.get_requested_token(user_auth)
self.head('/auth/tokens', token=admin_token,
- headers={'X-Subject-Token': user_token}, expected_status=200)
+ headers={'X-Subject-Token': user_token},
+ expected_status=http_client.OK)
def test_user_revoke_same_token(self):
# Given a non-admin user token, the token can be used to revoke
@@ -1294,3 +1629,149 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.delete('/auth/tokens', token=admin_token,
headers={'X-Subject-Token': user_token})
+
+ def test_user_with_a_role_get_project(self):
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+
+ # Test user can get project for one they have a role in
+ self.get('/projects/%s' % self.project['id'], auth=user_auth)
+
+ # Test user can not get project for one they don't have a role in,
+ # even if they have a role on another project
+ project2 = unit.new_project_ref(domain_id=self.domainA['id'])
+ self.resource_api.create_project(project2['id'], project2)
+ self.get('/projects/%s' % project2['id'], auth=user_auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_project_admin_get_project(self):
+ admin_auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ resp = self.get('/projects/%s' % self.project['id'], auth=admin_auth)
+ self.assertEqual(self.project['id'],
+ jsonutils.loads(resp.body)['project']['id'])
+
+ def test_role_management_no_admin_no_rights(self):
+ # A non-admin domain user shouldn't be able to manipulate roles
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._role_management_cases(expected=exception.ForbiddenAction.code)
+
+ # ...and nor should non-admin project user
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+
+ self._role_management_cases(expected=exception.ForbiddenAction.code)
+
+ def test_role_management_with_project_admin(self):
+ # A project admin user should be able to get and list, but not be able
+ # to create/update/delete global roles
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ self._role_management_cases(read_status_OK=True,
+ expected=exception.ForbiddenAction.code)
+
+ def test_role_management_with_domain_admin(self):
+ # A domain admin user should be able to get and list, but not be able
+ # to create/update/delete global roles
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._role_management_cases(read_status_OK=True,
+ expected=exception.ForbiddenAction.code)
+
+ def test_role_management_with_cloud_admin(self):
+ # A cloud admin user should have rights to manipulate global roles
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ project_id=self.admin_project['id'])
+
+ self._role_management_cases()
+
+ def test_domain_role_management_no_admin_no_rights(self):
+ # A non-admin domain user shouldn't be able to manipulate domain roles
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._domain_role_management_cases(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ # ...and nor should non-admin project user
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+
+ self._domain_role_management_cases(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ def test_domain_role_management_with_cloud_admin(self):
+ # A cloud admin user should have rights to manipulate domain roles
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ project_id=self.admin_project['id'])
+
+ self._domain_role_management_cases(self.domainA['id'])
+
+ def test_domain_role_management_with_domain_admin(self):
+ # A domain admin user should only be able to manipulate the domain
+ # specific roles in their own domain
+ self.auth = self.build_authentication_request(
+ user_id=self.domainB_admin_user['id'],
+ password=self.domainB_admin_user['password'],
+ domain_id=self.domainB['id'])
+
+ # Try to access the domain specific roles in another domain
+ self._domain_role_management_cases(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ # ...but they should be able to work with those in their own domain
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._domain_role_management_cases(self.domainA['id'])
+
+ def test_domain_role_management_with_project_admin(self):
+ # A project admin user should have not access to domain specific roles
+ # in another domain. They should be able to get and list domain
+ # specific roles from their own domain, but not be able to create,
+ # update or delete them,
+ self.auth = self.build_authentication_request(
+ user_id=self.project_adminB_user['id'],
+ password=self.project_adminB_user['password'],
+ project_id=self.projectB['id'])
+
+ # Try access the domain specific roless in another domain
+ self._domain_role_management_cases(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ # ...but they should be ablet to work with those in their own domain
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ self._domain_role_management_cases(
+ self.domainA['id'], read_status_OK=True,
+ expected=exception.ForbiddenAction.code)