summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_protection.py
diff options
context:
space:
mode:
authorWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
committerWuKong <rebirthmonkey@gmail.com>2015-06-30 18:47:29 +0200
commitb8c756ecdd7cced1db4300935484e8c83701c82e (patch)
tree87e51107d82b217ede145de9d9d59e2100725bd7 /keystone-moon/keystone/tests/unit/test_v3_protection.py
parentc304c773bae68fb854ed9eab8fb35c4ef17cf136 (diff)
migrate moon code from github to opnfv
Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_protection.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_protection.py1170
1 files changed, 1170 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py
new file mode 100644
index 00000000..2b2c96d1
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_v3_protection.py
@@ -0,0 +1,1170 @@
+# Copyright 2012 OpenStack Foundation
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from oslo_config import cfg
+from oslo_serialization import jsonutils
+
+from keystone import exception
+from keystone.policy.backends import rules
+from keystone.tests import unit as tests
+from keystone.tests.unit.ksfixtures import temporaryfile
+from keystone.tests.unit import test_v3
+
+
+CONF = cfg.CONF
+DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
+
+
+class IdentityTestProtectedCase(test_v3.RestfulTestCase):
+ """Test policy enforcement on the v3 Identity API."""
+
+ def setUp(self):
+ """Setup for Identity Protection Test Cases.
+
+ As well as the usual housekeeping, create a set of domains,
+ users, roles and projects for the subsequent tests:
+
+ - Three domains: A,B & C. C is disabled.
+ - DomainA has user1, DomainB has user2 and user3
+ - DomainA has group1 and group2, DomainB has group3
+ - User1 has two roles on DomainA
+ - User2 has one role on DomainA
+
+ Remember that there will also be a fourth domain in existence,
+ the default domain.
+
+ """
+ # Ensure that test_v3.RestfulTestCase doesn't load its own
+ # sample data, which would make checking the results of our
+ # tests harder
+ super(IdentityTestProtectedCase, self).setUp()
+ self.tempfile = self.useFixture(temporaryfile.SecureTempFile())
+ self.tmpfilename = self.tempfile.file_name
+ self.config_fixture.config(group='oslo_policy',
+ policy_file=self.tmpfilename)
+
+ # A default auth request we can use - un-scoped user token
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'])
+
+ def load_sample_data(self):
+ self._populate_default_domain()
+ # Start by creating a couple of domains
+ self.domainA = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainA['id'], self.domainA)
+ self.domainB = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainB['id'], self.domainB)
+ self.domainC = self.new_domain_ref()
+ self.domainC['enabled'] = False
+ self.resource_api.create_domain(self.domainC['id'], self.domainC)
+
+ # Now create some users, one in domainA and two of them in domainB
+ self.user1 = self.new_user_ref(domain_id=self.domainA['id'])
+ password = uuid.uuid4().hex
+ self.user1['password'] = password
+ self.user1 = self.identity_api.create_user(self.user1)
+ self.user1['password'] = password
+
+ self.user2 = self.new_user_ref(domain_id=self.domainB['id'])
+ password = uuid.uuid4().hex
+ self.user2['password'] = password
+ self.user2 = self.identity_api.create_user(self.user2)
+ self.user2['password'] = password
+
+ self.user3 = self.new_user_ref(domain_id=self.domainB['id'])
+ password = uuid.uuid4().hex
+ self.user3['password'] = password
+ self.user3 = self.identity_api.create_user(self.user3)
+ self.user3['password'] = password
+
+ self.group1 = self.new_group_ref(domain_id=self.domainA['id'])
+ self.group1 = self.identity_api.create_group(self.group1)
+
+ self.group2 = self.new_group_ref(domain_id=self.domainA['id'])
+ self.group2 = self.identity_api.create_group(self.group2)
+
+ self.group3 = self.new_group_ref(domain_id=self.domainB['id'])
+ self.group3 = self.identity_api.create_group(self.group3)
+
+ self.role = self.new_role_ref()
+ self.role_api.create_role(self.role['id'], self.role)
+ self.role1 = self.new_role_ref()
+ self.role_api.create_role(self.role1['id'], self.role1)
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user1['id'],
+ domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.user2['id'],
+ domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.role1['id'],
+ user_id=self.user1['id'],
+ domain_id=self.domainA['id'])
+
+ def _get_id_list_from_ref_list(self, ref_list):
+ result_list = []
+ for x in ref_list:
+ result_list.append(x['id'])
+ return result_list
+
+ def _set_policy(self, new_policy):
+ with open(self.tmpfilename, "w") as policyfile:
+ policyfile.write(jsonutils.dumps(new_policy))
+
+ def test_list_users_unprotected(self):
+ """GET /users (unprotected)
+
+ Test Plan:
+
+ - Update policy so api is unprotected
+ - Use an un-scoped token to make sure we can get back all
+ the users independent of domain
+
+ """
+ self._set_policy({"identity:list_users": []})
+ r = self.get('/users', auth=self.auth)
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
+ self.assertIn(self.user1['id'], id_list)
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_list_users_filtered_by_domain(self):
+ """GET /users?domain_id=mydomain (filtered)
+
+ Test Plan:
+
+ - Update policy so api is unprotected
+ - Use an un-scoped token to make sure we can filter the
+ users by domainB, getting back the 2 users in that domain
+
+ """
+ self._set_policy({"identity:list_users": []})
+ url_by_name = '/users?domain_id=%s' % self.domainB['id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should get back two users, those in DomainB
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
+ self.assertIn(self.user2['id'], id_list)
+ self.assertIn(self.user3['id'], id_list)
+
+ def test_get_user_protected_match_id(self):
+ """GET /users/{id} (match payload)
+
+ Test Plan:
+
+ - Update policy to protect api by user_id
+ - List users with user_id of user1 as filter, to check that
+ this will correctly match user_id in the flattened
+ payload
+
+ """
+ # TODO(henry-nash, ayoung): It would be good to expand this
+ # test for further test flattening, e.g. protect on, say, an
+ # attribute of an object being created
+ new_policy = {"identity:get_user": [["user_id:%(user_id)s"]]}
+ self._set_policy(new_policy)
+ url_by_name = '/users/%s' % self.user1['id']
+ r = self.get(url_by_name, auth=self.auth)
+ self.assertEqual(self.user1['id'], r.result['user']['id'])
+
+ def test_get_user_protected_match_target(self):
+ """GET /users/{id} (match target)
+
+ Test Plan:
+
+ - Update policy to protect api by domain_id
+ - Try and read a user who is in DomainB with a token scoped
+ to Domain A - this should fail
+ - Retry this for a user who is in Domain A, which should succeed.
+ - Finally, try getting a user that does not exist, which should
+ still return UserNotFound
+
+ """
+ new_policy = {'identity:get_user':
+ [["domain_id:%(target.user.domain_id)s"]]}
+ self._set_policy(new_policy)
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'],
+ domain_id=self.domainA['id'])
+ url_by_name = '/users/%s' % self.user2['id']
+ r = self.get(url_by_name, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ url_by_name = '/users/%s' % self.user1['id']
+ r = self.get(url_by_name, auth=self.auth)
+ self.assertEqual(self.user1['id'], r.result['user']['id'])
+
+ url_by_name = '/users/%s' % uuid.uuid4().hex
+ r = self.get(url_by_name, auth=self.auth,
+ expected_status=exception.UserNotFound.code)
+
+ def test_revoke_grant_protected_match_target(self):
+ """DELETE /domains/{id}/users/{id}/roles/{id} (match target)
+
+ Test Plan:
+
+ - Update policy to protect api by domain_id of entities in
+ the grant
+ - Try and delete the existing grant that has a user who is
+ from a different domain - this should fail.
+ - Retry this for a user who is in Domain A, which should succeed.
+
+ """
+ new_policy = {'identity:revoke_grant':
+ [["domain_id:%(target.user.domain_id)s"]]}
+ self._set_policy(new_policy)
+ collection_url = (
+ '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domainA['id'],
+ 'user_id': self.user2['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role['id']}
+
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'],
+ domain_id=self.domainA['id'])
+ self.delete(member_url, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ collection_url = (
+ '/domains/%(domain_id)s/users/%(user_id)s/roles' % {
+ 'domain_id': self.domainA['id'],
+ 'user_id': self.user1['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': self.role1['id']}
+ self.delete(member_url, auth=self.auth)
+
+ def test_list_users_protected_by_domain(self):
+ """GET /users?domain_id=mydomain (protected)
+
+ Test Plan:
+
+ - Update policy to protect api by domain_id
+ - List groups using a token scoped to domainA with a filter
+ specifying domainA - we should only get back the one user
+ that is in domainA.
+ - Try and read the users from domainB - this should fail since
+ we don't have a token scoped for domainB
+
+ """
+ new_policy = {"identity:list_users": ["domain_id:%(domain_id)s"]}
+ self._set_policy(new_policy)
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'],
+ domain_id=self.domainA['id'])
+ url_by_name = '/users?domain_id=%s' % self.domainA['id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should only get back one user, the one in DomainA
+ id_list = self._get_id_list_from_ref_list(r.result.get('users'))
+ self.assertEqual(1, len(id_list))
+ self.assertIn(self.user1['id'], id_list)
+
+ # Now try for domainB, which should fail
+ url_by_name = '/users?domain_id=%s' % self.domainB['id']
+ r = self.get(url_by_name, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_list_groups_protected_by_domain(self):
+ """GET /groups?domain_id=mydomain (protected)
+
+ Test Plan:
+
+ - Update policy to protect api by domain_id
+ - List groups using a token scoped to domainA and make sure
+ we only get back the two groups that are in domainA
+ - Try and read the groups from domainB - this should fail since
+ we don't have a token scoped for domainB
+
+ """
+ new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
+ self._set_policy(new_policy)
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'],
+ domain_id=self.domainA['id'])
+ url_by_name = '/groups?domain_id=%s' % self.domainA['id']
+ r = self.get(url_by_name, auth=self.auth)
+ # We should only get back two groups, the ones in DomainA
+ id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
+ self.assertEqual(2, len(id_list))
+ self.assertIn(self.group1['id'], id_list)
+ self.assertIn(self.group2['id'], id_list)
+
+ # Now try for domainB, which should fail
+ url_by_name = '/groups?domain_id=%s' % self.domainB['id']
+ r = self.get(url_by_name, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_list_groups_protected_by_domain_and_filtered(self):
+ """GET /groups?domain_id=mydomain&name=myname (protected)
+
+ Test Plan:
+
+ - Update policy to protect api by domain_id
+ - List groups using a token scoped to domainA with a filter
+ specifying both domainA and the name of group.
+ - We should only get back the group in domainA that matches
+ the name
+
+ """
+ new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]}
+ self._set_policy(new_policy)
+ self.auth = self.build_authentication_request(
+ user_id=self.user1['id'],
+ password=self.user1['password'],
+ domain_id=self.domainA['id'])
+ url_by_name = '/groups?domain_id=%s&name=%s' % (
+ self.domainA['id'], self.group2['name'])
+ r = self.get(url_by_name, auth=self.auth)
+ # We should only get back one user, the one in DomainA that matches
+ # the name supplied
+ id_list = self._get_id_list_from_ref_list(r.result.get('groups'))
+ self.assertEqual(1, len(id_list))
+ self.assertIn(self.group2['id'], id_list)
+
+
+class IdentityTestPolicySample(test_v3.RestfulTestCase):
+ """Test policy enforcement of the policy.json file."""
+
+ def load_sample_data(self):
+ self._populate_default_domain()
+
+ self.just_a_user = self.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ password = uuid.uuid4().hex
+ self.just_a_user['password'] = password
+ self.just_a_user = self.identity_api.create_user(self.just_a_user)
+ self.just_a_user['password'] = password
+
+ self.another_user = self.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ password = uuid.uuid4().hex
+ self.another_user['password'] = password
+ self.another_user = self.identity_api.create_user(self.another_user)
+ self.another_user['password'] = password
+
+ self.admin_user = self.new_user_ref(
+ domain_id=CONF.identity.default_domain_id)
+ password = uuid.uuid4().hex
+ self.admin_user['password'] = password
+ self.admin_user = self.identity_api.create_user(self.admin_user)
+ self.admin_user['password'] = password
+
+ self.role = self.new_role_ref()
+ self.role_api.create_role(self.role['id'], self.role)
+ self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'}
+ self.role_api.create_role(self.admin_role['id'], self.admin_role)
+
+ # Create and assign roles to the project
+ self.project = self.new_project_ref(
+ domain_id=CONF.identity.default_domain_id)
+ self.resource_api.create_project(self.project['id'], self.project)
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.just_a_user['id'],
+ project_id=self.project['id'])
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.another_user['id'],
+ project_id=self.project['id'])
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=self.admin_user['id'],
+ project_id=self.project['id'])
+
+ def test_user_validate_same_token(self):
+ # Given a non-admin user token, the token can be used to validate
+ # itself.
+ # This is GET /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+ # FIXME(blk-u): This test fails, a user can't validate their own token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403.
+ self.get('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token}, expected_status=403)
+
+ def test_user_validate_user_token(self):
+ # A user can validate one of their own tokens.
+ # This is GET /v3/auth/tokens
+ # FIXME(blk-u): This test fails, a user can't validate their own token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403.
+ self.get('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2}, expected_status=403)
+
+ def test_user_validate_other_user_token_rejected(self):
+ # A user cannot validate another user's token.
+ # This is GET /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.another_user['id'],
+ password=self.another_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.get('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token}, expected_status=403)
+
+ def test_admin_validate_user_token(self):
+ # An admin can validate a user's token.
+ # This is GET /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.admin_user['id'],
+ password=self.admin_user['password'],
+ project_id=self.project['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.get('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token})
+
+ def test_user_check_same_token(self):
+ # Given a non-admin user token, the token can be used to check
+ # itself.
+ # This is HEAD /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+ # FIXME(blk-u): This test fails, a user can't check the same token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ # FIXME(blk-u): change to expected_status=200
+ self.head('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token}, expected_status=403)
+
+ def test_user_check_user_token(self):
+ # A user can check one of their own tokens.
+ # This is HEAD /v3/auth/tokens
+ # FIXME(blk-u): This test fails, a user can't check the same token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ # FIXME(blk-u): change to expected_status=200
+ self.head('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2}, expected_status=403)
+
+ def test_user_check_other_user_token_rejected(self):
+ # A user cannot check another user's token.
+ # This is HEAD /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.another_user['id'],
+ password=self.another_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.head('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token},
+ expected_status=403)
+
+ def test_admin_check_user_token(self):
+ # An admin can check a user's token.
+ # This is HEAD /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.admin_user['id'],
+ password=self.admin_user['password'],
+ project_id=self.project['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.head('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token}, expected_status=200)
+
+ def test_user_revoke_same_token(self):
+ # Given a non-admin user token, the token can be used to revoke
+ # itself.
+ # This is DELETE /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+ # FIXME(blk-u): This test fails, a user can't revoke the same token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403
+ self.delete('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token}, expected_status=403)
+
+ def test_user_revoke_user_token(self):
+ # A user can revoke one of their own tokens.
+ # This is DELETE /v3/auth/tokens
+ # FIXME(blk-u): This test fails, a user can't revoke the same token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403
+ self.delete('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2}, expected_status=403)
+
+ def test_user_revoke_other_user_token_rejected(self):
+ # A user cannot revoke another user's token.
+ # This is DELETE /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.another_user['id'],
+ password=self.another_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.delete('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token},
+ expected_status=403)
+
+ def test_admin_revoke_user_token(self):
+ # An admin can revoke a user's token.
+ # This is DELETE /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.admin_user['id'],
+ password=self.admin_user['password'],
+ project_id=self.project['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.delete('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token})
+
+
+class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase):
+ """Test policy enforcement of the sample v3 cloud policy file."""
+
+ def setUp(self):
+ """Setup for v3 Cloud Policy Sample Test Cases.
+
+ The following data is created:
+
+ - Three domains: domainA, domainB and admin_domain
+ - One project, which name is 'project'
+ - domainA has three users: domain_admin_user, project_admin_user and
+ just_a_user:
+
+ - domain_admin_user has role 'admin' on domainA,
+ - project_admin_user has role 'admin' on the project,
+ - just_a_user has a non-admin role on both domainA and the project.
+ - admin_domain has user cloud_admin_user, with an 'admin' role
+ on admin_domain.
+
+ We test various api protection rules from the cloud sample policy
+ file to make sure the sample is valid and that we correctly enforce it.
+
+ """
+ # Ensure that test_v3.RestfulTestCase doesn't load its own
+ # sample data, which would make checking the results of our
+ # tests harder
+ super(IdentityTestv3CloudPolicySample, self).setUp()
+
+ # Finally, switch to the v3 sample policy file
+ self.addCleanup(rules.reset)
+ rules.reset()
+ self.config_fixture.config(
+ group='oslo_policy',
+ policy_file=tests.dirs.etc('policy.v3cloudsample.json'))
+
+ def load_sample_data(self):
+ # Start by creating a couple of domains
+ self._populate_default_domain()
+ self.domainA = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainA['id'], self.domainA)
+ self.domainB = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainB['id'], self.domainB)
+ self.admin_domain = {'id': 'admin_domain_id', 'name': 'Admin_domain'}
+ self.resource_api.create_domain(self.admin_domain['id'],
+ self.admin_domain)
+
+ # And our users
+ self.cloud_admin_user = self.new_user_ref(
+ domain_id=self.admin_domain['id'])
+ password = uuid.uuid4().hex
+ self.cloud_admin_user['password'] = password
+ self.cloud_admin_user = (
+ self.identity_api.create_user(self.cloud_admin_user))
+ self.cloud_admin_user['password'] = password
+ self.just_a_user = self.new_user_ref(domain_id=self.domainA['id'])
+ password = uuid.uuid4().hex
+ self.just_a_user['password'] = password
+ self.just_a_user = self.identity_api.create_user(self.just_a_user)
+ self.just_a_user['password'] = password
+ self.domain_admin_user = self.new_user_ref(
+ domain_id=self.domainA['id'])
+ password = uuid.uuid4().hex
+ self.domain_admin_user['password'] = password
+ self.domain_admin_user = (
+ self.identity_api.create_user(self.domain_admin_user))
+ self.domain_admin_user['password'] = password
+ self.project_admin_user = self.new_user_ref(
+ domain_id=self.domainA['id'])
+ password = uuid.uuid4().hex
+ self.project_admin_user['password'] = password
+ self.project_admin_user = (
+ self.identity_api.create_user(self.project_admin_user))
+ self.project_admin_user['password'] = password
+
+ # The admin role and another plain role
+ self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'}
+ self.role_api.create_role(self.admin_role['id'], self.admin_role)
+ self.role = self.new_role_ref()
+ self.role_api.create_role(self.role['id'], self.role)
+
+ # The cloud admin just gets the admin role
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=self.cloud_admin_user['id'],
+ domain_id=self.admin_domain['id'])
+
+ # Assign roles to the domain
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=self.domain_admin_user['id'],
+ domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.just_a_user['id'],
+ domain_id=self.domainA['id'])
+
+ # Create and assign roles to the project
+ self.project = self.new_project_ref(domain_id=self.domainA['id'])
+ self.resource_api.create_project(self.project['id'], self.project)
+ self.assignment_api.create_grant(self.admin_role['id'],
+ user_id=self.project_admin_user['id'],
+ project_id=self.project['id'])
+ self.assignment_api.create_grant(self.role['id'],
+ user_id=self.just_a_user['id'],
+ project_id=self.project['id'])
+
+ def _stati(self, expected_status):
+ # Return the expected return codes for APIs with and without data
+ # with any specified status overriding the normal values
+ if expected_status is None:
+ return (200, 201, 204)
+ else:
+ return (expected_status, expected_status, expected_status)
+
+ def _test_user_management(self, domain_id, expected=None):
+ status_OK, status_created, status_no_data = self._stati(expected)
+ entity_url = '/users/%s' % self.just_a_user['id']
+ list_url = '/users?domain_id=%s' % domain_id
+
+ self.get(entity_url, auth=self.auth,
+ expected_status=status_OK)
+ self.get(list_url, auth=self.auth,
+ expected_status=status_OK)
+ user = {'description': 'Updated'}
+ self.patch(entity_url, auth=self.auth, body={'user': user},
+ expected_status=status_OK)
+ self.delete(entity_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ user_ref = self.new_user_ref(domain_id=domain_id)
+ self.post('/users', auth=self.auth, body={'user': user_ref},
+ expected_status=status_created)
+
+ def _test_project_management(self, domain_id, expected=None):
+ status_OK, status_created, status_no_data = self._stati(expected)
+ entity_url = '/projects/%s' % self.project['id']
+ list_url = '/projects?domain_id=%s' % domain_id
+
+ self.get(entity_url, auth=self.auth,
+ expected_status=status_OK)
+ self.get(list_url, auth=self.auth,
+ expected_status=status_OK)
+ project = {'description': 'Updated'}
+ self.patch(entity_url, auth=self.auth, body={'project': project},
+ expected_status=status_OK)
+ self.delete(entity_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ proj_ref = self.new_project_ref(domain_id=domain_id)
+ self.post('/projects', auth=self.auth, body={'project': proj_ref},
+ expected_status=status_created)
+
+ def _test_domain_management(self, expected=None):
+ status_OK, status_created, status_no_data = self._stati(expected)
+ entity_url = '/domains/%s' % self.domainB['id']
+ list_url = '/domains'
+
+ self.get(entity_url, auth=self.auth,
+ expected_status=status_OK)
+ self.get(list_url, auth=self.auth,
+ expected_status=status_OK)
+ domain = {'description': 'Updated', 'enabled': False}
+ self.patch(entity_url, auth=self.auth, body={'domain': domain},
+ expected_status=status_OK)
+ self.delete(entity_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ domain_ref = self.new_domain_ref()
+ self.post('/domains', auth=self.auth, body={'domain': domain_ref},
+ expected_status=status_created)
+
+ def _test_grants(self, target, entity_id, expected=None):
+ status_OK, status_created, status_no_data = self._stati(expected)
+ a_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
+ self.role_api.create_role(a_role['id'], a_role)
+
+ collection_url = (
+ '/%(target)s/%(target_id)s/users/%(user_id)s/roles' % {
+ 'target': target,
+ 'target_id': entity_id,
+ 'user_id': self.just_a_user['id']})
+ member_url = '%(collection_url)s/%(role_id)s' % {
+ 'collection_url': collection_url,
+ 'role_id': a_role['id']}
+
+ self.put(member_url, auth=self.auth,
+ expected_status=status_no_data)
+ self.head(member_url, auth=self.auth,
+ expected_status=status_no_data)
+ self.get(collection_url, auth=self.auth,
+ expected_status=status_OK)
+ self.delete(member_url, auth=self.auth,
+ expected_status=status_no_data)
+
+ def test_user_management(self):
+ # First, authenticate with a user that does not have the domain
+ # admin role - shouldn't be able to do much.
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_user_management(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ # Now, authenticate with a user that does have the domain admin role
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_user_management(self.domainA['id'])
+
+ def test_user_management_by_cloud_admin(self):
+ # Test users management with a cloud admin. This user should
+ # be able to manage users in any domain.
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ domain_id=self.admin_domain['id'])
+
+ self._test_user_management(self.domainA['id'])
+
+ def test_project_management(self):
+ # First, authenticate with a user that does not have the project
+ # admin role - shouldn't be able to do much.
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_project_management(
+ self.domainA['id'], expected=exception.ForbiddenAction.code)
+
+ # ...but should still be able to list projects of which they are
+ # a member
+ url = '/users/%s/projects' % self.just_a_user['id']
+ self.get(url, auth=self.auth)
+
+ # Now, authenticate with a user that does have the domain admin role
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_project_management(self.domainA['id'])
+
+ def test_project_management_by_cloud_admin(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ domain_id=self.admin_domain['id'])
+
+ # Check whether cloud admin can operate a domain
+ # other than its own domain or not
+ self._test_project_management(self.domainA['id'])
+
+ def test_domain_grants(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('domains', self.domainA['id'],
+ expected=exception.ForbiddenAction.code)
+
+ # Now, authenticate with a user that does have the domain admin role
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('domains', self.domainA['id'])
+
+ # Check that with such a token we cannot modify grants on a
+ # different domain
+ self._test_grants('domains', self.domainB['id'],
+ expected=exception.ForbiddenAction.code)
+
+ def test_domain_grants_by_cloud_admin(self):
+ # Test domain grants with a cloud admin. This user should be
+ # able to manage roles on any domain.
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ domain_id=self.admin_domain['id'])
+
+ self._test_grants('domains', self.domainA['id'])
+
+ def test_project_grants(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+
+ self._test_grants('projects', self.project['id'],
+ expected=exception.ForbiddenAction.code)
+
+ # Now, authenticate with a user that does have the project
+ # admin role
+ self.auth = self.build_authentication_request(
+ user_id=self.project_admin_user['id'],
+ password=self.project_admin_user['password'],
+ project_id=self.project['id'])
+
+ self._test_grants('projects', self.project['id'])
+
+ def test_project_grants_by_domain_admin(self):
+ # Test project grants with a domain admin. This user should be
+ # able to manage roles on any project in its own domain.
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_grants('projects', self.project['id'])
+
+ def test_cloud_admin(self):
+ self.auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+
+ self._test_domain_management(
+ expected=exception.ForbiddenAction.code)
+
+ self.auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ domain_id=self.admin_domain['id'])
+
+ self._test_domain_management()
+
+ def test_list_user_credentials(self):
+ self.credential_user = self.new_credential_ref(self.just_a_user['id'])
+ self.credential_api.create_credential(self.credential_user['id'],
+ self.credential_user)
+ self.credential_admin = self.new_credential_ref(
+ self.cloud_admin_user['id'])
+ self.credential_api.create_credential(self.credential_admin['id'],
+ self.credential_admin)
+
+ self.auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ url = '/credentials?user_id=%s' % self.just_a_user['id']
+ self.get(url, auth=self.auth)
+ url = '/credentials?user_id=%s' % self.cloud_admin_user['id']
+ self.get(url, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+ url = '/credentials'
+ self.get(url, auth=self.auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ def test_get_and_delete_ec2_credentials(self):
+ """Tests getting and deleting ec2 credentials through the ec2 API."""
+ another_user = self.new_user_ref(domain_id=self.domainA['id'])
+ password = another_user['password']
+ another_user = self.identity_api.create_user(another_user)
+
+ # create a credential for just_a_user
+ just_user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'],
+ project_id=self.project['id'])
+ url = '/users/%s/credentials/OS-EC2' % self.just_a_user['id']
+ r = self.post(url, body={'tenant_id': self.project['id']},
+ auth=just_user_auth)
+
+ # another normal user can't get the credential
+ another_user_auth = self.build_authentication_request(
+ user_id=another_user['id'],
+ password=password)
+ another_user_url = '/users/%s/credentials/OS-EC2/%s' % (
+ another_user['id'], r.result['credential']['access'])
+ self.get(another_user_url, auth=another_user_auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ # the owner can get the credential
+ just_user_url = '/users/%s/credentials/OS-EC2/%s' % (
+ self.just_a_user['id'], r.result['credential']['access'])
+ self.get(just_user_url, auth=just_user_auth)
+
+ # another normal user can't delete the credential
+ self.delete(another_user_url, auth=another_user_auth,
+ expected_status=exception.ForbiddenAction.code)
+
+ # the owner can get the credential
+ self.delete(just_user_url, auth=just_user_auth)
+
+ def test_user_validate_same_token(self):
+ # Given a non-admin user token, the token can be used to validate
+ # itself.
+ # This is GET /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+ # FIXME(blk-u): This test fails, a user can't validate their own token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403.
+ self.get('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token}, expected_status=403)
+
+ def test_user_validate_user_token(self):
+ # A user can validate one of their own tokens.
+ # This is GET /v3/auth/tokens
+ # FIXME(blk-u): This test fails, a user can't validate their own token,
+ # see bug 1421825.
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ # FIXME(blk-u): remove expected_status=403.
+ self.get('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2}, expected_status=403)
+
+ def test_user_validate_other_user_token_rejected(self):
+ # A user cannot validate another user's token.
+ # This is GET /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.get('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token}, expected_status=403)
+
+ def test_admin_validate_user_token(self):
+ # An admin can validate a user's token.
+ # This is GET /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'],
+ domain_id=self.admin_domain['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.get('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token})
+
+ def test_user_check_same_token(self):
+ # Given a non-admin user token, the token can be used to check
+ # itself.
+ # This is HEAD /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ self.head('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token}, expected_status=200)
+
+ def test_user_check_user_token(self):
+ # A user can check one of their own tokens.
+ # This is HEAD /v3/auth/tokens
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ self.head('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2}, expected_status=200)
+
+ def test_user_check_other_user_token_rejected(self):
+ # A user cannot check another user's token.
+ # This is HEAD /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.head('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token},
+ expected_status=403)
+
+ def test_admin_check_user_token(self):
+ # An admin can check a user's token.
+ # This is HEAD /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.head('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token}, expected_status=200)
+
+ def test_user_revoke_same_token(self):
+ # Given a non-admin user token, the token can be used to revoke
+ # itself.
+ # This is DELETE /v3/auth/tokens, with X-Auth-Token == X-Subject-Token
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token = self.get_requested_token(auth)
+
+ self.delete('/auth/tokens', token=token,
+ headers={'X-Subject-Token': token})
+
+ def test_user_revoke_user_token(self):
+ # A user can revoke one of their own tokens.
+ # This is DELETE /v3/auth/tokens
+
+ auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ token1 = self.get_requested_token(auth)
+ token2 = self.get_requested_token(auth)
+
+ self.delete('/auth/tokens', token=token1,
+ headers={'X-Subject-Token': token2})
+
+ def test_user_revoke_other_user_token_rejected(self):
+ # A user cannot revoke another user's token.
+ # This is DELETE /v3/auth/tokens
+
+ user1_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user1_token = self.get_requested_token(user1_auth)
+
+ user2_auth = self.build_authentication_request(
+ user_id=self.cloud_admin_user['id'],
+ password=self.cloud_admin_user['password'])
+ user2_token = self.get_requested_token(user2_auth)
+
+ self.delete('/auth/tokens', token=user1_token,
+ headers={'X-Subject-Token': user2_token},
+ expected_status=403)
+
+ def test_admin_revoke_user_token(self):
+ # An admin can revoke a user's token.
+ # This is DELETE /v3/auth/tokens
+
+ admin_auth = self.build_authentication_request(
+ user_id=self.domain_admin_user['id'],
+ password=self.domain_admin_user['password'],
+ domain_id=self.domainA['id'])
+ admin_token = self.get_requested_token(admin_auth)
+
+ user_auth = self.build_authentication_request(
+ user_id=self.just_a_user['id'],
+ password=self.just_a_user['password'])
+ user_token = self.get_requested_token(user_auth)
+
+ self.delete('/auth/tokens', token=admin_token,
+ headers={'X-Subject-Token': user_token})