diff options
author | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
---|---|---|
committer | RHE <rebirthmonkey@gmail.com> | 2017-11-24 13:54:26 +0100 |
commit | 920a49cfa055733d575282973e23558c33087a4a (patch) | |
tree | d371dab34efa5028600dad2e7ca58063626e7ba4 /keystone-moon/keystone/tests/unit/test_v3_protection.py | |
parent | ef3eefca70d8abb4a00dafb9419ad32738e934b2 (diff) |
remove keystone-moon
Change-Id: I80d7c9b669f19d5f6607e162de8e0e55c2f80fdd
Signed-off-by: RHE <rebirthmonkey@gmail.com>
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_protection.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_protection.py | 1777 |
1 files changed, 0 insertions, 1777 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py deleted file mode 100644 index f77a1528..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_protection.py +++ /dev/null @@ -1,1777 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# Copyright 2013 IBM Corp. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import uuid - -from oslo_config import cfg -from oslo_serialization import jsonutils -from six.moves import http_client - -from keystone import exception -from keystone.tests import unit -from keystone.tests.unit import ksfixtures -from keystone.tests.unit.ksfixtures import temporaryfile -from keystone.tests.unit import test_v3 -from keystone.tests.unit import utils - - -CONF = cfg.CONF - - -class IdentityTestProtectedCase(test_v3.RestfulTestCase): - """Test policy enforcement on the v3 Identity API.""" - - def _policy_fixture(self): - return ksfixtures.Policy(self.tmpfilename, self.config_fixture) - - def setUp(self): - """Setup for Identity Protection Test Cases. - - As well as the usual housekeeping, create a set of domains, - users, roles and projects for the subsequent tests: - - - Three domains: A,B & C. C is disabled. - - DomainA has user1, DomainB has user2 and user3 - - DomainA has group1 and group2, DomainB has group3 - - User1 has two roles on DomainA - - User2 has one role on DomainA - - Remember that there will also be a fourth domain in existence, - the default domain. - - """ - self.tempfile = self.useFixture(temporaryfile.SecureTempFile()) - self.tmpfilename = self.tempfile.file_name - super(IdentityTestProtectedCase, self).setUp() - - # A default auth request we can use - un-scoped user token - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password']) - - def load_sample_data(self): - self._populate_default_domain() - # Start by creating a couple of domains - self.domainA = unit.new_domain_ref() - self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = unit.new_domain_ref() - self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.domainC = unit.new_domain_ref(enabled=False) - self.resource_api.create_domain(self.domainC['id'], self.domainC) - - # Now create some users, one in domainA and two of them in domainB - self.user1 = unit.create_user(self.identity_api, - domain_id=self.domainA['id']) - self.user2 = unit.create_user(self.identity_api, - domain_id=self.domainB['id']) - self.user3 = unit.create_user(self.identity_api, - domain_id=self.domainB['id']) - - self.group1 = unit.new_group_ref(domain_id=self.domainA['id']) - self.group1 = self.identity_api.create_group(self.group1) - - self.group2 = unit.new_group_ref(domain_id=self.domainA['id']) - self.group2 = self.identity_api.create_group(self.group2) - - self.group3 = unit.new_group_ref(domain_id=self.domainB['id']) - self.group3 = self.identity_api.create_group(self.group3) - - self.role = unit.new_role_ref() - self.role_api.create_role(self.role['id'], self.role) - self.role1 = unit.new_role_ref() - self.role_api.create_role(self.role1['id'], self.role1) - self.assignment_api.create_grant(self.role['id'], - user_id=self.user1['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.role['id'], - user_id=self.user2['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.role1['id'], - user_id=self.user1['id'], - domain_id=self.domainA['id']) - - def _get_id_list_from_ref_list(self, ref_list): - result_list = [] - for x in ref_list: - result_list.append(x['id']) - return result_list - - def _set_policy(self, new_policy): - with open(self.tmpfilename, "w") as policyfile: - policyfile.write(jsonutils.dumps(new_policy)) - - def test_list_users_unprotected(self): - """GET /users (unprotected) - - Test Plan: - - - Update policy so api is unprotected - - Use an un-scoped token to make sure we can get back all - the users independent of domain - - """ - self._set_policy({"identity:list_users": []}) - r = self.get('/users', auth=self.auth) - id_list = self._get_id_list_from_ref_list(r.result.get('users')) - self.assertIn(self.user1['id'], id_list) - self.assertIn(self.user2['id'], id_list) - self.assertIn(self.user3['id'], id_list) - - def test_list_users_filtered_by_domain(self): - """GET /users?domain_id=mydomain (filtered) - - Test Plan: - - - Update policy so api is unprotected - - Use an un-scoped token to make sure we can filter the - users by domainB, getting back the 2 users in that domain - - """ - self._set_policy({"identity:list_users": []}) - url_by_name = '/users?domain_id=%s' % self.domainB['id'] - r = self.get(url_by_name, auth=self.auth) - # We should get back two users, those in DomainB - id_list = self._get_id_list_from_ref_list(r.result.get('users')) - self.assertIn(self.user2['id'], id_list) - self.assertIn(self.user3['id'], id_list) - - def test_get_user_protected_match_id(self): - """GET /users/{id} (match payload) - - Test Plan: - - - Update policy to protect api by user_id - - List users with user_id of user1 as filter, to check that - this will correctly match user_id in the flattened - payload - - """ - # TODO(henry-nash, ayoung): It would be good to expand this - # test for further test flattening, e.g. protect on, say, an - # attribute of an object being created - new_policy = {"identity:get_user": [["user_id:%(user_id)s"]]} - self._set_policy(new_policy) - url_by_name = '/users/%s' % self.user1['id'] - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(self.user1['id'], r.result['user']['id']) - - def test_get_user_protected_match_target(self): - """GET /users/{id} (match target) - - Test Plan: - - - Update policy to protect api by domain_id - - Try and read a user who is in DomainB with a token scoped - to Domain A - this should fail - - Retry this for a user who is in Domain A, which should succeed. - - Finally, try getting a user that does not exist, which should - still return UserNotFound - - """ - new_policy = {'identity:get_user': - [["domain_id:%(target.user.domain_id)s"]]} - self._set_policy(new_policy) - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password'], - domain_id=self.domainA['id']) - url_by_name = '/users/%s' % self.user2['id'] - r = self.get(url_by_name, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - url_by_name = '/users/%s' % self.user1['id'] - r = self.get(url_by_name, auth=self.auth) - self.assertEqual(self.user1['id'], r.result['user']['id']) - - url_by_name = '/users/%s' % uuid.uuid4().hex - r = self.get(url_by_name, auth=self.auth, - expected_status=exception.UserNotFound.code) - - def test_revoke_grant_protected_match_target(self): - """DELETE /domains/{id}/users/{id}/roles/{id} (match target) - - Test Plan: - - - Update policy to protect api by domain_id of entities in - the grant - - Try and delete the existing grant that has a user who is - from a different domain - this should fail. - - Retry this for a user who is in Domain A, which should succeed. - - """ - new_policy = {'identity:revoke_grant': - [["domain_id:%(target.user.domain_id)s"]]} - self._set_policy(new_policy) - collection_url = ( - '/domains/%(domain_id)s/users/%(user_id)s/roles' % { - 'domain_id': self.domainA['id'], - 'user_id': self.user2['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': self.role['id']} - - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password'], - domain_id=self.domainA['id']) - self.delete(member_url, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - collection_url = ( - '/domains/%(domain_id)s/users/%(user_id)s/roles' % { - 'domain_id': self.domainA['id'], - 'user_id': self.user1['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': self.role1['id']} - self.delete(member_url, auth=self.auth) - - def test_list_users_protected_by_domain(self): - """GET /users?domain_id=mydomain (protected) - - Test Plan: - - - Update policy to protect api by domain_id - - List groups using a token scoped to domainA with a filter - specifying domainA - we should only get back the one user - that is in domainA. - - Try and read the users from domainB - this should fail since - we don't have a token scoped for domainB - - """ - new_policy = {"identity:list_users": ["domain_id:%(domain_id)s"]} - self._set_policy(new_policy) - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password'], - domain_id=self.domainA['id']) - url_by_name = '/users?domain_id=%s' % self.domainA['id'] - r = self.get(url_by_name, auth=self.auth) - # We should only get back one user, the one in DomainA - id_list = self._get_id_list_from_ref_list(r.result.get('users')) - self.assertEqual(1, len(id_list)) - self.assertIn(self.user1['id'], id_list) - - # Now try for domainB, which should fail - url_by_name = '/users?domain_id=%s' % self.domainB['id'] - r = self.get(url_by_name, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - def test_list_groups_protected_by_domain(self): - """GET /groups?domain_id=mydomain (protected) - - Test Plan: - - - Update policy to protect api by domain_id - - List groups using a token scoped to domainA and make sure - we only get back the two groups that are in domainA - - Try and read the groups from domainB - this should fail since - we don't have a token scoped for domainB - - """ - new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]} - self._set_policy(new_policy) - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password'], - domain_id=self.domainA['id']) - url_by_name = '/groups?domain_id=%s' % self.domainA['id'] - r = self.get(url_by_name, auth=self.auth) - # We should only get back two groups, the ones in DomainA - id_list = self._get_id_list_from_ref_list(r.result.get('groups')) - self.assertEqual(2, len(id_list)) - self.assertIn(self.group1['id'], id_list) - self.assertIn(self.group2['id'], id_list) - - # Now try for domainB, which should fail - url_by_name = '/groups?domain_id=%s' % self.domainB['id'] - r = self.get(url_by_name, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - def test_list_groups_protected_by_domain_and_filtered(self): - """GET /groups?domain_id=mydomain&name=myname (protected) - - Test Plan: - - - Update policy to protect api by domain_id - - List groups using a token scoped to domainA with a filter - specifying both domainA and the name of group. - - We should only get back the group in domainA that matches - the name - - """ - new_policy = {"identity:list_groups": ["domain_id:%(domain_id)s"]} - self._set_policy(new_policy) - self.auth = self.build_authentication_request( - user_id=self.user1['id'], - password=self.user1['password'], - domain_id=self.domainA['id']) - url_by_name = '/groups?domain_id=%s&name=%s' % ( - self.domainA['id'], self.group2['name']) - r = self.get(url_by_name, auth=self.auth) - # We should only get back one user, the one in DomainA that matches - # the name supplied - id_list = self._get_id_list_from_ref_list(r.result.get('groups')) - self.assertEqual(1, len(id_list)) - self.assertIn(self.group2['id'], id_list) - - -class IdentityTestPolicySample(test_v3.RestfulTestCase): - """Test policy enforcement of the policy.json file.""" - - def load_sample_data(self): - self._populate_default_domain() - - self.just_a_user = unit.create_user( - self.identity_api, - domain_id=CONF.identity.default_domain_id) - self.another_user = unit.create_user( - self.identity_api, - domain_id=CONF.identity.default_domain_id) - self.admin_user = unit.create_user( - self.identity_api, - domain_id=CONF.identity.default_domain_id) - - self.role = unit.new_role_ref() - self.role_api.create_role(self.role['id'], self.role) - self.admin_role = unit.new_role_ref(name='admin') - self.role_api.create_role(self.admin_role['id'], self.admin_role) - - # Create and assign roles to the project - self.project = unit.new_project_ref( - domain_id=CONF.identity.default_domain_id) - self.resource_api.create_project(self.project['id'], self.project) - self.assignment_api.create_grant(self.role['id'], - user_id=self.just_a_user['id'], - project_id=self.project['id']) - self.assignment_api.create_grant(self.role['id'], - user_id=self.another_user['id'], - project_id=self.project['id']) - self.assignment_api.create_grant(self.admin_role['id'], - user_id=self.admin_user['id'], - project_id=self.project['id']) - - def test_user_validate_same_token(self): - # Given a non-admin user token, the token can be used to validate - # itself. - # This is GET /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.get('/auth/tokens', token=token, - headers={'X-Subject-Token': token}) - - def test_user_validate_user_token(self): - # A user can validate one of their own tokens. - # This is GET /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.get('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}) - - def test_user_validate_other_user_token_rejected(self): - # A user cannot validate another user's token. - # This is GET /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.another_user['id'], - password=self.another_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.get('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_validate_user_token(self): - # An admin can validate a user's token. - # This is GET /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.admin_user['id'], - password=self.admin_user['password'], - project_id=self.project['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.get('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}) - - def test_user_check_same_token(self): - # Given a non-admin user token, the token can be used to check - # itself. - # This is HEAD /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, - expected_status=http_client.OK) - - def test_user_check_user_token(self): - # A user can check one of their own tokens. - # This is HEAD /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) - - def test_user_check_other_user_token_rejected(self): - # A user cannot check another user's token. - # This is HEAD /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.another_user['id'], - password=self.another_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.head('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_check_user_token(self): - # An admin can check a user's token. - # This is HEAD /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.admin_user['id'], - password=self.admin_user['password'], - project_id=self.project['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, - expected_status=http_client.OK) - - def test_user_revoke_same_token(self): - # Given a non-admin user token, the token can be used to revoke - # itself. - # This is DELETE /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.delete('/auth/tokens', token=token, - headers={'X-Subject-Token': token}) - - def test_user_revoke_user_token(self): - # A user can revoke one of their own tokens. - # This is DELETE /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.delete('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}) - - def test_user_revoke_other_user_token_rejected(self): - # A user cannot revoke another user's token. - # This is DELETE /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.another_user['id'], - password=self.another_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.delete('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_revoke_user_token(self): - # An admin can revoke a user's token. - # This is DELETE /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.admin_user['id'], - password=self.admin_user['password'], - project_id=self.project['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.delete('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}) - - -class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, - test_v3.AssignmentTestMixin): - """Test policy enforcement of the sample v3 cloud policy file.""" - - def _policy_fixture(self): - return ksfixtures.Policy(unit.dirs.etc('policy.v3cloudsample.json'), - self.config_fixture) - - def setUp(self): - """Setup for v3 Cloud Policy Sample Test Cases. - - The following data is created: - - - Three domains: domainA, domainB and admin_domain - - One project, which name is 'project' - - domainA has three users: domain_admin_user, project_admin_user and - just_a_user: - - - domain_admin_user has role 'admin' on domainA, - - project_admin_user has role 'admin' on the project, - - just_a_user has a non-admin role on both domainA and the project. - - admin_domain has admin_project, and user cloud_admin_user, with an - 'admin' role on admin_project. - - We test various api protection rules from the cloud sample policy - file to make sure the sample is valid and that we correctly enforce it. - - """ - # Ensure that test_v3.RestfulTestCase doesn't load its own - # sample data, which would make checking the results of our - # tests harder - super(IdentityTestv3CloudPolicySample, self).setUp() - - self.config_fixture.config( - group='resource', - admin_project_name=self.admin_project['name']) - self.config_fixture.config( - group='resource', - admin_project_domain_name=self.admin_domain['name']) - - def load_sample_data(self): - # Start by creating a couple of domains - self._populate_default_domain() - self.domainA = unit.new_domain_ref() - self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = unit.new_domain_ref() - self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.admin_domain = unit.new_domain_ref() - self.resource_api.create_domain(self.admin_domain['id'], - self.admin_domain) - - self.admin_project = unit.new_project_ref( - domain_id=self.admin_domain['id']) - self.resource_api.create_project(self.admin_project['id'], - self.admin_project) - - # And our users - self.cloud_admin_user = unit.create_user( - self.identity_api, - domain_id=self.admin_domain['id']) - self.just_a_user = unit.create_user( - self.identity_api, - domain_id=self.domainA['id']) - self.domain_admin_user = unit.create_user( - self.identity_api, - domain_id=self.domainA['id']) - self.domainB_admin_user = unit.create_user( - self.identity_api, - domain_id=self.domainB['id']) - self.project_admin_user = unit.create_user( - self.identity_api, - domain_id=self.domainA['id']) - self.project_adminB_user = unit.create_user( - self.identity_api, - domain_id=self.domainB['id']) - - # The admin role, a domain specific role and another plain role - self.admin_role = unit.new_role_ref(name='admin') - self.role_api.create_role(self.admin_role['id'], self.admin_role) - self.roleA = unit.new_role_ref(domain_id=self.domainA['id']) - self.role_api.create_role(self.roleA['id'], self.roleA) - self.role = unit.new_role_ref() - self.role_api.create_role(self.role['id'], self.role) - - # The cloud admin just gets the admin role on the special admin project - self.assignment_api.create_grant(self.admin_role['id'], - user_id=self.cloud_admin_user['id'], - project_id=self.admin_project['id']) - - # Assign roles to the domain - self.assignment_api.create_grant(self.admin_role['id'], - user_id=self.domain_admin_user['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.role['id'], - user_id=self.just_a_user['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.admin_role['id'], - user_id=self.domainB_admin_user['id'], - domain_id=self.domainB['id']) - - # Create and assign roles to the project - self.project = unit.new_project_ref(domain_id=self.domainA['id']) - self.resource_api.create_project(self.project['id'], self.project) - self.projectB = unit.new_project_ref(domain_id=self.domainB['id']) - self.resource_api.create_project(self.projectB['id'], self.projectB) - self.assignment_api.create_grant(self.admin_role['id'], - user_id=self.project_admin_user['id'], - project_id=self.project['id']) - self.assignment_api.create_grant( - self.admin_role['id'], user_id=self.project_adminB_user['id'], - project_id=self.projectB['id']) - self.assignment_api.create_grant(self.role['id'], - user_id=self.just_a_user['id'], - project_id=self.project['id']) - - def _stati(self, expected_status): - # Return the expected return codes for APIs with and without data - # with any specified status overriding the normal values - if expected_status is None: - return (http_client.OK, http_client.CREATED, - http_client.NO_CONTENT) - else: - return (expected_status, expected_status, expected_status) - - def _test_user_management(self, domain_id, expected=None): - status_OK, status_created, status_no_data = self._stati(expected) - entity_url = '/users/%s' % self.just_a_user['id'] - list_url = '/users?domain_id=%s' % domain_id - - self.get(entity_url, auth=self.auth, - expected_status=status_OK) - self.get(list_url, auth=self.auth, - expected_status=status_OK) - user = {'description': 'Updated'} - self.patch(entity_url, auth=self.auth, body={'user': user}, - expected_status=status_OK) - self.delete(entity_url, auth=self.auth, - expected_status=status_no_data) - - user_ref = unit.new_user_ref(domain_id=domain_id) - self.post('/users', auth=self.auth, body={'user': user_ref}, - expected_status=status_created) - - def _test_project_management(self, domain_id, expected=None): - status_OK, status_created, status_no_data = self._stati(expected) - entity_url = '/projects/%s' % self.project['id'] - list_url = '/projects?domain_id=%s' % domain_id - - self.get(entity_url, auth=self.auth, - expected_status=status_OK) - self.get(list_url, auth=self.auth, - expected_status=status_OK) - project = {'description': 'Updated'} - self.patch(entity_url, auth=self.auth, body={'project': project}, - expected_status=status_OK) - self.delete(entity_url, auth=self.auth, - expected_status=status_no_data) - - proj_ref = unit.new_project_ref(domain_id=domain_id) - self.post('/projects', auth=self.auth, body={'project': proj_ref}, - expected_status=status_created) - - def _test_domain_management(self, expected=None): - status_OK, status_created, status_no_data = self._stati(expected) - entity_url = '/domains/%s' % self.domainB['id'] - list_url = '/domains' - - self.get(entity_url, auth=self.auth, - expected_status=status_OK) - self.get(list_url, auth=self.auth, - expected_status=status_OK) - domain = {'description': 'Updated', 'enabled': False} - self.patch(entity_url, auth=self.auth, body={'domain': domain}, - expected_status=status_OK) - self.delete(entity_url, auth=self.auth, - expected_status=status_no_data) - - domain_ref = unit.new_domain_ref() - self.post('/domains', auth=self.auth, body={'domain': domain_ref}, - expected_status=status_created) - - def _test_grants(self, target, entity_id, role_domain_id=None, - list_status_OK=False, expected=None): - status_OK, status_created, status_no_data = self._stati(expected) - a_role = unit.new_role_ref(domain_id=role_domain_id) - self.role_api.create_role(a_role['id'], a_role) - - collection_url = ( - '/%(target)s/%(target_id)s/users/%(user_id)s/roles' % { - 'target': target, - 'target_id': entity_id, - 'user_id': self.just_a_user['id']}) - member_url = '%(collection_url)s/%(role_id)s' % { - 'collection_url': collection_url, - 'role_id': a_role['id']} - - self.put(member_url, auth=self.auth, - expected_status=status_no_data) - self.head(member_url, auth=self.auth, - expected_status=status_no_data) - if list_status_OK: - self.get(collection_url, auth=self.auth) - else: - self.get(collection_url, auth=self.auth, - expected_status=status_OK) - self.delete(member_url, auth=self.auth, - expected_status=status_no_data) - - def _role_management_cases(self, read_status_OK=False, expected=None): - # Set the different status values for different types of call depending - # on whether we expect the calls to fail or not. - status_OK, status_created, status_no_data = self._stati(expected) - entity_url = '/roles/%s' % self.role['id'] - list_url = '/roles' - - if read_status_OK: - self.get(entity_url, auth=self.auth) - self.get(list_url, auth=self.auth) - else: - self.get(entity_url, auth=self.auth, - expected_status=status_OK) - self.get(list_url, auth=self.auth, - expected_status=status_OK) - - role = {'name': 'Updated'} - self.patch(entity_url, auth=self.auth, body={'role': role}, - expected_status=status_OK) - self.delete(entity_url, auth=self.auth, - expected_status=status_no_data) - - role_ref = unit.new_role_ref() - self.post('/roles', auth=self.auth, body={'role': role_ref}, - expected_status=status_created) - - def _domain_role_management_cases(self, domain_id, read_status_OK=False, - expected=None): - # Set the different status values for different types of call depending - # on whether we expect the calls to fail or not. - status_OK, status_created, status_no_data = self._stati(expected) - entity_url = '/roles/%s' % self.roleA['id'] - list_url = '/roles?domain_id=%s' % domain_id - - if read_status_OK: - self.get(entity_url, auth=self.auth) - self.get(list_url, auth=self.auth) - else: - self.get(entity_url, auth=self.auth, - expected_status=status_OK) - self.get(list_url, auth=self.auth, - expected_status=status_OK) - - role = {'name': 'Updated'} - self.patch(entity_url, auth=self.auth, body={'role': role}, - expected_status=status_OK) - self.delete(entity_url, auth=self.auth, - expected_status=status_no_data) - - role_ref = unit.new_role_ref(domain_id=domain_id) - self.post('/roles', auth=self.auth, body={'role': role_ref}, - expected_status=status_created) - - def test_user_management(self): - # First, authenticate with a user that does not have the domain - # admin role - shouldn't be able to do much. - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._test_user_management( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - # Now, authenticate with a user that does have the domain admin role - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_user_management(self.domainA['id']) - - def test_user_management_normalized_keys(self): - """Illustrate the inconsistent handling of hyphens in keys. - - To quote Morgan in bug 1526244: - - the reason this is converted from "domain-id" to "domain_id" is - because of how we process/normalize data. The way we have to handle - specific data types for known columns requires avoiding "-" in the - actual python code since "-" is not valid for attributes in python - w/o significant use of "getattr" etc. - - In short, historically we handle some things in conversions. The - use of "extras" has long been a poor design choice that leads to - odd/strange inconsistent behaviors because of other choices made in - handling data from within the body. (In many cases we convert from - "-" to "_" throughout openstack) - - Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9 - - """ - # Authenticate with a user that has the domain admin role - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - # Show that we can read a normal user without any surprises. - r = self.get( - '/users/%s' % self.just_a_user['id'], - auth=self.auth, - expected_status=http_client.OK) - self.assertValidUserResponse(r) - - # We don't normalize query string keys, so both of these result in a - # 403, because we didn't specify a domain_id query string in either - # case, and we explicitly require one (it doesn't matter what - # 'domain-id' value you use). - self.get( - '/users?domain-id=%s' % self.domainA['id'], - auth=self.auth, - expected_status=exception.ForbiddenAction.code) - self.get( - '/users?domain-id=%s' % self.domainB['id'], - auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - # If we try updating the user's 'domain_id' by specifying a - # 'domain-id', then it'll be stored into extras rather than normalized, - # and the user's actual 'domain_id' is not affected. - r = self.patch( - '/users/%s' % self.just_a_user['id'], - auth=self.auth, - body={'user': {'domain-id': self.domainB['id']}}, - expected_status=http_client.OK) - self.assertEqual(self.domainB['id'], r.json['user']['domain-id']) - self.assertEqual(self.domainA['id'], r.json['user']['domain_id']) - self.assertNotEqual(self.domainB['id'], self.just_a_user['domain_id']) - self.assertValidUserResponse(r, self.just_a_user) - - # Finally, show that we can create a new user without any surprises. - # But if we specify a 'domain-id' instead of a 'domain_id', we get a - # Forbidden response because we fail a policy check before - # normalization occurs. - user_ref = unit.new_user_ref(domain_id=self.domainA['id']) - r = self.post( - '/users', - auth=self.auth, - body={'user': user_ref}, - expected_status=http_client.CREATED) - self.assertValidUserResponse(r, ref=user_ref) - user_ref['domain-id'] = user_ref.pop('domain_id') - self.post( - '/users', - auth=self.auth, - body={'user': user_ref}, - expected_status=exception.ForbiddenAction.code) - - def test_user_management_by_cloud_admin(self): - # Test users management with a cloud admin. This user should - # be able to manage users in any domain. - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._test_user_management(self.domainA['id']) - - def test_project_management(self): - # First, authenticate with a user that does not have the project - # admin role - shouldn't be able to do much. - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._test_project_management( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - # ...but should still be able to list projects of which they are - # a member - url = '/users/%s/projects' % self.just_a_user['id'] - self.get(url, auth=self.auth) - - # Now, authenticate with a user that does have the domain admin role - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_project_management(self.domainA['id']) - - def test_project_management_by_cloud_admin(self): - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - # Check whether cloud admin can operate a domain - # other than its own domain or not - self._test_project_management(self.domainA['id']) - - def test_domain_grants(self): - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('domains', self.domainA['id'], - expected=exception.ForbiddenAction.code) - - # Now, authenticate with a user that does have the domain admin role - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('domains', self.domainA['id']) - - # Check that with such a token we cannot modify grants on a - # different domain - self._test_grants('domains', self.domainB['id'], - expected=exception.ForbiddenAction.code) - - def test_domain_grants_by_cloud_admin(self): - # Test domain grants with a cloud admin. This user should be - # able to manage roles on any domain. - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._test_grants('domains', self.domainA['id']) - - def test_domain_grants_by_cloud_admin_for_domain_specific_role(self): - # Test domain grants with a cloud admin. This user should be - # able to manage domain roles on any domain. - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._test_grants('domains', self.domainA['id'], - role_domain_id=self.domainB['id']) - - def test_domain_grants_by_non_admin_for_domain_specific_role(self): - # A non-admin shouldn't be able to do anything - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('domains', self.domainA['id'], - role_domain_id=self.domainA['id'], - expected=exception.ForbiddenAction.code) - self._test_grants('domains', self.domainA['id'], - role_domain_id=self.domainB['id'], - expected=exception.ForbiddenAction.code) - - def test_domain_grants_by_domain_admin_for_domain_specific_role(self): - # Authenticate with a user that does have the domain admin role, - # should not be able to assign a domain_specific role from another - # domain - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('domains', self.domainA['id'], - role_domain_id=self.domainB['id'], - # List status will always be OK, since we are not - # granting/checking/deleting assignments - list_status_OK=True, - expected=exception.ForbiddenAction.code) - - # They should be able to assign a domain specific role from the same - # domain - self._test_grants('domains', self.domainA['id'], - role_domain_id=self.domainA['id']) - - def test_project_grants(self): - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - - self._test_grants('projects', self.project['id'], - expected=exception.ForbiddenAction.code) - - # Now, authenticate with a user that does have the project - # admin role - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - self._test_grants('projects', self.project['id']) - - def test_project_grants_by_domain_admin(self): - # Test project grants with a domain admin. This user should be - # able to manage roles on any project in its own domain. - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('projects', self.project['id']) - - def test_project_grants_by_non_admin_for_domain_specific_role(self): - # A non-admin shouldn't be able to do anything - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainA['id'], - expected=exception.ForbiddenAction.code) - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainB['id'], - expected=exception.ForbiddenAction.code) - - def test_project_grants_by_project_admin_for_domain_specific_role(self): - # Authenticate with a user that does have the project admin role, - # should not be able to assign a domain_specific role from another - # domain - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainB['id'], - # List status will always be OK, since we are not - # granting/checking/deleting assignments - list_status_OK=True, - expected=exception.ForbiddenAction.code) - - # They should be able to assign a domain specific role from the same - # domain - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainA['id']) - - def test_project_grants_by_domain_admin_for_domain_specific_role(self): - # Authenticate with a user that does have the domain admin role, - # should not be able to assign a domain_specific role from another - # domain - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainB['id'], - # List status will always be OK, since we are not - # granting/checking/deleting assignments - list_status_OK=True, - expected=exception.ForbiddenAction.code) - - # They should be able to assign a domain specific role from the same - # domain - self._test_grants('projects', self.project['id'], - role_domain_id=self.domainA['id']) - - def test_cloud_admin_list_assignments_of_domain(self): - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - collection_url = self.build_role_assignment_query_url( - domain_id=self.domainA['id']) - r = self.get(collection_url, auth=self.auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=2, resource_url=collection_url) - - domainA_admin_entity = self.build_role_assignment_entity( - domain_id=self.domainA['id'], - user_id=self.domain_admin_user['id'], - role_id=self.admin_role['id'], - inherited_to_projects=False) - domainA_user_entity = self.build_role_assignment_entity( - domain_id=self.domainA['id'], - user_id=self.just_a_user['id'], - role_id=self.role['id'], - inherited_to_projects=False) - - self.assertRoleAssignmentInListResponse(r, domainA_admin_entity) - self.assertRoleAssignmentInListResponse(r, domainA_user_entity) - - def test_domain_admin_list_assignments_of_domain(self): - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - collection_url = self.build_role_assignment_query_url( - domain_id=self.domainA['id']) - r = self.get(collection_url, auth=self.auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=2, resource_url=collection_url) - - domainA_admin_entity = self.build_role_assignment_entity( - domain_id=self.domainA['id'], - user_id=self.domain_admin_user['id'], - role_id=self.admin_role['id'], - inherited_to_projects=False) - domainA_user_entity = self.build_role_assignment_entity( - domain_id=self.domainA['id'], - user_id=self.just_a_user['id'], - role_id=self.role['id'], - inherited_to_projects=False) - - self.assertRoleAssignmentInListResponse(r, domainA_admin_entity) - self.assertRoleAssignmentInListResponse(r, domainA_user_entity) - - def test_domain_admin_list_assignments_of_another_domain_failed(self): - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - collection_url = self.build_role_assignment_query_url( - domain_id=self.domainB['id']) - self.get(collection_url, auth=self.auth, - expected_status=http_client.FORBIDDEN) - - def test_domain_user_list_assignments_of_domain_failed(self): - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - collection_url = self.build_role_assignment_query_url( - domain_id=self.domainA['id']) - self.get(collection_url, auth=self.auth, - expected_status=http_client.FORBIDDEN) - - def test_cloud_admin_list_assignments_of_project(self): - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - collection_url = self.build_role_assignment_query_url( - project_id=self.project['id']) - r = self.get(collection_url, auth=self.auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=2, resource_url=collection_url) - - project_admin_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.project_admin_user['id'], - role_id=self.admin_role['id'], - inherited_to_projects=False) - project_user_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.just_a_user['id'], - role_id=self.role['id'], - inherited_to_projects=False) - - self.assertRoleAssignmentInListResponse(r, project_admin_entity) - self.assertRoleAssignmentInListResponse(r, project_user_entity) - - def test_admin_project_list_assignments_of_project(self): - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - collection_url = self.build_role_assignment_query_url( - project_id=self.project['id']) - r = self.get(collection_url, auth=self.auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=2, resource_url=collection_url) - - project_admin_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.project_admin_user['id'], - role_id=self.admin_role['id'], - inherited_to_projects=False) - project_user_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.just_a_user['id'], - role_id=self.role['id'], - inherited_to_projects=False) - - self.assertRoleAssignmentInListResponse(r, project_admin_entity) - self.assertRoleAssignmentInListResponse(r, project_user_entity) - - @utils.wip('waiting on bug #1437407') - def test_domain_admin_list_assignments_of_project(self): - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - collection_url = self.build_role_assignment_query_url( - project_id=self.project['id']) - r = self.get(collection_url, auth=self.auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=2, resource_url=collection_url) - - project_admin_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.project_admin_user['id'], - role_id=self.admin_role['id'], - inherited_to_projects=False) - project_user_entity = self.build_role_assignment_entity( - project_id=self.project['id'], - user_id=self.just_a_user['id'], - role_id=self.role['id'], - inherited_to_projects=False) - - self.assertRoleAssignmentInListResponse(r, project_admin_entity) - self.assertRoleAssignmentInListResponse(r, project_user_entity) - - def test_domain_admin_list_assignment_tree(self): - # Add a child project to the standard test data - sub_project = unit.new_project_ref(domain_id=self.domainA['id'], - parent_id=self.project['id']) - self.resource_api.create_project(sub_project['id'], sub_project) - self.assignment_api.create_grant(self.role['id'], - user_id=self.just_a_user['id'], - project_id=sub_project['id']) - - collection_url = self.build_role_assignment_query_url( - project_id=self.project['id']) - collection_url += '&include_subtree=True' - - # The domain admin should be able to list the assignment tree - auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - r = self.get(collection_url, auth=auth) - self.assertValidRoleAssignmentListResponse( - r, expected_length=3, resource_url=collection_url) - - # A project admin should not be able to - auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - r = self.get(collection_url, auth=auth, - expected_status=http_client.FORBIDDEN) - - # A neither should a domain admin from a different domain - domainB_admin_user = unit.create_user( - self.identity_api, - domain_id=self.domainB['id']) - self.assignment_api.create_grant(self.admin_role['id'], - user_id=domainB_admin_user['id'], - domain_id=self.domainB['id']) - auth = self.build_authentication_request( - user_id=domainB_admin_user['id'], - password=domainB_admin_user['password'], - domain_id=self.domainB['id']) - - r = self.get(collection_url, auth=auth, - expected_status=http_client.FORBIDDEN) - - def test_domain_user_list_assignments_of_project_failed(self): - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - collection_url = self.build_role_assignment_query_url( - project_id=self.project['id']) - self.get(collection_url, auth=self.auth, - expected_status=http_client.FORBIDDEN) - - def test_cloud_admin(self): - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._test_domain_management( - expected=exception.ForbiddenAction.code) - - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._test_domain_management() - - def test_admin_project(self): - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - self._test_domain_management( - expected=exception.ForbiddenAction.code) - - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._test_domain_management() - - def test_domain_admin_get_domain(self): - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - entity_url = '/domains/%s' % self.domainA['id'] - self.get(entity_url, auth=self.auth) - - def test_list_user_credentials(self): - credential_user = unit.new_credential_ref(self.just_a_user['id']) - self.credential_api.create_credential(credential_user['id'], - credential_user) - credential_admin = unit.new_credential_ref(self.cloud_admin_user['id']) - self.credential_api.create_credential(credential_admin['id'], - credential_admin) - - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - url = '/credentials?user_id=%s' % self.just_a_user['id'] - self.get(url, auth=self.auth) - url = '/credentials?user_id=%s' % self.cloud_admin_user['id'] - self.get(url, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - url = '/credentials' - self.get(url, auth=self.auth, - expected_status=exception.ForbiddenAction.code) - - def test_get_and_delete_ec2_credentials(self): - """Tests getting and deleting ec2 credentials through the ec2 API.""" - another_user = unit.create_user(self.identity_api, - domain_id=self.domainA['id']) - - # create a credential for just_a_user - just_user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - url = '/users/%s/credentials/OS-EC2' % self.just_a_user['id'] - r = self.post(url, body={'tenant_id': self.project['id']}, - auth=just_user_auth) - - # another normal user can't get the credential - another_user_auth = self.build_authentication_request( - user_id=another_user['id'], - password=another_user['password']) - another_user_url = '/users/%s/credentials/OS-EC2/%s' % ( - another_user['id'], r.result['credential']['access']) - self.get(another_user_url, auth=another_user_auth, - expected_status=exception.ForbiddenAction.code) - - # the owner can get the credential - just_user_url = '/users/%s/credentials/OS-EC2/%s' % ( - self.just_a_user['id'], r.result['credential']['access']) - self.get(just_user_url, auth=just_user_auth) - - # another normal user can't delete the credential - self.delete(another_user_url, auth=another_user_auth, - expected_status=exception.ForbiddenAction.code) - - # the owner can get the credential - self.delete(just_user_url, auth=just_user_auth) - - def test_user_validate_same_token(self): - # Given a non-admin user token, the token can be used to validate - # itself. - # This is GET /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.get('/auth/tokens', token=token, - headers={'X-Subject-Token': token}) - - def test_user_validate_user_token(self): - # A user can validate one of their own tokens. - # This is GET /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.get('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}) - - def test_user_validate_other_user_token_rejected(self): - # A user cannot validate another user's token. - # This is GET /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.get('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_validate_user_token(self): - # An admin can validate a user's token. - # This is GET /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.get('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}) - - def test_admin_project_validate_user_token(self): - # An admin can validate a user's token. - # This is GET /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.get('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}) - - def test_user_check_same_token(self): - # Given a non-admin user token, the token can be used to check - # itself. - # This is HEAD /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, - expected_status=http_client.OK) - - def test_user_check_user_token(self): - # A user can check one of their own tokens. - # This is HEAD /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, - expected_status=http_client.OK) - - def test_user_check_other_user_token_rejected(self): - # A user cannot check another user's token. - # This is HEAD /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.head('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_check_user_token(self): - # An admin can check a user's token. - # This is HEAD /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, - expected_status=http_client.OK) - - def test_user_revoke_same_token(self): - # Given a non-admin user token, the token can be used to revoke - # itself. - # This is DELETE /v3/auth/tokens, with X-Auth-Token == X-Subject-Token - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token = self.get_requested_token(auth) - - self.delete('/auth/tokens', token=token, - headers={'X-Subject-Token': token}) - - def test_user_revoke_user_token(self): - # A user can revoke one of their own tokens. - # This is DELETE /v3/auth/tokens - - auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - token1 = self.get_requested_token(auth) - token2 = self.get_requested_token(auth) - - self.delete('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}) - - def test_user_revoke_other_user_token_rejected(self): - # A user cannot revoke another user's token. - # This is DELETE /v3/auth/tokens - - user1_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user1_token = self.get_requested_token(user1_auth) - - user2_auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password']) - user2_token = self.get_requested_token(user2_auth) - - self.delete('/auth/tokens', token=user1_token, - headers={'X-Subject-Token': user2_token}, - expected_status=http_client.FORBIDDEN) - - def test_admin_revoke_user_token(self): - # An admin can revoke a user's token. - # This is DELETE /v3/auth/tokens - - admin_auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - admin_token = self.get_requested_token(admin_auth) - - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password']) - user_token = self.get_requested_token(user_auth) - - self.delete('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}) - - def test_user_with_a_role_get_project(self): - user_auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - - # Test user can get project for one they have a role in - self.get('/projects/%s' % self.project['id'], auth=user_auth) - - # Test user can not get project for one they don't have a role in, - # even if they have a role on another project - project2 = unit.new_project_ref(domain_id=self.domainA['id']) - self.resource_api.create_project(project2['id'], project2) - self.get('/projects/%s' % project2['id'], auth=user_auth, - expected_status=exception.ForbiddenAction.code) - - def test_project_admin_get_project(self): - admin_auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - resp = self.get('/projects/%s' % self.project['id'], auth=admin_auth) - self.assertEqual(self.project['id'], - jsonutils.loads(resp.body)['project']['id']) - - def test_role_management_no_admin_no_rights(self): - # A non-admin domain user shouldn't be able to manipulate roles - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._role_management_cases(expected=exception.ForbiddenAction.code) - - # ...and nor should non-admin project user - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - - self._role_management_cases(expected=exception.ForbiddenAction.code) - - def test_role_management_with_project_admin(self): - # A project admin user should be able to get and list, but not be able - # to create/update/delete global roles - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - self._role_management_cases(read_status_OK=True, - expected=exception.ForbiddenAction.code) - - def test_role_management_with_domain_admin(self): - # A domain admin user should be able to get and list, but not be able - # to create/update/delete global roles - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._role_management_cases(read_status_OK=True, - expected=exception.ForbiddenAction.code) - - def test_role_management_with_cloud_admin(self): - # A cloud admin user should have rights to manipulate global roles - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._role_management_cases() - - def test_domain_role_management_no_admin_no_rights(self): - # A non-admin domain user shouldn't be able to manipulate domain roles - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - domain_id=self.domainA['id']) - - self._domain_role_management_cases( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - # ...and nor should non-admin project user - self.auth = self.build_authentication_request( - user_id=self.just_a_user['id'], - password=self.just_a_user['password'], - project_id=self.project['id']) - - self._domain_role_management_cases( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - def test_domain_role_management_with_cloud_admin(self): - # A cloud admin user should have rights to manipulate domain roles - self.auth = self.build_authentication_request( - user_id=self.cloud_admin_user['id'], - password=self.cloud_admin_user['password'], - project_id=self.admin_project['id']) - - self._domain_role_management_cases(self.domainA['id']) - - def test_domain_role_management_with_domain_admin(self): - # A domain admin user should only be able to manipulate the domain - # specific roles in their own domain - self.auth = self.build_authentication_request( - user_id=self.domainB_admin_user['id'], - password=self.domainB_admin_user['password'], - domain_id=self.domainB['id']) - - # Try to access the domain specific roles in another domain - self._domain_role_management_cases( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - # ...but they should be able to work with those in their own domain - self.auth = self.build_authentication_request( - user_id=self.domain_admin_user['id'], - password=self.domain_admin_user['password'], - domain_id=self.domainA['id']) - - self._domain_role_management_cases(self.domainA['id']) - - def test_domain_role_management_with_project_admin(self): - # A project admin user should have not access to domain specific roles - # in another domain. They should be able to get and list domain - # specific roles from their own domain, but not be able to create, - # update or delete them, - self.auth = self.build_authentication_request( - user_id=self.project_adminB_user['id'], - password=self.project_adminB_user['password'], - project_id=self.projectB['id']) - - # Try access the domain specific roless in another domain - self._domain_role_management_cases( - self.domainA['id'], expected=exception.ForbiddenAction.code) - - # ...but they should be ablet to work with those in their own domain - self.auth = self.build_authentication_request( - user_id=self.project_admin_user['id'], - password=self.project_admin_user['password'], - project_id=self.project['id']) - - self._domain_role_management_cases( - self.domainA['id'], read_status_OK=True, - expected=exception.ForbiddenAction.code) |