diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_federation.py | 1450 |
1 files changed, 793 insertions, 657 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py index 3b6f4d8b..e646bc0a 100644 --- a/keystone-moon/keystone/tests/unit/test_v3_federation.py +++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py @@ -13,26 +13,27 @@ import os import random import subprocess +from testtools import matchers import uuid +import fixtures from lxml import etree import mock from oslo_config import cfg from oslo_log import log -from oslo_serialization import jsonutils +from oslo_utils import importutils from oslotest import mockpatch import saml2 from saml2 import saml from saml2 import sigver -from six.moves import urllib -import xmldsig +from six.moves import range, urllib, zip +xmldsig = importutils.try_import("saml2.xmldsig") +if not xmldsig: + xmldsig = importutils.try_import("xmldsig") from keystone.auth import controllers as auth_controllers -from keystone.auth.plugins import mapped -from keystone.contrib import federation from keystone.contrib.federation import controllers as federation_controllers from keystone.contrib.federation import idp as keystone_idp -from keystone.contrib.federation import utils as mapping_utils from keystone import exception from keystone import notifications from keystone.tests.unit import core @@ -68,7 +69,7 @@ class FederatedSetupMixin(object): USER = 'user@ORGANIZATION' ASSERTION_PREFIX = 'PREFIX_' IDP_WITH_REMOTE = 'ORG_IDP_REMOTE' - REMOTE_ID = 'entityID_IDP' + REMOTE_IDS = ['entityID_IDP1', 'entityID_IDP2'] REMOTE_ID_ATTR = uuid.uuid4().hex UNSCOPED_V3_SAML2_REQ = { @@ -108,14 +109,14 @@ class FederatedSetupMixin(object): self.assertEqual(token_projects, projects_ref) def _check_scoped_token_attributes(self, token): - def xor_project_domain(iterable): - return sum(('project' in iterable, 'domain' in iterable)) % 2 + def xor_project_domain(token_keys): + return sum(('project' in token_keys, 'domain' in token_keys)) % 2 for obj in ('user', 'catalog', 'expires_at', 'issued_at', 'methods', 'roles'): self.assertIn(obj, token) # Check for either project or domain - if not xor_project_domain(token.keys()): + if not xor_project_domain(list(token.keys())): raise AssertionError("You must specify either" "project or domain.") @@ -123,6 +124,10 @@ class FederatedSetupMixin(object): os_federation = token['user']['OS-FEDERATION'] self.assertEqual(self.IDP, os_federation['identity_provider']['id']) self.assertEqual(self.PROTOCOL, os_federation['protocol']['id']) + self.assertListEqual(sorted(['groups', + 'identity_provider', + 'protocol']), + sorted(os_federation.keys())) def _issue_unscoped_token(self, idp=None, @@ -327,7 +332,8 @@ class FederatedSetupMixin(object): }, { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -336,6 +342,9 @@ class FederatedSetupMixin(object): 'type': 'UserName' }, { + 'type': 'Email', + }, + { 'type': 'orgPersonType', 'any_one_of': [ 'Employee' @@ -352,7 +361,8 @@ class FederatedSetupMixin(object): }, { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -361,6 +371,9 @@ class FederatedSetupMixin(object): 'type': self.ASSERTION_PREFIX + 'UserName' }, { + 'type': self.ASSERTION_PREFIX + 'Email', + }, + { 'type': self.ASSERTION_PREFIX + 'orgPersonType', 'any_one_of': [ 'SuperEmployee' @@ -377,7 +390,8 @@ class FederatedSetupMixin(object): }, { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -386,6 +400,9 @@ class FederatedSetupMixin(object): 'type': 'UserName' }, { + 'type': 'Email' + }, + { 'type': 'orgPersonType', 'any_one_of': [ 'Customer' @@ -413,7 +430,8 @@ class FederatedSetupMixin(object): { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -422,6 +440,9 @@ class FederatedSetupMixin(object): 'type': 'UserName' }, { + 'type': 'Email' + }, + { 'type': 'orgPersonType', 'any_one_of': [ 'Admin', @@ -444,7 +465,8 @@ class FederatedSetupMixin(object): }, { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -453,6 +475,9 @@ class FederatedSetupMixin(object): 'type': 'UserName', }, { + 'type': 'Email', + }, + { 'type': 'FirstName', 'any_one_of': [ 'Jill' @@ -475,7 +500,8 @@ class FederatedSetupMixin(object): }, { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } } ], @@ -485,6 +511,9 @@ class FederatedSetupMixin(object): }, { 'type': 'Email', + }, + { + 'type': 'Email', 'any_one_of': [ 'testacct@example.com' ] @@ -502,7 +531,8 @@ class FederatedSetupMixin(object): "local": [ { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } }, { @@ -519,6 +549,9 @@ class FederatedSetupMixin(object): 'type': 'UserName', }, { + 'type': 'Email', + }, + { "type": "orgPersonType", "any_one_of": [ "CEO", @@ -531,7 +564,8 @@ class FederatedSetupMixin(object): "local": [ { 'user': { - 'name': '{0}' + 'name': '{0}', + 'id': '{1}' } }, { @@ -548,6 +582,9 @@ class FederatedSetupMixin(object): "type": "UserName", }, { + "type": "Email", + }, + { "type": "orgPersonType", "any_one_of": [ "Managers" @@ -559,7 +596,8 @@ class FederatedSetupMixin(object): "local": [ { "user": { - "name": "{0}" + "name": "{0}", + "id": "{1}" } }, { @@ -576,6 +614,9 @@ class FederatedSetupMixin(object): "type": "UserName", }, { + "type": "Email", + }, + { "type": "UserName", "any_one_of": [ "IamTester" @@ -639,7 +680,7 @@ class FederatedSetupMixin(object): self.idp) # Add IDP with remote self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE) - self.idp_with_remote['remote_id'] = self.REMOTE_ID + self.idp_with_remote['remote_ids'] = self.REMOTE_IDS self.federation_api.create_idp(self.idp_with_remote['id'], self.idp_with_remote) # Add a mapping @@ -793,28 +834,137 @@ class FederatedIdentityProviderTests(FederationTests): return r def test_create_idp(self): - """Creates the IdentityProvider entity.""" + """Creates the IdentityProvider entity associated to remote_ids.""" - keys_to_check = self.idp_keys - body = self._http_idp_input() + keys_to_check = list(self.idp_keys) + body = self.default_body.copy() + body['description'] = uuid.uuid4().hex resp = self._create_default_idp(body=body) self.assertValidResponse(resp, 'identity_provider', dummy_validator, keys_to_check=keys_to_check, ref=body) def test_create_idp_remote(self): - """Creates the IdentityProvider entity associated to a remote_id.""" + """Creates the IdentityProvider entity associated to remote_ids.""" keys_to_check = list(self.idp_keys) - keys_to_check.append('remote_id') + keys_to_check.append('remote_ids') body = self.default_body.copy() body['description'] = uuid.uuid4().hex - body['remote_id'] = uuid.uuid4().hex + body['remote_ids'] = [uuid.uuid4().hex, + uuid.uuid4().hex, + uuid.uuid4().hex] resp = self._create_default_idp(body=body) self.assertValidResponse(resp, 'identity_provider', dummy_validator, keys_to_check=keys_to_check, ref=body) + def test_create_idp_remote_repeated(self): + """Creates two IdentityProvider entities with some remote_ids + + A remote_id is the same for both so the second IdP is not + created because of the uniqueness of the remote_ids + + Expect HTTP 409 code for the latter call. + + """ + + body = self.default_body.copy() + repeated_remote_id = uuid.uuid4().hex + body['remote_ids'] = [uuid.uuid4().hex, + uuid.uuid4().hex, + uuid.uuid4().hex, + repeated_remote_id] + self._create_default_idp(body=body) + + url = self.base_url(suffix=uuid.uuid4().hex) + body['remote_ids'] = [uuid.uuid4().hex, + repeated_remote_id] + self.put(url, body={'identity_provider': body}, + expected_status=409) + + def test_create_idp_remote_empty(self): + """Creates an IdP with empty remote_ids.""" + + keys_to_check = list(self.idp_keys) + keys_to_check.append('remote_ids') + body = self.default_body.copy() + body['description'] = uuid.uuid4().hex + body['remote_ids'] = [] + resp = self._create_default_idp(body=body) + self.assertValidResponse(resp, 'identity_provider', dummy_validator, + keys_to_check=keys_to_check, + ref=body) + + def test_create_idp_remote_none(self): + """Creates an IdP with a None remote_ids.""" + + keys_to_check = list(self.idp_keys) + keys_to_check.append('remote_ids') + body = self.default_body.copy() + body['description'] = uuid.uuid4().hex + body['remote_ids'] = None + resp = self._create_default_idp(body=body) + expected = body.copy() + expected['remote_ids'] = [] + self.assertValidResponse(resp, 'identity_provider', dummy_validator, + keys_to_check=keys_to_check, + ref=expected) + + def test_update_idp_remote_ids(self): + """Update IdP's remote_ids parameter.""" + body = self.default_body.copy() + body['remote_ids'] = [uuid.uuid4().hex] + default_resp = self._create_default_idp(body=body) + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + url = self.base_url(suffix=idp_id) + self.assertIsNotNone(idp_id) + + body['remote_ids'] = [uuid.uuid4().hex, uuid.uuid4().hex] + + body = {'identity_provider': body} + resp = self.patch(url, body=body) + updated_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + body = body['identity_provider'] + self.assertEqual(sorted(body['remote_ids']), + sorted(updated_idp.get('remote_ids'))) + + resp = self.get(url) + returned_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + self.assertEqual(sorted(body['remote_ids']), + sorted(returned_idp.get('remote_ids'))) + + def test_update_idp_clean_remote_ids(self): + """Update IdP's remote_ids parameter with an empty list.""" + body = self.default_body.copy() + body['remote_ids'] = [uuid.uuid4().hex] + default_resp = self._create_default_idp(body=body) + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + url = self.base_url(suffix=idp_id) + self.assertIsNotNone(idp_id) + + body['remote_ids'] = [] + + body = {'identity_provider': body} + resp = self.patch(url, body=body) + updated_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + body = body['identity_provider'] + self.assertEqual(sorted(body['remote_ids']), + sorted(updated_idp.get('remote_ids'))) + + resp = self.get(url) + returned_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + self.assertEqual(sorted(body['remote_ids']), + sorted(returned_idp.get('remote_ids'))) + def test_list_idps(self, iterations=5): """Lists all available IdentityProviders. @@ -899,6 +1049,33 @@ class FederatedIdentityProviderTests(FederationTests): self.delete(url) self.get(url, expected_status=404) + def test_delete_idp_also_deletes_assigned_protocols(self): + """Deleting an IdP will delete its assigned protocol.""" + + # create default IdP + default_resp = self._create_default_idp() + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp['id'] + protocol_id = uuid.uuid4().hex + + url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') + idp_url = self.base_url(suffix=idp_id) + + # assign protocol to IdP + kwargs = {'expected_status': 201} + resp, idp_id, proto = self._assign_protocol_to_idp( + url=url, + idp_id=idp_id, + proto=protocol_id, + **kwargs) + + # removing IdP will remove the assigned protocol as well + self.assertEqual(1, len(self.federation_api.list_protocols(idp_id))) + self.delete(idp_url) + self.get(idp_url, expected_status=404) + self.assertEqual(0, len(self.federation_api.list_protocols(idp_id))) + def test_delete_nonexisting_idp(self): """Delete nonexisting IdP. @@ -918,7 +1095,7 @@ class FederatedIdentityProviderTests(FederationTests): self.assertIsNotNone(idp_id) _enabled = not default_idp.get('enabled') - body = {'remote_id': uuid.uuid4().hex, + body = {'remote_ids': [uuid.uuid4().hex, uuid.uuid4().hex], 'description': uuid.uuid4().hex, 'enabled': _enabled} @@ -928,13 +1105,21 @@ class FederatedIdentityProviderTests(FederationTests): 'identity_provider') body = body['identity_provider'] for key in body.keys(): - self.assertEqual(body[key], updated_idp.get(key)) + if isinstance(body[key], list): + self.assertEqual(sorted(body[key]), + sorted(updated_idp.get(key))) + else: + self.assertEqual(body[key], updated_idp.get(key)) resp = self.get(url) updated_idp = self._fetch_attribute_from_response(resp, 'identity_provider') for key in body.keys(): - self.assertEqual(body[key], updated_idp.get(key)) + if isinstance(body[key], list): + self.assertEqual(sorted(body[key]), + sorted(updated_idp.get(key))) + else: + self.assertEqual(body[key], updated_idp.get(key)) def test_update_idp_immutable_attributes(self): """Update IdP's immutable parameters. @@ -1126,7 +1311,7 @@ class MappingCRUDTests(FederationTests): self.assertIsNotNone(entity.get('id')) self.assertIsNotNone(entity.get('rules')) if ref: - self.assertEqual(jsonutils.loads(entity['rules']), ref['rules']) + self.assertEqual(entity['rules'], ref['rules']) return entity def _create_default_mapping_entry(self): @@ -1262,594 +1447,11 @@ class MappingCRUDTests(FederationTests): self.put(url, expected_status=400, body={'mapping': mapping}) -class MappingRuleEngineTests(FederationTests): - """A class for testing the mapping rule engine.""" - - def assertValidMappedUserObject(self, mapped_properties, - user_type='ephemeral', - domain_id=None): - """Check whether mapped properties object has 'user' within. - - According to today's rules, RuleProcessor does not have to issue user's - id or name. What's actually required is user's type and for ephemeral - users that would be service domain named 'Federated'. - """ - self.assertIn('user', mapped_properties, - message='Missing user object in mapped properties') - user = mapped_properties['user'] - self.assertIn('type', user) - self.assertEqual(user_type, user['type']) - self.assertIn('domain', user) - domain = user['domain'] - domain_name_or_id = domain.get('id') or domain.get('name') - domain_ref = domain_id or federation.FEDERATED_DOMAIN_KEYWORD - self.assertEqual(domain_ref, domain_name_or_id) - - def test_rule_engine_any_one_of_and_direct_mapping(self): - """Should return user's name and group id EMPLOYEE_GROUP_ID. - - The ADMIN_ASSERTION should successfully have a match in MAPPING_LARGE. - They will test the case where `any_one_of` is valid, and there is - a direct mapping for the users name. - - """ - - mapping = mapping_fixtures.MAPPING_LARGE - assertion = mapping_fixtures.ADMIN_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - fn = assertion.get('FirstName') - ln = assertion.get('LastName') - full_name = '%s %s' % (fn, ln) - group_ids = values.get('group_ids') - user_name = values.get('user', {}).get('name') - - self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) - self.assertEqual(full_name, user_name) - - def test_rule_engine_no_regex_match(self): - """Should deny authorization, the email of the tester won't match. - - This will not match since the email in the assertion will fail - the regex test. It is set to match any @example.com address. - But the incoming value is set to eviltester@example.org. - RuleProcessor should return list of empty group_ids. - - """ - - mapping = mapping_fixtures.MAPPING_LARGE - assertion = mapping_fixtures.BAD_TESTER_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - - self.assertValidMappedUserObject(mapped_properties) - self.assertIsNone(mapped_properties['user'].get('name')) - self.assertListEqual(list(), mapped_properties['group_ids']) - - def test_rule_engine_regex_many_groups(self): - """Should return group CONTRACTOR_GROUP_ID. - - The TESTER_ASSERTION should successfully have a match in - MAPPING_TESTER_REGEX. This will test the case where many groups - are in the assertion, and a regex value is used to try and find - a match. - - """ - - mapping = mapping_fixtures.MAPPING_TESTER_REGEX - assertion = mapping_fixtures.TESTER_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - self.assertValidMappedUserObject(values) - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertEqual(user_name, name) - self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids) - - def test_rule_engine_any_one_of_many_rules(self): - """Should return group CONTRACTOR_GROUP_ID. - - The CONTRACTOR_ASSERTION should successfully have a match in - MAPPING_SMALL. This will test the case where many rules - must be matched, including an `any_one_of`, and a direct - mapping. - - """ - - mapping = mapping_fixtures.MAPPING_SMALL - assertion = mapping_fixtures.CONTRACTOR_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - self.assertValidMappedUserObject(values) - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertEqual(user_name, name) - self.assertIn(mapping_fixtures.CONTRACTOR_GROUP_ID, group_ids) - - def test_rule_engine_not_any_of_and_direct_mapping(self): - """Should return user's name and email. - - The CUSTOMER_ASSERTION should successfully have a match in - MAPPING_LARGE. This will test the case where a requirement - has `not_any_of`, and direct mapping to a username, no group. - - """ - - mapping = mapping_fixtures.MAPPING_LARGE - assertion = mapping_fixtures.CUSTOMER_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - self.assertValidMappedUserObject(values) - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertEqual(user_name, name) - self.assertEqual([], group_ids,) - - def test_rule_engine_not_any_of_many_rules(self): - """Should return group EMPLOYEE_GROUP_ID. - - The EMPLOYEE_ASSERTION should successfully have a match in - MAPPING_SMALL. This will test the case where many remote - rules must be matched, including a `not_any_of`. - - """ - - mapping = mapping_fixtures.MAPPING_SMALL - assertion = mapping_fixtures.EMPLOYEE_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - self.assertValidMappedUserObject(values) - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertEqual(user_name, name) - self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) - - def test_rule_engine_not_any_of_regex_verify_pass(self): - """Should return group DEVELOPER_GROUP_ID. - - The DEVELOPER_ASSERTION should successfully have a match in - MAPPING_DEVELOPER_REGEX. This will test the case where many - remote rules must be matched, including a `not_any_of`, with - regex set to True. - - """ - - mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX - assertion = mapping_fixtures.DEVELOPER_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - self.assertValidMappedUserObject(values) - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertEqual(user_name, name) - self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids) - - def test_rule_engine_not_any_of_regex_verify_fail(self): - """Should deny authorization. - - The email in the assertion will fail the regex test. - It is set to reject any @example.org address, but the - incoming value is set to evildeveloper@example.org. - RuleProcessor should return list of empty group_ids. - - """ - - mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX - assertion = mapping_fixtures.BAD_DEVELOPER_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - - self.assertValidMappedUserObject(mapped_properties) - self.assertIsNone(mapped_properties['user'].get('name')) - self.assertListEqual(list(), mapped_properties['group_ids']) - - def _rule_engine_regex_match_and_many_groups(self, assertion): - """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID. - - A helper function injecting assertion passed as an argument. - Expect DEVELOPER_GROUP_ID and TESTER_GROUP_ID in the results. - - """ - - mapping = mapping_fixtures.MAPPING_LARGE - rp = mapping_utils.RuleProcessor(mapping['rules']) - values = rp.process(assertion) - - user_name = assertion.get('UserName') - group_ids = values.get('group_ids') - name = values.get('user', {}).get('name') - - self.assertValidMappedUserObject(values) - self.assertEqual(user_name, name) - self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids) - self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids) - - def test_rule_engine_regex_match_and_many_groups(self): - """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID. - - The TESTER_ASSERTION should successfully have a match in - MAPPING_LARGE. This will test a successful regex match - for an `any_one_of` evaluation type, and will have many - groups returned. - - """ - self._rule_engine_regex_match_and_many_groups( - mapping_fixtures.TESTER_ASSERTION) - - def test_rule_engine_discards_nonstring_objects(self): - """Check whether RuleProcessor discards non string objects. - - Despite the fact that assertion is malformed and contains - non string objects, RuleProcessor should correctly discard them and - successfully have a match in MAPPING_LARGE. - - """ - self._rule_engine_regex_match_and_many_groups( - mapping_fixtures.MALFORMED_TESTER_ASSERTION) - - def test_rule_engine_fails_after_discarding_nonstring(self): - """Check whether RuleProcessor discards non string objects. - - Expect RuleProcessor to discard non string object, which - is required for a correct rule match. RuleProcessor will result with - empty list of groups. - - """ - mapping = mapping_fixtures.MAPPING_SMALL - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CONTRACTOR_MALFORMED_ASSERTION - mapped_properties = rp.process(assertion) - self.assertValidMappedUserObject(mapped_properties) - self.assertIsNone(mapped_properties['user'].get('name')) - self.assertListEqual(list(), mapped_properties['group_ids']) - - def test_rule_engine_returns_group_names(self): - """Check whether RuleProcessor returns group names with their domains. - - RuleProcessor should return 'group_names' entry with a list of - dictionaries with two entries 'name' and 'domain' identifying group by - its name and domain. - - """ - mapping = mapping_fixtures.MAPPING_GROUP_NAMES - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.EMPLOYEE_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - reference = { - mapping_fixtures.DEVELOPER_GROUP_NAME: - { - "name": mapping_fixtures.DEVELOPER_GROUP_NAME, - "domain": { - "name": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_NAME - } - }, - mapping_fixtures.TESTER_GROUP_NAME: - { - "name": mapping_fixtures.TESTER_GROUP_NAME, - "domain": { - "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID - } - } - } - for rule in mapped_properties['group_names']: - self.assertDictEqual(reference.get(rule.get('name')), rule) - - def test_rule_engine_whitelist_and_direct_groups_mapping(self): - """Should return user's groups Developer and Contractor. - - The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match - in MAPPING_GROUPS_WHITELIST. It will test the case where 'whitelist' - correctly filters out Manager and only allows Developer and Contractor. - - """ - - mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST - assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - - reference = { - mapping_fixtures.DEVELOPER_GROUP_NAME: - { - "name": mapping_fixtures.DEVELOPER_GROUP_NAME, - "domain": { - "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID - } - }, - mapping_fixtures.CONTRACTOR_GROUP_NAME: - { - "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, - "domain": { - "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID - } - } - } - for rule in mapped_properties['group_names']: - self.assertDictEqual(reference.get(rule.get('name')), rule) - - self.assertEqual('tbo', mapped_properties['user']['name']) - self.assertEqual([], mapped_properties['group_ids']) - - def test_rule_engine_blacklist_and_direct_groups_mapping(self): - """Should return user's group Developer. - - The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match - in MAPPING_GROUPS_BLACKLIST. It will test the case where 'blacklist' - correctly filters out Manager and Developer and only allows Contractor. - - """ - - mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST - assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - - reference = { - mapping_fixtures.CONTRACTOR_GROUP_NAME: - { - "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, - "domain": { - "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID - } - } - } - for rule in mapped_properties['group_names']: - self.assertDictEqual(reference.get(rule.get('name')), rule) - self.assertEqual('tbo', mapped_properties['user']['name']) - self.assertEqual([], mapped_properties['group_ids']) - - def test_rule_engine_blacklist_and_direct_groups_mapping_multiples(self): - """Tests matching multiple values before the blacklist. - - Verifies that the local indexes are correct when matching multiple - remote values for a field when the field occurs before the blacklist - entry in the remote rules. - - """ - - mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MULTIPLES - assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - - reference = { - mapping_fixtures.CONTRACTOR_GROUP_NAME: - { - "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, - "domain": { - "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID - } - } - } - for rule in mapped_properties['group_names']: - self.assertDictEqual(reference.get(rule.get('name')), rule) - self.assertEqual('tbo', mapped_properties['user']['name']) - self.assertEqual([], mapped_properties['group_ids']) - - def test_rule_engine_whitelist_direct_group_mapping_missing_domain(self): - """Test if the local rule is rejected upon missing domain value - - This is a variation with a ``whitelist`` filter. - - """ - mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_MISSING_DOMAIN - assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS - rp = mapping_utils.RuleProcessor(mapping['rules']) - self.assertRaises(exception.ValidationError, rp.process, assertion) - - def test_rule_engine_blacklist_direct_group_mapping_missing_domain(self): - """Test if the local rule is rejected upon missing domain value - - This is a variation with a ``blacklist`` filter. - - """ - mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MISSING_DOMAIN - assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS - rp = mapping_utils.RuleProcessor(mapping['rules']) - self.assertRaises(exception.ValidationError, rp.process, assertion) - - def test_rule_engine_no_groups_allowed(self): - """Should return user mapped to no groups. - - The EMPLOYEE_ASSERTION should successfully have a match - in MAPPING_GROUPS_WHITELIST, but 'whitelist' should filter out - the group values from the assertion and thus map to no groups. - - """ - mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST - assertion = mapping_fixtures.EMPLOYEE_ASSERTION - rp = mapping_utils.RuleProcessor(mapping['rules']) - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertListEqual(mapped_properties['group_names'], []) - self.assertListEqual(mapped_properties['group_ids'], []) - self.assertEqual('tbo', mapped_properties['user']['name']) - - def test_mapping_federated_domain_specified(self): - """Test mapping engine when domain 'ephemeral' is explicitely set. - - For that, we use mapping rule MAPPING_EPHEMERAL_USER and assertion - EMPLOYEE_ASSERTION - - """ - mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.EMPLOYEE_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - - def test_create_user_object_with_bad_mapping(self): - """Test if user object is created even with bad mapping. - - User objects will be created by mapping engine always as long as there - is corresponding local rule. This test shows, that even with assertion - where no group names nor ids are matched, but there is 'blind' rule for - mapping user, such object will be created. - - In this test MAPPING_EHPEMERAL_USER expects UserName set to jsmith - whereas value from assertion is 'tbo'. - - """ - mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CONTRACTOR_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - - self.assertNotIn('id', mapped_properties['user']) - self.assertNotIn('name', mapped_properties['user']) - - def test_set_ephemeral_domain_to_ephemeral_users(self): - """Test auto assigning service domain to ephemeral users. - - Test that ephemeral users will always become members of federated - service domain. The check depends on ``type`` value which must be set - to ``ephemeral`` in case of ephemeral user. - - """ - mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER_LOCAL_DOMAIN - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CONTRACTOR_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - - def test_local_user_local_domain(self): - """Test that local users can have non-service domains assigned.""" - mapping = mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CONTRACTOR_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject( - mapped_properties, user_type='local', - domain_id=mapping_fixtures.LOCAL_DOMAIN) - - def test_user_identifications_name(self): - """Test varius mapping options and how users are identified. - - This test calls mapped.setup_username() for propagating user object. - - Test plan: - - Check if the user has proper domain ('federated') set - - Check if the user has property type set ('ephemeral') - - Check if user's name is properly mapped from the assertion - - Check if user's id is properly set and equal to name, as it was not - explicitely specified in the mapping. - - """ - mapping = mapping_fixtures.MAPPING_USER_IDS - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CONTRACTOR_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - mapped.setup_username({}, mapped_properties) - self.assertEqual('jsmith', mapped_properties['user']['id']) - self.assertEqual('jsmith', mapped_properties['user']['name']) - - def test_user_identifications_name_and_federated_domain(self): - """Test varius mapping options and how users are identified. - - This test calls mapped.setup_username() for propagating user object. - - Test plan: - - Check if the user has proper domain ('federated') set - - Check if the user has propert type set ('ephemeral') - - Check if user's name is properly mapped from the assertion - - Check if user's id is properly set and equal to name, as it was not - explicitely specified in the mapping. - - """ - mapping = mapping_fixtures.MAPPING_USER_IDS - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.EMPLOYEE_ASSERTION - mapped_properties = rp.process(assertion) - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - mapped.setup_username({}, mapped_properties) - self.assertEqual('tbo', mapped_properties['user']['name']) - self.assertEqual('tbo', mapped_properties['user']['id']) - - def test_user_identification_id(self): - """Test varius mapping options and how users are identified. - - This test calls mapped.setup_username() for propagating user object. - - Test plan: - - Check if the user has proper domain ('federated') set - - Check if the user has propert type set ('ephemeral') - - Check if user's id is properly mapped from the assertion - - Check if user's name is properly set and equal to id, as it was not - explicitely specified in the mapping. - - """ - mapping = mapping_fixtures.MAPPING_USER_IDS - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.ADMIN_ASSERTION - mapped_properties = rp.process(assertion) - context = {'environment': {}} - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - mapped.setup_username(context, mapped_properties) - self.assertEqual('bob', mapped_properties['user']['name']) - self.assertEqual('bob', mapped_properties['user']['id']) - - def test_user_identification_id_and_name(self): - """Test varius mapping options and how users are identified. - - This test calls mapped.setup_username() for propagating user object. - - Test plan: - - Check if the user has proper domain ('federated') set - - Check if the user has proper type set ('ephemeral') - - Check if user's name is properly mapped from the assertion - - Check if user's id is properly set and and equal to value hardcoded - in the mapping - - """ - mapping = mapping_fixtures.MAPPING_USER_IDS - rp = mapping_utils.RuleProcessor(mapping['rules']) - assertion = mapping_fixtures.CUSTOMER_ASSERTION - mapped_properties = rp.process(assertion) - context = {'environment': {}} - self.assertIsNotNone(mapped_properties) - self.assertValidMappedUserObject(mapped_properties) - mapped.setup_username(context, mapped_properties) - self.assertEqual('bwilliams', mapped_properties['user']['name']) - self.assertEqual('abc123', mapped_properties['user']['id']) - - class FederatedTokenTests(FederationTests, FederatedSetupMixin): def auth_plugin_config_override(self): methods = ['saml2'] - method_classes = {'saml2': 'keystone.auth.plugins.saml2.Saml2'} - super(FederatedTokenTests, self).auth_plugin_config_override( - methods, **method_classes) + super(FederatedTokenTests, self).auth_plugin_config_override(methods) def setUp(self): super(FederatedTokenTests, self).setUp() @@ -1923,7 +1525,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): def test_issue_unscoped_token_with_remote_no_attribute(self): r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, environment={ - self.REMOTE_ID_ATTR: self.REMOTE_ID + self.REMOTE_ID_ATTR: + self.REMOTE_IDS[0] }) self.assertIsNotNone(r.headers.get('X-Subject-Token')) @@ -1932,7 +1535,18 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): remote_id_attribute=self.REMOTE_ID_ATTR) r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, environment={ - self.REMOTE_ID_ATTR: self.REMOTE_ID + self.REMOTE_ID_ATTR: + self.REMOTE_IDS[0] + }) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_issue_unscoped_token_with_saml2_remote(self): + self.config_fixture.config(group='saml2', + remote_id_attribute=self.REMOTE_ID_ATTR) + r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, + environment={ + self.REMOTE_ID_ATTR: + self.REMOTE_IDS[0] }) self.assertIsNotNone(r.headers.get('X-Subject-Token')) @@ -1946,6 +1560,25 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.REMOTE_ID_ATTR: uuid.uuid4().hex }) + def test_issue_unscoped_token_with_remote_default_overwritten(self): + """Test that protocol remote_id_attribute has higher priority. + + Make sure the parameter stored under ``protocol`` section has higher + priority over parameter from default ``federation`` configuration + section. + + """ + self.config_fixture.config(group='saml2', + remote_id_attribute=self.REMOTE_ID_ATTR) + self.config_fixture.config(group='federation', + remote_id_attribute=uuid.uuid4().hex) + r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, + environment={ + self.REMOTE_ID_ATTR: + self.REMOTE_IDS[0] + }) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + def test_issue_unscoped_token_with_remote_unavailable(self): self.config_fixture.config(group='federation', remote_id_attribute=self.REMOTE_ID_ATTR) @@ -1979,7 +1612,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): context = { 'environment': { 'malformed_object': object(), - 'another_bad_idea': tuple(xrange(10)), + 'another_bad_idea': tuple(range(10)), 'yet_another_bad_param': dict(zip(uuid.uuid4().hex, range(32))) } @@ -2156,6 +1789,44 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): self.assertEqual(projects_ref, projects, 'match failed for url %s' % url) + # TODO(samueldmq): Create another test class for role inheritance tests. + # The advantage would be to reduce the complexity of this test class and + # have tests specific to this fuctionality grouped, easing readability and + # maintenability. + def test_list_projects_for_inherited_project_assignment(self): + # Enable os_inherit extension + self.config_fixture.config(group='os_inherit', enabled=True) + + # Create a subproject + subproject_inherited = self.new_project_ref( + domain_id=self.domainD['id'], + parent_id=self.project_inherited['id']) + self.resource_api.create_project(subproject_inherited['id'], + subproject_inherited) + + # Create an inherited role assignment + self.assignment_api.create_grant( + role_id=self.role_employee['id'], + group_id=self.group_employees['id'], + project_id=self.project_inherited['id'], + inherited_to_projects=True) + + # Define expected projects from employee assertion, which contain + # the created subproject + expected_project_ids = [self.project_all['id'], + self.proj_employees['id'], + subproject_inherited['id']] + + # Assert expected projects for both available URLs + for url in ('/OS-FEDERATION/projects', '/auth/projects'): + r = self.get(url, token=self.tokens['EMPLOYEE_ASSERTION']) + project_ids = [project['id'] for project in r.result['projects']] + + self.assertEqual(len(expected_project_ids), len(project_ids)) + for expected_project_id in expected_project_ids: + self.assertIn(expected_project_id, project_ids, + 'Projects match failed for url %s' % url) + def test_list_domains(self): urls = ('/OS-FEDERATION/domains', '/auth/domains') @@ -2325,7 +1996,6 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): "remote": [ { "type": "REMOTE_USER_GROUPS", - "blacklist": ["noblacklist"] } ] } @@ -2333,10 +2003,290 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin): } self.federation_api.update_mapping(self.mapping['id'], rules) + def test_empty_blacklist_passess_all_values(self): + """Test a mapping with empty blacklist specified + + Not adding a ``blacklist`` keyword to the mapping rules has the same + effect as adding an empty ``blacklist``. + In both cases, the mapping engine will not discard any groups that are + associated with apache environment variables. + + This test checks scenario where an empty blacklist was specified. + Expected result is to allow any value. + + The test scenario is as follows: + - Create group ``EXISTS`` + - Create group ``NO_EXISTS`` + - Set mapping rules for existing IdP with a blacklist + that passes through as REMOTE_USER_GROUPS + - Issue unscoped token with groups ``EXISTS`` and ``NO_EXISTS`` + assigned + + """ + + domain_id = self.domainA['id'] + domain_name = self.domainA['name'] + + # Add a group "EXISTS" + group_exists = self.new_group_ref(domain_id=domain_id) + group_exists['name'] = 'EXISTS' + group_exists = self.identity_api.create_group(group_exists) + + # Add a group "NO_EXISTS" + group_no_exists = self.new_group_ref(domain_id=domain_id) + group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = self.identity_api.create_group(group_no_exists) + + group_ids = set([group_exists['id'], group_no_exists['id']]) + + rules = { + 'rules': [ + { + "local": [ + { + "user": { + "name": "{0}", + "id": "{0}" + } + } + ], + "remote": [ + { + "type": "REMOTE_USER" + } + ] + }, + { + "local": [ + { + "groups": "{0}", + "domain": {"name": domain_name} + } + ], + "remote": [ + { + "type": "REMOTE_USER_GROUPS", + "blacklist": [] + } + ] + } + ] + } + self.federation_api.update_mapping(self.mapping['id'], rules) + r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') + assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] + self.assertEqual(len(group_ids), len(assigned_group_ids)) + for group in assigned_group_ids: + self.assertIn(group['id'], group_ids) + + def test_not_adding_blacklist_passess_all_values(self): + """Test a mapping without blacklist specified. + + Not adding a ``blacklist`` keyword to the mapping rules has the same + effect as adding an empty ``blacklist``. In both cases all values will + be accepted and passed. + + This test checks scenario where an blacklist was not specified. + Expected result is to allow any value. + + The test scenario is as follows: + - Create group ``EXISTS`` + - Create group ``NO_EXISTS`` + - Set mapping rules for existing IdP with a blacklist + that passes through as REMOTE_USER_GROUPS + - Issue unscoped token with on groups ``EXISTS`` and ``NO_EXISTS`` + assigned + + """ + + domain_id = self.domainA['id'] + domain_name = self.domainA['name'] + + # Add a group "EXISTS" + group_exists = self.new_group_ref(domain_id=domain_id) + group_exists['name'] = 'EXISTS' + group_exists = self.identity_api.create_group(group_exists) + + # Add a group "NO_EXISTS" + group_no_exists = self.new_group_ref(domain_id=domain_id) + group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = self.identity_api.create_group(group_no_exists) + + group_ids = set([group_exists['id'], group_no_exists['id']]) + + rules = { + 'rules': [ + { + "local": [ + { + "user": { + "name": "{0}", + "id": "{0}" + } + } + ], + "remote": [ + { + "type": "REMOTE_USER" + } + ] + }, + { + "local": [ + { + "groups": "{0}", + "domain": {"name": domain_name} + } + ], + "remote": [ + { + "type": "REMOTE_USER_GROUPS", + } + ] + } + ] + } + self.federation_api.update_mapping(self.mapping['id'], rules) + r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') + assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] + self.assertEqual(len(group_ids), len(assigned_group_ids)) + for group in assigned_group_ids: + self.assertIn(group['id'], group_ids) + + def test_empty_whitelist_discards_all_values(self): + """Test that empty whitelist blocks all the values + + Not adding a ``whitelist`` keyword to the mapping value is different + than adding empty whitelist. The former case will simply pass all the + values, whereas the latter would discard all the values. + + This test checks scenario where an empty whitelist was specified. + The expected result is that no groups are matched. + + The test scenario is as follows: + - Create group ``EXISTS`` + - Set mapping rules for existing IdP with an empty whitelist + that whould discard any values from the assertion + - Try issuing unscoped token, expect server to raise + ``exception.MissingGroups`` as no groups were matched and ephemeral + user does not have any group assigned. + + """ + domain_id = self.domainA['id'] + domain_name = self.domainA['name'] + group = self.new_group_ref(domain_id=domain_id) + group['name'] = 'EXISTS' + group = self.identity_api.create_group(group) + rules = { + 'rules': [ + { + "local": [ + { + "user": { + "name": "{0}", + "id": "{0}" + } + } + ], + "remote": [ + { + "type": "REMOTE_USER" + } + ] + }, + { + "local": [ + { + "groups": "{0}", + "domain": {"name": domain_name} + } + ], + "remote": [ + { + "type": "REMOTE_USER_GROUPS", + "whitelist": [] + } + ] + } + ] + } + self.federation_api.update_mapping(self.mapping['id'], rules) + + self.assertRaises(exception.MissingGroups, + self._issue_unscoped_token, + assertion='UNMATCHED_GROUP_ASSERTION') + + def test_not_setting_whitelist_accepts_all_values(self): + """Test that not setting whitelist passes + + Not adding a ``whitelist`` keyword to the mapping value is different + than adding empty whitelist. The former case will simply pass all the + values, whereas the latter would discard all the values. + + This test checks a scenario where a ``whitelist`` was not specified. + Expected result is that no groups are ignored. + + The test scenario is as follows: + - Create group ``EXISTS`` + - Set mapping rules for existing IdP with an empty whitelist + that whould discard any values from the assertion + - Issue an unscoped token and make sure ephemeral user is a member of + two groups. + + """ + domain_id = self.domainA['id'] + domain_name = self.domainA['name'] + + # Add a group "EXISTS" + group_exists = self.new_group_ref(domain_id=domain_id) + group_exists['name'] = 'EXISTS' + group_exists = self.identity_api.create_group(group_exists) + + # Add a group "NO_EXISTS" + group_no_exists = self.new_group_ref(domain_id=domain_id) + group_no_exists['name'] = 'NO_EXISTS' + group_no_exists = self.identity_api.create_group(group_no_exists) + + group_ids = set([group_exists['id'], group_no_exists['id']]) + + rules = { + 'rules': [ + { + "local": [ + { + "user": { + "name": "{0}", + "id": "{0}" + } + } + ], + "remote": [ + { + "type": "REMOTE_USER" + } + ] + }, + { + "local": [ + { + "groups": "{0}", + "domain": {"name": domain_name} + } + ], + "remote": [ + { + "type": "REMOTE_USER_GROUPS", + } + ] + } + ] + } + self.federation_api.update_mapping(self.mapping['id'], rules) r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] - self.assertEqual(1, len(assigned_group_ids)) - self.assertEqual(group['id'], assigned_group_ids[0]['id']) + self.assertEqual(len(group_ids), len(assigned_group_ids)) + for group in assigned_group_ids: + self.assertIn(group['id'], group_ids) def test_assertion_prefix_parameter(self): """Test parameters filtering based on the prefix. @@ -2416,27 +2366,24 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): super(FernetFederatedTokenTests, self).load_fixtures(fixtures) self.load_federation_sample_data() + def config_overrides(self): + super(FernetFederatedTokenTests, self).config_overrides() + self.config_fixture.config(group='token', provider='fernet') + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + def auth_plugin_config_override(self): methods = ['saml2', 'token', 'password'] - method_classes = dict( - password='keystone.auth.plugins.password.Password', - token='keystone.auth.plugins.token.Token', - saml2='keystone.auth.plugins.saml2.Saml2') super(FernetFederatedTokenTests, - self).auth_plugin_config_override(methods, **method_classes) - self.config_fixture.config( - group='token', - provider='keystone.token.providers.fernet.Provider') - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + self).auth_plugin_config_override(methods) def test_federated_unscoped_token(self): resp = self._issue_unscoped_token() - self.assertEqual(186, len(resp.headers['X-Subject-Token'])) + self.assertEqual(204, len(resp.headers['X-Subject-Token'])) def test_federated_unscoped_token_with_multiple_groups(self): assertion = 'ANOTHER_CUSTOMER_ASSERTION' resp = self._issue_unscoped_token(assertion=assertion) - self.assertEqual(204, len(resp.headers['X-Subject-Token'])) + self.assertEqual(232, len(resp.headers['X-Subject-Token'])) def test_validate_federated_unscoped_token(self): resp = self._issue_unscoped_token() @@ -2481,11 +2428,8 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests): def auth_plugin_config_override(self): methods = ['saml2', 'token'] - method_classes = dict( - token='keystone.auth.plugins.token.Token', - saml2='keystone.auth.plugins.saml2.Saml2') super(FederatedTokenTests, - self).auth_plugin_config_override(methods, **method_classes) + self).auth_plugin_config_override(methods) class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin): @@ -2520,12 +2464,20 @@ class SAMLGenerationTests(FederationTests): SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers' '/BETA/protocols/saml2/auth') + + ASSERTION_FILE = 'signed_saml2_assertion.xml' + + # The values of the following variables match the attributes values found + # in ASSERTION_FILE ISSUER = 'https://acme.com/FIM/sps/openstack/saml20' RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST' SUBJECT = 'test_user' + SUBJECT_DOMAIN = 'user_domain' ROLES = ['admin', 'member'] PROJECT = 'development' + PROJECT_DOMAIN = 'project_domain' SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2' + ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp' ASSERTION_VERSION = "2.0" SERVICE_PROVDIER_ID = 'ACME' @@ -2535,6 +2487,7 @@ class SAMLGenerationTests(FederationTests): 'enabled': True, 'description': uuid.uuid4().hex, 'sp_url': self.RECIPIENT, + 'relay_state_prefix': CONF.saml.relay_state_prefix, } return ref @@ -2542,9 +2495,11 @@ class SAMLGenerationTests(FederationTests): def setUp(self): super(SAMLGenerationTests, self).setUp() self.signed_assertion = saml2.create_class_from_xml_string( - saml.Assertion, _load_xml('signed_saml2_assertion.xml')) + saml.Assertion, _load_xml(self.ASSERTION_FILE)) self.sp = self.sp_ref() - self.federation_api.create_sp(self.SERVICE_PROVDIER_ID, self.sp) + url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID + self.put(url, body={'service_provider': self.sp}, + expected_status=201) def test_samlize_token_values(self): """Test the SAML generator produces a SAML object. @@ -2558,8 +2513,10 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.ROLES, - self.PROJECT) + self.SUBJECT, + self.SUBJECT_DOMAIN, + self.ROLES, self.PROJECT, + self.PROJECT_DOMAIN) assertion = response.assertion self.assertIsNotNone(assertion) @@ -2571,14 +2528,24 @@ class SAMLGenerationTests(FederationTests): user_attribute = assertion.attribute_statement[0].attribute[0] self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text) - role_attribute = assertion.attribute_statement[0].attribute[1] + user_domain_attribute = ( + assertion.attribute_statement[0].attribute[1]) + self.assertEqual(self.SUBJECT_DOMAIN, + user_domain_attribute.attribute_value[0].text) + + role_attribute = assertion.attribute_statement[0].attribute[2] for attribute_value in role_attribute.attribute_value: self.assertIn(attribute_value.text, self.ROLES) - project_attribute = assertion.attribute_statement[0].attribute[2] + project_attribute = assertion.attribute_statement[0].attribute[3] self.assertEqual(self.PROJECT, project_attribute.attribute_value[0].text) + project_domain_attribute = ( + assertion.attribute_statement[0].attribute[4]) + self.assertEqual(self.PROJECT_DOMAIN, + project_domain_attribute.attribute_value[0].text) + def test_verify_assertion_object(self): """Test that the Assertion object is built properly. @@ -2590,8 +2557,10 @@ class SAMLGenerationTests(FederationTests): side_effect=lambda x: x): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.ROLES, - self.PROJECT) + self.SUBJECT, + self.SUBJECT_DOMAIN, + self.ROLES, self.PROJECT, + self.PROJECT_DOMAIN) assertion = response.assertion self.assertEqual(self.ASSERTION_VERSION, assertion.version) @@ -2607,8 +2576,10 @@ class SAMLGenerationTests(FederationTests): return_value=self.signed_assertion): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.ROLES, - self.PROJECT) + self.SUBJECT, + self.SUBJECT_DOMAIN, + self.ROLES, self.PROJECT, + self.PROJECT_DOMAIN) saml_str = response.to_string() response = etree.fromstring(saml_str) @@ -2621,13 +2592,19 @@ class SAMLGenerationTests(FederationTests): user_attribute = assertion[4][0] self.assertEqual(self.SUBJECT, user_attribute[0].text) - role_attribute = assertion[4][1] + user_domain_attribute = assertion[4][1] + self.assertEqual(self.SUBJECT_DOMAIN, user_domain_attribute[0].text) + + role_attribute = assertion[4][2] for attribute_value in role_attribute: self.assertIn(attribute_value.text, self.ROLES) - project_attribute = assertion[4][2] + project_attribute = assertion[4][3] self.assertEqual(self.PROJECT, project_attribute[0].text) + project_domain_attribute = assertion[4][4] + self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text) + def test_assertion_using_explicit_namespace_prefixes(self): def mocked_subprocess_check_output(*popenargs, **kwargs): # the last option is the assertion file to be signed @@ -2642,8 +2619,10 @@ class SAMLGenerationTests(FederationTests): side_effect=mocked_subprocess_check_output): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.ROLES, - self.PROJECT) + self.SUBJECT, + self.SUBJECT_DOMAIN, + self.ROLES, self.PROJECT, + self.PROJECT_DOMAIN) assertion_xml = response.assertion.to_string() # make sure we have the proper tag and prefix for the assertion # namespace @@ -2666,8 +2645,9 @@ class SAMLGenerationTests(FederationTests): generator = keystone_idp.SAMLGenerator() response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.ROLES, - self.PROJECT) + self.SUBJECT, self.SUBJECT_DOMAIN, + self.ROLES, self.PROJECT, + self.PROJECT_DOMAIN) signature = response.assertion.signature self.assertIsNotNone(signature) @@ -2770,12 +2750,18 @@ class SAMLGenerationTests(FederationTests): user_attribute = assertion[4][0] self.assertIsInstance(user_attribute[0].text, str) - role_attribute = assertion[4][1] + user_domain_attribute = assertion[4][1] + self.assertIsInstance(user_domain_attribute[0].text, str) + + role_attribute = assertion[4][2] self.assertIsInstance(role_attribute[0].text, str) - project_attribute = assertion[4][2] + project_attribute = assertion[4][3] self.assertIsInstance(project_attribute[0].text, str) + project_domain_attribute = assertion[4][4] + self.assertIsInstance(project_domain_attribute[0].text, str) + def test_invalid_scope_body(self): """Test that missing the scope in request body raises an exception. @@ -2839,6 +2825,104 @@ class SAMLGenerationTests(FederationTests): self.SERVICE_PROVDIER_ID) self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404) + def test_generate_ecp_route(self): + """Test that the ECP generation endpoint produces XML. + + The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same + input as the SAML generation endpoint (scoped token ID + Service + Provider ID). + The controller should return a SAML assertion that is wrapped in a + SOAP envelope. + """ + + self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) + token_id = self._fetch_valid_token() + body = self._create_generate_saml_request(token_id, + self.SERVICE_PROVDIER_ID) + + with mock.patch.object(keystone_idp, '_sign_assertion', + return_value=self.signed_assertion): + http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, + response_content_type='text/xml', + expected_status=200) + + env_response = etree.fromstring(http_response.result) + header = env_response[0] + + # Verify the relay state starts with 'ss:mem' + prefix = CONF.saml.relay_state_prefix + self.assertThat(header[0].text, matchers.StartsWith(prefix)) + + # Verify that the content in the body matches the expected assertion + body = env_response[1] + response = body[0] + issuer = response[0] + assertion = response[2] + + self.assertEqual(self.RECIPIENT, response.get('Destination')) + self.assertEqual(self.ISSUER, issuer.text) + + user_attribute = assertion[4][0] + self.assertIsInstance(user_attribute[0].text, str) + + user_domain_attribute = assertion[4][1] + self.assertIsInstance(user_domain_attribute[0].text, str) + + role_attribute = assertion[4][2] + self.assertIsInstance(role_attribute[0].text, str) + + project_attribute = assertion[4][3] + self.assertIsInstance(project_attribute[0].text, str) + + project_domain_attribute = assertion[4][4] + self.assertIsInstance(project_domain_attribute[0].text, str) + + @mock.patch('saml2.create_class_from_xml_string') + @mock.patch('oslo_utils.fileutils.write_to_tempfile') + @mock.patch('subprocess.check_output') + def test__sign_assertion(self, check_output_mock, + write_to_tempfile_mock, create_class_mock): + write_to_tempfile_mock.return_value = 'tmp_path' + check_output_mock.return_value = 'fakeoutput' + + keystone_idp._sign_assertion(self.signed_assertion) + + create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput') + + @mock.patch('oslo_utils.fileutils.write_to_tempfile') + @mock.patch('subprocess.check_output') + def test__sign_assertion_exc(self, check_output_mock, + write_to_tempfile_mock): + # If the command fails the command output is logged. + + write_to_tempfile_mock.return_value = 'tmp_path' + + sample_returncode = 1 + sample_output = self.getUniqueString() + check_output_mock.side_effect = subprocess.CalledProcessError( + returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary, + output=sample_output) + + # FIXME(blk-u): This should raise exception.SAMLSigningError instead, + # but fails with TypeError due to concatenating string to Message, see + # bug 1484735. + self.assertRaises(TypeError, + keystone_idp._sign_assertion, + self.signed_assertion) + + @mock.patch('oslo_utils.fileutils.write_to_tempfile') + def test__sign_assertion_fileutils_exc(self, write_to_tempfile_mock): + exception_msg = 'fake' + write_to_tempfile_mock.side_effect = Exception(exception_msg) + + logger_fixture = self.useFixture(fixtures.LoggerFixture()) + self.assertRaises(exception.SAMLSigningError, + keystone_idp._sign_assertion, + self.signed_assertion) + expected_log = ( + 'Error when signing assertion, reason: %s\n' % exception_msg) + self.assertEqual(expected_log, logger_fixture.output) + class IdPMetadataGenerationTests(FederationTests): """A class for testing Identity Provider Metadata generation.""" @@ -2976,7 +3060,8 @@ class ServiceProviderTests(FederationTests): MEMBER_NAME = 'service_provider' COLLECTION_NAME = 'service_providers' SERVICE_PROVIDER_ID = 'ACME' - SP_KEYS = ['auth_url', 'id', 'enabled', 'description', 'sp_url'] + SP_KEYS = ['auth_url', 'id', 'enabled', 'description', + 'relay_state_prefix', 'sp_url'] def setUp(self): super(FederationTests, self).setUp() @@ -2993,6 +3078,7 @@ class ServiceProviderTests(FederationTests): 'enabled': True, 'description': uuid.uuid4().hex, 'sp_url': 'https://' + uuid.uuid4().hex + '.com', + 'relay_state_prefix': CONF.saml.relay_state_prefix } return ref @@ -3019,6 +3105,29 @@ class ServiceProviderTests(FederationTests): self.assertValidEntity(resp.result['service_provider'], keys_to_check=self.SP_KEYS) + def test_create_sp_relay_state_default(self): + """Create an SP without relay state, should default to `ss:mem`.""" + url = self.base_url(suffix=uuid.uuid4().hex) + sp = self.sp_ref() + del sp['relay_state_prefix'] + resp = self.put(url, body={'service_provider': sp}, + expected_status=201) + sp_result = resp.result['service_provider'] + self.assertEqual(CONF.saml.relay_state_prefix, + sp_result['relay_state_prefix']) + + def test_create_sp_relay_state_non_default(self): + """Create an SP with custom relay state.""" + url = self.base_url(suffix=uuid.uuid4().hex) + sp = self.sp_ref() + non_default_prefix = uuid.uuid4().hex + sp['relay_state_prefix'] = non_default_prefix + resp = self.put(url, body={'service_provider': sp}, + expected_status=201) + sp_result = resp.result['service_provider'] + self.assertEqual(non_default_prefix, + sp_result['relay_state_prefix']) + def test_create_service_provider_fail(self): """Try adding SP object with unallowed attribute.""" url = self.base_url(suffix=uuid.uuid4().hex) @@ -3108,6 +3217,18 @@ class ServiceProviderTests(FederationTests): self.patch(url, body={'service_provider': new_sp_ref}, expected_status=404) + def test_update_sp_relay_state(self): + """Update an SP with custome relay state.""" + new_sp_ref = self.sp_ref() + non_default_prefix = uuid.uuid4().hex + new_sp_ref['relay_state_prefix'] = non_default_prefix + url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) + resp = self.patch(url, body={'service_provider': new_sp_ref}, + expected_status=200) + sp_result = resp.result['service_provider'] + self.assertEqual(non_default_prefix, + sp_result['relay_state_prefix']) + def test_delete_service_provider(self): url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) self.delete(url, expected_status=204) @@ -3125,6 +3246,7 @@ class WebSSOTests(FederatedTokenTests): SSO_TEMPLATE_PATH = os.path.join(core.dirs.etc(), SSO_TEMPLATE_NAME) TRUSTED_DASHBOARD = 'http://horizon.com' ORIGIN = urllib.parse.quote_plus(TRUSTED_DASHBOARD) + PROTOCOL_REMOTE_ID_ATTR = uuid.uuid4().hex def setUp(self): super(WebSSOTests, self).setUp() @@ -3145,7 +3267,19 @@ class WebSSOTests(FederatedTokenTests): self.assertIn(self.TRUSTED_DASHBOARD, resp.body) def test_federated_sso_auth(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID} + environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} + context = {'environment': environment} + query_string = {'origin': self.ORIGIN} + self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) + resp = self.api.federated_sso_auth(context, self.PROTOCOL) + self.assertIn(self.TRUSTED_DASHBOARD, resp.body) + + def test_federated_sso_auth_with_protocol_specific_remote_id(self): + self.config_fixture.config( + group=self.PROTOCOL, + remote_id_attribute=self.PROTOCOL_REMOTE_ID_ATTR) + + environment = {self.PROTOCOL_REMOTE_ID_ATTR: self.REMOTE_IDS[0]} context = {'environment': environment} query_string = {'origin': self.ORIGIN} self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) @@ -3162,7 +3296,7 @@ class WebSSOTests(FederatedTokenTests): context, self.PROTOCOL) def test_federated_sso_missing_query(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID} + environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} context = {'environment': environment} self._inject_assertion(context, 'EMPLOYEE_ASSERTION') self.assertRaises(exception.ValidationError, @@ -3178,7 +3312,7 @@ class WebSSOTests(FederatedTokenTests): context, self.PROTOCOL) def test_federated_sso_untrusted_dashboard(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID} + environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} context = {'environment': environment} query_string = {'origin': uuid.uuid4().hex} self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) @@ -3229,6 +3363,7 @@ class K2KServiceCatalogTests(FederationTests): def sp_response(self, id, ref): ref.pop('enabled') ref.pop('description') + ref.pop('relay_state_prefix') ref['id'] = id return ref @@ -3238,6 +3373,7 @@ class K2KServiceCatalogTests(FederationTests): 'enabled': True, 'description': uuid.uuid4().hex, 'sp_url': uuid.uuid4().hex, + 'relay_state_prefix': CONF.saml.relay_state_prefix, } return ref |