diff options
author | asteroide <thomas.duval@orange.com> | 2015-09-24 16:27:16 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-09-24 16:27:16 +0200 |
commit | 92d11d139e9f76d4fd76859aea78643fc32ef36b (patch) | |
tree | bd5a2e7b50853498074ab55bdaee4452c460010b /keystone-moon/keystone/tests/unit/test_v3_federation.py | |
parent | 49325d99acfadaadfad99c596c4ada6b5ec849de (diff) |
Update Keystone code from repository.
Change-Id: Ib3d0a06b10902fcc6d520f58e85aa617bc326d00
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_federation.py | 293 |
1 files changed, 170 insertions, 123 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py index e646bc0a..5717e67b 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_federation.py +++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py @@ -12,7 +12,6 @@ import os import random -import subprocess from testtools import matchers import uuid @@ -26,12 +25,14 @@ from oslotest import mockpatch import saml2 from saml2 import saml from saml2 import sigver +from six.moves import http_client from six.moves import range, urllib, zip xmldsig = importutils.try_import("saml2.xmldsig") if not xmldsig: xmldsig = importutils.try_import("xmldsig") from keystone.auth import controllers as auth_controllers +from keystone.common import environment from keystone.contrib.federation import controllers as federation_controllers from keystone.contrib.federation import idp as keystone_idp from keystone import exception @@ -44,6 +45,8 @@ from keystone.tests.unit import test_v3 from keystone.token.providers import common as token_common +subprocess = environment.subprocess + CONF = cfg.CONF LOG = log.getLogger(__name__) ROOTDIR = os.path.dirname(os.path.abspath(__file__)) @@ -109,25 +112,43 @@ class FederatedSetupMixin(object): self.assertEqual(token_projects, projects_ref) def _check_scoped_token_attributes(self, token): - def xor_project_domain(token_keys): - return sum(('project' in token_keys, 'domain' in token_keys)) % 2 for obj in ('user', 'catalog', 'expires_at', 'issued_at', 'methods', 'roles'): self.assertIn(obj, token) - # Check for either project or domain - if not xor_project_domain(list(token.keys())): - raise AssertionError("You must specify either" - "project or domain.") - self.assertIn('OS-FEDERATION', token['user']) os_federation = token['user']['OS-FEDERATION'] + + self.assertIn('groups', os_federation) + self.assertIn('identity_provider', os_federation) + self.assertIn('protocol', os_federation) + self.assertThat(os_federation, matchers.HasLength(3)) + self.assertEqual(self.IDP, os_federation['identity_provider']['id']) self.assertEqual(self.PROTOCOL, os_federation['protocol']['id']) - self.assertListEqual(sorted(['groups', - 'identity_provider', - 'protocol']), - sorted(os_federation.keys())) + + def _check_project_scoped_token_attributes(self, token, project_id): + self.assertEqual(project_id, token['project']['id']) + self._check_scoped_token_attributes(token) + + def _check_domain_scoped_token_attributes(self, token, domain_id): + self.assertEqual(domain_id, token['domain']['id']) + self._check_scoped_token_attributes(token) + + def assertValidMappedUser(self, token): + """Check if user object meets all the criteria.""" + + user = token['user'] + self.assertIn('id', user) + self.assertIn('name', user) + self.assertIn('domain', user) + + self.assertIn('groups', user['OS-FEDERATION']) + self.assertIn('identity_provider', user['OS-FEDERATION']) + self.assertIn('protocol', user['OS-FEDERATION']) + + # Make sure user_id is url safe + self.assertEqual(urllib.parse.quote(user['name']), user['id']) def _issue_unscoped_token(self, idp=None, @@ -794,7 +815,7 @@ class FederatedIdentityProviderTests(FederationTests): if body is None: body = self._http_idp_input() resp = self.put(url, body={'identity_provider': body}, - expected_status=201) + expected_status=http_client.CREATED) return resp def _http_idp_input(self, **kwargs): @@ -881,7 +902,7 @@ class FederatedIdentityProviderTests(FederationTests): body['remote_ids'] = [uuid.uuid4().hex, repeated_remote_id] self.put(url, body={'identity_provider': body}, - expected_status=409) + expected_status=http_client.CONFLICT) def test_create_idp_remote_empty(self): """Creates an IdP with empty remote_ids.""" @@ -1006,9 +1027,9 @@ class FederatedIdentityProviderTests(FederationTests): url = self.base_url(suffix=uuid.uuid4().hex) body = self._http_idp_input() self.put(url, body={'identity_provider': body}, - expected_status=201) + expected_status=http_client.CREATED) self.put(url, body={'identity_provider': body}, - expected_status=409) + expected_status=http_client.CONFLICT) def test_get_idp(self): """Create and later fetch IdP.""" @@ -1033,7 +1054,7 @@ class FederatedIdentityProviderTests(FederationTests): self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_delete_existing_idp(self): """Create and later delete IdP. @@ -1047,7 +1068,7 @@ class FederatedIdentityProviderTests(FederationTests): self.assertIsNotNone(idp_id) url = self.base_url(suffix=idp_id) self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_delete_idp_also_deletes_assigned_protocols(self): """Deleting an IdP will delete its assigned protocol.""" @@ -1063,7 +1084,7 @@ class FederatedIdentityProviderTests(FederationTests): idp_url = self.base_url(suffix=idp_id) # assign protocol to IdP - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} resp, idp_id, proto = self._assign_protocol_to_idp( url=url, idp_id=idp_id, @@ -1073,7 +1094,7 @@ class FederatedIdentityProviderTests(FederationTests): # removing IdP will remove the assigned protocol as well self.assertEqual(1, len(self.federation_api.list_protocols(idp_id))) self.delete(idp_url) - self.get(idp_url, expected_status=404) + self.get(idp_url, expected_status=http_client.NOT_FOUND) self.assertEqual(0, len(self.federation_api.list_protocols(idp_id))) def test_delete_nonexisting_idp(self): @@ -1083,7 +1104,7 @@ class FederatedIdentityProviderTests(FederationTests): """ idp_id = uuid.uuid4().hex url = self.base_url(suffix=idp_id) - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) def test_update_idp_mutable_attributes(self): """Update IdP's mutable parameters.""" @@ -1124,7 +1145,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_idp_immutable_attributes(self): """Update IdP's immutable parameters. - Expect HTTP 403 code. + Expect HTTP FORBIDDEN. """ default_resp = self._create_default_idp() @@ -1138,7 +1159,8 @@ class FederatedIdentityProviderTests(FederationTests): body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] url = self.base_url(suffix=idp_id) - self.patch(url, body={'identity_provider': body}, expected_status=403) + self.patch(url, body={'identity_provider': body}, + expected_status=http_client.FORBIDDEN) def test_update_nonexistent_idp(self): """Update nonexistent IdP @@ -1152,12 +1174,12 @@ class FederatedIdentityProviderTests(FederationTests): body['enabled'] = False body = {'identity_provider': body} - self.patch(url, body=body, expected_status=404) + self.patch(url, body=body, expected_status=http_client.NOT_FOUND) def test_assign_protocol_to_idp(self): """Assign a protocol to existing IdP.""" - self._assign_protocol_to_idp(expected_status=201) + self._assign_protocol_to_idp(expected_status=http_client.CREATED) def test_protocol_composite_pk(self): """Test whether Keystone let's add two entities with identical @@ -1171,7 +1193,7 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) @@ -1187,10 +1209,10 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) - kwargs = {'expected_status': 409} + kwargs = {'expected_status': http_client.CONFLICT} resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id, proto='saml2', validate=False, @@ -1204,7 +1226,7 @@ class FederatedIdentityProviderTests(FederationTests): """ idp_id = uuid.uuid4().hex - kwargs = {'expected_status': 404} + kwargs = {'expected_status': http_client.NOT_FOUND} self._assign_protocol_to_idp(proto='saml2', idp_id=idp_id, validate=False, @@ -1213,7 +1235,8 @@ class FederatedIdentityProviderTests(FederationTests): def test_get_protocol(self): """Create and later fetch protocol tied to IdP.""" - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] url = "%s/protocols/%s" % (idp_id, proto_id) url = self.base_url(suffix=url) @@ -1232,12 +1255,14 @@ class FederatedIdentityProviderTests(FederationTests): Compare input and output id sets. """ - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) iterations = random.randint(0, 16) protocol_ids = [] for _ in range(iterations): - resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id, - expected_status=201) + resp, _, proto = self._assign_protocol_to_idp( + idp_id=idp_id, + expected_status=http_client.CREATED) proto_id = self._fetch_attribute_from_response(resp, 'protocol') proto_id = proto_id['id'] protocol_ids.append(proto_id) @@ -1256,7 +1281,8 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_protocols_attribute(self): """Update protocol's attribute.""" - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) new_mapping_id = uuid.uuid4().hex url = "%s/protocols/%s" % (idp_id, proto) @@ -1277,11 +1303,12 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='/%(idp_id)s/' 'protocols/%(protocol_id)s') - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) url = url % {'idp_id': idp_id, 'protocol_id': proto} self.delete(url) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) class MappingCRUDTests(FederationTests): @@ -1318,7 +1345,7 @@ class MappingCRUDTests(FederationTests): url = self.MAPPING_URL + uuid.uuid4().hex resp = self.put(url, body={'mapping': mapping_fixtures.MAPPING_LARGE}, - expected_status=201) + expected_status=http_client.CREATED) return resp def _get_id_from_response(self, resp): @@ -1335,7 +1362,7 @@ class MappingCRUDTests(FederationTests): resp = self.get(url) entities = resp.result.get('mappings') self.assertIsNotNone(entities) - self.assertResponseStatus(resp, 200) + self.assertResponseStatus(resp, http_client.OK) self.assertValidListLinks(resp.result.get('links')) self.assertEqual(1, len(entities)) @@ -1345,8 +1372,8 @@ class MappingCRUDTests(FederationTests): mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': str(mapping_id)} resp = self.delete(url) - self.assertResponseStatus(resp, 204) - self.get(url, expected_status=404) + self.assertResponseStatus(resp, http_client.NO_CONTENT) + self.get(url, expected_status=http_client.NOT_FOUND) def test_mapping_get(self): url = self.MAPPING_URL + '%(mapping_id)s' @@ -1369,70 +1396,73 @@ class MappingCRUDTests(FederationTests): def test_delete_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) def test_get_mapping_dne(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_create_mapping_bad_requirements(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) def test_create_mapping_no_rules(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) def test_create_mapping_no_remote_objects(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) def test_create_mapping_bad_value(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) def test_create_mapping_missing_local(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) def test_create_mapping_missing_type(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) def test_create_mapping_wrong_type(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) def test_create_mapping_extra_remote_properties_not_any_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_any_one_of(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_extra_remote_properties_just_type(self): url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) def test_create_mapping_empty_map(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': {}}) def test_create_mapping_extra_rules_properties(self): url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=400, + self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) def test_create_mapping_with_blacklist_and_whitelist(self): @@ -1444,7 +1474,8 @@ class MappingCRUDTests(FederationTests): """ url = self.MAPPING_URL + uuid.uuid4().hex mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST - self.put(url, expected_status=400, body={'mapping': mapping}) + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': mapping}) class FederatedTokenTests(FederationTests, FederatedSetupMixin): @@ -1494,6 +1525,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_issue_unscoped_token(self): r = self._issue_unscoped_token() self.assertIsNotNone(r.headers.get('X-Subject-Token')) + self.assertValidMappedUser(r.json['token']) def test_issue_unscoped_token_disabled_idp(self): """Checks if authentication works with disabled identity providers. @@ -1632,11 +1664,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) token_resp = r.result['token'] project_id = token_resp['project']['id'] - self.assertEqual(project_id, self.proj_employees['id']) - self._check_scoped_token_attributes(token_resp) + self._check_project_scoped_token_attributes(token_resp, project_id) roles_ref = [self.role_employee] + projects_ref = self.proj_employees self._check_projects_and_roles(token_resp, roles_ref, projects_ref) + self.assertValidMappedUser(token_resp) def test_scope_token_with_idp_disabled(self): """Scope token issued by disabled IdP. @@ -1659,14 +1692,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.federation_api.update_idp(self.IDP, enabled_false) self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_scope_to_bad_project(self): """Scope unscoped token with a project we don't have access to.""" self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_scope_to_project_multiple_times(self): """Try to scope the unscoped token multiple times. @@ -1685,9 +1718,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): for body, project_id_ref in zip(bodies, project_ids): r = self.v3_authenticate_token(body) token_resp = r.result['token'] - project_id = token_resp['project']['id'] - self.assertEqual(project_id, project_id_ref) - self._check_scoped_token_attributes(token_resp) + self._check_project_scoped_token_attributes(token_resp, + project_id_ref) def test_scope_to_project_with_only_inherited_roles(self): """Try to scope token whose only roles are inherited.""" @@ -1695,18 +1727,18 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): r = self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER) token_resp = r.result['token'] - project_id = token_resp['project']['id'] - self.assertEqual(project_id, self.project_inherited['id']) - self._check_scoped_token_attributes(token_resp) + self._check_project_scoped_token_attributes( + token_resp, self.project_inherited['id']) roles_ref = [self.role_customer] projects_ref = self.project_inherited self._check_projects_and_roles(token_resp, roles_ref, projects_ref) + self.assertValidMappedUser(token_resp) def test_scope_token_from_nonexistent_unscoped_token(self): """Try to scope token from non-existent unscoped token.""" self.v3_authenticate_token( self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_issue_token_from_rules_without_user(self): api = auth_controllers.Auth() @@ -1730,9 +1762,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_scope_to_domain_once(self): r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER) token_resp = r.result['token'] - domain_id = token_resp['domain']['id'] - self.assertEqual(self.domainA['id'], domain_id) - self._check_scoped_token_attributes(token_resp) + self._check_domain_scoped_token_attributes(token_resp, + self.domainA['id']) def test_scope_to_domain_multiple_tokens(self): """Issue multiple tokens scoping to different domains. @@ -1754,15 +1785,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): for body, domain_id_ref in zip(bodies, domain_ids): r = self.v3_authenticate_token(body) token_resp = r.result['token'] - domain_id = token_resp['domain']['id'] - self.assertEqual(domain_id_ref, domain_id) - self._check_scoped_token_attributes(token_resp) + self._check_domain_scoped_token_attributes(token_resp, + domain_id_ref) def test_scope_to_domain_with_only_inherited_roles_fails(self): """Try to scope to a domain that has no direct roles.""" self.v3_authenticate_token( self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, - expected_status=401) + expected_status=http_client.UNAUTHORIZED) def test_list_projects(self): urls = ('/OS-FEDERATION/projects', '/auth/projects') @@ -1863,9 +1893,10 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """ r = self._issue_unscoped_token() + token_resp = r.json_body['token'] + self.assertValidMappedUser(token_resp) employee_unscoped_token_id = r.headers.get('X-Subject-Token') - r = self.get('/OS-FEDERATION/projects', - token=employee_unscoped_token_id) + r = self.get('/auth/projects', token=employee_unscoped_token_id) projects = r.result['projects'] random_project = random.randint(0, len(projects)) - 1 project = projects[random_project] @@ -1875,9 +1906,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): r = self.v3_authenticate_token(v3_scope_request) token_resp = r.result['token'] - project_id = token_resp['project']['id'] - self.assertEqual(project['id'], project_id) - self._check_scoped_token_attributes(token_resp) + self._check_project_scoped_token_attributes(token_resp, project['id']) def test_workflow_with_groups_deletion(self): """Test full workflow with groups deletion before token scoping. @@ -1947,7 +1976,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): token_id, 'project', self.project_all['id']) - self.v3_authenticate_token(scoped_token, expected_status=500) + self.v3_authenticate_token( + scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR) def test_lists_with_missing_group_in_backend(self): """Test a mapping that points to a group that does not exist @@ -2379,11 +2409,13 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): def test_federated_unscoped_token(self): resp = self._issue_unscoped_token() self.assertEqual(204, len(resp.headers['X-Subject-Token'])) + self.assertValidMappedUser(resp.json_body['token']) def test_federated_unscoped_token_with_multiple_groups(self): assertion = 'ANOTHER_CUSTOMER_ASSERTION' resp = self._issue_unscoped_token(assertion=assertion) - self.assertEqual(232, len(resp.headers['X-Subject-Token'])) + self.assertEqual(226, len(resp.headers['X-Subject-Token'])) + self.assertValidMappedUser(resp.json_body['token']) def test_validate_federated_unscoped_token(self): resp = self._issue_unscoped_token() @@ -2400,9 +2432,9 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): """ resp = self._issue_unscoped_token() + self.assertValidMappedUser(resp.json_body['token']) unscoped_token = resp.headers.get('X-Subject-Token') - resp = self.get('/OS-FEDERATION/projects', - token=unscoped_token) + resp = self.get('/auth/projects', token=unscoped_token) projects = resp.result['projects'] random_project = random.randint(0, len(projects)) - 1 project = projects[random_project] @@ -2412,9 +2444,7 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): resp = self.v3_authenticate_token(v3_scope_request) token_resp = resp.result['token'] - project_id = token_resp['project']['id'] - self.assertEqual(project['id'], project_id) - self._check_scoped_token_attributes(token_resp) + self._check_project_scoped_token_attributes(token_resp, project['id']) class FederatedTokenTestsMethodToken(FederatedTokenTests): @@ -2499,7 +2529,7 @@ class SAMLGenerationTests(FederationTests): self.sp = self.sp_ref() url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID self.put(url, body={'service_provider': self.sp}, - expected_status=201) + expected_status=http_client.CREATED) def test_samlize_token_values(self): """Test the SAML generator produces a SAML object. @@ -2615,8 +2645,8 @@ class SAMLGenerationTests(FederationTests): # the assertion as is without signing it return assertion_content - with mock.patch('subprocess.check_output', - side_effect=mocked_subprocess_check_output): + with mock.patch.object(subprocess, 'check_output', + side_effect=mocked_subprocess_check_output): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, self.SUBJECT, @@ -2711,7 +2741,7 @@ class SAMLGenerationTests(FederationTests): with mock.patch.object(keystone_idp, '_sign_assertion', return_value=self.signed_assertion): self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=403) + expected_status=http_client.FORBIDDEN) def test_generate_saml_route(self): """Test that the SAML generation endpoint produces XML. @@ -2733,7 +2763,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.SAML_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=200) + expected_status=http_client.OK) response = etree.fromstring(http_response.result) issuer = response[0] @@ -2774,7 +2804,8 @@ class SAMLGenerationTests(FederationTests): self.SERVICE_PROVDIER_ID) del body['auth']['scope'] - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.BAD_REQUEST) def test_invalid_token_body(self): """Test that missing the token in request body raises an exception. @@ -2788,7 +2819,8 @@ class SAMLGenerationTests(FederationTests): self.SERVICE_PROVDIER_ID) del body['auth']['identity']['token'] - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.BAD_REQUEST) def test_sp_not_found(self): """Test SAML generation with an invalid service provider ID. @@ -2799,7 +2831,8 @@ class SAMLGenerationTests(FederationTests): sp_id = uuid.uuid4().hex token_id = self._fetch_valid_token() body = self._create_generate_saml_request(token_id, sp_id) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.NOT_FOUND) def test_sp_disabled(self): """Try generating assertion for disabled Service Provider.""" @@ -2811,7 +2844,8 @@ class SAMLGenerationTests(FederationTests): token_id = self._fetch_valid_token() body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=403) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.FORBIDDEN) def test_token_not_found(self): """Test that an invalid token in the request body raises an exception. @@ -2823,7 +2857,8 @@ class SAMLGenerationTests(FederationTests): token_id = uuid.uuid4().hex body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404) + self.post(self.SAML_GENERATION_ROUTE, body=body, + expected_status=http_client.NOT_FOUND) def test_generate_ecp_route(self): """Test that the ECP generation endpoint produces XML. @@ -2844,7 +2879,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=200) + expected_status=http_client.OK) env_response = etree.fromstring(http_response.result) header = env_response[0] @@ -2879,7 +2914,7 @@ class SAMLGenerationTests(FederationTests): @mock.patch('saml2.create_class_from_xml_string') @mock.patch('oslo_utils.fileutils.write_to_tempfile') - @mock.patch('subprocess.check_output') + @mock.patch.object(subprocess, 'check_output') def test__sign_assertion(self, check_output_mock, write_to_tempfile_mock, create_class_mock): write_to_tempfile_mock.return_value = 'tmp_path' @@ -2890,7 +2925,7 @@ class SAMLGenerationTests(FederationTests): create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput') @mock.patch('oslo_utils.fileutils.write_to_tempfile') - @mock.patch('subprocess.check_output') + @mock.patch.object(subprocess, 'check_output') def test__sign_assertion_exc(self, check_output_mock, write_to_tempfile_mock): # If the command fails the command output is logged. @@ -2903,12 +2938,15 @@ class SAMLGenerationTests(FederationTests): returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary, output=sample_output) - # FIXME(blk-u): This should raise exception.SAMLSigningError instead, - # but fails with TypeError due to concatenating string to Message, see - # bug 1484735. - self.assertRaises(TypeError, + logger_fixture = self.useFixture(fixtures.LoggerFixture()) + self.assertRaises(exception.SAMLSigningError, keystone_idp._sign_assertion, self.signed_assertion) + expected_log = ( + "Error when signing assertion, reason: Command '%s' returned " + "non-zero exit status %s %s\n" % + (CONF.saml.xmlsec1_binary, sample_returncode, sample_output)) + self.assertEqual(expected_log, logger_fixture.output) @mock.patch('oslo_utils.fileutils.write_to_tempfile') def test__sign_assertion_fileutils_exc(self, write_to_tempfile_mock): @@ -3041,13 +3079,13 @@ class IdPMetadataGenerationTests(FederationTests): self.generator.generate_metadata) def test_get_metadata_with_no_metadata_file_configured(self): - self.get(self.METADATA_URL, expected_status=500) + self.get(self.METADATA_URL, + expected_status=http_client.INTERNAL_SERVER_ERROR) def test_get_metadata(self): self.config_fixture.config( group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml') - r = self.get(self.METADATA_URL, response_content_type='text/xml', - expected_status=200) + r = self.get(self.METADATA_URL, response_content_type='text/xml') self.assertEqual('text/xml', r.headers.get('Content-Type')) reference_file = _load_xml('idp_saml2_metadata.xml') @@ -3070,7 +3108,7 @@ class ServiceProviderTests(FederationTests): self.SP_REF = self.sp_ref() self.SERVICE_PROVIDER = self.put( url, body={'service_provider': self.SP_REF}, - expected_status=201).result + expected_status=http_client.CREATED).result def sp_ref(self): ref = { @@ -3089,19 +3127,19 @@ class ServiceProviderTests(FederationTests): def test_get_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.get(url, expected_status=200) + resp = self.get(url) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) def test_get_service_provider_fail(self): url = self.base_url(suffix=uuid.uuid4().hex) - self.get(url, expected_status=404) + self.get(url, expected_status=http_client.NOT_FOUND) def test_create_service_provider(self): url = self.base_url(suffix=uuid.uuid4().hex) sp = self.sp_ref() resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) @@ -3111,7 +3149,7 @@ class ServiceProviderTests(FederationTests): sp = self.sp_ref() del sp['relay_state_prefix'] resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) sp_result = resp.result['service_provider'] self.assertEqual(CONF.saml.relay_state_prefix, sp_result['relay_state_prefix']) @@ -3123,7 +3161,7 @@ class ServiceProviderTests(FederationTests): non_default_prefix = uuid.uuid4().hex sp['relay_state_prefix'] = non_default_prefix resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) @@ -3134,7 +3172,7 @@ class ServiceProviderTests(FederationTests): sp = self.sp_ref() sp[uuid.uuid4().hex] = uuid.uuid4().hex self.put(url, body={'service_provider': sp}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_list_service_providers(self): """Test listing of service provider objects. @@ -3150,7 +3188,8 @@ class ServiceProviderTests(FederationTests): } for id, sp in ref_service_providers.items(): url = self.base_url(suffix=id) - self.put(url, body={'service_provider': sp}, expected_status=201) + self.put(url, body={'service_provider': sp}, + expected_status=http_client.CREATED) # Insert ids into service provider object, we will compare it with # responses from server and those include 'id' attribute. @@ -3177,15 +3216,14 @@ class ServiceProviderTests(FederationTests): """ new_sp_ref = self.sp_ref() url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=200) + resp = self.patch(url, body={'service_provider': new_sp_ref}) patch_result = resp.result new_sp_ref['id'] = self.SERVICE_PROVIDER_ID self.assertValidEntity(patch_result['service_provider'], ref=new_sp_ref, keys_to_check=self.SP_KEYS) - resp = self.get(url, expected_status=200) + resp = self.get(url) get_result = resp.result self.assertDictEqual(patch_result['service_provider'], @@ -3201,21 +3239,21 @@ class ServiceProviderTests(FederationTests): new_sp_ref = {'id': uuid.uuid4().hex} url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_service_provider_unknown_parameter(self): new_sp_ref = self.sp_ref() new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=400) + expected_status=http_client.BAD_REQUEST) def test_update_service_provider_404(self): new_sp_ref = self.sp_ref() new_sp_ref['description'] = uuid.uuid4().hex url = self.base_url(suffix=uuid.uuid4().hex) self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=404) + expected_status=http_client.NOT_FOUND) def test_update_sp_relay_state(self): """Update an SP with custome relay state.""" @@ -3223,19 +3261,18 @@ class ServiceProviderTests(FederationTests): non_default_prefix = uuid.uuid4().hex new_sp_ref['relay_state_prefix'] = non_default_prefix url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=200) + resp = self.patch(url, body={'service_provider': new_sp_ref}) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) def test_delete_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.delete(url, expected_status=204) + self.delete(url) def test_delete_service_provider_404(self): url = self.base_url(suffix=uuid.uuid4().hex) - self.delete(url, expected_status=404) + self.delete(url, expected_status=http_client.NOT_FOUND) class WebSSOTests(FederatedTokenTests): @@ -3337,6 +3374,16 @@ class WebSSOTests(FederatedTokenTests): self.api.federated_sso_auth, context, self.PROTOCOL) + def test_identity_provider_specific_federated_authentication(self): + environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} + context = {'environment': environment} + query_string = {'origin': self.ORIGIN} + self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) + resp = self.api.federated_idp_specific_sso_auth(context, + self.idp['id'], + self.PROTOCOL) + self.assertIn(self.TRUSTED_DASHBOARD, resp.body) + class K2KServiceCatalogTests(FederationTests): SP1 = 'SP1' |