diff options
author | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
---|---|---|
committer | DUVAL Thomas <thomas.duval@orange.com> | 2016-06-09 09:11:50 +0200 |
commit | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (patch) | |
tree | 8b8d94001ebe6cc34106cf813b538911a8d66d9a /keystone-moon/keystone/tests/unit/test_v3_federation.py | |
parent | a33bdcb627102a01244630a54cb4b5066b385a6a (diff) |
Update Keystone core to Mitaka.
Change-Id: Ia10d6add16f4a9d25d1f42d420661c46332e69db
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_federation.py | 562 |
1 files changed, 405 insertions, 157 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py index 4d7dcaab..f4ec8e51 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_federation.py +++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py @@ -10,6 +10,7 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import os import random from testtools import matchers @@ -19,7 +20,8 @@ import fixtures from lxml import etree import mock from oslo_config import cfg -from oslo_log import log +from oslo_log import versionutils +from oslo_serialization import jsonutils from oslo_utils import importutils from oslotest import mockpatch import saml2 @@ -33,22 +35,24 @@ if not xmldsig: from keystone.auth import controllers as auth_controllers from keystone.common import environment -from keystone.contrib.federation import controllers as federation_controllers -from keystone.contrib.federation import idp as keystone_idp +from keystone.contrib.federation import routers from keystone import exception +from keystone.federation import controllers as federation_controllers +from keystone.federation import idp as keystone_idp from keystone import notifications +from keystone.tests import unit from keystone.tests.unit import core from keystone.tests.unit import federation_fixtures from keystone.tests.unit import ksfixtures from keystone.tests.unit import mapping_fixtures from keystone.tests.unit import test_v3 +from keystone.tests.unit import utils from keystone.token.providers import common as token_common subprocess = environment.subprocess CONF = cfg.CONF -LOG = log.getLogger(__name__) ROOTDIR = os.path.dirname(os.path.abspath(__file__)) XMLDIR = os.path.join(ROOTDIR, 'saml2/') @@ -59,8 +63,12 @@ def dummy_validator(*args, **kwargs): class FederationTests(test_v3.RestfulTestCase): - EXTENSION_NAME = 'federation' - EXTENSION_TO_ADD = 'federation_extension' + @mock.patch.object(versionutils, 'report_deprecated_feature') + def test_exception_happens(self, mock_deprecator): + routers.FederationExtension(mock.ANY) + mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY) + args, _kwargs = mock_deprecator.call_args + self.assertIn("Remove federation_extension from", args[1]) class FederatedSetupMixin(object): @@ -137,7 +145,6 @@ class FederatedSetupMixin(object): def assertValidMappedUser(self, token): """Check if user object meets all the criteria.""" - user = token['user'] self.assertIn('id', user) self.assertIn('name', user) @@ -209,66 +216,62 @@ class FederatedSetupMixin(object): def load_federation_sample_data(self): """Inject additional data.""" - # Create and add domains - self.domainA = self.new_domain_ref() + self.domainA = unit.new_domain_ref() self.resource_api.create_domain(self.domainA['id'], self.domainA) - self.domainB = self.new_domain_ref() + self.domainB = unit.new_domain_ref() self.resource_api.create_domain(self.domainB['id'], self.domainB) - self.domainC = self.new_domain_ref() + self.domainC = unit.new_domain_ref() self.resource_api.create_domain(self.domainC['id'], self.domainC) - self.domainD = self.new_domain_ref() + self.domainD = unit.new_domain_ref() self.resource_api.create_domain(self.domainD['id'], self.domainD) # Create and add projects - self.proj_employees = self.new_project_ref( + self.proj_employees = unit.new_project_ref( domain_id=self.domainA['id']) self.resource_api.create_project(self.proj_employees['id'], self.proj_employees) - self.proj_customers = self.new_project_ref( + self.proj_customers = unit.new_project_ref( domain_id=self.domainA['id']) self.resource_api.create_project(self.proj_customers['id'], self.proj_customers) - self.project_all = self.new_project_ref( + self.project_all = unit.new_project_ref( domain_id=self.domainA['id']) self.resource_api.create_project(self.project_all['id'], self.project_all) - self.project_inherited = self.new_project_ref( + self.project_inherited = unit.new_project_ref( domain_id=self.domainD['id']) self.resource_api.create_project(self.project_inherited['id'], self.project_inherited) # Create and add groups - self.group_employees = self.new_group_ref( - domain_id=self.domainA['id']) + self.group_employees = unit.new_group_ref(domain_id=self.domainA['id']) self.group_employees = ( self.identity_api.create_group(self.group_employees)) - self.group_customers = self.new_group_ref( - domain_id=self.domainA['id']) + self.group_customers = unit.new_group_ref(domain_id=self.domainA['id']) self.group_customers = ( self.identity_api.create_group(self.group_customers)) - self.group_admins = self.new_group_ref( - domain_id=self.domainA['id']) + self.group_admins = unit.new_group_ref(domain_id=self.domainA['id']) self.group_admins = self.identity_api.create_group(self.group_admins) # Create and add roles - self.role_employee = self.new_role_ref() + self.role_employee = unit.new_role_ref() self.role_api.create_role(self.role_employee['id'], self.role_employee) - self.role_customer = self.new_role_ref() + self.role_customer = unit.new_role_ref() self.role_api.create_role(self.role_customer['id'], self.role_customer) - self.role_admin = self.new_role_ref() + self.role_admin = unit.new_role_ref() self.role_api.create_role(self.role_admin['id'], self.role_admin) # Employees can access @@ -774,7 +777,7 @@ class FederatedSetupMixin(object): self.domainC['id']) -class FederatedIdentityProviderTests(FederationTests): +class FederatedIdentityProviderTests(test_v3.RestfulTestCase): """A test class for Identity Providers.""" idp_keys = ['description', 'enabled'] @@ -815,7 +818,7 @@ class FederatedIdentityProviderTests(FederationTests): if body is None: body = self._http_idp_input() resp = self.put(url, body={'identity_provider': body}, - expected_status=201) + expected_status=http_client.CREATED) return resp def _http_idp_input(self, **kwargs): @@ -856,7 +859,6 @@ class FederatedIdentityProviderTests(FederationTests): def test_create_idp(self): """Creates the IdentityProvider entity associated to remote_ids.""" - keys_to_check = list(self.idp_keys) body = self.default_body.copy() body['description'] = uuid.uuid4().hex @@ -867,7 +869,6 @@ class FederatedIdentityProviderTests(FederationTests): def test_create_idp_remote(self): """Creates the IdentityProvider entity associated to remote_ids.""" - keys_to_check = list(self.idp_keys) keys_to_check.append('remote_ids') body = self.default_body.copy() @@ -886,10 +887,9 @@ class FederatedIdentityProviderTests(FederationTests): A remote_id is the same for both so the second IdP is not created because of the uniqueness of the remote_ids - Expect HTTP 409 code for the latter call. + Expect HTTP 409 Conflict code for the latter call. """ - body = self.default_body.copy() repeated_remote_id = uuid.uuid4().hex body['remote_ids'] = [uuid.uuid4().hex, @@ -901,12 +901,15 @@ class FederatedIdentityProviderTests(FederationTests): url = self.base_url(suffix=uuid.uuid4().hex) body['remote_ids'] = [uuid.uuid4().hex, repeated_remote_id] - self.put(url, body={'identity_provider': body}, - expected_status=http_client.CONFLICT) + resp = self.put(url, body={'identity_provider': body}, + expected_status=http_client.CONFLICT) + + resp_data = jsonutils.loads(resp.body) + self.assertIn('Duplicate remote ID', + resp_data.get('error', {}).get('message')) def test_create_idp_remote_empty(self): """Creates an IdP with empty remote_ids.""" - keys_to_check = list(self.idp_keys) keys_to_check.append('remote_ids') body = self.default_body.copy() @@ -919,7 +922,6 @@ class FederatedIdentityProviderTests(FederationTests): def test_create_idp_remote_none(self): """Creates an IdP with a None remote_ids.""" - keys_to_check = list(self.idp_keys) keys_to_check.append('remote_ids') body = self.default_body.copy() @@ -986,6 +988,37 @@ class FederatedIdentityProviderTests(FederationTests): self.assertEqual(sorted(body['remote_ids']), sorted(returned_idp.get('remote_ids'))) + def test_update_idp_remote_repeated(self): + """Update an IdentityProvider entity reusing a remote_id. + + A remote_id is the same for both so the second IdP is not + updated because of the uniqueness of the remote_ids. + + Expect HTTP 409 Conflict code for the latter call. + + """ + # Create first identity provider + body = self.default_body.copy() + repeated_remote_id = uuid.uuid4().hex + body['remote_ids'] = [uuid.uuid4().hex, + repeated_remote_id] + self._create_default_idp(body=body) + + # Create second identity provider (without remote_ids) + body = self.default_body.copy() + default_resp = self._create_default_idp(body=body) + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + url = self.base_url(suffix=idp_id) + + body['remote_ids'] = [repeated_remote_id] + resp = self.patch(url, body={'identity_provider': body}, + expected_status=http_client.CONFLICT) + resp_data = jsonutils.loads(resp.body) + self.assertIn('Duplicate remote ID', + resp_data['error']['message']) + def test_list_idps(self, iterations=5): """Lists all available IdentityProviders. @@ -1018,18 +1051,73 @@ class FederatedIdentityProviderTests(FederationTests): ids_intersection = entities_ids.intersection(ids) self.assertEqual(ids_intersection, ids) + def test_filter_list_idp_by_id(self): + def get_id(resp): + r = self._fetch_attribute_from_response(resp, + 'identity_provider') + return r.get('id') + + idp1_id = get_id(self._create_default_idp()) + idp2_id = get_id(self._create_default_idp()) + + # list the IdP, should get two IdP. + url = self.base_url() + resp = self.get(url) + entities = self._fetch_attribute_from_response(resp, + 'identity_providers') + entities_ids = [e['id'] for e in entities] + self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) + + # filter the IdP by ID. + url = self.base_url() + '?id=' + idp1_id + resp = self.get(url) + filtered_service_list = resp.json['identity_providers'] + self.assertThat(filtered_service_list, matchers.HasLength(1)) + self.assertEqual(idp1_id, filtered_service_list[0].get('id')) + + def test_filter_list_idp_by_enabled(self): + def get_id(resp): + r = self._fetch_attribute_from_response(resp, + 'identity_provider') + return r.get('id') + + idp1_id = get_id(self._create_default_idp()) + + body = self.default_body.copy() + body['enabled'] = False + idp2_id = get_id(self._create_default_idp(body=body)) + + # list the IdP, should get two IdP. + url = self.base_url() + resp = self.get(url) + entities = self._fetch_attribute_from_response(resp, + 'identity_providers') + entities_ids = [e['id'] for e in entities] + self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) + + # filter the IdP by 'enabled'. + url = self.base_url() + '?enabled=True' + resp = self.get(url) + filtered_service_list = resp.json['identity_providers'] + self.assertThat(filtered_service_list, matchers.HasLength(1)) + self.assertEqual(idp1_id, filtered_service_list[0].get('id')) + def test_check_idp_uniqueness(self): """Add same IdP twice. - Expect HTTP 409 code for the latter call. + Expect HTTP 409 Conflict code for the latter call. """ url = self.base_url(suffix=uuid.uuid4().hex) body = self._http_idp_input() self.put(url, body={'identity_provider': body}, - expected_status=201) - self.put(url, body={'identity_provider': body}, - expected_status=http_client.CONFLICT) + expected_status=http_client.CREATED) + resp = self.put(url, body={'identity_provider': body}, + expected_status=http_client.CONFLICT) + + resp_data = jsonutils.loads(resp.body) + self.assertIn('Duplicate entry', + resp_data.get('error', {}).get('message')) def test_get_idp(self): """Create and later fetch IdP.""" @@ -1047,7 +1135,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_get_nonexisting_idp(self): """Fetch nonexisting IdP entity. - Expected HTTP 404 status code. + Expected HTTP 404 Not Found status code. """ idp_id = uuid.uuid4().hex @@ -1059,7 +1147,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_delete_existing_idp(self): """Create and later delete IdP. - Expect HTTP 404 for the GET IdP call. + Expect HTTP 404 Not Found for the GET IdP call. """ default_resp = self._create_default_idp() default_idp = self._fetch_attribute_from_response(default_resp, @@ -1072,7 +1160,6 @@ class FederatedIdentityProviderTests(FederationTests): def test_delete_idp_also_deletes_assigned_protocols(self): """Deleting an IdP will delete its assigned protocol.""" - # create default IdP default_resp = self._create_default_idp() default_idp = self._fetch_attribute_from_response(default_resp, @@ -1084,7 +1171,7 @@ class FederatedIdentityProviderTests(FederationTests): idp_url = self.base_url(suffix=idp_id) # assign protocol to IdP - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} resp, idp_id, proto = self._assign_protocol_to_idp( url=url, idp_id=idp_id, @@ -1100,7 +1187,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_delete_nonexisting_idp(self): """Delete nonexisting IdP. - Expect HTTP 404 for the GET IdP call. + Expect HTTP 404 Not Found for the GET IdP call. """ idp_id = uuid.uuid4().hex url = self.base_url(suffix=idp_id) @@ -1145,7 +1232,7 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_idp_immutable_attributes(self): """Update IdP's immutable parameters. - Expect HTTP FORBIDDEN. + Expect HTTP BAD REQUEST. """ default_resp = self._create_default_idp() @@ -1160,12 +1247,12 @@ class FederatedIdentityProviderTests(FederationTests): url = self.base_url(suffix=idp_id) self.patch(url, body={'identity_provider': body}, - expected_status=http_client.FORBIDDEN) + expected_status=http_client.BAD_REQUEST) def test_update_nonexistent_idp(self): """Update nonexistent IdP - Expect HTTP 404 code. + Expect HTTP 404 Not Found code. """ idp_id = uuid.uuid4().hex @@ -1178,12 +1265,13 @@ class FederatedIdentityProviderTests(FederationTests): def test_assign_protocol_to_idp(self): """Assign a protocol to existing IdP.""" - - self._assign_protocol_to_idp(expected_status=201) + self._assign_protocol_to_idp(expected_status=http_client.CREATED) def test_protocol_composite_pk(self): - """Test whether Keystone let's add two entities with identical - names, however attached to different IdPs. + """Test that Keystone can add two entities. + + The entities have identical names, however, attached to different + IdPs. 1. Add IdP and assign it protocol with predefined name 2. Add another IdP and assign it a protocol with same name. @@ -1193,7 +1281,7 @@ class FederatedIdentityProviderTests(FederationTests): """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) @@ -1204,12 +1292,12 @@ class FederatedIdentityProviderTests(FederationTests): """Test whether Keystone checks for unique idp/protocol values. Add same protocol twice, expect Keystone to reject a latter call and - return HTTP 409 code. + return HTTP 409 Conflict code. """ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - kwargs = {'expected_status': 201} + kwargs = {'expected_status': http_client.CREATED} resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', url=url, **kwargs) kwargs = {'expected_status': http_client.CONFLICT} @@ -1221,10 +1309,9 @@ class FederatedIdentityProviderTests(FederationTests): def test_assign_protocol_to_nonexistent_idp(self): """Assign protocol to IdP that doesn't exist. - Expect HTTP 404 code. + Expect HTTP 404 Not Found code. """ - idp_id = uuid.uuid4().hex kwargs = {'expected_status': http_client.NOT_FOUND} self._assign_protocol_to_idp(proto='saml2', @@ -1234,8 +1321,8 @@ class FederatedIdentityProviderTests(FederationTests): def test_get_protocol(self): """Create and later fetch protocol tied to IdP.""" - - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] url = "%s/protocols/%s" % (idp_id, proto_id) url = self.base_url(suffix=url) @@ -1254,12 +1341,14 @@ class FederatedIdentityProviderTests(FederationTests): Compare input and output id sets. """ - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) iterations = random.randint(0, 16) protocol_ids = [] for _ in range(iterations): - resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id, - expected_status=201) + resp, _, proto = self._assign_protocol_to_idp( + idp_id=idp_id, + expected_status=http_client.CREATED) proto_id = self._fetch_attribute_from_response(resp, 'protocol') proto_id = proto_id['id'] protocol_ids.append(proto_id) @@ -1277,8 +1366,8 @@ class FederatedIdentityProviderTests(FederationTests): def test_update_protocols_attribute(self): """Update protocol's attribute.""" - - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) new_mapping_id = uuid.uuid4().hex url = "%s/protocols/%s" % (idp_id, proto) @@ -1294,19 +1383,21 @@ class FederatedIdentityProviderTests(FederationTests): def test_delete_protocol(self): """Delete protocol. - Expect HTTP 404 code for the GET call after the protocol is deleted. + Expect HTTP 404 Not Found code for the GET call after the protocol is + deleted. """ url = self.base_url(suffix='/%(idp_id)s/' 'protocols/%(protocol_id)s') - resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + resp, idp_id, proto = self._assign_protocol_to_idp( + expected_status=http_client.CREATED) url = url % {'idp_id': idp_id, 'protocol_id': proto} self.delete(url) self.get(url, expected_status=http_client.NOT_FOUND) -class MappingCRUDTests(FederationTests): +class MappingCRUDTests(test_v3.RestfulTestCase): """A class for testing CRUD operations for Mappings.""" MAPPING_URL = '/OS-FEDERATION/mappings/' @@ -1340,7 +1431,7 @@ class MappingCRUDTests(FederationTests): url = self.MAPPING_URL + uuid.uuid4().hex resp = self.put(url, body={'mapping': mapping_fixtures.MAPPING_LARGE}, - expected_status=201) + expected_status=http_client.CREATED) return resp def _get_id_from_response(self, resp): @@ -1357,7 +1448,7 @@ class MappingCRUDTests(FederationTests): resp = self.get(url) entities = resp.result.get('mappings') self.assertIsNotNone(entities) - self.assertResponseStatus(resp, 200) + self.assertResponseStatus(resp, http_client.OK) self.assertValidListLinks(resp.result.get('links')) self.assertEqual(1, len(entities)) @@ -1367,7 +1458,7 @@ class MappingCRUDTests(FederationTests): mapping_id = self._get_id_from_response(resp) url = url % {'mapping_id': str(mapping_id)} resp = self.delete(url) - self.assertResponseStatus(resp, 204) + self.assertResponseStatus(resp, http_client.NO_CONTENT) self.get(url, expected_status=http_client.NOT_FOUND) def test_mapping_get(self): @@ -1463,8 +1554,8 @@ class MappingCRUDTests(FederationTests): def test_create_mapping_with_blacklist_and_whitelist(self): """Test for adding whitelist and blacklist in the rule - Server should respond with HTTP 400 error upon discovering both - ``whitelist`` and ``blacklist`` keywords in the same rule. + Server should respond with HTTP 400 Bad Request error upon discovering + both ``whitelist`` and ``blacklist`` keywords in the same rule. """ url = self.MAPPING_URL + uuid.uuid4().hex @@ -1472,8 +1563,37 @@ class MappingCRUDTests(FederationTests): self.put(url, expected_status=http_client.BAD_REQUEST, body={'mapping': mapping}) + def test_create_mapping_with_local_user_and_local_domain(self): + url = self.MAPPING_URL + uuid.uuid4().hex + resp = self.put( + url, + body={ + 'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN + }, + expected_status=http_client.CREATED) + self.assertValidMappingResponse( + resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN) + + def test_create_mapping_with_ephemeral(self): + url = self.MAPPING_URL + uuid.uuid4().hex + resp = self.put( + url, + body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER}, + expected_status=http_client.CREATED) + self.assertValidMappingResponse( + resp, mapping_fixtures.MAPPING_EPHEMERAL_USER) + + def test_create_mapping_with_bad_user_type(self): + url = self.MAPPING_URL + uuid.uuid4().hex + # get a copy of a known good map + bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER) + # now sabotage the user type + bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex + self.put(url, expected_status=http_client.BAD_REQUEST, + body={'mapping': bad_mapping}) + -class FederatedTokenTests(FederationTests, FederatedSetupMixin): +class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin): def auth_plugin_config_override(self): methods = ['saml2'] @@ -1510,7 +1630,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.assertTrue(note['send_notification_called']) def load_fixtures(self, fixtures): - super(FederationTests, self).load_fixtures(fixtures) + super(FederatedTokenTests, self).load_fixtures(fixtures) self.load_federation_sample_data() def test_issue_unscoped_token_notify(self): @@ -1609,7 +1729,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_issue_unscoped_token_with_remote_unavailable(self): self.config_fixture.config(group='federation', remote_id_attribute=self.REMOTE_ID_ATTR) - self.assertRaises(exception.ValidationError, + self.assertRaises(exception.Unauthorized, self._issue_unscoped_token, idp=self.IDP_WITH_REMOTE, environment={ @@ -1649,13 +1769,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.assertIsNotNone(r.headers.get('X-Subject-Token')) def test_scope_to_project_once_notify(self): - r = self.v3_authenticate_token( + r = self.v3_create_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) user_id = r.json['token']['user']['id'] self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id) def test_scope_to_project_once(self): - r = self.v3_authenticate_token( + r = self.v3_create_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) token_resp = r.result['token'] project_id = token_resp['project']['id'] @@ -1685,14 +1805,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """ enabled_false = {'enabled': False} self.federation_api.update_idp(self.IDP, enabled_false) - self.v3_authenticate_token( + self.v3_create_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, expected_status=http_client.FORBIDDEN) def test_scope_to_bad_project(self): """Scope unscoped token with a project we don't have access to.""" - - self.v3_authenticate_token( + self.v3_create_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, expected_status=http_client.UNAUTHORIZED) @@ -1705,13 +1824,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): * Employees' project """ - bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN, self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN) project_ids = (self.proj_employees['id'], self.proj_customers['id']) for body, project_id_ref in zip(bodies, project_ids): - r = self.v3_authenticate_token(body) + r = self.v3_create_token(body) token_resp = r.result['token'] self._check_project_scoped_token_attributes(token_resp, project_id_ref) @@ -1719,7 +1837,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_scope_to_project_with_only_inherited_roles(self): """Try to scope token whose only roles are inherited.""" self.config_fixture.config(group='os_inherit', enabled=True) - r = self.v3_authenticate_token( + r = self.v3_create_token( self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER) token_resp = r.result['token'] self._check_project_scoped_token_attributes( @@ -1731,7 +1849,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_scope_token_from_nonexistent_unscoped_token(self): """Try to scope token from non-existent unscoped token.""" - self.v3_authenticate_token( + self.v3_create_token( self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, expected_status=http_client.NOT_FOUND) @@ -1755,7 +1873,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): assertion='CONTRACTOR_ASSERTION') def test_scope_to_domain_once(self): - r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER) + r = self.v3_create_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER) token_resp = r.result['token'] self._check_domain_scoped_token_attributes(token_resp, self.domainA['id']) @@ -1778,14 +1896,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.domainC['id']) for body, domain_id_ref in zip(bodies, domain_ids): - r = self.v3_authenticate_token(body) + r = self.v3_create_token(body) token_resp = r.result['token'] self._check_domain_scoped_token_attributes(token_resp, domain_id_ref) def test_scope_to_domain_with_only_inherited_roles_fails(self): """Try to scope to a domain that has no direct roles.""" - self.v3_authenticate_token( + self.v3_create_token( self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, expected_status=http_client.UNAUTHORIZED) @@ -1816,14 +1934,14 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): # TODO(samueldmq): Create another test class for role inheritance tests. # The advantage would be to reduce the complexity of this test class and - # have tests specific to this fuctionality grouped, easing readability and + # have tests specific to this functionality grouped, easing readability and # maintenability. def test_list_projects_for_inherited_project_assignment(self): # Enable os_inherit extension self.config_fixture.config(group='os_inherit', enabled=True) # Create a subproject - subproject_inherited = self.new_project_ref( + subproject_inherited = unit.new_project_ref( domain_id=self.domainD['id'], parent_id=self.project_inherited['id']) self.resource_api.create_project(subproject_inherited['id'], @@ -1878,6 +1996,9 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.assertEqual(domains_ref, domains, 'match failed for url %s' % url) + @utils.wip('This will fail because of bug #1501032. The returned method' + 'list should contain "saml2". This is documented in bug ' + '1501032.') def test_full_workflow(self): """Test 'standard' workflow for granting access tokens. @@ -1886,9 +2007,10 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): * Scope token to one of available projects """ - r = self._issue_unscoped_token() token_resp = r.json_body['token'] + # NOTE(lbragstad): Ensure only 'saml2' is in the method list. + self.assertListEqual(['saml2'], token_resp['methods']) self.assertValidMappedUser(token_resp) employee_unscoped_token_id = r.headers.get('X-Subject-Token') r = self.get('/auth/projects', token=employee_unscoped_token_id) @@ -1899,8 +2021,12 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): v3_scope_request = self._scope_request(employee_unscoped_token_id, 'project', project['id']) - r = self.v3_authenticate_token(v3_scope_request) + r = self.v3_create_token(v3_scope_request) token_resp = r.result['token'] + # FIXME(lbragstad): 'token' should be in the list of methods returned + # but it isn't. This is documented in bug 1501032. + self.assertIn('token', token_resp['methods']) + self.assertIn('saml2', token_resp['methods']) self._check_project_scoped_token_attributes(token_resp, project['id']) def test_workflow_with_groups_deletion(self): @@ -1917,10 +2043,9 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """ # create group and role - group = self.new_group_ref( - domain_id=self.domainA['id']) + group = unit.new_group_ref(domain_id=self.domainA['id']) group = self.identity_api.create_group(group) - role = self.new_role_ref() + role = unit.new_role_ref() self.role_api.create_role(role['id'], role) # assign role to group and project_admins @@ -1971,7 +2096,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): token_id, 'project', self.project_all['id']) - self.v3_authenticate_token(scoped_token, expected_status=500) + self.v3_create_token( + scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR) def test_lists_with_missing_group_in_backend(self): """Test a mapping that points to a group that does not exist @@ -1990,8 +2116,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """ domain_id = self.domainA['id'] domain_name = self.domainA['name'] - group = self.new_group_ref(domain_id=domain_id) - group['name'] = 'EXISTS' + group = unit.new_group_ref(domain_id=domain_id, name='EXISTS') group = self.identity_api.create_group(group) rules = { 'rules': [ @@ -2047,18 +2172,16 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): assigned """ - domain_id = self.domainA['id'] domain_name = self.domainA['name'] # Add a group "EXISTS" - group_exists = self.new_group_ref(domain_id=domain_id) - group_exists['name'] = 'EXISTS' + group_exists = unit.new_group_ref(domain_id=domain_id, name='EXISTS') group_exists = self.identity_api.create_group(group_exists) # Add a group "NO_EXISTS" - group_no_exists = self.new_group_ref(domain_id=domain_id) - group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = unit.new_group_ref(domain_id=domain_id, + name='NO_EXISTS') group_no_exists = self.identity_api.create_group(group_no_exists) group_ids = set([group_exists['id'], group_no_exists['id']]) @@ -2122,18 +2245,17 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): assigned """ - domain_id = self.domainA['id'] domain_name = self.domainA['name'] # Add a group "EXISTS" - group_exists = self.new_group_ref(domain_id=domain_id) - group_exists['name'] = 'EXISTS' + group_exists = unit.new_group_ref(domain_id=domain_id, + name='EXISTS') group_exists = self.identity_api.create_group(group_exists) # Add a group "NO_EXISTS" - group_no_exists = self.new_group_ref(domain_id=domain_id) - group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = unit.new_group_ref(domain_id=domain_id, + name='NO_EXISTS') group_no_exists = self.identity_api.create_group(group_no_exists) group_ids = set([group_exists['id'], group_no_exists['id']]) @@ -2198,8 +2320,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): """ domain_id = self.domainA['id'] domain_name = self.domainA['name'] - group = self.new_group_ref(domain_id=domain_id) - group['name'] = 'EXISTS' + group = unit.new_group_ref(domain_id=domain_id, name='EXISTS') group = self.identity_api.create_group(group) rules = { 'rules': [ @@ -2262,13 +2383,13 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): domain_name = self.domainA['name'] # Add a group "EXISTS" - group_exists = self.new_group_ref(domain_id=domain_id) - group_exists['name'] = 'EXISTS' + group_exists = unit.new_group_ref(domain_id=domain_id, + name='EXISTS') group_exists = self.identity_api.create_group(group_exists) # Add a group "NO_EXISTS" - group_no_exists = self.new_group_ref(domain_id=domain_id) - group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = unit.new_group_ref(domain_id=domain_id, + name='NO_EXISTS') group_no_exists = self.identity_api.create_group(group_no_exists) group_ids = set([group_exists['id'], group_no_exists['id']]) @@ -2362,7 +2483,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self._check_domains_are_valid(r.json_body['token']) def test_scoped_token_has_user_domain(self): - r = self.v3_authenticate_token( + r = self.v3_create_token( self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) self._check_domains_are_valid(r.result['token']) @@ -2383,7 +2504,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): assertion='ANOTHER_LOCAL_USER_ASSERTION') -class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): +class FernetFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin): AUTH_METHOD = 'token' def load_fixtures(self, fixtures): @@ -2436,7 +2557,7 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): v3_scope_request = self._scope_request(unscoped_token, 'project', project['id']) - resp = self.v3_authenticate_token(v3_scope_request) + resp = self.v3_create_token(v3_scope_request) token_resp = resp.result['token'] self._check_project_scoped_token_attributes(token_resp, project['id']) @@ -2448,6 +2569,7 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests): way for scoping all the tokens. """ + AUTH_METHOD = 'token' def auth_plugin_config_override(self): @@ -2455,8 +2577,67 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests): super(FederatedTokenTests, self).auth_plugin_config_override(methods) + @utils.wip('This will fail because of bug #1501032. The returned method' + 'list should contain "saml2". This is documented in bug ' + '1501032.') + def test_full_workflow(self): + """Test 'standard' workflow for granting access tokens. + + * Issue unscoped token + * List available projects based on groups + * Scope token to one of available projects + + """ + r = self._issue_unscoped_token() + token_resp = r.json_body['token'] + # NOTE(lbragstad): Ensure only 'saml2' is in the method list. + self.assertListEqual(['saml2'], token_resp['methods']) + self.assertValidMappedUser(token_resp) + employee_unscoped_token_id = r.headers.get('X-Subject-Token') + r = self.get('/auth/projects', token=employee_unscoped_token_id) + projects = r.result['projects'] + random_project = random.randint(0, len(projects)) - 1 + project = projects[random_project] + + v3_scope_request = self._scope_request(employee_unscoped_token_id, + 'project', project['id']) + + r = self.v3_authenticate_token(v3_scope_request) + token_resp = r.result['token'] + self.assertIn('token', token_resp['methods']) + self.assertIn('saml2', token_resp['methods']) + self._check_project_scoped_token_attributes(token_resp, project['id']) + + +class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin): + """Tests for federated users + + Tests new shadow users functionality + + """ + + def auth_plugin_config_override(self): + methods = ['saml2'] + super(FederatedUserTests, self).auth_plugin_config_override(methods) + + def setUp(self): + super(FederatedUserTests, self).setUp() + + def load_fixtures(self, fixtures): + super(FederatedUserTests, self).load_fixtures(fixtures) + self.load_federation_sample_data() + + def test_user_id_persistense(self): + """Ensure user_id is persistend for multiple federated authn calls.""" + r = self._issue_unscoped_token() + user_id = r.json_body['token']['user']['id'] -class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin): + r = self._issue_unscoped_token() + user_id2 = r.json_body['token']['user']['id'] + self.assertEqual(user_id, user_id2) + + +class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin): JSON_HOME_DATA = { 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/' '1.0/rel/identity_provider': { @@ -2484,7 +2665,7 @@ def _load_xml(filename): return xml.read() -class SAMLGenerationTests(FederationTests): +class SAMLGenerationTests(test_v3.RestfulTestCase): SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers' '/BETA/protocols/saml2/auth') @@ -2523,7 +2704,7 @@ class SAMLGenerationTests(FederationTests): self.sp = self.sp_ref() url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID self.put(url, body={'service_provider': self.sp}, - expected_status=201) + expected_status=http_client.CREATED) def test_samlize_token_values(self): """Test the SAML generator produces a SAML object. @@ -2665,7 +2846,7 @@ class SAMLGenerationTests(FederationTests): """ if not _is_xmlsec1_installed(): - self.skip('xmlsec1 is not installed') + self.skipTest('xmlsec1 is not installed') generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, @@ -2709,7 +2890,7 @@ class SAMLGenerationTests(FederationTests): user_id=self.user['id'], password=self.user['password'], project_id=self.project['id']) - resp = self.v3_authenticate_token(auth_data) + resp = self.v3_create_token(auth_data) token_id = resp.headers.get('X-Subject-Token') return token_id @@ -2718,7 +2899,7 @@ class SAMLGenerationTests(FederationTests): user_id=self.user['id'], password=self.user['password'], user_domain_id=self.domain['id']) - resp = self.v3_authenticate_token(auth_data) + resp = self.v3_create_token(auth_data) token_id = resp.headers.get('X-Subject-Token') return token_id @@ -2757,7 +2938,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.SAML_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=200) + expected_status=http_client.OK) response = etree.fromstring(http_response.result) issuer = response[0] @@ -2789,10 +2970,9 @@ class SAMLGenerationTests(FederationTests): def test_invalid_scope_body(self): """Test that missing the scope in request body raises an exception. - Raises exception.SchemaValidationError() - error code 400 + Raises exception.SchemaValidationError() - error 400 Bad Request """ - token_id = uuid.uuid4().hex body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) @@ -2804,10 +2984,9 @@ class SAMLGenerationTests(FederationTests): def test_invalid_token_body(self): """Test that missing the token in request body raises an exception. - Raises exception.SchemaValidationError() - error code 400 + Raises exception.SchemaValidationError() - error 400 Bad Request """ - token_id = uuid.uuid4().hex body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) @@ -2819,7 +2998,7 @@ class SAMLGenerationTests(FederationTests): def test_sp_not_found(self): """Test SAML generation with an invalid service provider ID. - Raises exception.ServiceProviderNotFound() - error code 404 + Raises exception.ServiceProviderNotFound() - error Not Found 404 """ sp_id = uuid.uuid4().hex @@ -2830,7 +3009,6 @@ class SAMLGenerationTests(FederationTests): def test_sp_disabled(self): """Try generating assertion for disabled Service Provider.""" - # Disable Service Provider sp_ref = {'enabled': False} self.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref) @@ -2844,10 +3022,9 @@ class SAMLGenerationTests(FederationTests): def test_token_not_found(self): """Test that an invalid token in the request body raises an exception. - Raises exception.TokenNotFound() - error code 404 + Raises exception.TokenNotFound() - error Not Found 404 """ - token_id = uuid.uuid4().hex body = self._create_generate_saml_request(token_id, self.SERVICE_PROVDIER_ID) @@ -2863,7 +3040,6 @@ class SAMLGenerationTests(FederationTests): The controller should return a SAML assertion that is wrapped in a SOAP envelope. """ - self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) token_id = self._fetch_valid_token() body = self._create_generate_saml_request(token_id, @@ -2873,7 +3049,7 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, response_content_type='text/xml', - expected_status=200) + expected_status=http_client.OK) env_response = etree.fromstring(http_response.result) header = env_response[0] @@ -2956,7 +3132,7 @@ class SAMLGenerationTests(FederationTests): self.assertEqual(expected_log, logger_fixture.output) -class IdPMetadataGenerationTests(FederationTests): +class IdPMetadataGenerationTests(test_v3.RestfulTestCase): """A class for testing Identity Provider Metadata generation.""" METADATA_URL = '/OS-FEDERATION/saml2/metadata' @@ -3073,20 +3249,20 @@ class IdPMetadataGenerationTests(FederationTests): self.generator.generate_metadata) def test_get_metadata_with_no_metadata_file_configured(self): - self.get(self.METADATA_URL, expected_status=500) + self.get(self.METADATA_URL, + expected_status=http_client.INTERNAL_SERVER_ERROR) def test_get_metadata(self): self.config_fixture.config( group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml') - r = self.get(self.METADATA_URL, response_content_type='text/xml', - expected_status=200) + r = self.get(self.METADATA_URL, response_content_type='text/xml') self.assertEqual('text/xml', r.headers.get('Content-Type')) reference_file = _load_xml('idp_saml2_metadata.xml') self.assertEqual(reference_file, r.result) -class ServiceProviderTests(FederationTests): +class ServiceProviderTests(test_v3.RestfulTestCase): """A test class for Service Providers.""" MEMBER_NAME = 'service_provider' @@ -3096,13 +3272,13 @@ class ServiceProviderTests(FederationTests): 'relay_state_prefix', 'sp_url'] def setUp(self): - super(FederationTests, self).setUp() + super(ServiceProviderTests, self).setUp() # Add a Service Provider url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.SP_REF = self.sp_ref() self.SERVICE_PROVIDER = self.put( url, body={'service_provider': self.SP_REF}, - expected_status=201).result + expected_status=http_client.CREATED).result def sp_ref(self): ref = { @@ -3119,9 +3295,18 @@ class ServiceProviderTests(FederationTests): return '/OS-FEDERATION/service_providers/' + str(suffix) return '/OS-FEDERATION/service_providers' + def _create_default_sp(self, body=None): + """Create default Service Provider.""" + url = self.base_url(suffix=uuid.uuid4().hex) + if body is None: + body = self.sp_ref() + resp = self.put(url, body={'service_provider': body}, + expected_status=http_client.CREATED) + return resp + def test_get_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.get(url, expected_status=200) + resp = self.get(url) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) @@ -3133,7 +3318,7 @@ class ServiceProviderTests(FederationTests): url = self.base_url(suffix=uuid.uuid4().hex) sp = self.sp_ref() resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) @@ -3143,7 +3328,7 @@ class ServiceProviderTests(FederationTests): sp = self.sp_ref() del sp['relay_state_prefix'] resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) sp_result = resp.result['service_provider'] self.assertEqual(CONF.saml.relay_state_prefix, sp_result['relay_state_prefix']) @@ -3155,7 +3340,7 @@ class ServiceProviderTests(FederationTests): non_default_prefix = uuid.uuid4().hex sp['relay_state_prefix'] = non_default_prefix resp = self.put(url, body={'service_provider': sp}, - expected_status=201) + expected_status=http_client.CREATED) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) @@ -3182,7 +3367,8 @@ class ServiceProviderTests(FederationTests): } for id, sp in ref_service_providers.items(): url = self.base_url(suffix=id) - self.put(url, body={'service_provider': sp}, expected_status=201) + self.put(url, body={'service_provider': sp}, + expected_status=http_client.CREATED) # Insert ids into service provider object, we will compare it with # responses from server and those include 'id' attribute. @@ -3209,15 +3395,14 @@ class ServiceProviderTests(FederationTests): """ new_sp_ref = self.sp_ref() url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=200) + resp = self.patch(url, body={'service_provider': new_sp_ref}) patch_result = resp.result new_sp_ref['id'] = self.SERVICE_PROVIDER_ID self.assertValidEntity(patch_result['service_provider'], ref=new_sp_ref, keys_to_check=self.SP_KEYS) - resp = self.get(url, expected_status=200) + resp = self.get(url) get_result = resp.result self.assertDictEqual(patch_result['service_provider'], @@ -3227,7 +3412,7 @@ class ServiceProviderTests(FederationTests): """Update immutable attributes in service provider. In this particular case the test will try to change ``id`` attribute. - The server should return an HTTP 403 error code. + The server should return an HTTP 403 Forbidden error code. """ new_sp_ref = {'id': uuid.uuid4().hex} @@ -3242,7 +3427,7 @@ class ServiceProviderTests(FederationTests): self.patch(url, body={'service_provider': new_sp_ref}, expected_status=http_client.BAD_REQUEST) - def test_update_service_provider_404(self): + def test_update_service_provider_returns_not_found(self): new_sp_ref = self.sp_ref() new_sp_ref['description'] = uuid.uuid4().hex url = self.base_url(suffix=uuid.uuid4().hex) @@ -3250,25 +3435,74 @@ class ServiceProviderTests(FederationTests): expected_status=http_client.NOT_FOUND) def test_update_sp_relay_state(self): - """Update an SP with custome relay state.""" + """Update an SP with custom relay state.""" new_sp_ref = self.sp_ref() non_default_prefix = uuid.uuid4().hex new_sp_ref['relay_state_prefix'] = non_default_prefix url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=200) + resp = self.patch(url, body={'service_provider': new_sp_ref}) sp_result = resp.result['service_provider'] self.assertEqual(non_default_prefix, sp_result['relay_state_prefix']) def test_delete_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.delete(url, expected_status=204) + self.delete(url) - def test_delete_service_provider_404(self): + def test_delete_service_provider_returns_not_found(self): url = self.base_url(suffix=uuid.uuid4().hex) self.delete(url, expected_status=http_client.NOT_FOUND) + def test_filter_list_sp_by_id(self): + def get_id(resp): + sp = resp.result.get('service_provider') + return sp.get('id') + + sp1_id = get_id(self._create_default_sp()) + sp2_id = get_id(self._create_default_sp()) + + # list the SP, should get SPs. + url = self.base_url() + resp = self.get(url) + sps = resp.result.get('service_providers') + entities_ids = [e['id'] for e in sps] + self.assertIn(sp1_id, entities_ids) + self.assertIn(sp2_id, entities_ids) + + # filter the SP by 'id'. Only SP1 should appear. + url = self.base_url() + '?id=' + sp1_id + resp = self.get(url) + sps = resp.result.get('service_providers') + entities_ids = [e['id'] for e in sps] + self.assertIn(sp1_id, entities_ids) + self.assertNotIn(sp2_id, entities_ids) + + def test_filter_list_sp_by_enabled(self): + def get_id(resp): + sp = resp.result.get('service_provider') + return sp.get('id') + + sp1_id = get_id(self._create_default_sp()) + sp2_ref = self.sp_ref() + sp2_ref['enabled'] = False + sp2_id = get_id(self._create_default_sp(body=sp2_ref)) + + # list the SP, should get two SPs. + url = self.base_url() + resp = self.get(url) + sps = resp.result.get('service_providers') + entities_ids = [e['id'] for e in sps] + self.assertIn(sp1_id, entities_ids) + self.assertIn(sp2_id, entities_ids) + + # filter the SP by 'enabled'. Only SP1 should appear. + url = self.base_url() + '?enabled=True' + resp = self.get(url) + sps = resp.result.get('service_providers') + entities_ids = [e['id'] for e in sps] + self.assertIn(sp1_id, entities_ids) + self.assertNotIn(sp2_id, entities_ids) + class WebSSOTests(FederatedTokenTests): """A class for testing Web SSO.""" @@ -3306,6 +3540,21 @@ class WebSSOTests(FederatedTokenTests): resp = self.api.federated_sso_auth(context, self.PROTOCOL) self.assertIn(self.TRUSTED_DASHBOARD, resp.body) + def test_get_sso_origin_host_case_insensitive(self): + # test lowercase hostname in trusted_dashboard + context = { + 'query_string': { + 'origin': "http://horizon.com", + }, + } + host = self.api._get_sso_origin_host(context) + self.assertEqual("http://horizon.com", host) + # test uppercase hostname in trusted_dashboard + self.config_fixture.config(group='federation', + trusted_dashboard=['http://Horizon.com']) + host = self.api._get_sso_origin_host(context) + self.assertEqual("http://horizon.com", host) + def test_federated_sso_auth_with_protocol_specific_remote_id(self): self.config_fixture.config( group=self.PROTOCOL, @@ -3380,7 +3629,7 @@ class WebSSOTests(FederatedTokenTests): self.assertIn(self.TRUSTED_DASHBOARD, resp.body) -class K2KServiceCatalogTests(FederationTests): +class K2KServiceCatalogTests(test_v3.RestfulTestCase): SP1 = 'SP1' SP2 = 'SP2' SP3 = 'SP3' @@ -3429,11 +3678,10 @@ class K2KServiceCatalogTests(FederationTests): for entity in service_providers: id = entity.get('id') ref_entity = self.sp_response(id, ref.get(id)) - self.assertDictEqual(ref_entity, entity) + self.assertDictEqual(entity, ref_entity) def test_service_providers_in_token(self): """Check if service providers are listed in service catalog.""" - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) ref = {} for r in (self.sp_alpha, self.sp_beta, self.sp_gamma): |