summaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_federation.py
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-09-01 16:03:26 +0200
committerasteroide <thomas.duval@orange.com>2015-09-01 16:04:53 +0200
commit92fd2dbfb672d7b2b1cdfd5dd5cf89f7716b3e12 (patch)
tree7ba22297042019e7363fa1d4ad26d1c32c5908c6 /keystone-moon/keystone/tests/unit/test_v3_federation.py
parent26e753254f3e43399cc76e62892908b7742415e8 (diff)
Update Keystone code from official Github repository with branch Master on 09/01/2015.
Change-Id: I0ff6099e6e2580f87f502002a998bbfe12673498
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_federation.py1450
1 files changed, 793 insertions, 657 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py
index 3b6f4d8b..e646bc0a 100644
--- a/keystone-moon/keystone/tests/unit/test_v3_federation.py
+++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py
@@ -13,26 +13,27 @@
import os
import random
import subprocess
+from testtools import matchers
import uuid
+import fixtures
from lxml import etree
import mock
from oslo_config import cfg
from oslo_log import log
-from oslo_serialization import jsonutils
+from oslo_utils import importutils
from oslotest import mockpatch
import saml2
from saml2 import saml
from saml2 import sigver
-from six.moves import urllib
-import xmldsig
+from six.moves import range, urllib, zip
+xmldsig = importutils.try_import("saml2.xmldsig")
+if not xmldsig:
+ xmldsig = importutils.try_import("xmldsig")
from keystone.auth import controllers as auth_controllers
-from keystone.auth.plugins import mapped
-from keystone.contrib import federation
from keystone.contrib.federation import controllers as federation_controllers
from keystone.contrib.federation import idp as keystone_idp
-from keystone.contrib.federation import utils as mapping_utils
from keystone import exception
from keystone import notifications
from keystone.tests.unit import core
@@ -68,7 +69,7 @@ class FederatedSetupMixin(object):
USER = 'user@ORGANIZATION'
ASSERTION_PREFIX = 'PREFIX_'
IDP_WITH_REMOTE = 'ORG_IDP_REMOTE'
- REMOTE_ID = 'entityID_IDP'
+ REMOTE_IDS = ['entityID_IDP1', 'entityID_IDP2']
REMOTE_ID_ATTR = uuid.uuid4().hex
UNSCOPED_V3_SAML2_REQ = {
@@ -108,14 +109,14 @@ class FederatedSetupMixin(object):
self.assertEqual(token_projects, projects_ref)
def _check_scoped_token_attributes(self, token):
- def xor_project_domain(iterable):
- return sum(('project' in iterable, 'domain' in iterable)) % 2
+ def xor_project_domain(token_keys):
+ return sum(('project' in token_keys, 'domain' in token_keys)) % 2
for obj in ('user', 'catalog', 'expires_at', 'issued_at',
'methods', 'roles'):
self.assertIn(obj, token)
# Check for either project or domain
- if not xor_project_domain(token.keys()):
+ if not xor_project_domain(list(token.keys())):
raise AssertionError("You must specify either"
"project or domain.")
@@ -123,6 +124,10 @@ class FederatedSetupMixin(object):
os_federation = token['user']['OS-FEDERATION']
self.assertEqual(self.IDP, os_federation['identity_provider']['id'])
self.assertEqual(self.PROTOCOL, os_federation['protocol']['id'])
+ self.assertListEqual(sorted(['groups',
+ 'identity_provider',
+ 'protocol']),
+ sorted(os_federation.keys()))
def _issue_unscoped_token(self,
idp=None,
@@ -327,7 +332,8 @@ class FederatedSetupMixin(object):
},
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -336,6 +342,9 @@ class FederatedSetupMixin(object):
'type': 'UserName'
},
{
+ 'type': 'Email',
+ },
+ {
'type': 'orgPersonType',
'any_one_of': [
'Employee'
@@ -352,7 +361,8 @@ class FederatedSetupMixin(object):
},
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -361,6 +371,9 @@ class FederatedSetupMixin(object):
'type': self.ASSERTION_PREFIX + 'UserName'
},
{
+ 'type': self.ASSERTION_PREFIX + 'Email',
+ },
+ {
'type': self.ASSERTION_PREFIX + 'orgPersonType',
'any_one_of': [
'SuperEmployee'
@@ -377,7 +390,8 @@ class FederatedSetupMixin(object):
},
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -386,6 +400,9 @@ class FederatedSetupMixin(object):
'type': 'UserName'
},
{
+ 'type': 'Email'
+ },
+ {
'type': 'orgPersonType',
'any_one_of': [
'Customer'
@@ -413,7 +430,8 @@ class FederatedSetupMixin(object):
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -422,6 +440,9 @@ class FederatedSetupMixin(object):
'type': 'UserName'
},
{
+ 'type': 'Email'
+ },
+ {
'type': 'orgPersonType',
'any_one_of': [
'Admin',
@@ -444,7 +465,8 @@ class FederatedSetupMixin(object):
},
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -453,6 +475,9 @@ class FederatedSetupMixin(object):
'type': 'UserName',
},
{
+ 'type': 'Email',
+ },
+ {
'type': 'FirstName',
'any_one_of': [
'Jill'
@@ -475,7 +500,8 @@ class FederatedSetupMixin(object):
},
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
}
],
@@ -485,6 +511,9 @@ class FederatedSetupMixin(object):
},
{
'type': 'Email',
+ },
+ {
+ 'type': 'Email',
'any_one_of': [
'testacct@example.com'
]
@@ -502,7 +531,8 @@ class FederatedSetupMixin(object):
"local": [
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
},
{
@@ -519,6 +549,9 @@ class FederatedSetupMixin(object):
'type': 'UserName',
},
{
+ 'type': 'Email',
+ },
+ {
"type": "orgPersonType",
"any_one_of": [
"CEO",
@@ -531,7 +564,8 @@ class FederatedSetupMixin(object):
"local": [
{
'user': {
- 'name': '{0}'
+ 'name': '{0}',
+ 'id': '{1}'
}
},
{
@@ -548,6 +582,9 @@ class FederatedSetupMixin(object):
"type": "UserName",
},
{
+ "type": "Email",
+ },
+ {
"type": "orgPersonType",
"any_one_of": [
"Managers"
@@ -559,7 +596,8 @@ class FederatedSetupMixin(object):
"local": [
{
"user": {
- "name": "{0}"
+ "name": "{0}",
+ "id": "{1}"
}
},
{
@@ -576,6 +614,9 @@ class FederatedSetupMixin(object):
"type": "UserName",
},
{
+ "type": "Email",
+ },
+ {
"type": "UserName",
"any_one_of": [
"IamTester"
@@ -639,7 +680,7 @@ class FederatedSetupMixin(object):
self.idp)
# Add IDP with remote
self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE)
- self.idp_with_remote['remote_id'] = self.REMOTE_ID
+ self.idp_with_remote['remote_ids'] = self.REMOTE_IDS
self.federation_api.create_idp(self.idp_with_remote['id'],
self.idp_with_remote)
# Add a mapping
@@ -793,28 +834,137 @@ class FederatedIdentityProviderTests(FederationTests):
return r
def test_create_idp(self):
- """Creates the IdentityProvider entity."""
+ """Creates the IdentityProvider entity associated to remote_ids."""
- keys_to_check = self.idp_keys
- body = self._http_idp_input()
+ keys_to_check = list(self.idp_keys)
+ body = self.default_body.copy()
+ body['description'] = uuid.uuid4().hex
resp = self._create_default_idp(body=body)
self.assertValidResponse(resp, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check,
ref=body)
def test_create_idp_remote(self):
- """Creates the IdentityProvider entity associated to a remote_id."""
+ """Creates the IdentityProvider entity associated to remote_ids."""
keys_to_check = list(self.idp_keys)
- keys_to_check.append('remote_id')
+ keys_to_check.append('remote_ids')
body = self.default_body.copy()
body['description'] = uuid.uuid4().hex
- body['remote_id'] = uuid.uuid4().hex
+ body['remote_ids'] = [uuid.uuid4().hex,
+ uuid.uuid4().hex,
+ uuid.uuid4().hex]
resp = self._create_default_idp(body=body)
self.assertValidResponse(resp, 'identity_provider', dummy_validator,
keys_to_check=keys_to_check,
ref=body)
+ def test_create_idp_remote_repeated(self):
+ """Creates two IdentityProvider entities with some remote_ids
+
+ A remote_id is the same for both so the second IdP is not
+ created because of the uniqueness of the remote_ids
+
+ Expect HTTP 409 code for the latter call.
+
+ """
+
+ body = self.default_body.copy()
+ repeated_remote_id = uuid.uuid4().hex
+ body['remote_ids'] = [uuid.uuid4().hex,
+ uuid.uuid4().hex,
+ uuid.uuid4().hex,
+ repeated_remote_id]
+ self._create_default_idp(body=body)
+
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ body['remote_ids'] = [uuid.uuid4().hex,
+ repeated_remote_id]
+ self.put(url, body={'identity_provider': body},
+ expected_status=409)
+
+ def test_create_idp_remote_empty(self):
+ """Creates an IdP with empty remote_ids."""
+
+ keys_to_check = list(self.idp_keys)
+ keys_to_check.append('remote_ids')
+ body = self.default_body.copy()
+ body['description'] = uuid.uuid4().hex
+ body['remote_ids'] = []
+ resp = self._create_default_idp(body=body)
+ self.assertValidResponse(resp, 'identity_provider', dummy_validator,
+ keys_to_check=keys_to_check,
+ ref=body)
+
+ def test_create_idp_remote_none(self):
+ """Creates an IdP with a None remote_ids."""
+
+ keys_to_check = list(self.idp_keys)
+ keys_to_check.append('remote_ids')
+ body = self.default_body.copy()
+ body['description'] = uuid.uuid4().hex
+ body['remote_ids'] = None
+ resp = self._create_default_idp(body=body)
+ expected = body.copy()
+ expected['remote_ids'] = []
+ self.assertValidResponse(resp, 'identity_provider', dummy_validator,
+ keys_to_check=keys_to_check,
+ ref=expected)
+
+ def test_update_idp_remote_ids(self):
+ """Update IdP's remote_ids parameter."""
+ body = self.default_body.copy()
+ body['remote_ids'] = [uuid.uuid4().hex]
+ default_resp = self._create_default_idp(body=body)
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ url = self.base_url(suffix=idp_id)
+ self.assertIsNotNone(idp_id)
+
+ body['remote_ids'] = [uuid.uuid4().hex, uuid.uuid4().hex]
+
+ body = {'identity_provider': body}
+ resp = self.patch(url, body=body)
+ updated_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ body = body['identity_provider']
+ self.assertEqual(sorted(body['remote_ids']),
+ sorted(updated_idp.get('remote_ids')))
+
+ resp = self.get(url)
+ returned_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ self.assertEqual(sorted(body['remote_ids']),
+ sorted(returned_idp.get('remote_ids')))
+
+ def test_update_idp_clean_remote_ids(self):
+ """Update IdP's remote_ids parameter with an empty list."""
+ body = self.default_body.copy()
+ body['remote_ids'] = [uuid.uuid4().hex]
+ default_resp = self._create_default_idp(body=body)
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ url = self.base_url(suffix=idp_id)
+ self.assertIsNotNone(idp_id)
+
+ body['remote_ids'] = []
+
+ body = {'identity_provider': body}
+ resp = self.patch(url, body=body)
+ updated_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ body = body['identity_provider']
+ self.assertEqual(sorted(body['remote_ids']),
+ sorted(updated_idp.get('remote_ids')))
+
+ resp = self.get(url)
+ returned_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ self.assertEqual(sorted(body['remote_ids']),
+ sorted(returned_idp.get('remote_ids')))
+
def test_list_idps(self, iterations=5):
"""Lists all available IdentityProviders.
@@ -899,6 +1049,33 @@ class FederatedIdentityProviderTests(FederationTests):
self.delete(url)
self.get(url, expected_status=404)
+ def test_delete_idp_also_deletes_assigned_protocols(self):
+ """Deleting an IdP will delete its assigned protocol."""
+
+ # create default IdP
+ default_resp = self._create_default_idp()
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp['id']
+ protocol_id = uuid.uuid4().hex
+
+ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
+ idp_url = self.base_url(suffix=idp_id)
+
+ # assign protocol to IdP
+ kwargs = {'expected_status': 201}
+ resp, idp_id, proto = self._assign_protocol_to_idp(
+ url=url,
+ idp_id=idp_id,
+ proto=protocol_id,
+ **kwargs)
+
+ # removing IdP will remove the assigned protocol as well
+ self.assertEqual(1, len(self.federation_api.list_protocols(idp_id)))
+ self.delete(idp_url)
+ self.get(idp_url, expected_status=404)
+ self.assertEqual(0, len(self.federation_api.list_protocols(idp_id)))
+
def test_delete_nonexisting_idp(self):
"""Delete nonexisting IdP.
@@ -918,7 +1095,7 @@ class FederatedIdentityProviderTests(FederationTests):
self.assertIsNotNone(idp_id)
_enabled = not default_idp.get('enabled')
- body = {'remote_id': uuid.uuid4().hex,
+ body = {'remote_ids': [uuid.uuid4().hex, uuid.uuid4().hex],
'description': uuid.uuid4().hex,
'enabled': _enabled}
@@ -928,13 +1105,21 @@ class FederatedIdentityProviderTests(FederationTests):
'identity_provider')
body = body['identity_provider']
for key in body.keys():
- self.assertEqual(body[key], updated_idp.get(key))
+ if isinstance(body[key], list):
+ self.assertEqual(sorted(body[key]),
+ sorted(updated_idp.get(key)))
+ else:
+ self.assertEqual(body[key], updated_idp.get(key))
resp = self.get(url)
updated_idp = self._fetch_attribute_from_response(resp,
'identity_provider')
for key in body.keys():
- self.assertEqual(body[key], updated_idp.get(key))
+ if isinstance(body[key], list):
+ self.assertEqual(sorted(body[key]),
+ sorted(updated_idp.get(key)))
+ else:
+ self.assertEqual(body[key], updated_idp.get(key))
def test_update_idp_immutable_attributes(self):
"""Update IdP's immutable parameters.
@@ -1126,7 +1311,7 @@ class MappingCRUDTests(FederationTests):
self.assertIsNotNone(entity.get('id'))
self.assertIsNotNone(entity.get('rules'))
if ref:
- self.assertEqual(jsonutils.loads(entity['rules']), ref['rules'])
+ self.assertEqual(entity['rules'], ref['rules'])
return entity
def _create_default_mapping_entry(self):
@@ -1262,594 +1447,11 @@ class MappingCRUDTests(FederationTests):
self.put(url, expected_status=400, body={'mapping': mapping})
-class MappingRuleEngineTests(FederationTests):
- """A class for testing the mapping rule engine."""
-
- def assertValidMappedUserObject(self, mapped_properties,
- user_type='ephemeral',
- domain_id=None):
- """Check whether mapped properties object has 'user' within.
-
- According to today's rules, RuleProcessor does not have to issue user's
- id or name. What's actually required is user's type and for ephemeral
- users that would be service domain named 'Federated'.
- """
- self.assertIn('user', mapped_properties,
- message='Missing user object in mapped properties')
- user = mapped_properties['user']
- self.assertIn('type', user)
- self.assertEqual(user_type, user['type'])
- self.assertIn('domain', user)
- domain = user['domain']
- domain_name_or_id = domain.get('id') or domain.get('name')
- domain_ref = domain_id or federation.FEDERATED_DOMAIN_KEYWORD
- self.assertEqual(domain_ref, domain_name_or_id)
-
- def test_rule_engine_any_one_of_and_direct_mapping(self):
- """Should return user's name and group id EMPLOYEE_GROUP_ID.
-
- The ADMIN_ASSERTION should successfully have a match in MAPPING_LARGE.
- They will test the case where `any_one_of` is valid, and there is
- a direct mapping for the users name.
-
- """
-
- mapping = mapping_fixtures.MAPPING_LARGE
- assertion = mapping_fixtures.ADMIN_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- fn = assertion.get('FirstName')
- ln = assertion.get('LastName')
- full_name = '%s %s' % (fn, ln)
- group_ids = values.get('group_ids')
- user_name = values.get('user', {}).get('name')
-
- self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids)
- self.assertEqual(full_name, user_name)
-
- def test_rule_engine_no_regex_match(self):
- """Should deny authorization, the email of the tester won't match.
-
- This will not match since the email in the assertion will fail
- the regex test. It is set to match any @example.com address.
- But the incoming value is set to eviltester@example.org.
- RuleProcessor should return list of empty group_ids.
-
- """
-
- mapping = mapping_fixtures.MAPPING_LARGE
- assertion = mapping_fixtures.BAD_TESTER_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
-
- self.assertValidMappedUserObject(mapped_properties)
- self.assertIsNone(mapped_properties['user'].get('name'))
- self.assertListEqual(list(), mapped_properties['group_ids'])
-
- def test_rule_engine_regex_many_groups(self):
- """Should return group CONTRACTOR_GROUP_ID.
-
- The TESTER_ASSERTION should successfully have a match in
- MAPPING_TESTER_REGEX. This will test the case where many groups
- are in the assertion, and a regex value is used to try and find
- a match.
-
- """
-
- mapping = mapping_fixtures.MAPPING_TESTER_REGEX
- assertion = mapping_fixtures.TESTER_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- self.assertValidMappedUserObject(values)
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertEqual(user_name, name)
- self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids)
-
- def test_rule_engine_any_one_of_many_rules(self):
- """Should return group CONTRACTOR_GROUP_ID.
-
- The CONTRACTOR_ASSERTION should successfully have a match in
- MAPPING_SMALL. This will test the case where many rules
- must be matched, including an `any_one_of`, and a direct
- mapping.
-
- """
-
- mapping = mapping_fixtures.MAPPING_SMALL
- assertion = mapping_fixtures.CONTRACTOR_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- self.assertValidMappedUserObject(values)
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertEqual(user_name, name)
- self.assertIn(mapping_fixtures.CONTRACTOR_GROUP_ID, group_ids)
-
- def test_rule_engine_not_any_of_and_direct_mapping(self):
- """Should return user's name and email.
-
- The CUSTOMER_ASSERTION should successfully have a match in
- MAPPING_LARGE. This will test the case where a requirement
- has `not_any_of`, and direct mapping to a username, no group.
-
- """
-
- mapping = mapping_fixtures.MAPPING_LARGE
- assertion = mapping_fixtures.CUSTOMER_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- self.assertValidMappedUserObject(values)
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertEqual(user_name, name)
- self.assertEqual([], group_ids,)
-
- def test_rule_engine_not_any_of_many_rules(self):
- """Should return group EMPLOYEE_GROUP_ID.
-
- The EMPLOYEE_ASSERTION should successfully have a match in
- MAPPING_SMALL. This will test the case where many remote
- rules must be matched, including a `not_any_of`.
-
- """
-
- mapping = mapping_fixtures.MAPPING_SMALL
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- self.assertValidMappedUserObject(values)
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertEqual(user_name, name)
- self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids)
-
- def test_rule_engine_not_any_of_regex_verify_pass(self):
- """Should return group DEVELOPER_GROUP_ID.
-
- The DEVELOPER_ASSERTION should successfully have a match in
- MAPPING_DEVELOPER_REGEX. This will test the case where many
- remote rules must be matched, including a `not_any_of`, with
- regex set to True.
-
- """
-
- mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX
- assertion = mapping_fixtures.DEVELOPER_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- self.assertValidMappedUserObject(values)
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertEqual(user_name, name)
- self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids)
-
- def test_rule_engine_not_any_of_regex_verify_fail(self):
- """Should deny authorization.
-
- The email in the assertion will fail the regex test.
- It is set to reject any @example.org address, but the
- incoming value is set to evildeveloper@example.org.
- RuleProcessor should return list of empty group_ids.
-
- """
-
- mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX
- assertion = mapping_fixtures.BAD_DEVELOPER_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
-
- self.assertValidMappedUserObject(mapped_properties)
- self.assertIsNone(mapped_properties['user'].get('name'))
- self.assertListEqual(list(), mapped_properties['group_ids'])
-
- def _rule_engine_regex_match_and_many_groups(self, assertion):
- """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID.
-
- A helper function injecting assertion passed as an argument.
- Expect DEVELOPER_GROUP_ID and TESTER_GROUP_ID in the results.
-
- """
-
- mapping = mapping_fixtures.MAPPING_LARGE
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- values = rp.process(assertion)
-
- user_name = assertion.get('UserName')
- group_ids = values.get('group_ids')
- name = values.get('user', {}).get('name')
-
- self.assertValidMappedUserObject(values)
- self.assertEqual(user_name, name)
- self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids)
- self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids)
-
- def test_rule_engine_regex_match_and_many_groups(self):
- """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID.
-
- The TESTER_ASSERTION should successfully have a match in
- MAPPING_LARGE. This will test a successful regex match
- for an `any_one_of` evaluation type, and will have many
- groups returned.
-
- """
- self._rule_engine_regex_match_and_many_groups(
- mapping_fixtures.TESTER_ASSERTION)
-
- def test_rule_engine_discards_nonstring_objects(self):
- """Check whether RuleProcessor discards non string objects.
-
- Despite the fact that assertion is malformed and contains
- non string objects, RuleProcessor should correctly discard them and
- successfully have a match in MAPPING_LARGE.
-
- """
- self._rule_engine_regex_match_and_many_groups(
- mapping_fixtures.MALFORMED_TESTER_ASSERTION)
-
- def test_rule_engine_fails_after_discarding_nonstring(self):
- """Check whether RuleProcessor discards non string objects.
-
- Expect RuleProcessor to discard non string object, which
- is required for a correct rule match. RuleProcessor will result with
- empty list of groups.
-
- """
- mapping = mapping_fixtures.MAPPING_SMALL
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CONTRACTOR_MALFORMED_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertValidMappedUserObject(mapped_properties)
- self.assertIsNone(mapped_properties['user'].get('name'))
- self.assertListEqual(list(), mapped_properties['group_ids'])
-
- def test_rule_engine_returns_group_names(self):
- """Check whether RuleProcessor returns group names with their domains.
-
- RuleProcessor should return 'group_names' entry with a list of
- dictionaries with two entries 'name' and 'domain' identifying group by
- its name and domain.
-
- """
- mapping = mapping_fixtures.MAPPING_GROUP_NAMES
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
- reference = {
- mapping_fixtures.DEVELOPER_GROUP_NAME:
- {
- "name": mapping_fixtures.DEVELOPER_GROUP_NAME,
- "domain": {
- "name": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_NAME
- }
- },
- mapping_fixtures.TESTER_GROUP_NAME:
- {
- "name": mapping_fixtures.TESTER_GROUP_NAME,
- "domain": {
- "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
- }
- }
- }
- for rule in mapped_properties['group_names']:
- self.assertDictEqual(reference.get(rule.get('name')), rule)
-
- def test_rule_engine_whitelist_and_direct_groups_mapping(self):
- """Should return user's groups Developer and Contractor.
-
- The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match
- in MAPPING_GROUPS_WHITELIST. It will test the case where 'whitelist'
- correctly filters out Manager and only allows Developer and Contractor.
-
- """
-
- mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
-
- reference = {
- mapping_fixtures.DEVELOPER_GROUP_NAME:
- {
- "name": mapping_fixtures.DEVELOPER_GROUP_NAME,
- "domain": {
- "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
- }
- },
- mapping_fixtures.CONTRACTOR_GROUP_NAME:
- {
- "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
- "domain": {
- "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
- }
- }
- }
- for rule in mapped_properties['group_names']:
- self.assertDictEqual(reference.get(rule.get('name')), rule)
-
- self.assertEqual('tbo', mapped_properties['user']['name'])
- self.assertEqual([], mapped_properties['group_ids'])
-
- def test_rule_engine_blacklist_and_direct_groups_mapping(self):
- """Should return user's group Developer.
-
- The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match
- in MAPPING_GROUPS_BLACKLIST. It will test the case where 'blacklist'
- correctly filters out Manager and Developer and only allows Contractor.
-
- """
-
- mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
-
- reference = {
- mapping_fixtures.CONTRACTOR_GROUP_NAME:
- {
- "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
- "domain": {
- "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
- }
- }
- }
- for rule in mapped_properties['group_names']:
- self.assertDictEqual(reference.get(rule.get('name')), rule)
- self.assertEqual('tbo', mapped_properties['user']['name'])
- self.assertEqual([], mapped_properties['group_ids'])
-
- def test_rule_engine_blacklist_and_direct_groups_mapping_multiples(self):
- """Tests matching multiple values before the blacklist.
-
- Verifies that the local indexes are correct when matching multiple
- remote values for a field when the field occurs before the blacklist
- entry in the remote rules.
-
- """
-
- mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MULTIPLES
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
-
- reference = {
- mapping_fixtures.CONTRACTOR_GROUP_NAME:
- {
- "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
- "domain": {
- "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
- }
- }
- }
- for rule in mapped_properties['group_names']:
- self.assertDictEqual(reference.get(rule.get('name')), rule)
- self.assertEqual('tbo', mapped_properties['user']['name'])
- self.assertEqual([], mapped_properties['group_ids'])
-
- def test_rule_engine_whitelist_direct_group_mapping_missing_domain(self):
- """Test if the local rule is rejected upon missing domain value
-
- This is a variation with a ``whitelist`` filter.
-
- """
- mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_MISSING_DOMAIN
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- self.assertRaises(exception.ValidationError, rp.process, assertion)
-
- def test_rule_engine_blacklist_direct_group_mapping_missing_domain(self):
- """Test if the local rule is rejected upon missing domain value
-
- This is a variation with a ``blacklist`` filter.
-
- """
- mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MISSING_DOMAIN
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- self.assertRaises(exception.ValidationError, rp.process, assertion)
-
- def test_rule_engine_no_groups_allowed(self):
- """Should return user mapped to no groups.
-
- The EMPLOYEE_ASSERTION should successfully have a match
- in MAPPING_GROUPS_WHITELIST, but 'whitelist' should filter out
- the group values from the assertion and thus map to no groups.
-
- """
- mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertListEqual(mapped_properties['group_names'], [])
- self.assertListEqual(mapped_properties['group_ids'], [])
- self.assertEqual('tbo', mapped_properties['user']['name'])
-
- def test_mapping_federated_domain_specified(self):
- """Test mapping engine when domain 'ephemeral' is explicitely set.
-
- For that, we use mapping rule MAPPING_EPHEMERAL_USER and assertion
- EMPLOYEE_ASSERTION
-
- """
- mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
-
- def test_create_user_object_with_bad_mapping(self):
- """Test if user object is created even with bad mapping.
-
- User objects will be created by mapping engine always as long as there
- is corresponding local rule. This test shows, that even with assertion
- where no group names nor ids are matched, but there is 'blind' rule for
- mapping user, such object will be created.
-
- In this test MAPPING_EHPEMERAL_USER expects UserName set to jsmith
- whereas value from assertion is 'tbo'.
-
- """
- mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CONTRACTOR_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
-
- self.assertNotIn('id', mapped_properties['user'])
- self.assertNotIn('name', mapped_properties['user'])
-
- def test_set_ephemeral_domain_to_ephemeral_users(self):
- """Test auto assigning service domain to ephemeral users.
-
- Test that ephemeral users will always become members of federated
- service domain. The check depends on ``type`` value which must be set
- to ``ephemeral`` in case of ephemeral user.
-
- """
- mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER_LOCAL_DOMAIN
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CONTRACTOR_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
-
- def test_local_user_local_domain(self):
- """Test that local users can have non-service domains assigned."""
- mapping = mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CONTRACTOR_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(
- mapped_properties, user_type='local',
- domain_id=mapping_fixtures.LOCAL_DOMAIN)
-
- def test_user_identifications_name(self):
- """Test varius mapping options and how users are identified.
-
- This test calls mapped.setup_username() for propagating user object.
-
- Test plan:
- - Check if the user has proper domain ('federated') set
- - Check if the user has property type set ('ephemeral')
- - Check if user's name is properly mapped from the assertion
- - Check if user's id is properly set and equal to name, as it was not
- explicitely specified in the mapping.
-
- """
- mapping = mapping_fixtures.MAPPING_USER_IDS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CONTRACTOR_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
- mapped.setup_username({}, mapped_properties)
- self.assertEqual('jsmith', mapped_properties['user']['id'])
- self.assertEqual('jsmith', mapped_properties['user']['name'])
-
- def test_user_identifications_name_and_federated_domain(self):
- """Test varius mapping options and how users are identified.
-
- This test calls mapped.setup_username() for propagating user object.
-
- Test plan:
- - Check if the user has proper domain ('federated') set
- - Check if the user has propert type set ('ephemeral')
- - Check if user's name is properly mapped from the assertion
- - Check if user's id is properly set and equal to name, as it was not
- explicitely specified in the mapping.
-
- """
- mapping = mapping_fixtures.MAPPING_USER_IDS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.EMPLOYEE_ASSERTION
- mapped_properties = rp.process(assertion)
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
- mapped.setup_username({}, mapped_properties)
- self.assertEqual('tbo', mapped_properties['user']['name'])
- self.assertEqual('tbo', mapped_properties['user']['id'])
-
- def test_user_identification_id(self):
- """Test varius mapping options and how users are identified.
-
- This test calls mapped.setup_username() for propagating user object.
-
- Test plan:
- - Check if the user has proper domain ('federated') set
- - Check if the user has propert type set ('ephemeral')
- - Check if user's id is properly mapped from the assertion
- - Check if user's name is properly set and equal to id, as it was not
- explicitely specified in the mapping.
-
- """
- mapping = mapping_fixtures.MAPPING_USER_IDS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.ADMIN_ASSERTION
- mapped_properties = rp.process(assertion)
- context = {'environment': {}}
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
- mapped.setup_username(context, mapped_properties)
- self.assertEqual('bob', mapped_properties['user']['name'])
- self.assertEqual('bob', mapped_properties['user']['id'])
-
- def test_user_identification_id_and_name(self):
- """Test varius mapping options and how users are identified.
-
- This test calls mapped.setup_username() for propagating user object.
-
- Test plan:
- - Check if the user has proper domain ('federated') set
- - Check if the user has proper type set ('ephemeral')
- - Check if user's name is properly mapped from the assertion
- - Check if user's id is properly set and and equal to value hardcoded
- in the mapping
-
- """
- mapping = mapping_fixtures.MAPPING_USER_IDS
- rp = mapping_utils.RuleProcessor(mapping['rules'])
- assertion = mapping_fixtures.CUSTOMER_ASSERTION
- mapped_properties = rp.process(assertion)
- context = {'environment': {}}
- self.assertIsNotNone(mapped_properties)
- self.assertValidMappedUserObject(mapped_properties)
- mapped.setup_username(context, mapped_properties)
- self.assertEqual('bwilliams', mapped_properties['user']['name'])
- self.assertEqual('abc123', mapped_properties['user']['id'])
-
-
class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def auth_plugin_config_override(self):
methods = ['saml2']
- method_classes = {'saml2': 'keystone.auth.plugins.saml2.Saml2'}
- super(FederatedTokenTests, self).auth_plugin_config_override(
- methods, **method_classes)
+ super(FederatedTokenTests, self).auth_plugin_config_override(methods)
def setUp(self):
super(FederatedTokenTests, self).setUp()
@@ -1923,7 +1525,8 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
def test_issue_unscoped_token_with_remote_no_attribute(self):
r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
environment={
- self.REMOTE_ID_ATTR: self.REMOTE_ID
+ self.REMOTE_ID_ATTR:
+ self.REMOTE_IDS[0]
})
self.assertIsNotNone(r.headers.get('X-Subject-Token'))
@@ -1932,7 +1535,18 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
remote_id_attribute=self.REMOTE_ID_ATTR)
r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
environment={
- self.REMOTE_ID_ATTR: self.REMOTE_ID
+ self.REMOTE_ID_ATTR:
+ self.REMOTE_IDS[0]
+ })
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_issue_unscoped_token_with_saml2_remote(self):
+ self.config_fixture.config(group='saml2',
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+ r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
+ environment={
+ self.REMOTE_ID_ATTR:
+ self.REMOTE_IDS[0]
})
self.assertIsNotNone(r.headers.get('X-Subject-Token'))
@@ -1946,6 +1560,25 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.REMOTE_ID_ATTR: uuid.uuid4().hex
})
+ def test_issue_unscoped_token_with_remote_default_overwritten(self):
+ """Test that protocol remote_id_attribute has higher priority.
+
+ Make sure the parameter stored under ``protocol`` section has higher
+ priority over parameter from default ``federation`` configuration
+ section.
+
+ """
+ self.config_fixture.config(group='saml2',
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+ self.config_fixture.config(group='federation',
+ remote_id_attribute=uuid.uuid4().hex)
+ r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
+ environment={
+ self.REMOTE_ID_ATTR:
+ self.REMOTE_IDS[0]
+ })
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
def test_issue_unscoped_token_with_remote_unavailable(self):
self.config_fixture.config(group='federation',
remote_id_attribute=self.REMOTE_ID_ATTR)
@@ -1979,7 +1612,7 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
context = {
'environment': {
'malformed_object': object(),
- 'another_bad_idea': tuple(xrange(10)),
+ 'another_bad_idea': tuple(range(10)),
'yet_another_bad_param': dict(zip(uuid.uuid4().hex,
range(32)))
}
@@ -2156,6 +1789,44 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
self.assertEqual(projects_ref, projects,
'match failed for url %s' % url)
+ # TODO(samueldmq): Create another test class for role inheritance tests.
+ # The advantage would be to reduce the complexity of this test class and
+ # have tests specific to this fuctionality grouped, easing readability and
+ # maintenability.
+ def test_list_projects_for_inherited_project_assignment(self):
+ # Enable os_inherit extension
+ self.config_fixture.config(group='os_inherit', enabled=True)
+
+ # Create a subproject
+ subproject_inherited = self.new_project_ref(
+ domain_id=self.domainD['id'],
+ parent_id=self.project_inherited['id'])
+ self.resource_api.create_project(subproject_inherited['id'],
+ subproject_inherited)
+
+ # Create an inherited role assignment
+ self.assignment_api.create_grant(
+ role_id=self.role_employee['id'],
+ group_id=self.group_employees['id'],
+ project_id=self.project_inherited['id'],
+ inherited_to_projects=True)
+
+ # Define expected projects from employee assertion, which contain
+ # the created subproject
+ expected_project_ids = [self.project_all['id'],
+ self.proj_employees['id'],
+ subproject_inherited['id']]
+
+ # Assert expected projects for both available URLs
+ for url in ('/OS-FEDERATION/projects', '/auth/projects'):
+ r = self.get(url, token=self.tokens['EMPLOYEE_ASSERTION'])
+ project_ids = [project['id'] for project in r.result['projects']]
+
+ self.assertEqual(len(expected_project_ids), len(project_ids))
+ for expected_project_id in expected_project_ids:
+ self.assertIn(expected_project_id, project_ids,
+ 'Projects match failed for url %s' % url)
+
def test_list_domains(self):
urls = ('/OS-FEDERATION/domains', '/auth/domains')
@@ -2325,7 +1996,6 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
"remote": [
{
"type": "REMOTE_USER_GROUPS",
- "blacklist": ["noblacklist"]
}
]
}
@@ -2333,10 +2003,290 @@ class FederatedTokenTests(FederationTests, FederatedSetupMixin):
}
self.federation_api.update_mapping(self.mapping['id'], rules)
+ def test_empty_blacklist_passess_all_values(self):
+ """Test a mapping with empty blacklist specified
+
+ Not adding a ``blacklist`` keyword to the mapping rules has the same
+ effect as adding an empty ``blacklist``.
+ In both cases, the mapping engine will not discard any groups that are
+ associated with apache environment variables.
+
+ This test checks scenario where an empty blacklist was specified.
+ Expected result is to allow any value.
+
+ The test scenario is as follows:
+ - Create group ``EXISTS``
+ - Create group ``NO_EXISTS``
+ - Set mapping rules for existing IdP with a blacklist
+ that passes through as REMOTE_USER_GROUPS
+ - Issue unscoped token with groups ``EXISTS`` and ``NO_EXISTS``
+ assigned
+
+ """
+
+ domain_id = self.domainA['id']
+ domain_name = self.domainA['name']
+
+ # Add a group "EXISTS"
+ group_exists = self.new_group_ref(domain_id=domain_id)
+ group_exists['name'] = 'EXISTS'
+ group_exists = self.identity_api.create_group(group_exists)
+
+ # Add a group "NO_EXISTS"
+ group_no_exists = self.new_group_ref(domain_id=domain_id)
+ group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = self.identity_api.create_group(group_no_exists)
+
+ group_ids = set([group_exists['id'], group_no_exists['id']])
+
+ rules = {
+ 'rules': [
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}",
+ "id": "{0}"
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER"
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "groups": "{0}",
+ "domain": {"name": domain_name}
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER_GROUPS",
+ "blacklist": []
+ }
+ ]
+ }
+ ]
+ }
+ self.federation_api.update_mapping(self.mapping['id'], rules)
+ r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
+ assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups']
+ self.assertEqual(len(group_ids), len(assigned_group_ids))
+ for group in assigned_group_ids:
+ self.assertIn(group['id'], group_ids)
+
+ def test_not_adding_blacklist_passess_all_values(self):
+ """Test a mapping without blacklist specified.
+
+ Not adding a ``blacklist`` keyword to the mapping rules has the same
+ effect as adding an empty ``blacklist``. In both cases all values will
+ be accepted and passed.
+
+ This test checks scenario where an blacklist was not specified.
+ Expected result is to allow any value.
+
+ The test scenario is as follows:
+ - Create group ``EXISTS``
+ - Create group ``NO_EXISTS``
+ - Set mapping rules for existing IdP with a blacklist
+ that passes through as REMOTE_USER_GROUPS
+ - Issue unscoped token with on groups ``EXISTS`` and ``NO_EXISTS``
+ assigned
+
+ """
+
+ domain_id = self.domainA['id']
+ domain_name = self.domainA['name']
+
+ # Add a group "EXISTS"
+ group_exists = self.new_group_ref(domain_id=domain_id)
+ group_exists['name'] = 'EXISTS'
+ group_exists = self.identity_api.create_group(group_exists)
+
+ # Add a group "NO_EXISTS"
+ group_no_exists = self.new_group_ref(domain_id=domain_id)
+ group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = self.identity_api.create_group(group_no_exists)
+
+ group_ids = set([group_exists['id'], group_no_exists['id']])
+
+ rules = {
+ 'rules': [
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}",
+ "id": "{0}"
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER"
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "groups": "{0}",
+ "domain": {"name": domain_name}
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER_GROUPS",
+ }
+ ]
+ }
+ ]
+ }
+ self.federation_api.update_mapping(self.mapping['id'], rules)
+ r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
+ assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups']
+ self.assertEqual(len(group_ids), len(assigned_group_ids))
+ for group in assigned_group_ids:
+ self.assertIn(group['id'], group_ids)
+
+ def test_empty_whitelist_discards_all_values(self):
+ """Test that empty whitelist blocks all the values
+
+ Not adding a ``whitelist`` keyword to the mapping value is different
+ than adding empty whitelist. The former case will simply pass all the
+ values, whereas the latter would discard all the values.
+
+ This test checks scenario where an empty whitelist was specified.
+ The expected result is that no groups are matched.
+
+ The test scenario is as follows:
+ - Create group ``EXISTS``
+ - Set mapping rules for existing IdP with an empty whitelist
+ that whould discard any values from the assertion
+ - Try issuing unscoped token, expect server to raise
+ ``exception.MissingGroups`` as no groups were matched and ephemeral
+ user does not have any group assigned.
+
+ """
+ domain_id = self.domainA['id']
+ domain_name = self.domainA['name']
+ group = self.new_group_ref(domain_id=domain_id)
+ group['name'] = 'EXISTS'
+ group = self.identity_api.create_group(group)
+ rules = {
+ 'rules': [
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}",
+ "id": "{0}"
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER"
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "groups": "{0}",
+ "domain": {"name": domain_name}
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER_GROUPS",
+ "whitelist": []
+ }
+ ]
+ }
+ ]
+ }
+ self.federation_api.update_mapping(self.mapping['id'], rules)
+
+ self.assertRaises(exception.MissingGroups,
+ self._issue_unscoped_token,
+ assertion='UNMATCHED_GROUP_ASSERTION')
+
+ def test_not_setting_whitelist_accepts_all_values(self):
+ """Test that not setting whitelist passes
+
+ Not adding a ``whitelist`` keyword to the mapping value is different
+ than adding empty whitelist. The former case will simply pass all the
+ values, whereas the latter would discard all the values.
+
+ This test checks a scenario where a ``whitelist`` was not specified.
+ Expected result is that no groups are ignored.
+
+ The test scenario is as follows:
+ - Create group ``EXISTS``
+ - Set mapping rules for existing IdP with an empty whitelist
+ that whould discard any values from the assertion
+ - Issue an unscoped token and make sure ephemeral user is a member of
+ two groups.
+
+ """
+ domain_id = self.domainA['id']
+ domain_name = self.domainA['name']
+
+ # Add a group "EXISTS"
+ group_exists = self.new_group_ref(domain_id=domain_id)
+ group_exists['name'] = 'EXISTS'
+ group_exists = self.identity_api.create_group(group_exists)
+
+ # Add a group "NO_EXISTS"
+ group_no_exists = self.new_group_ref(domain_id=domain_id)
+ group_no_exists['name'] = 'NO_EXISTS'
+ group_no_exists = self.identity_api.create_group(group_no_exists)
+
+ group_ids = set([group_exists['id'], group_no_exists['id']])
+
+ rules = {
+ 'rules': [
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}",
+ "id": "{0}"
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER"
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "groups": "{0}",
+ "domain": {"name": domain_name}
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER_GROUPS",
+ }
+ ]
+ }
+ ]
+ }
+ self.federation_api.update_mapping(self.mapping['id'], rules)
r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups']
- self.assertEqual(1, len(assigned_group_ids))
- self.assertEqual(group['id'], assigned_group_ids[0]['id'])
+ self.assertEqual(len(group_ids), len(assigned_group_ids))
+ for group in assigned_group_ids:
+ self.assertIn(group['id'], group_ids)
def test_assertion_prefix_parameter(self):
"""Test parameters filtering based on the prefix.
@@ -2416,27 +2366,24 @@ class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
super(FernetFederatedTokenTests, self).load_fixtures(fixtures)
self.load_federation_sample_data()
+ def config_overrides(self):
+ super(FernetFederatedTokenTests, self).config_overrides()
+ self.config_fixture.config(group='token', provider='fernet')
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+
def auth_plugin_config_override(self):
methods = ['saml2', 'token', 'password']
- method_classes = dict(
- password='keystone.auth.plugins.password.Password',
- token='keystone.auth.plugins.token.Token',
- saml2='keystone.auth.plugins.saml2.Saml2')
super(FernetFederatedTokenTests,
- self).auth_plugin_config_override(methods, **method_classes)
- self.config_fixture.config(
- group='token',
- provider='keystone.token.providers.fernet.Provider')
- self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+ self).auth_plugin_config_override(methods)
def test_federated_unscoped_token(self):
resp = self._issue_unscoped_token()
- self.assertEqual(186, len(resp.headers['X-Subject-Token']))
+ self.assertEqual(204, len(resp.headers['X-Subject-Token']))
def test_federated_unscoped_token_with_multiple_groups(self):
assertion = 'ANOTHER_CUSTOMER_ASSERTION'
resp = self._issue_unscoped_token(assertion=assertion)
- self.assertEqual(204, len(resp.headers['X-Subject-Token']))
+ self.assertEqual(232, len(resp.headers['X-Subject-Token']))
def test_validate_federated_unscoped_token(self):
resp = self._issue_unscoped_token()
@@ -2481,11 +2428,8 @@ class FederatedTokenTestsMethodToken(FederatedTokenTests):
def auth_plugin_config_override(self):
methods = ['saml2', 'token']
- method_classes = dict(
- token='keystone.auth.plugins.token.Token',
- saml2='keystone.auth.plugins.saml2.Saml2')
super(FederatedTokenTests,
- self).auth_plugin_config_override(methods, **method_classes)
+ self).auth_plugin_config_override(methods)
class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin):
@@ -2520,12 +2464,20 @@ class SAMLGenerationTests(FederationTests):
SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers'
'/BETA/protocols/saml2/auth')
+
+ ASSERTION_FILE = 'signed_saml2_assertion.xml'
+
+ # The values of the following variables match the attributes values found
+ # in ASSERTION_FILE
ISSUER = 'https://acme.com/FIM/sps/openstack/saml20'
RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST'
SUBJECT = 'test_user'
+ SUBJECT_DOMAIN = 'user_domain'
ROLES = ['admin', 'member']
PROJECT = 'development'
+ PROJECT_DOMAIN = 'project_domain'
SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2'
+ ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp'
ASSERTION_VERSION = "2.0"
SERVICE_PROVDIER_ID = 'ACME'
@@ -2535,6 +2487,7 @@ class SAMLGenerationTests(FederationTests):
'enabled': True,
'description': uuid.uuid4().hex,
'sp_url': self.RECIPIENT,
+ 'relay_state_prefix': CONF.saml.relay_state_prefix,
}
return ref
@@ -2542,9 +2495,11 @@ class SAMLGenerationTests(FederationTests):
def setUp(self):
super(SAMLGenerationTests, self).setUp()
self.signed_assertion = saml2.create_class_from_xml_string(
- saml.Assertion, _load_xml('signed_saml2_assertion.xml'))
+ saml.Assertion, _load_xml(self.ASSERTION_FILE))
self.sp = self.sp_ref()
- self.federation_api.create_sp(self.SERVICE_PROVDIER_ID, self.sp)
+ url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID
+ self.put(url, body={'service_provider': self.sp},
+ expected_status=201)
def test_samlize_token_values(self):
"""Test the SAML generator produces a SAML object.
@@ -2558,8 +2513,10 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
- self.SUBJECT, self.ROLES,
- self.PROJECT)
+ self.SUBJECT,
+ self.SUBJECT_DOMAIN,
+ self.ROLES, self.PROJECT,
+ self.PROJECT_DOMAIN)
assertion = response.assertion
self.assertIsNotNone(assertion)
@@ -2571,14 +2528,24 @@ class SAMLGenerationTests(FederationTests):
user_attribute = assertion.attribute_statement[0].attribute[0]
self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text)
- role_attribute = assertion.attribute_statement[0].attribute[1]
+ user_domain_attribute = (
+ assertion.attribute_statement[0].attribute[1])
+ self.assertEqual(self.SUBJECT_DOMAIN,
+ user_domain_attribute.attribute_value[0].text)
+
+ role_attribute = assertion.attribute_statement[0].attribute[2]
for attribute_value in role_attribute.attribute_value:
self.assertIn(attribute_value.text, self.ROLES)
- project_attribute = assertion.attribute_statement[0].attribute[2]
+ project_attribute = assertion.attribute_statement[0].attribute[3]
self.assertEqual(self.PROJECT,
project_attribute.attribute_value[0].text)
+ project_domain_attribute = (
+ assertion.attribute_statement[0].attribute[4])
+ self.assertEqual(self.PROJECT_DOMAIN,
+ project_domain_attribute.attribute_value[0].text)
+
def test_verify_assertion_object(self):
"""Test that the Assertion object is built properly.
@@ -2590,8 +2557,10 @@ class SAMLGenerationTests(FederationTests):
side_effect=lambda x: x):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
- self.SUBJECT, self.ROLES,
- self.PROJECT)
+ self.SUBJECT,
+ self.SUBJECT_DOMAIN,
+ self.ROLES, self.PROJECT,
+ self.PROJECT_DOMAIN)
assertion = response.assertion
self.assertEqual(self.ASSERTION_VERSION, assertion.version)
@@ -2607,8 +2576,10 @@ class SAMLGenerationTests(FederationTests):
return_value=self.signed_assertion):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
- self.SUBJECT, self.ROLES,
- self.PROJECT)
+ self.SUBJECT,
+ self.SUBJECT_DOMAIN,
+ self.ROLES, self.PROJECT,
+ self.PROJECT_DOMAIN)
saml_str = response.to_string()
response = etree.fromstring(saml_str)
@@ -2621,13 +2592,19 @@ class SAMLGenerationTests(FederationTests):
user_attribute = assertion[4][0]
self.assertEqual(self.SUBJECT, user_attribute[0].text)
- role_attribute = assertion[4][1]
+ user_domain_attribute = assertion[4][1]
+ self.assertEqual(self.SUBJECT_DOMAIN, user_domain_attribute[0].text)
+
+ role_attribute = assertion[4][2]
for attribute_value in role_attribute:
self.assertIn(attribute_value.text, self.ROLES)
- project_attribute = assertion[4][2]
+ project_attribute = assertion[4][3]
self.assertEqual(self.PROJECT, project_attribute[0].text)
+ project_domain_attribute = assertion[4][4]
+ self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text)
+
def test_assertion_using_explicit_namespace_prefixes(self):
def mocked_subprocess_check_output(*popenargs, **kwargs):
# the last option is the assertion file to be signed
@@ -2642,8 +2619,10 @@ class SAMLGenerationTests(FederationTests):
side_effect=mocked_subprocess_check_output):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
- self.SUBJECT, self.ROLES,
- self.PROJECT)
+ self.SUBJECT,
+ self.SUBJECT_DOMAIN,
+ self.ROLES, self.PROJECT,
+ self.PROJECT_DOMAIN)
assertion_xml = response.assertion.to_string()
# make sure we have the proper tag and prefix for the assertion
# namespace
@@ -2666,8 +2645,9 @@ class SAMLGenerationTests(FederationTests):
generator = keystone_idp.SAMLGenerator()
response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
- self.SUBJECT, self.ROLES,
- self.PROJECT)
+ self.SUBJECT, self.SUBJECT_DOMAIN,
+ self.ROLES, self.PROJECT,
+ self.PROJECT_DOMAIN)
signature = response.assertion.signature
self.assertIsNotNone(signature)
@@ -2770,12 +2750,18 @@ class SAMLGenerationTests(FederationTests):
user_attribute = assertion[4][0]
self.assertIsInstance(user_attribute[0].text, str)
- role_attribute = assertion[4][1]
+ user_domain_attribute = assertion[4][1]
+ self.assertIsInstance(user_domain_attribute[0].text, str)
+
+ role_attribute = assertion[4][2]
self.assertIsInstance(role_attribute[0].text, str)
- project_attribute = assertion[4][2]
+ project_attribute = assertion[4][3]
self.assertIsInstance(project_attribute[0].text, str)
+ project_domain_attribute = assertion[4][4]
+ self.assertIsInstance(project_domain_attribute[0].text, str)
+
def test_invalid_scope_body(self):
"""Test that missing the scope in request body raises an exception.
@@ -2839,6 +2825,104 @@ class SAMLGenerationTests(FederationTests):
self.SERVICE_PROVDIER_ID)
self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
+ def test_generate_ecp_route(self):
+ """Test that the ECP generation endpoint produces XML.
+
+ The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same
+ input as the SAML generation endpoint (scoped token ID + Service
+ Provider ID).
+ The controller should return a SAML assertion that is wrapped in a
+ SOAP envelope.
+ """
+
+ self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
+ token_id = self._fetch_valid_token()
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ return_value=self.signed_assertion):
+ http_response = self.post(self.ECP_GENERATION_ROUTE, body=body,
+ response_content_type='text/xml',
+ expected_status=200)
+
+ env_response = etree.fromstring(http_response.result)
+ header = env_response[0]
+
+ # Verify the relay state starts with 'ss:mem'
+ prefix = CONF.saml.relay_state_prefix
+ self.assertThat(header[0].text, matchers.StartsWith(prefix))
+
+ # Verify that the content in the body matches the expected assertion
+ body = env_response[1]
+ response = body[0]
+ issuer = response[0]
+ assertion = response[2]
+
+ self.assertEqual(self.RECIPIENT, response.get('Destination'))
+ self.assertEqual(self.ISSUER, issuer.text)
+
+ user_attribute = assertion[4][0]
+ self.assertIsInstance(user_attribute[0].text, str)
+
+ user_domain_attribute = assertion[4][1]
+ self.assertIsInstance(user_domain_attribute[0].text, str)
+
+ role_attribute = assertion[4][2]
+ self.assertIsInstance(role_attribute[0].text, str)
+
+ project_attribute = assertion[4][3]
+ self.assertIsInstance(project_attribute[0].text, str)
+
+ project_domain_attribute = assertion[4][4]
+ self.assertIsInstance(project_domain_attribute[0].text, str)
+
+ @mock.patch('saml2.create_class_from_xml_string')
+ @mock.patch('oslo_utils.fileutils.write_to_tempfile')
+ @mock.patch('subprocess.check_output')
+ def test__sign_assertion(self, check_output_mock,
+ write_to_tempfile_mock, create_class_mock):
+ write_to_tempfile_mock.return_value = 'tmp_path'
+ check_output_mock.return_value = 'fakeoutput'
+
+ keystone_idp._sign_assertion(self.signed_assertion)
+
+ create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput')
+
+ @mock.patch('oslo_utils.fileutils.write_to_tempfile')
+ @mock.patch('subprocess.check_output')
+ def test__sign_assertion_exc(self, check_output_mock,
+ write_to_tempfile_mock):
+ # If the command fails the command output is logged.
+
+ write_to_tempfile_mock.return_value = 'tmp_path'
+
+ sample_returncode = 1
+ sample_output = self.getUniqueString()
+ check_output_mock.side_effect = subprocess.CalledProcessError(
+ returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary,
+ output=sample_output)
+
+ # FIXME(blk-u): This should raise exception.SAMLSigningError instead,
+ # but fails with TypeError due to concatenating string to Message, see
+ # bug 1484735.
+ self.assertRaises(TypeError,
+ keystone_idp._sign_assertion,
+ self.signed_assertion)
+
+ @mock.patch('oslo_utils.fileutils.write_to_tempfile')
+ def test__sign_assertion_fileutils_exc(self, write_to_tempfile_mock):
+ exception_msg = 'fake'
+ write_to_tempfile_mock.side_effect = Exception(exception_msg)
+
+ logger_fixture = self.useFixture(fixtures.LoggerFixture())
+ self.assertRaises(exception.SAMLSigningError,
+ keystone_idp._sign_assertion,
+ self.signed_assertion)
+ expected_log = (
+ 'Error when signing assertion, reason: %s\n' % exception_msg)
+ self.assertEqual(expected_log, logger_fixture.output)
+
class IdPMetadataGenerationTests(FederationTests):
"""A class for testing Identity Provider Metadata generation."""
@@ -2976,7 +3060,8 @@ class ServiceProviderTests(FederationTests):
MEMBER_NAME = 'service_provider'
COLLECTION_NAME = 'service_providers'
SERVICE_PROVIDER_ID = 'ACME'
- SP_KEYS = ['auth_url', 'id', 'enabled', 'description', 'sp_url']
+ SP_KEYS = ['auth_url', 'id', 'enabled', 'description',
+ 'relay_state_prefix', 'sp_url']
def setUp(self):
super(FederationTests, self).setUp()
@@ -2993,6 +3078,7 @@ class ServiceProviderTests(FederationTests):
'enabled': True,
'description': uuid.uuid4().hex,
'sp_url': 'https://' + uuid.uuid4().hex + '.com',
+ 'relay_state_prefix': CONF.saml.relay_state_prefix
}
return ref
@@ -3019,6 +3105,29 @@ class ServiceProviderTests(FederationTests):
self.assertValidEntity(resp.result['service_provider'],
keys_to_check=self.SP_KEYS)
+ def test_create_sp_relay_state_default(self):
+ """Create an SP without relay state, should default to `ss:mem`."""
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ sp = self.sp_ref()
+ del sp['relay_state_prefix']
+ resp = self.put(url, body={'service_provider': sp},
+ expected_status=201)
+ sp_result = resp.result['service_provider']
+ self.assertEqual(CONF.saml.relay_state_prefix,
+ sp_result['relay_state_prefix'])
+
+ def test_create_sp_relay_state_non_default(self):
+ """Create an SP with custom relay state."""
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ sp = self.sp_ref()
+ non_default_prefix = uuid.uuid4().hex
+ sp['relay_state_prefix'] = non_default_prefix
+ resp = self.put(url, body={'service_provider': sp},
+ expected_status=201)
+ sp_result = resp.result['service_provider']
+ self.assertEqual(non_default_prefix,
+ sp_result['relay_state_prefix'])
+
def test_create_service_provider_fail(self):
"""Try adding SP object with unallowed attribute."""
url = self.base_url(suffix=uuid.uuid4().hex)
@@ -3108,6 +3217,18 @@ class ServiceProviderTests(FederationTests):
self.patch(url, body={'service_provider': new_sp_ref},
expected_status=404)
+ def test_update_sp_relay_state(self):
+ """Update an SP with custome relay state."""
+ new_sp_ref = self.sp_ref()
+ non_default_prefix = uuid.uuid4().hex
+ new_sp_ref['relay_state_prefix'] = non_default_prefix
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ resp = self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=200)
+ sp_result = resp.result['service_provider']
+ self.assertEqual(non_default_prefix,
+ sp_result['relay_state_prefix'])
+
def test_delete_service_provider(self):
url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
self.delete(url, expected_status=204)
@@ -3125,6 +3246,7 @@ class WebSSOTests(FederatedTokenTests):
SSO_TEMPLATE_PATH = os.path.join(core.dirs.etc(), SSO_TEMPLATE_NAME)
TRUSTED_DASHBOARD = 'http://horizon.com'
ORIGIN = urllib.parse.quote_plus(TRUSTED_DASHBOARD)
+ PROTOCOL_REMOTE_ID_ATTR = uuid.uuid4().hex
def setUp(self):
super(WebSSOTests, self).setUp()
@@ -3145,7 +3267,19 @@ class WebSSOTests(FederatedTokenTests):
self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
def test_federated_sso_auth(self):
- environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
+ context = {'environment': environment}
+ query_string = {'origin': self.ORIGIN}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ resp = self.api.federated_sso_auth(context, self.PROTOCOL)
+ self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
+
+ def test_federated_sso_auth_with_protocol_specific_remote_id(self):
+ self.config_fixture.config(
+ group=self.PROTOCOL,
+ remote_id_attribute=self.PROTOCOL_REMOTE_ID_ATTR)
+
+ environment = {self.PROTOCOL_REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
context = {'environment': environment}
query_string = {'origin': self.ORIGIN}
self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
@@ -3162,7 +3296,7 @@ class WebSSOTests(FederatedTokenTests):
context, self.PROTOCOL)
def test_federated_sso_missing_query(self):
- environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
context = {'environment': environment}
self._inject_assertion(context, 'EMPLOYEE_ASSERTION')
self.assertRaises(exception.ValidationError,
@@ -3178,7 +3312,7 @@ class WebSSOTests(FederatedTokenTests):
context, self.PROTOCOL)
def test_federated_sso_untrusted_dashboard(self):
- environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]}
context = {'environment': environment}
query_string = {'origin': uuid.uuid4().hex}
self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
@@ -3229,6 +3363,7 @@ class K2KServiceCatalogTests(FederationTests):
def sp_response(self, id, ref):
ref.pop('enabled')
ref.pop('description')
+ ref.pop('relay_state_prefix')
ref['id'] = id
return ref
@@ -3238,6 +3373,7 @@ class K2KServiceCatalogTests(FederationTests):
'enabled': True,
'description': uuid.uuid4().hex,
'sp_url': uuid.uuid4().hex,
+ 'relay_state_prefix': CONF.saml.relay_state_prefix,
}
return ref