diff options
author | Ruan HE <ruan.he@orange.com> | 2016-06-09 08:12:34 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@172.30.200.206> | 2016-06-09 08:12:34 +0000 |
commit | 4bc079a2664f9a407e332291f34d174625a9d5ea (patch) | |
tree | 7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_v3_protection.py | |
parent | 2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff) | |
parent | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff) |
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_protection.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_protection.py | 739 |
1 files changed, 610 insertions, 129 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_protection.py b/keystone-moon/keystone/tests/unit/test_v3_protection.py index 9922ae5e..f77a1528 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_protection.py +++ b/keystone-moon/keystone/tests/unit/test_v3_protection.py @@ -20,19 +20,22 @@ from oslo_serialization import jsonutils from six.moves import http_client from keystone import exception -from keystone.policy.backends import rules from keystone.tests import unit +from keystone.tests.unit import ksfixtures from keystone.tests.unit.ksfixtures import temporaryfile from keystone.tests.unit import test_v3 +from keystone.tests.unit import utils CONF = cfg.CONF -DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id class IdentityTestProtectedCase(test_v3.RestfulTestCase): """Test policy enforcement on the v3 Identity API.""" + def _policy_fixture(self): + return ksfixtures.Policy(self.tmpfilename, self.config_fixture) + def setUp(self): """Setup for Identity Protection Test Cases. @@ -49,14 +52,9 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): the default domain. """ - # Ensure that test_v3.RestfulTestCase doesn't load its own - # sample data, which would make checking the results of our - # tests harder - super(IdentityTestProtectedCase, self).setUp() self.tempfile = self.useFixture(temporaryfile.SecureTempFile()) self.tmpfilename = self.tempfile.file_name - self.config_fixture.config(group='oslo_policy', - policy_file=self.tmpfilename) + super(IdentityTestProtectedCase, self).setUp() # A default auth request we can use - un-scoped user token self.auth = self.build_authentication_request( @@ -66,45 +64,33 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase): def load_sample_data(self): self._populate_default_domain() # Start by creating a couple of domains - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = self.new_domain_ref() + self.domainB = unit.new_domain_ref() self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.domainC = self.new_domain_ref() - self.domainC['enabled'] = False + self.domainC = unit.new_domain_ref(enabled=False) self.resource_api.create_domain(self.domainC['id'], self.domainC) # Now create some users, one in domainA and two of them in domainB - self.user1 = self.new_user_ref(domain_id=self.domainA['id']) - password = uuid.uuid4().hex - self.user1['password'] = password - self.user1 = self.identity_api.create_user(self.user1) - self.user1['password'] = password - - self.user2 = self.new_user_ref(domain_id=self.domainB['id']) - password = uuid.uuid4().hex - self.user2['password'] = password - self.user2 = self.identity_api.create_user(self.user2) - self.user2['password'] = password - - self.user3 = self.new_user_ref(domain_id=self.domainB['id']) - password = uuid.uuid4().hex - self.user3['password'] = password - self.user3 = self.identity_api.create_user(self.user3) - self.user3['password'] = password - - self.group1 = self.new_group_ref(domain_id=self.domainA['id']) + self.user1 = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) + self.user2 = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) + self.user3 = unit.create_user(self.identity_api, + domain_id=self.domainB['id']) + + self.group1 = unit.new_group_ref(domain_id=self.domainA['id']) self.group1 = self.identity_api.create_group(self.group1) - self.group2 = self.new_group_ref(domain_id=self.domainA['id']) + self.group2 = unit.new_group_ref(domain_id=self.domainA['id']) self.group2 = self.identity_api.create_group(self.group2) - self.group3 = self.new_group_ref(domain_id=self.domainB['id']) + self.group3 = unit.new_group_ref(domain_id=self.domainB['id']) self.group3 = self.identity_api.create_group(self.group3) - self.role = self.new_role_ref() + self.role = unit.new_role_ref() self.role_api.create_role(self.role['id'], self.role) - self.role1 = self.new_role_ref() + self.role1 = unit.new_role_ref() self.role_api.create_role(self.role1['id'], self.role1) self.assignment_api.create_grant(self.role['id'], user_id=self.user1['id'], @@ -348,34 +334,23 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): def load_sample_data(self): self._populate_default_domain() - self.just_a_user = self.new_user_ref( + self.just_a_user = unit.create_user( + self.identity_api, domain_id=CONF.identity.default_domain_id) - password = uuid.uuid4().hex - self.just_a_user['password'] = password - self.just_a_user = self.identity_api.create_user(self.just_a_user) - self.just_a_user['password'] = password - - self.another_user = self.new_user_ref( + self.another_user = unit.create_user( + self.identity_api, domain_id=CONF.identity.default_domain_id) - password = uuid.uuid4().hex - self.another_user['password'] = password - self.another_user = self.identity_api.create_user(self.another_user) - self.another_user['password'] = password - - self.admin_user = self.new_user_ref( + self.admin_user = unit.create_user( + self.identity_api, domain_id=CONF.identity.default_domain_id) - password = uuid.uuid4().hex - self.admin_user['password'] = password - self.admin_user = self.identity_api.create_user(self.admin_user) - self.admin_user['password'] = password - self.role = self.new_role_ref() + self.role = unit.new_role_ref() self.role_api.create_role(self.role['id'], self.role) - self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'} + self.admin_role = unit.new_role_ref(name='admin') self.role_api.create_role(self.admin_role['id'], self.admin_role) # Create and assign roles to the project - self.project = self.new_project_ref( + self.project = unit.new_project_ref( domain_id=CONF.identity.default_domain_id) self.resource_api.create_project(self.project['id'], self.project) self.assignment_api.create_grant(self.role['id'], @@ -461,7 +436,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): token = self.get_requested_token(auth) self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, expected_status=200) + headers={'X-Subject-Token': token}, + expected_status=http_client.OK) def test_user_check_user_token(self): # A user can check one of their own tokens. @@ -474,7 +450,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): token2 = self.get_requested_token(auth) self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, expected_status=200) + headers={'X-Subject-Token': token2}, + expected_status=http_client.OK) def test_user_check_other_user_token_rejected(self): # A user cannot check another user's token. @@ -510,7 +487,8 @@ class IdentityTestPolicySample(test_v3.RestfulTestCase): user_token = self.get_requested_token(user_auth) self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, expected_status=200) + headers={'X-Subject-Token': user_token}, + expected_status=http_client.OK) def test_user_revoke_same_token(self): # Given a non-admin user token, the token can be used to revoke @@ -579,6 +557,10 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin): """Test policy enforcement of the sample v3 cloud policy file.""" + def _policy_fixture(self): + return ksfixtures.Policy(unit.dirs.etc('policy.v3cloudsample.json'), + self.config_fixture) + def setUp(self): """Setup for v3 Cloud Policy Sample Test Cases. @@ -592,8 +574,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, - domain_admin_user has role 'admin' on domainA, - project_admin_user has role 'admin' on the project, - just_a_user has a non-admin role on both domainA and the project. - - admin_domain has user cloud_admin_user, with an 'admin' role - on admin_domain. + - admin_domain has admin_project, and user cloud_admin_user, with an + 'admin' role on admin_project. We test various api protection rules from the cloud sample policy file to make sure the sample is valid and that we correctly enforce it. @@ -604,62 +586,61 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, # tests harder super(IdentityTestv3CloudPolicySample, self).setUp() - # Finally, switch to the v3 sample policy file - self.addCleanup(rules.reset) - rules.reset() self.config_fixture.config( - group='oslo_policy', - policy_file=unit.dirs.etc('policy.v3cloudsample.json')) + group='resource', + admin_project_name=self.admin_project['name']) + self.config_fixture.config( + group='resource', + admin_project_domain_name=self.admin_domain['name']) def load_sample_data(self): # Start by creating a couple of domains self._populate_default_domain() - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = self.new_domain_ref() + self.domainB = unit.new_domain_ref() self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.admin_domain = {'id': 'admin_domain_id', 'name': 'Admin_domain'} + self.admin_domain = unit.new_domain_ref() self.resource_api.create_domain(self.admin_domain['id'], self.admin_domain) + self.admin_project = unit.new_project_ref( + domain_id=self.admin_domain['id']) + self.resource_api.create_project(self.admin_project['id'], + self.admin_project) + # And our users - self.cloud_admin_user = self.new_user_ref( + self.cloud_admin_user = unit.create_user( + self.identity_api, domain_id=self.admin_domain['id']) - password = uuid.uuid4().hex - self.cloud_admin_user['password'] = password - self.cloud_admin_user = ( - self.identity_api.create_user(self.cloud_admin_user)) - self.cloud_admin_user['password'] = password - self.just_a_user = self.new_user_ref(domain_id=self.domainA['id']) - password = uuid.uuid4().hex - self.just_a_user['password'] = password - self.just_a_user = self.identity_api.create_user(self.just_a_user) - self.just_a_user['password'] = password - self.domain_admin_user = self.new_user_ref( + self.just_a_user = unit.create_user( + self.identity_api, domain_id=self.domainA['id']) - password = uuid.uuid4().hex - self.domain_admin_user['password'] = password - self.domain_admin_user = ( - self.identity_api.create_user(self.domain_admin_user)) - self.domain_admin_user['password'] = password - self.project_admin_user = self.new_user_ref( + self.domain_admin_user = unit.create_user( + self.identity_api, + domain_id=self.domainA['id']) + self.domainB_admin_user = unit.create_user( + self.identity_api, + domain_id=self.domainB['id']) + self.project_admin_user = unit.create_user( + self.identity_api, domain_id=self.domainA['id']) - password = uuid.uuid4().hex - self.project_admin_user['password'] = password - self.project_admin_user = ( - self.identity_api.create_user(self.project_admin_user)) - self.project_admin_user['password'] = password - - # The admin role and another plain role - self.admin_role = {'id': uuid.uuid4().hex, 'name': 'admin'} + self.project_adminB_user = unit.create_user( + self.identity_api, + domain_id=self.domainB['id']) + + # The admin role, a domain specific role and another plain role + self.admin_role = unit.new_role_ref(name='admin') self.role_api.create_role(self.admin_role['id'], self.admin_role) - self.role = self.new_role_ref() + self.roleA = unit.new_role_ref(domain_id=self.domainA['id']) + self.role_api.create_role(self.roleA['id'], self.roleA) + self.role = unit.new_role_ref() self.role_api.create_role(self.role['id'], self.role) - # The cloud admin just gets the admin role + # The cloud admin just gets the admin role on the special admin project self.assignment_api.create_grant(self.admin_role['id'], user_id=self.cloud_admin_user['id'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) # Assign roles to the domain self.assignment_api.create_grant(self.admin_role['id'], @@ -668,13 +649,21 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.assignment_api.create_grant(self.role['id'], user_id=self.just_a_user['id'], domain_id=self.domainA['id']) + self.assignment_api.create_grant(self.admin_role['id'], + user_id=self.domainB_admin_user['id'], + domain_id=self.domainB['id']) # Create and assign roles to the project - self.project = self.new_project_ref(domain_id=self.domainA['id']) + self.project = unit.new_project_ref(domain_id=self.domainA['id']) self.resource_api.create_project(self.project['id'], self.project) + self.projectB = unit.new_project_ref(domain_id=self.domainB['id']) + self.resource_api.create_project(self.projectB['id'], self.projectB) self.assignment_api.create_grant(self.admin_role['id'], user_id=self.project_admin_user['id'], project_id=self.project['id']) + self.assignment_api.create_grant( + self.admin_role['id'], user_id=self.project_adminB_user['id'], + project_id=self.projectB['id']) self.assignment_api.create_grant(self.role['id'], user_id=self.just_a_user['id'], project_id=self.project['id']) @@ -683,7 +672,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, # Return the expected return codes for APIs with and without data # with any specified status overriding the normal values if expected_status is None: - return (200, 201, 204) + return (http_client.OK, http_client.CREATED, + http_client.NO_CONTENT) else: return (expected_status, expected_status, expected_status) @@ -702,7 +692,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.delete(entity_url, auth=self.auth, expected_status=status_no_data) - user_ref = self.new_user_ref(domain_id=domain_id) + user_ref = unit.new_user_ref(domain_id=domain_id) self.post('/users', auth=self.auth, body={'user': user_ref}, expected_status=status_created) @@ -721,7 +711,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.delete(entity_url, auth=self.auth, expected_status=status_no_data) - proj_ref = self.new_project_ref(domain_id=domain_id) + proj_ref = unit.new_project_ref(domain_id=domain_id) self.post('/projects', auth=self.auth, body={'project': proj_ref}, expected_status=status_created) @@ -740,13 +730,14 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.delete(entity_url, auth=self.auth, expected_status=status_no_data) - domain_ref = self.new_domain_ref() + domain_ref = unit.new_domain_ref() self.post('/domains', auth=self.auth, body={'domain': domain_ref}, expected_status=status_created) - def _test_grants(self, target, entity_id, expected=None): + def _test_grants(self, target, entity_id, role_domain_id=None, + list_status_OK=False, expected=None): status_OK, status_created, status_no_data = self._stati(expected) - a_role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex} + a_role = unit.new_role_ref(domain_id=role_domain_id) self.role_api.create_role(a_role['id'], a_role) collection_url = ( @@ -762,11 +753,67 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, expected_status=status_no_data) self.head(member_url, auth=self.auth, expected_status=status_no_data) - self.get(collection_url, auth=self.auth, - expected_status=status_OK) + if list_status_OK: + self.get(collection_url, auth=self.auth) + else: + self.get(collection_url, auth=self.auth, + expected_status=status_OK) self.delete(member_url, auth=self.auth, expected_status=status_no_data) + def _role_management_cases(self, read_status_OK=False, expected=None): + # Set the different status values for different types of call depending + # on whether we expect the calls to fail or not. + status_OK, status_created, status_no_data = self._stati(expected) + entity_url = '/roles/%s' % self.role['id'] + list_url = '/roles' + + if read_status_OK: + self.get(entity_url, auth=self.auth) + self.get(list_url, auth=self.auth) + else: + self.get(entity_url, auth=self.auth, + expected_status=status_OK) + self.get(list_url, auth=self.auth, + expected_status=status_OK) + + role = {'name': 'Updated'} + self.patch(entity_url, auth=self.auth, body={'role': role}, + expected_status=status_OK) + self.delete(entity_url, auth=self.auth, + expected_status=status_no_data) + + role_ref = unit.new_role_ref() + self.post('/roles', auth=self.auth, body={'role': role_ref}, + expected_status=status_created) + + def _domain_role_management_cases(self, domain_id, read_status_OK=False, + expected=None): + # Set the different status values for different types of call depending + # on whether we expect the calls to fail or not. + status_OK, status_created, status_no_data = self._stati(expected) + entity_url = '/roles/%s' % self.roleA['id'] + list_url = '/roles?domain_id=%s' % domain_id + + if read_status_OK: + self.get(entity_url, auth=self.auth) + self.get(list_url, auth=self.auth) + else: + self.get(entity_url, auth=self.auth, + expected_status=status_OK) + self.get(list_url, auth=self.auth, + expected_status=status_OK) + + role = {'name': 'Updated'} + self.patch(entity_url, auth=self.auth, body={'role': role}, + expected_status=status_OK) + self.delete(entity_url, auth=self.auth, + expected_status=status_no_data) + + role_ref = unit.new_role_ref(domain_id=domain_id) + self.post('/roles', auth=self.auth, body={'role': role_ref}, + expected_status=status_created) + def test_user_management(self): # First, authenticate with a user that does not have the domain # admin role - shouldn't be able to do much. @@ -786,13 +833,90 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self._test_user_management(self.domainA['id']) + def test_user_management_normalized_keys(self): + """Illustrate the inconsistent handling of hyphens in keys. + + To quote Morgan in bug 1526244: + + the reason this is converted from "domain-id" to "domain_id" is + because of how we process/normalize data. The way we have to handle + specific data types for known columns requires avoiding "-" in the + actual python code since "-" is not valid for attributes in python + w/o significant use of "getattr" etc. + + In short, historically we handle some things in conversions. The + use of "extras" has long been a poor design choice that leads to + odd/strange inconsistent behaviors because of other choices made in + handling data from within the body. (In many cases we convert from + "-" to "_" throughout openstack) + + Source: https://bugs.launchpad.net/keystone/+bug/1526244/comments/9 + + """ + # Authenticate with a user that has the domain admin role + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + # Show that we can read a normal user without any surprises. + r = self.get( + '/users/%s' % self.just_a_user['id'], + auth=self.auth, + expected_status=http_client.OK) + self.assertValidUserResponse(r) + + # We don't normalize query string keys, so both of these result in a + # 403, because we didn't specify a domain_id query string in either + # case, and we explicitly require one (it doesn't matter what + # 'domain-id' value you use). + self.get( + '/users?domain-id=%s' % self.domainA['id'], + auth=self.auth, + expected_status=exception.ForbiddenAction.code) + self.get( + '/users?domain-id=%s' % self.domainB['id'], + auth=self.auth, + expected_status=exception.ForbiddenAction.code) + + # If we try updating the user's 'domain_id' by specifying a + # 'domain-id', then it'll be stored into extras rather than normalized, + # and the user's actual 'domain_id' is not affected. + r = self.patch( + '/users/%s' % self.just_a_user['id'], + auth=self.auth, + body={'user': {'domain-id': self.domainB['id']}}, + expected_status=http_client.OK) + self.assertEqual(self.domainB['id'], r.json['user']['domain-id']) + self.assertEqual(self.domainA['id'], r.json['user']['domain_id']) + self.assertNotEqual(self.domainB['id'], self.just_a_user['domain_id']) + self.assertValidUserResponse(r, self.just_a_user) + + # Finally, show that we can create a new user without any surprises. + # But if we specify a 'domain-id' instead of a 'domain_id', we get a + # Forbidden response because we fail a policy check before + # normalization occurs. + user_ref = unit.new_user_ref(domain_id=self.domainA['id']) + r = self.post( + '/users', + auth=self.auth, + body={'user': user_ref}, + expected_status=http_client.CREATED) + self.assertValidUserResponse(r, ref=user_ref) + user_ref['domain-id'] = user_ref.pop('domain_id') + self.post( + '/users', + auth=self.auth, + body={'user': user_ref}, + expected_status=exception.ForbiddenAction.code) + def test_user_management_by_cloud_admin(self): # Test users management with a cloud admin. This user should # be able to manage users in any domain. self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) self._test_user_management(self.domainA['id']) @@ -824,7 +948,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) # Check whether cloud admin can operate a domain # other than its own domain or not @@ -858,10 +982,56 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) self._test_grants('domains', self.domainA['id']) + def test_domain_grants_by_cloud_admin_for_domain_specific_role(self): + # Test domain grants with a cloud admin. This user should be + # able to manage domain roles on any domain. + self.auth = self.build_authentication_request( + user_id=self.cloud_admin_user['id'], + password=self.cloud_admin_user['password'], + project_id=self.admin_project['id']) + + self._test_grants('domains', self.domainA['id'], + role_domain_id=self.domainB['id']) + + def test_domain_grants_by_non_admin_for_domain_specific_role(self): + # A non-admin shouldn't be able to do anything + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + domain_id=self.domainA['id']) + + self._test_grants('domains', self.domainA['id'], + role_domain_id=self.domainA['id'], + expected=exception.ForbiddenAction.code) + self._test_grants('domains', self.domainA['id'], + role_domain_id=self.domainB['id'], + expected=exception.ForbiddenAction.code) + + def test_domain_grants_by_domain_admin_for_domain_specific_role(self): + # Authenticate with a user that does have the domain admin role, + # should not be able to assign a domain_specific role from another + # domain + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + self._test_grants('domains', self.domainA['id'], + role_domain_id=self.domainB['id'], + # List status will always be OK, since we are not + # granting/checking/deleting assignments + list_status_OK=True, + expected=exception.ForbiddenAction.code) + + # They should be able to assign a domain specific role from the same + # domain + self._test_grants('domains', self.domainA['id'], + role_domain_id=self.domainA['id']) + def test_project_grants(self): self.auth = self.build_authentication_request( user_id=self.just_a_user['id'], @@ -890,11 +1060,67 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self._test_grants('projects', self.project['id']) + def test_project_grants_by_non_admin_for_domain_specific_role(self): + # A non-admin shouldn't be able to do anything + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + project_id=self.project['id']) + + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainA['id'], + expected=exception.ForbiddenAction.code) + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainB['id'], + expected=exception.ForbiddenAction.code) + + def test_project_grants_by_project_admin_for_domain_specific_role(self): + # Authenticate with a user that does have the project admin role, + # should not be able to assign a domain_specific role from another + # domain + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainB['id'], + # List status will always be OK, since we are not + # granting/checking/deleting assignments + list_status_OK=True, + expected=exception.ForbiddenAction.code) + + # They should be able to assign a domain specific role from the same + # domain + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainA['id']) + + def test_project_grants_by_domain_admin_for_domain_specific_role(self): + # Authenticate with a user that does have the domain admin role, + # should not be able to assign a domain_specific role from another + # domain + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainB['id'], + # List status will always be OK, since we are not + # granting/checking/deleting assignments + list_status_OK=True, + expected=exception.ForbiddenAction.code) + + # They should be able to assign a domain specific role from the same + # domain + self._test_grants('projects', self.project['id'], + role_domain_id=self.domainA['id']) + def test_cloud_admin_list_assignments_of_domain(self): self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) collection_url = self.build_role_assignment_query_url( domain_id=self.domainA['id']) @@ -968,7 +1194,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) collection_url = self.build_role_assignment_query_url( project_id=self.project['id']) @@ -990,7 +1216,33 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, project_admin_entity) self.assertRoleAssignmentInListResponse(r, project_user_entity) - @unit.utils.wip('waiting on bug #1437407') + def test_admin_project_list_assignments_of_project(self): + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + collection_url = self.build_role_assignment_query_url( + project_id=self.project['id']) + r = self.get(collection_url, auth=self.auth) + self.assertValidRoleAssignmentListResponse( + r, expected_length=2, resource_url=collection_url) + + project_admin_entity = self.build_role_assignment_entity( + project_id=self.project['id'], + user_id=self.project_admin_user['id'], + role_id=self.admin_role['id'], + inherited_to_projects=False) + project_user_entity = self.build_role_assignment_entity( + project_id=self.project['id'], + user_id=self.just_a_user['id'], + role_id=self.role['id'], + inherited_to_projects=False) + + self.assertRoleAssignmentInListResponse(r, project_admin_entity) + self.assertRoleAssignmentInListResponse(r, project_user_entity) + + @utils.wip('waiting on bug #1437407') def test_domain_admin_list_assignments_of_project(self): self.auth = self.build_authentication_request( user_id=self.domain_admin_user['id'], @@ -1017,6 +1269,53 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, project_admin_entity) self.assertRoleAssignmentInListResponse(r, project_user_entity) + def test_domain_admin_list_assignment_tree(self): + # Add a child project to the standard test data + sub_project = unit.new_project_ref(domain_id=self.domainA['id'], + parent_id=self.project['id']) + self.resource_api.create_project(sub_project['id'], sub_project) + self.assignment_api.create_grant(self.role['id'], + user_id=self.just_a_user['id'], + project_id=sub_project['id']) + + collection_url = self.build_role_assignment_query_url( + project_id=self.project['id']) + collection_url += '&include_subtree=True' + + # The domain admin should be able to list the assignment tree + auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + r = self.get(collection_url, auth=auth) + self.assertValidRoleAssignmentListResponse( + r, expected_length=3, resource_url=collection_url) + + # A project admin should not be able to + auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + r = self.get(collection_url, auth=auth, + expected_status=http_client.FORBIDDEN) + + # A neither should a domain admin from a different domain + domainB_admin_user = unit.create_user( + self.identity_api, + domain_id=self.domainB['id']) + self.assignment_api.create_grant(self.admin_role['id'], + user_id=domainB_admin_user['id'], + domain_id=self.domainB['id']) + auth = self.build_authentication_request( + user_id=domainB_admin_user['id'], + password=domainB_admin_user['password'], + domain_id=self.domainB['id']) + + r = self.get(collection_url, auth=auth, + expected_status=http_client.FORBIDDEN) + def test_domain_user_list_assignments_of_project_failed(self): self.auth = self.build_authentication_request( user_id=self.just_a_user['id'], @@ -1040,7 +1339,23 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) + + self._test_domain_management() + + def test_admin_project(self): + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + self._test_domain_management( + expected=exception.ForbiddenAction.code) + + self.auth = self.build_authentication_request( + user_id=self.cloud_admin_user['id'], + password=self.cloud_admin_user['password'], + project_id=self.admin_project['id']) self._test_domain_management() @@ -1050,16 +1365,15 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, password=self.domain_admin_user['password'], domain_id=self.domainA['id']) entity_url = '/domains/%s' % self.domainA['id'] - self.get(entity_url, auth=self.auth, expected_status=200) + self.get(entity_url, auth=self.auth) def test_list_user_credentials(self): - self.credential_user = self.new_credential_ref(self.just_a_user['id']) - self.credential_api.create_credential(self.credential_user['id'], - self.credential_user) - self.credential_admin = self.new_credential_ref( - self.cloud_admin_user['id']) - self.credential_api.create_credential(self.credential_admin['id'], - self.credential_admin) + credential_user = unit.new_credential_ref(self.just_a_user['id']) + self.credential_api.create_credential(credential_user['id'], + credential_user) + credential_admin = unit.new_credential_ref(self.cloud_admin_user['id']) + self.credential_api.create_credential(credential_admin['id'], + credential_admin) self.auth = self.build_authentication_request( user_id=self.just_a_user['id'], @@ -1075,9 +1389,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, def test_get_and_delete_ec2_credentials(self): """Tests getting and deleting ec2 credentials through the ec2 API.""" - another_user = self.new_user_ref(domain_id=self.domainA['id']) - password = another_user['password'] - another_user = self.identity_api.create_user(another_user) + another_user = unit.create_user(self.identity_api, + domain_id=self.domainA['id']) # create a credential for just_a_user just_user_auth = self.build_authentication_request( @@ -1091,7 +1404,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, # another normal user can't get the credential another_user_auth = self.build_authentication_request( user_id=another_user['id'], - password=password) + password=another_user['password']) another_user_url = '/users/%s/credentials/OS-EC2/%s' % ( another_user['id'], r.result['credential']['access']) self.get(another_user_url, auth=another_user_auth, @@ -1160,7 +1473,26 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, admin_auth = self.build_authentication_request( user_id=self.cloud_admin_user['id'], password=self.cloud_admin_user['password'], - domain_id=self.admin_domain['id']) + project_id=self.admin_project['id']) + admin_token = self.get_requested_token(admin_auth) + + user_auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password']) + user_token = self.get_requested_token(user_auth) + + self.get('/auth/tokens', token=admin_token, + headers={'X-Subject-Token': user_token}) + + def test_admin_project_validate_user_token(self): + # An admin can validate a user's token. + # This is GET /v3/auth/tokens + + admin_auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + admin_token = self.get_requested_token(admin_auth) user_auth = self.build_authentication_request( @@ -1182,7 +1514,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, token = self.get_requested_token(auth) self.head('/auth/tokens', token=token, - headers={'X-Subject-Token': token}, expected_status=200) + headers={'X-Subject-Token': token}, + expected_status=http_client.OK) def test_user_check_user_token(self): # A user can check one of their own tokens. @@ -1195,7 +1528,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, token2 = self.get_requested_token(auth) self.head('/auth/tokens', token=token1, - headers={'X-Subject-Token': token2}, expected_status=200) + headers={'X-Subject-Token': token2}, + expected_status=http_client.OK) def test_user_check_other_user_token_rejected(self): # A user cannot check another user's token. @@ -1231,7 +1565,8 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, user_token = self.get_requested_token(user_auth) self.head('/auth/tokens', token=admin_token, - headers={'X-Subject-Token': user_token}, expected_status=200) + headers={'X-Subject-Token': user_token}, + expected_status=http_client.OK) def test_user_revoke_same_token(self): # Given a non-admin user token, the token can be used to revoke @@ -1294,3 +1629,149 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase, self.delete('/auth/tokens', token=admin_token, headers={'X-Subject-Token': user_token}) + + def test_user_with_a_role_get_project(self): + user_auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + project_id=self.project['id']) + + # Test user can get project for one they have a role in + self.get('/projects/%s' % self.project['id'], auth=user_auth) + + # Test user can not get project for one they don't have a role in, + # even if they have a role on another project + project2 = unit.new_project_ref(domain_id=self.domainA['id']) + self.resource_api.create_project(project2['id'], project2) + self.get('/projects/%s' % project2['id'], auth=user_auth, + expected_status=exception.ForbiddenAction.code) + + def test_project_admin_get_project(self): + admin_auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + resp = self.get('/projects/%s' % self.project['id'], auth=admin_auth) + self.assertEqual(self.project['id'], + jsonutils.loads(resp.body)['project']['id']) + + def test_role_management_no_admin_no_rights(self): + # A non-admin domain user shouldn't be able to manipulate roles + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + domain_id=self.domainA['id']) + + self._role_management_cases(expected=exception.ForbiddenAction.code) + + # ...and nor should non-admin project user + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + project_id=self.project['id']) + + self._role_management_cases(expected=exception.ForbiddenAction.code) + + def test_role_management_with_project_admin(self): + # A project admin user should be able to get and list, but not be able + # to create/update/delete global roles + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + self._role_management_cases(read_status_OK=True, + expected=exception.ForbiddenAction.code) + + def test_role_management_with_domain_admin(self): + # A domain admin user should be able to get and list, but not be able + # to create/update/delete global roles + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + self._role_management_cases(read_status_OK=True, + expected=exception.ForbiddenAction.code) + + def test_role_management_with_cloud_admin(self): + # A cloud admin user should have rights to manipulate global roles + self.auth = self.build_authentication_request( + user_id=self.cloud_admin_user['id'], + password=self.cloud_admin_user['password'], + project_id=self.admin_project['id']) + + self._role_management_cases() + + def test_domain_role_management_no_admin_no_rights(self): + # A non-admin domain user shouldn't be able to manipulate domain roles + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + domain_id=self.domainA['id']) + + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...and nor should non-admin project user + self.auth = self.build_authentication_request( + user_id=self.just_a_user['id'], + password=self.just_a_user['password'], + project_id=self.project['id']) + + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + def test_domain_role_management_with_cloud_admin(self): + # A cloud admin user should have rights to manipulate domain roles + self.auth = self.build_authentication_request( + user_id=self.cloud_admin_user['id'], + password=self.cloud_admin_user['password'], + project_id=self.admin_project['id']) + + self._domain_role_management_cases(self.domainA['id']) + + def test_domain_role_management_with_domain_admin(self): + # A domain admin user should only be able to manipulate the domain + # specific roles in their own domain + self.auth = self.build_authentication_request( + user_id=self.domainB_admin_user['id'], + password=self.domainB_admin_user['password'], + domain_id=self.domainB['id']) + + # Try to access the domain specific roles in another domain + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...but they should be able to work with those in their own domain + self.auth = self.build_authentication_request( + user_id=self.domain_admin_user['id'], + password=self.domain_admin_user['password'], + domain_id=self.domainA['id']) + + self._domain_role_management_cases(self.domainA['id']) + + def test_domain_role_management_with_project_admin(self): + # A project admin user should have not access to domain specific roles + # in another domain. They should be able to get and list domain + # specific roles from their own domain, but not be able to create, + # update or delete them, + self.auth = self.build_authentication_request( + user_id=self.project_adminB_user['id'], + password=self.project_adminB_user['password'], + project_id=self.projectB['id']) + + # Try access the domain specific roless in another domain + self._domain_role_management_cases( + self.domainA['id'], expected=exception.ForbiddenAction.code) + + # ...but they should be ablet to work with those in their own domain + self.auth = self.build_authentication_request( + user_id=self.project_admin_user['id'], + password=self.project_admin_user['password'], + project_id=self.project['id']) + + self._domain_role_management_cases( + self.domainA['id'], read_status_OK=True, + expected=exception.ForbiddenAction.code) |