aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3_federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3_federation.py3296
1 files changed, 3296 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py
new file mode 100644
index 00000000..3b6f4d8b
--- /dev/null
+++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py
@@ -0,0 +1,3296 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+import random
+import subprocess
+import uuid
+
+from lxml import etree
+import mock
+from oslo_config import cfg
+from oslo_log import log
+from oslo_serialization import jsonutils
+from oslotest import mockpatch
+import saml2
+from saml2 import saml
+from saml2 import sigver
+from six.moves import urllib
+import xmldsig
+
+from keystone.auth import controllers as auth_controllers
+from keystone.auth.plugins import mapped
+from keystone.contrib import federation
+from keystone.contrib.federation import controllers as federation_controllers
+from keystone.contrib.federation import idp as keystone_idp
+from keystone.contrib.federation import utils as mapping_utils
+from keystone import exception
+from keystone import notifications
+from keystone.tests.unit import core
+from keystone.tests.unit import federation_fixtures
+from keystone.tests.unit import ksfixtures
+from keystone.tests.unit import mapping_fixtures
+from keystone.tests.unit import test_v3
+from keystone.token.providers import common as token_common
+
+
+CONF = cfg.CONF
+LOG = log.getLogger(__name__)
+ROOTDIR = os.path.dirname(os.path.abspath(__file__))
+XMLDIR = os.path.join(ROOTDIR, 'saml2/')
+
+
+def dummy_validator(*args, **kwargs):
+ pass
+
+
+class FederationTests(test_v3.RestfulTestCase):
+
+ EXTENSION_NAME = 'federation'
+ EXTENSION_TO_ADD = 'federation_extension'
+
+
+class FederatedSetupMixin(object):
+
+ ACTION = 'authenticate'
+ IDP = 'ORG_IDP'
+ PROTOCOL = 'saml2'
+ AUTH_METHOD = 'saml2'
+ USER = 'user@ORGANIZATION'
+ ASSERTION_PREFIX = 'PREFIX_'
+ IDP_WITH_REMOTE = 'ORG_IDP_REMOTE'
+ REMOTE_ID = 'entityID_IDP'
+ REMOTE_ID_ATTR = uuid.uuid4().hex
+
+ UNSCOPED_V3_SAML2_REQ = {
+ "identity": {
+ "methods": [AUTH_METHOD],
+ AUTH_METHOD: {
+ "identity_provider": IDP,
+ "protocol": PROTOCOL
+ }
+ }
+ }
+
+ def _check_domains_are_valid(self, token):
+ self.assertEqual('Federated', token['user']['domain']['id'])
+ self.assertEqual('Federated', token['user']['domain']['name'])
+
+ def _project(self, project):
+ return (project['id'], project['name'])
+
+ def _roles(self, roles):
+ return set([(r['id'], r['name']) for r in roles])
+
+ def _check_projects_and_roles(self, token, roles, projects):
+ """Check whether the projects and the roles match."""
+ token_roles = token.get('roles')
+ if token_roles is None:
+ raise AssertionError('Roles not found in the token')
+ token_roles = self._roles(token_roles)
+ roles_ref = self._roles(roles)
+ self.assertEqual(token_roles, roles_ref)
+
+ token_projects = token.get('project')
+ if token_projects is None:
+ raise AssertionError('Projects not found in the token')
+ token_projects = self._project(token_projects)
+ projects_ref = self._project(projects)
+ self.assertEqual(token_projects, projects_ref)
+
+ def _check_scoped_token_attributes(self, token):
+ def xor_project_domain(iterable):
+ return sum(('project' in iterable, 'domain' in iterable)) % 2
+
+ for obj in ('user', 'catalog', 'expires_at', 'issued_at',
+ 'methods', 'roles'):
+ self.assertIn(obj, token)
+ # Check for either project or domain
+ if not xor_project_domain(token.keys()):
+ raise AssertionError("You must specify either"
+ "project or domain.")
+
+ self.assertIn('OS-FEDERATION', token['user'])
+ os_federation = token['user']['OS-FEDERATION']
+ self.assertEqual(self.IDP, os_federation['identity_provider']['id'])
+ self.assertEqual(self.PROTOCOL, os_federation['protocol']['id'])
+
+ def _issue_unscoped_token(self,
+ idp=None,
+ assertion='EMPLOYEE_ASSERTION',
+ environment=None):
+ api = federation_controllers.Auth()
+ context = {'environment': environment or {}}
+ self._inject_assertion(context, assertion)
+ if idp is None:
+ idp = self.IDP
+ r = api.federated_authentication(context, idp, self.PROTOCOL)
+ return r
+
+ def idp_ref(self, id=None):
+ idp = {
+ 'id': id or uuid.uuid4().hex,
+ 'enabled': True,
+ 'description': uuid.uuid4().hex
+ }
+ return idp
+
+ def proto_ref(self, mapping_id=None):
+ proto = {
+ 'id': uuid.uuid4().hex,
+ 'mapping_id': mapping_id or uuid.uuid4().hex
+ }
+ return proto
+
+ def mapping_ref(self, rules=None):
+ return {
+ 'id': uuid.uuid4().hex,
+ 'rules': rules or self.rules['rules']
+ }
+
+ def _scope_request(self, unscoped_token_id, scope, scope_id):
+ return {
+ 'auth': {
+ 'identity': {
+ 'methods': [
+ self.AUTH_METHOD
+ ],
+ self.AUTH_METHOD: {
+ 'id': unscoped_token_id
+ }
+ },
+ 'scope': {
+ scope: {
+ 'id': scope_id
+ }
+ }
+ }
+ }
+
+ def _inject_assertion(self, context, variant, query_string=None):
+ assertion = getattr(mapping_fixtures, variant)
+ context['environment'].update(assertion)
+ context['query_string'] = query_string or []
+
+ def load_federation_sample_data(self):
+ """Inject additional data."""
+
+ # Create and add domains
+ self.domainA = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainA['id'],
+ self.domainA)
+
+ self.domainB = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainB['id'],
+ self.domainB)
+
+ self.domainC = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainC['id'],
+ self.domainC)
+
+ self.domainD = self.new_domain_ref()
+ self.resource_api.create_domain(self.domainD['id'],
+ self.domainD)
+
+ # Create and add projects
+ self.proj_employees = self.new_project_ref(
+ domain_id=self.domainA['id'])
+ self.resource_api.create_project(self.proj_employees['id'],
+ self.proj_employees)
+ self.proj_customers = self.new_project_ref(
+ domain_id=self.domainA['id'])
+ self.resource_api.create_project(self.proj_customers['id'],
+ self.proj_customers)
+
+ self.project_all = self.new_project_ref(
+ domain_id=self.domainA['id'])
+ self.resource_api.create_project(self.project_all['id'],
+ self.project_all)
+
+ self.project_inherited = self.new_project_ref(
+ domain_id=self.domainD['id'])
+ self.resource_api.create_project(self.project_inherited['id'],
+ self.project_inherited)
+
+ # Create and add groups
+ self.group_employees = self.new_group_ref(
+ domain_id=self.domainA['id'])
+ self.group_employees = (
+ self.identity_api.create_group(self.group_employees))
+
+ self.group_customers = self.new_group_ref(
+ domain_id=self.domainA['id'])
+ self.group_customers = (
+ self.identity_api.create_group(self.group_customers))
+
+ self.group_admins = self.new_group_ref(
+ domain_id=self.domainA['id'])
+ self.group_admins = self.identity_api.create_group(self.group_admins)
+
+ # Create and add roles
+ self.role_employee = self.new_role_ref()
+ self.role_api.create_role(self.role_employee['id'], self.role_employee)
+ self.role_customer = self.new_role_ref()
+ self.role_api.create_role(self.role_customer['id'], self.role_customer)
+
+ self.role_admin = self.new_role_ref()
+ self.role_api.create_role(self.role_admin['id'], self.role_admin)
+
+ # Employees can access
+ # * proj_employees
+ # * project_all
+ self.assignment_api.create_grant(self.role_employee['id'],
+ group_id=self.group_employees['id'],
+ project_id=self.proj_employees['id'])
+ self.assignment_api.create_grant(self.role_employee['id'],
+ group_id=self.group_employees['id'],
+ project_id=self.project_all['id'])
+ # Customers can access
+ # * proj_customers
+ self.assignment_api.create_grant(self.role_customer['id'],
+ group_id=self.group_customers['id'],
+ project_id=self.proj_customers['id'])
+
+ # Admins can access:
+ # * proj_customers
+ # * proj_employees
+ # * project_all
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ project_id=self.proj_customers['id'])
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ project_id=self.proj_employees['id'])
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ project_id=self.project_all['id'])
+
+ self.assignment_api.create_grant(self.role_customer['id'],
+ group_id=self.group_customers['id'],
+ domain_id=self.domainA['id'])
+
+ # Customers can access:
+ # * domain A
+ self.assignment_api.create_grant(self.role_customer['id'],
+ group_id=self.group_customers['id'],
+ domain_id=self.domainA['id'])
+
+ # Customers can access projects via inheritance:
+ # * domain D
+ self.assignment_api.create_grant(self.role_customer['id'],
+ group_id=self.group_customers['id'],
+ domain_id=self.domainD['id'],
+ inherited_to_projects=True)
+
+ # Employees can access:
+ # * domain A
+ # * domain B
+
+ self.assignment_api.create_grant(self.role_employee['id'],
+ group_id=self.group_employees['id'],
+ domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.role_employee['id'],
+ group_id=self.group_employees['id'],
+ domain_id=self.domainB['id'])
+
+ # Admins can access:
+ # * domain A
+ # * domain B
+ # * domain C
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ domain_id=self.domainA['id'])
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ domain_id=self.domainB['id'])
+
+ self.assignment_api.create_grant(self.role_admin['id'],
+ group_id=self.group_admins['id'],
+ domain_id=self.domainC['id'])
+ self.rules = {
+ 'rules': [
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': self.group_employees['id']
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName'
+ },
+ {
+ 'type': 'orgPersonType',
+ 'any_one_of': [
+ 'Employee'
+ ]
+ }
+ ]
+ },
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': self.group_employees['id']
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': self.ASSERTION_PREFIX + 'UserName'
+ },
+ {
+ 'type': self.ASSERTION_PREFIX + 'orgPersonType',
+ 'any_one_of': [
+ 'SuperEmployee'
+ ]
+ }
+ ]
+ },
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': self.group_customers['id']
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName'
+ },
+ {
+ 'type': 'orgPersonType',
+ 'any_one_of': [
+ 'Customer'
+ ]
+ }
+ ]
+ },
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': self.group_admins['id']
+ }
+ },
+ {
+ 'group': {
+ 'id': self.group_employees['id']
+ }
+ },
+ {
+ 'group': {
+ 'id': self.group_customers['id']
+ }
+ },
+
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName'
+ },
+ {
+ 'type': 'orgPersonType',
+ 'any_one_of': [
+ 'Admin',
+ 'Chief'
+ ]
+ }
+ ]
+ },
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': uuid.uuid4().hex
+ }
+ },
+ {
+ 'group': {
+ 'id': self.group_customers['id']
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName',
+ },
+ {
+ 'type': 'FirstName',
+ 'any_one_of': [
+ 'Jill'
+ ]
+ },
+ {
+ 'type': 'LastName',
+ 'any_one_of': [
+ 'Smith'
+ ]
+ }
+ ]
+ },
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': 'this_group_no_longer_exists'
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName',
+ },
+ {
+ 'type': 'Email',
+ 'any_one_of': [
+ 'testacct@example.com'
+ ]
+ },
+ {
+ 'type': 'orgPersonType',
+ 'any_one_of': [
+ 'Tester'
+ ]
+ }
+ ]
+ },
+ # rules with local group names
+ {
+ "local": [
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ },
+ {
+ "group": {
+ "name": self.group_customers['name'],
+ "domain": {
+ "name": self.domainA['name']
+ }
+ }
+ }
+ ],
+ "remote": [
+ {
+ 'type': 'UserName',
+ },
+ {
+ "type": "orgPersonType",
+ "any_one_of": [
+ "CEO",
+ "CTO"
+ ],
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ },
+ {
+ "group": {
+ "name": self.group_admins['name'],
+ "domain": {
+ "id": self.domainA['id']
+ }
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "UserName",
+ },
+ {
+ "type": "orgPersonType",
+ "any_one_of": [
+ "Managers"
+ ]
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}"
+ }
+ },
+ {
+ "group": {
+ "name": "NON_EXISTING",
+ "domain": {
+ "id": self.domainA['id']
+ }
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "UserName",
+ },
+ {
+ "type": "UserName",
+ "any_one_of": [
+ "IamTester"
+ ]
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "user": {
+ "type": "local",
+ "name": self.user['name'],
+ "domain": {
+ "id": self.user['domain_id']
+ }
+ }
+ },
+ {
+ "group": {
+ "id": self.group_customers['id']
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "UserType",
+ "any_one_of": [
+ "random"
+ ]
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "user": {
+ "type": "local",
+ "name": self.user['name'],
+ "domain": {
+ "id": uuid.uuid4().hex
+ }
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "Position",
+ "any_one_of": [
+ "DirectorGeneral"
+ ]
+ }
+ ]
+ }
+ ]
+ }
+
+ # Add IDP
+ self.idp = self.idp_ref(id=self.IDP)
+ self.federation_api.create_idp(self.idp['id'],
+ self.idp)
+ # Add IDP with remote
+ self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE)
+ self.idp_with_remote['remote_id'] = self.REMOTE_ID
+ self.federation_api.create_idp(self.idp_with_remote['id'],
+ self.idp_with_remote)
+ # Add a mapping
+ self.mapping = self.mapping_ref()
+ self.federation_api.create_mapping(self.mapping['id'],
+ self.mapping)
+ # Add protocols
+ self.proto_saml = self.proto_ref(mapping_id=self.mapping['id'])
+ self.proto_saml['id'] = self.PROTOCOL
+ self.federation_api.create_protocol(self.idp['id'],
+ self.proto_saml['id'],
+ self.proto_saml)
+ # Add protocols IDP with remote
+ self.federation_api.create_protocol(self.idp_with_remote['id'],
+ self.proto_saml['id'],
+ self.proto_saml)
+ # Generate fake tokens
+ context = {'environment': {}}
+
+ self.tokens = {}
+ VARIANTS = ('EMPLOYEE_ASSERTION', 'CUSTOMER_ASSERTION',
+ 'ADMIN_ASSERTION')
+ api = auth_controllers.Auth()
+ for variant in VARIANTS:
+ self._inject_assertion(context, variant)
+ r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ)
+ self.tokens[variant] = r.headers.get('X-Subject-Token')
+
+ self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN = self._scope_request(
+ uuid.uuid4().hex, 'project', self.proj_customers['id'])
+
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE = self._scope_request(
+ self.tokens['EMPLOYEE_ASSERTION'], 'project',
+ self.proj_employees['id'])
+
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN = self._scope_request(
+ self.tokens['ADMIN_ASSERTION'], 'project',
+ self.proj_employees['id'])
+
+ self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN = self._scope_request(
+ self.tokens['ADMIN_ASSERTION'], 'project',
+ self.proj_customers['id'])
+
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER = self._scope_request(
+ self.tokens['CUSTOMER_ASSERTION'], 'project',
+ self.proj_employees['id'])
+
+ self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER = self._scope_request(
+ self.tokens['CUSTOMER_ASSERTION'], 'project',
+ self.project_inherited['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER = self._scope_request(
+ self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainA['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_B_FROM_CUSTOMER = self._scope_request(
+ self.tokens['CUSTOMER_ASSERTION'], 'domain',
+ self.domainB['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER = self._scope_request(
+ self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainD['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN = self._scope_request(
+ self.tokens['ADMIN_ASSERTION'], 'domain', self.domainA['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN = self._scope_request(
+ self.tokens['ADMIN_ASSERTION'], 'domain', self.domainB['id'])
+
+ self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN = self._scope_request(
+ self.tokens['ADMIN_ASSERTION'], 'domain',
+ self.domainC['id'])
+
+
+class FederatedIdentityProviderTests(FederationTests):
+ """A test class for Identity Providers."""
+
+ idp_keys = ['description', 'enabled']
+
+ default_body = {'description': None, 'enabled': True}
+
+ def base_url(self, suffix=None):
+ if suffix is not None:
+ return '/OS-FEDERATION/identity_providers/' + str(suffix)
+ return '/OS-FEDERATION/identity_providers'
+
+ def _fetch_attribute_from_response(self, resp, parameter,
+ assert_is_not_none=True):
+ """Fetch single attribute from TestResponse object."""
+ result = resp.result.get(parameter)
+ if assert_is_not_none:
+ self.assertIsNotNone(result)
+ return result
+
+ def _create_and_decapsulate_response(self, body=None):
+ """Create IdP and fetch it's random id along with entity."""
+ default_resp = self._create_default_idp(body=body)
+ idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ self.assertIsNotNone(idp)
+ idp_id = idp.get('id')
+ return (idp_id, idp)
+
+ def _get_idp(self, idp_id):
+ """Fetch IdP entity based on its id."""
+ url = self.base_url(suffix=idp_id)
+ resp = self.get(url)
+ return resp
+
+ def _create_default_idp(self, body=None):
+ """Create default IdP."""
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ if body is None:
+ body = self._http_idp_input()
+ resp = self.put(url, body={'identity_provider': body},
+ expected_status=201)
+ return resp
+
+ def _http_idp_input(self, **kwargs):
+ """Create default input for IdP data."""
+ body = None
+ if 'body' not in kwargs:
+ body = self.default_body.copy()
+ body['description'] = uuid.uuid4().hex
+ else:
+ body = kwargs['body']
+ return body
+
+ def _assign_protocol_to_idp(self, idp_id=None, proto=None, url=None,
+ mapping_id=None, validate=True, **kwargs):
+ if url is None:
+ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
+ if idp_id is None:
+ idp_id, _ = self._create_and_decapsulate_response()
+ if proto is None:
+ proto = uuid.uuid4().hex
+ if mapping_id is None:
+ mapping_id = uuid.uuid4().hex
+ body = {'mapping_id': mapping_id}
+ url = url % {'idp_id': idp_id, 'protocol_id': proto}
+ resp = self.put(url, body={'protocol': body}, **kwargs)
+ if validate:
+ self.assertValidResponse(resp, 'protocol', dummy_validator,
+ keys_to_check=['id', 'mapping_id'],
+ ref={'id': proto,
+ 'mapping_id': mapping_id})
+ return (resp, idp_id, proto)
+
+ def _get_protocol(self, idp_id, protocol_id):
+ url = "%s/protocols/%s" % (idp_id, protocol_id)
+ url = self.base_url(suffix=url)
+ r = self.get(url)
+ return r
+
+ def test_create_idp(self):
+ """Creates the IdentityProvider entity."""
+
+ keys_to_check = self.idp_keys
+ body = self._http_idp_input()
+ resp = self._create_default_idp(body=body)
+ self.assertValidResponse(resp, 'identity_provider', dummy_validator,
+ keys_to_check=keys_to_check,
+ ref=body)
+
+ def test_create_idp_remote(self):
+ """Creates the IdentityProvider entity associated to a remote_id."""
+
+ keys_to_check = list(self.idp_keys)
+ keys_to_check.append('remote_id')
+ body = self.default_body.copy()
+ body['description'] = uuid.uuid4().hex
+ body['remote_id'] = uuid.uuid4().hex
+ resp = self._create_default_idp(body=body)
+ self.assertValidResponse(resp, 'identity_provider', dummy_validator,
+ keys_to_check=keys_to_check,
+ ref=body)
+
+ def test_list_idps(self, iterations=5):
+ """Lists all available IdentityProviders.
+
+ This test collects ids of created IdPs and
+ intersects it with the list of all available IdPs.
+ List of all IdPs can be a superset of IdPs created in this test,
+ because other tests also create IdPs.
+
+ """
+ def get_id(resp):
+ r = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ return r.get('id')
+
+ ids = []
+ for _ in range(iterations):
+ id = get_id(self._create_default_idp())
+ ids.append(id)
+ ids = set(ids)
+
+ keys_to_check = self.idp_keys
+ url = self.base_url()
+ resp = self.get(url)
+ self.assertValidListResponse(resp, 'identity_providers',
+ dummy_validator,
+ keys_to_check=keys_to_check)
+ entities = self._fetch_attribute_from_response(resp,
+ 'identity_providers')
+ entities_ids = set([e['id'] for e in entities])
+ ids_intersection = entities_ids.intersection(ids)
+ self.assertEqual(ids_intersection, ids)
+
+ def test_check_idp_uniqueness(self):
+ """Add same IdP twice.
+
+ Expect HTTP 409 code for the latter call.
+
+ """
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ body = self._http_idp_input()
+ self.put(url, body={'identity_provider': body},
+ expected_status=201)
+ self.put(url, body={'identity_provider': body},
+ expected_status=409)
+
+ def test_get_idp(self):
+ """Create and later fetch IdP."""
+ body = self._http_idp_input()
+ default_resp = self._create_default_idp(body=body)
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ url = self.base_url(suffix=idp_id)
+ resp = self.get(url)
+ self.assertValidResponse(resp, 'identity_provider',
+ dummy_validator, keys_to_check=body.keys(),
+ ref=body)
+
+ def test_get_nonexisting_idp(self):
+ """Fetch nonexisting IdP entity.
+
+ Expected HTTP 404 status code.
+
+ """
+ idp_id = uuid.uuid4().hex
+ self.assertIsNotNone(idp_id)
+
+ url = self.base_url(suffix=idp_id)
+ self.get(url, expected_status=404)
+
+ def test_delete_existing_idp(self):
+ """Create and later delete IdP.
+
+ Expect HTTP 404 for the GET IdP call.
+ """
+ default_resp = self._create_default_idp()
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ self.assertIsNotNone(idp_id)
+ url = self.base_url(suffix=idp_id)
+ self.delete(url)
+ self.get(url, expected_status=404)
+
+ def test_delete_nonexisting_idp(self):
+ """Delete nonexisting IdP.
+
+ Expect HTTP 404 for the GET IdP call.
+ """
+ idp_id = uuid.uuid4().hex
+ url = self.base_url(suffix=idp_id)
+ self.delete(url, expected_status=404)
+
+ def test_update_idp_mutable_attributes(self):
+ """Update IdP's mutable parameters."""
+ default_resp = self._create_default_idp()
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ url = self.base_url(suffix=idp_id)
+ self.assertIsNotNone(idp_id)
+
+ _enabled = not default_idp.get('enabled')
+ body = {'remote_id': uuid.uuid4().hex,
+ 'description': uuid.uuid4().hex,
+ 'enabled': _enabled}
+
+ body = {'identity_provider': body}
+ resp = self.patch(url, body=body)
+ updated_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ body = body['identity_provider']
+ for key in body.keys():
+ self.assertEqual(body[key], updated_idp.get(key))
+
+ resp = self.get(url)
+ updated_idp = self._fetch_attribute_from_response(resp,
+ 'identity_provider')
+ for key in body.keys():
+ self.assertEqual(body[key], updated_idp.get(key))
+
+ def test_update_idp_immutable_attributes(self):
+ """Update IdP's immutable parameters.
+
+ Expect HTTP 403 code.
+
+ """
+ default_resp = self._create_default_idp()
+ default_idp = self._fetch_attribute_from_response(default_resp,
+ 'identity_provider')
+ idp_id = default_idp.get('id')
+ self.assertIsNotNone(idp_id)
+
+ body = self._http_idp_input()
+ body['id'] = uuid.uuid4().hex
+ body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex]
+
+ url = self.base_url(suffix=idp_id)
+ self.patch(url, body={'identity_provider': body}, expected_status=403)
+
+ def test_update_nonexistent_idp(self):
+ """Update nonexistent IdP
+
+ Expect HTTP 404 code.
+
+ """
+ idp_id = uuid.uuid4().hex
+ url = self.base_url(suffix=idp_id)
+ body = self._http_idp_input()
+ body['enabled'] = False
+ body = {'identity_provider': body}
+
+ self.patch(url, body=body, expected_status=404)
+
+ def test_assign_protocol_to_idp(self):
+ """Assign a protocol to existing IdP."""
+
+ self._assign_protocol_to_idp(expected_status=201)
+
+ def test_protocol_composite_pk(self):
+ """Test whether Keystone let's add two entities with identical
+ names, however attached to different IdPs.
+
+ 1. Add IdP and assign it protocol with predefined name
+ 2. Add another IdP and assign it a protocol with same name.
+
+ Expect HTTP 201 code
+
+ """
+ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
+
+ kwargs = {'expected_status': 201}
+ self._assign_protocol_to_idp(proto='saml2',
+ url=url, **kwargs)
+
+ self._assign_protocol_to_idp(proto='saml2',
+ url=url, **kwargs)
+
+ def test_protocol_idp_pk_uniqueness(self):
+ """Test whether Keystone checks for unique idp/protocol values.
+
+ Add same protocol twice, expect Keystone to reject a latter call and
+ return HTTP 409 code.
+
+ """
+ url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s')
+
+ kwargs = {'expected_status': 201}
+ resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2',
+ url=url, **kwargs)
+ kwargs = {'expected_status': 409}
+ resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id,
+ proto='saml2',
+ validate=False,
+ url=url, **kwargs)
+
+ def test_assign_protocol_to_nonexistent_idp(self):
+ """Assign protocol to IdP that doesn't exist.
+
+ Expect HTTP 404 code.
+
+ """
+
+ idp_id = uuid.uuid4().hex
+ kwargs = {'expected_status': 404}
+ self._assign_protocol_to_idp(proto='saml2',
+ idp_id=idp_id,
+ validate=False,
+ **kwargs)
+
+ def test_get_protocol(self):
+ """Create and later fetch protocol tied to IdP."""
+
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id']
+ url = "%s/protocols/%s" % (idp_id, proto_id)
+ url = self.base_url(suffix=url)
+
+ resp = self.get(url)
+
+ reference = {'id': proto_id}
+ self.assertValidResponse(resp, 'protocol',
+ dummy_validator,
+ keys_to_check=reference.keys(),
+ ref=reference)
+
+ def test_list_protocols(self):
+ """Create set of protocols and later list them.
+
+ Compare input and output id sets.
+
+ """
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ iterations = random.randint(0, 16)
+ protocol_ids = []
+ for _ in range(iterations):
+ resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id,
+ expected_status=201)
+ proto_id = self._fetch_attribute_from_response(resp, 'protocol')
+ proto_id = proto_id['id']
+ protocol_ids.append(proto_id)
+
+ url = "%s/protocols" % idp_id
+ url = self.base_url(suffix=url)
+ resp = self.get(url)
+ self.assertValidListResponse(resp, 'protocols',
+ dummy_validator,
+ keys_to_check=['id'])
+ entities = self._fetch_attribute_from_response(resp, 'protocols')
+ entities = set([entity['id'] for entity in entities])
+ protocols_intersection = entities.intersection(protocol_ids)
+ self.assertEqual(protocols_intersection, set(protocol_ids))
+
+ def test_update_protocols_attribute(self):
+ """Update protocol's attribute."""
+
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ new_mapping_id = uuid.uuid4().hex
+
+ url = "%s/protocols/%s" % (idp_id, proto)
+ url = self.base_url(suffix=url)
+ body = {'mapping_id': new_mapping_id}
+ resp = self.patch(url, body={'protocol': body})
+ self.assertValidResponse(resp, 'protocol', dummy_validator,
+ keys_to_check=['id', 'mapping_id'],
+ ref={'id': proto,
+ 'mapping_id': new_mapping_id}
+ )
+
+ def test_delete_protocol(self):
+ """Delete protocol.
+
+ Expect HTTP 404 code for the GET call after the protocol is deleted.
+
+ """
+ url = self.base_url(suffix='/%(idp_id)s/'
+ 'protocols/%(protocol_id)s')
+ resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201)
+ url = url % {'idp_id': idp_id,
+ 'protocol_id': proto}
+ self.delete(url)
+ self.get(url, expected_status=404)
+
+
+class MappingCRUDTests(FederationTests):
+ """A class for testing CRUD operations for Mappings."""
+
+ MAPPING_URL = '/OS-FEDERATION/mappings/'
+
+ def assertValidMappingListResponse(self, resp, *args, **kwargs):
+ return self.assertValidListResponse(
+ resp,
+ 'mappings',
+ self.assertValidMapping,
+ keys_to_check=[],
+ *args,
+ **kwargs)
+
+ def assertValidMappingResponse(self, resp, *args, **kwargs):
+ return self.assertValidResponse(
+ resp,
+ 'mapping',
+ self.assertValidMapping,
+ keys_to_check=[],
+ *args,
+ **kwargs)
+
+ def assertValidMapping(self, entity, ref=None):
+ self.assertIsNotNone(entity.get('id'))
+ self.assertIsNotNone(entity.get('rules'))
+ if ref:
+ self.assertEqual(jsonutils.loads(entity['rules']), ref['rules'])
+ return entity
+
+ def _create_default_mapping_entry(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ resp = self.put(url,
+ body={'mapping': mapping_fixtures.MAPPING_LARGE},
+ expected_status=201)
+ return resp
+
+ def _get_id_from_response(self, resp):
+ r = resp.result.get('mapping')
+ return r.get('id')
+
+ def test_mapping_create(self):
+ resp = self._create_default_mapping_entry()
+ self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
+
+ def test_mapping_list(self):
+ url = self.MAPPING_URL
+ self._create_default_mapping_entry()
+ resp = self.get(url)
+ entities = resp.result.get('mappings')
+ self.assertIsNotNone(entities)
+ self.assertResponseStatus(resp, 200)
+ self.assertValidListLinks(resp.result.get('links'))
+ self.assertEqual(1, len(entities))
+
+ def test_mapping_delete(self):
+ url = self.MAPPING_URL + '%(mapping_id)s'
+ resp = self._create_default_mapping_entry()
+ mapping_id = self._get_id_from_response(resp)
+ url = url % {'mapping_id': str(mapping_id)}
+ resp = self.delete(url)
+ self.assertResponseStatus(resp, 204)
+ self.get(url, expected_status=404)
+
+ def test_mapping_get(self):
+ url = self.MAPPING_URL + '%(mapping_id)s'
+ resp = self._create_default_mapping_entry()
+ mapping_id = self._get_id_from_response(resp)
+ url = url % {'mapping_id': mapping_id}
+ resp = self.get(url)
+ self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE)
+
+ def test_mapping_update(self):
+ url = self.MAPPING_URL + '%(mapping_id)s'
+ resp = self._create_default_mapping_entry()
+ mapping_id = self._get_id_from_response(resp)
+ url = url % {'mapping_id': mapping_id}
+ resp = self.patch(url,
+ body={'mapping': mapping_fixtures.MAPPING_SMALL})
+ self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
+ resp = self.get(url)
+ self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL)
+
+ def test_delete_mapping_dne(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.delete(url, expected_status=404)
+
+ def test_get_mapping_dne(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.get(url, expected_status=404)
+
+ def test_create_mapping_bad_requirements(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_BAD_REQ})
+
+ def test_create_mapping_no_rules(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_NO_RULES})
+
+ def test_create_mapping_no_remote_objects(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE})
+
+ def test_create_mapping_bad_value(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE})
+
+ def test_create_mapping_missing_local(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL})
+
+ def test_create_mapping_missing_type(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE})
+
+ def test_create_mapping_wrong_type(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE})
+
+ def test_create_mapping_extra_remote_properties_not_any_of(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF
+ self.put(url, expected_status=400, body={'mapping': mapping})
+
+ def test_create_mapping_extra_remote_properties_any_one_of(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF
+ self.put(url, expected_status=400, body={'mapping': mapping})
+
+ def test_create_mapping_extra_remote_properties_just_type(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE
+ self.put(url, expected_status=400, body={'mapping': mapping})
+
+ def test_create_mapping_empty_map(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': {}})
+
+ def test_create_mapping_extra_rules_properties(self):
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ self.put(url, expected_status=400,
+ body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS})
+
+ def test_create_mapping_with_blacklist_and_whitelist(self):
+ """Test for adding whitelist and blacklist in the rule
+
+ Server should respond with HTTP 400 error upon discovering both
+ ``whitelist`` and ``blacklist`` keywords in the same rule.
+
+ """
+ url = self.MAPPING_URL + uuid.uuid4().hex
+ mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST
+ self.put(url, expected_status=400, body={'mapping': mapping})
+
+
+class MappingRuleEngineTests(FederationTests):
+ """A class for testing the mapping rule engine."""
+
+ def assertValidMappedUserObject(self, mapped_properties,
+ user_type='ephemeral',
+ domain_id=None):
+ """Check whether mapped properties object has 'user' within.
+
+ According to today's rules, RuleProcessor does not have to issue user's
+ id or name. What's actually required is user's type and for ephemeral
+ users that would be service domain named 'Federated'.
+ """
+ self.assertIn('user', mapped_properties,
+ message='Missing user object in mapped properties')
+ user = mapped_properties['user']
+ self.assertIn('type', user)
+ self.assertEqual(user_type, user['type'])
+ self.assertIn('domain', user)
+ domain = user['domain']
+ domain_name_or_id = domain.get('id') or domain.get('name')
+ domain_ref = domain_id or federation.FEDERATED_DOMAIN_KEYWORD
+ self.assertEqual(domain_ref, domain_name_or_id)
+
+ def test_rule_engine_any_one_of_and_direct_mapping(self):
+ """Should return user's name and group id EMPLOYEE_GROUP_ID.
+
+ The ADMIN_ASSERTION should successfully have a match in MAPPING_LARGE.
+ They will test the case where `any_one_of` is valid, and there is
+ a direct mapping for the users name.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_LARGE
+ assertion = mapping_fixtures.ADMIN_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ fn = assertion.get('FirstName')
+ ln = assertion.get('LastName')
+ full_name = '%s %s' % (fn, ln)
+ group_ids = values.get('group_ids')
+ user_name = values.get('user', {}).get('name')
+
+ self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids)
+ self.assertEqual(full_name, user_name)
+
+ def test_rule_engine_no_regex_match(self):
+ """Should deny authorization, the email of the tester won't match.
+
+ This will not match since the email in the assertion will fail
+ the regex test. It is set to match any @example.com address.
+ But the incoming value is set to eviltester@example.org.
+ RuleProcessor should return list of empty group_ids.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_LARGE
+ assertion = mapping_fixtures.BAD_TESTER_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+
+ self.assertValidMappedUserObject(mapped_properties)
+ self.assertIsNone(mapped_properties['user'].get('name'))
+ self.assertListEqual(list(), mapped_properties['group_ids'])
+
+ def test_rule_engine_regex_many_groups(self):
+ """Should return group CONTRACTOR_GROUP_ID.
+
+ The TESTER_ASSERTION should successfully have a match in
+ MAPPING_TESTER_REGEX. This will test the case where many groups
+ are in the assertion, and a regex value is used to try and find
+ a match.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_TESTER_REGEX
+ assertion = mapping_fixtures.TESTER_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ self.assertValidMappedUserObject(values)
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertEqual(user_name, name)
+ self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids)
+
+ def test_rule_engine_any_one_of_many_rules(self):
+ """Should return group CONTRACTOR_GROUP_ID.
+
+ The CONTRACTOR_ASSERTION should successfully have a match in
+ MAPPING_SMALL. This will test the case where many rules
+ must be matched, including an `any_one_of`, and a direct
+ mapping.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_SMALL
+ assertion = mapping_fixtures.CONTRACTOR_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ self.assertValidMappedUserObject(values)
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertEqual(user_name, name)
+ self.assertIn(mapping_fixtures.CONTRACTOR_GROUP_ID, group_ids)
+
+ def test_rule_engine_not_any_of_and_direct_mapping(self):
+ """Should return user's name and email.
+
+ The CUSTOMER_ASSERTION should successfully have a match in
+ MAPPING_LARGE. This will test the case where a requirement
+ has `not_any_of`, and direct mapping to a username, no group.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_LARGE
+ assertion = mapping_fixtures.CUSTOMER_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ self.assertValidMappedUserObject(values)
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertEqual(user_name, name)
+ self.assertEqual([], group_ids,)
+
+ def test_rule_engine_not_any_of_many_rules(self):
+ """Should return group EMPLOYEE_GROUP_ID.
+
+ The EMPLOYEE_ASSERTION should successfully have a match in
+ MAPPING_SMALL. This will test the case where many remote
+ rules must be matched, including a `not_any_of`.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_SMALL
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ self.assertValidMappedUserObject(values)
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertEqual(user_name, name)
+ self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids)
+
+ def test_rule_engine_not_any_of_regex_verify_pass(self):
+ """Should return group DEVELOPER_GROUP_ID.
+
+ The DEVELOPER_ASSERTION should successfully have a match in
+ MAPPING_DEVELOPER_REGEX. This will test the case where many
+ remote rules must be matched, including a `not_any_of`, with
+ regex set to True.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX
+ assertion = mapping_fixtures.DEVELOPER_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ self.assertValidMappedUserObject(values)
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertEqual(user_name, name)
+ self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids)
+
+ def test_rule_engine_not_any_of_regex_verify_fail(self):
+ """Should deny authorization.
+
+ The email in the assertion will fail the regex test.
+ It is set to reject any @example.org address, but the
+ incoming value is set to evildeveloper@example.org.
+ RuleProcessor should return list of empty group_ids.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX
+ assertion = mapping_fixtures.BAD_DEVELOPER_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+
+ self.assertValidMappedUserObject(mapped_properties)
+ self.assertIsNone(mapped_properties['user'].get('name'))
+ self.assertListEqual(list(), mapped_properties['group_ids'])
+
+ def _rule_engine_regex_match_and_many_groups(self, assertion):
+ """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID.
+
+ A helper function injecting assertion passed as an argument.
+ Expect DEVELOPER_GROUP_ID and TESTER_GROUP_ID in the results.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_LARGE
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ values = rp.process(assertion)
+
+ user_name = assertion.get('UserName')
+ group_ids = values.get('group_ids')
+ name = values.get('user', {}).get('name')
+
+ self.assertValidMappedUserObject(values)
+ self.assertEqual(user_name, name)
+ self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids)
+ self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids)
+
+ def test_rule_engine_regex_match_and_many_groups(self):
+ """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID.
+
+ The TESTER_ASSERTION should successfully have a match in
+ MAPPING_LARGE. This will test a successful regex match
+ for an `any_one_of` evaluation type, and will have many
+ groups returned.
+
+ """
+ self._rule_engine_regex_match_and_many_groups(
+ mapping_fixtures.TESTER_ASSERTION)
+
+ def test_rule_engine_discards_nonstring_objects(self):
+ """Check whether RuleProcessor discards non string objects.
+
+ Despite the fact that assertion is malformed and contains
+ non string objects, RuleProcessor should correctly discard them and
+ successfully have a match in MAPPING_LARGE.
+
+ """
+ self._rule_engine_regex_match_and_many_groups(
+ mapping_fixtures.MALFORMED_TESTER_ASSERTION)
+
+ def test_rule_engine_fails_after_discarding_nonstring(self):
+ """Check whether RuleProcessor discards non string objects.
+
+ Expect RuleProcessor to discard non string object, which
+ is required for a correct rule match. RuleProcessor will result with
+ empty list of groups.
+
+ """
+ mapping = mapping_fixtures.MAPPING_SMALL
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CONTRACTOR_MALFORMED_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertValidMappedUserObject(mapped_properties)
+ self.assertIsNone(mapped_properties['user'].get('name'))
+ self.assertListEqual(list(), mapped_properties['group_ids'])
+
+ def test_rule_engine_returns_group_names(self):
+ """Check whether RuleProcessor returns group names with their domains.
+
+ RuleProcessor should return 'group_names' entry with a list of
+ dictionaries with two entries 'name' and 'domain' identifying group by
+ its name and domain.
+
+ """
+ mapping = mapping_fixtures.MAPPING_GROUP_NAMES
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+ reference = {
+ mapping_fixtures.DEVELOPER_GROUP_NAME:
+ {
+ "name": mapping_fixtures.DEVELOPER_GROUP_NAME,
+ "domain": {
+ "name": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_NAME
+ }
+ },
+ mapping_fixtures.TESTER_GROUP_NAME:
+ {
+ "name": mapping_fixtures.TESTER_GROUP_NAME,
+ "domain": {
+ "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
+ }
+ }
+ }
+ for rule in mapped_properties['group_names']:
+ self.assertDictEqual(reference.get(rule.get('name')), rule)
+
+ def test_rule_engine_whitelist_and_direct_groups_mapping(self):
+ """Should return user's groups Developer and Contractor.
+
+ The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match
+ in MAPPING_GROUPS_WHITELIST. It will test the case where 'whitelist'
+ correctly filters out Manager and only allows Developer and Contractor.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+
+ reference = {
+ mapping_fixtures.DEVELOPER_GROUP_NAME:
+ {
+ "name": mapping_fixtures.DEVELOPER_GROUP_NAME,
+ "domain": {
+ "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
+ }
+ },
+ mapping_fixtures.CONTRACTOR_GROUP_NAME:
+ {
+ "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
+ "domain": {
+ "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
+ }
+ }
+ }
+ for rule in mapped_properties['group_names']:
+ self.assertDictEqual(reference.get(rule.get('name')), rule)
+
+ self.assertEqual('tbo', mapped_properties['user']['name'])
+ self.assertEqual([], mapped_properties['group_ids'])
+
+ def test_rule_engine_blacklist_and_direct_groups_mapping(self):
+ """Should return user's group Developer.
+
+ The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match
+ in MAPPING_GROUPS_BLACKLIST. It will test the case where 'blacklist'
+ correctly filters out Manager and Developer and only allows Contractor.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+
+ reference = {
+ mapping_fixtures.CONTRACTOR_GROUP_NAME:
+ {
+ "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
+ "domain": {
+ "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
+ }
+ }
+ }
+ for rule in mapped_properties['group_names']:
+ self.assertDictEqual(reference.get(rule.get('name')), rule)
+ self.assertEqual('tbo', mapped_properties['user']['name'])
+ self.assertEqual([], mapped_properties['group_ids'])
+
+ def test_rule_engine_blacklist_and_direct_groups_mapping_multiples(self):
+ """Tests matching multiple values before the blacklist.
+
+ Verifies that the local indexes are correct when matching multiple
+ remote values for a field when the field occurs before the blacklist
+ entry in the remote rules.
+
+ """
+
+ mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MULTIPLES
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+
+ reference = {
+ mapping_fixtures.CONTRACTOR_GROUP_NAME:
+ {
+ "name": mapping_fixtures.CONTRACTOR_GROUP_NAME,
+ "domain": {
+ "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID
+ }
+ }
+ }
+ for rule in mapped_properties['group_names']:
+ self.assertDictEqual(reference.get(rule.get('name')), rule)
+ self.assertEqual('tbo', mapped_properties['user']['name'])
+ self.assertEqual([], mapped_properties['group_ids'])
+
+ def test_rule_engine_whitelist_direct_group_mapping_missing_domain(self):
+ """Test if the local rule is rejected upon missing domain value
+
+ This is a variation with a ``whitelist`` filter.
+
+ """
+ mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_MISSING_DOMAIN
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ self.assertRaises(exception.ValidationError, rp.process, assertion)
+
+ def test_rule_engine_blacklist_direct_group_mapping_missing_domain(self):
+ """Test if the local rule is rejected upon missing domain value
+
+ This is a variation with a ``blacklist`` filter.
+
+ """
+ mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MISSING_DOMAIN
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ self.assertRaises(exception.ValidationError, rp.process, assertion)
+
+ def test_rule_engine_no_groups_allowed(self):
+ """Should return user mapped to no groups.
+
+ The EMPLOYEE_ASSERTION should successfully have a match
+ in MAPPING_GROUPS_WHITELIST, but 'whitelist' should filter out
+ the group values from the assertion and thus map to no groups.
+
+ """
+ mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertListEqual(mapped_properties['group_names'], [])
+ self.assertListEqual(mapped_properties['group_ids'], [])
+ self.assertEqual('tbo', mapped_properties['user']['name'])
+
+ def test_mapping_federated_domain_specified(self):
+ """Test mapping engine when domain 'ephemeral' is explicitely set.
+
+ For that, we use mapping rule MAPPING_EPHEMERAL_USER and assertion
+ EMPLOYEE_ASSERTION
+
+ """
+ mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+
+ def test_create_user_object_with_bad_mapping(self):
+ """Test if user object is created even with bad mapping.
+
+ User objects will be created by mapping engine always as long as there
+ is corresponding local rule. This test shows, that even with assertion
+ where no group names nor ids are matched, but there is 'blind' rule for
+ mapping user, such object will be created.
+
+ In this test MAPPING_EHPEMERAL_USER expects UserName set to jsmith
+ whereas value from assertion is 'tbo'.
+
+ """
+ mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CONTRACTOR_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+
+ self.assertNotIn('id', mapped_properties['user'])
+ self.assertNotIn('name', mapped_properties['user'])
+
+ def test_set_ephemeral_domain_to_ephemeral_users(self):
+ """Test auto assigning service domain to ephemeral users.
+
+ Test that ephemeral users will always become members of federated
+ service domain. The check depends on ``type`` value which must be set
+ to ``ephemeral`` in case of ephemeral user.
+
+ """
+ mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER_LOCAL_DOMAIN
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CONTRACTOR_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+
+ def test_local_user_local_domain(self):
+ """Test that local users can have non-service domains assigned."""
+ mapping = mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CONTRACTOR_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(
+ mapped_properties, user_type='local',
+ domain_id=mapping_fixtures.LOCAL_DOMAIN)
+
+ def test_user_identifications_name(self):
+ """Test varius mapping options and how users are identified.
+
+ This test calls mapped.setup_username() for propagating user object.
+
+ Test plan:
+ - Check if the user has proper domain ('federated') set
+ - Check if the user has property type set ('ephemeral')
+ - Check if user's name is properly mapped from the assertion
+ - Check if user's id is properly set and equal to name, as it was not
+ explicitely specified in the mapping.
+
+ """
+ mapping = mapping_fixtures.MAPPING_USER_IDS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CONTRACTOR_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+ mapped.setup_username({}, mapped_properties)
+ self.assertEqual('jsmith', mapped_properties['user']['id'])
+ self.assertEqual('jsmith', mapped_properties['user']['name'])
+
+ def test_user_identifications_name_and_federated_domain(self):
+ """Test varius mapping options and how users are identified.
+
+ This test calls mapped.setup_username() for propagating user object.
+
+ Test plan:
+ - Check if the user has proper domain ('federated') set
+ - Check if the user has propert type set ('ephemeral')
+ - Check if user's name is properly mapped from the assertion
+ - Check if user's id is properly set and equal to name, as it was not
+ explicitely specified in the mapping.
+
+ """
+ mapping = mapping_fixtures.MAPPING_USER_IDS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.EMPLOYEE_ASSERTION
+ mapped_properties = rp.process(assertion)
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+ mapped.setup_username({}, mapped_properties)
+ self.assertEqual('tbo', mapped_properties['user']['name'])
+ self.assertEqual('tbo', mapped_properties['user']['id'])
+
+ def test_user_identification_id(self):
+ """Test varius mapping options and how users are identified.
+
+ This test calls mapped.setup_username() for propagating user object.
+
+ Test plan:
+ - Check if the user has proper domain ('federated') set
+ - Check if the user has propert type set ('ephemeral')
+ - Check if user's id is properly mapped from the assertion
+ - Check if user's name is properly set and equal to id, as it was not
+ explicitely specified in the mapping.
+
+ """
+ mapping = mapping_fixtures.MAPPING_USER_IDS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.ADMIN_ASSERTION
+ mapped_properties = rp.process(assertion)
+ context = {'environment': {}}
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+ mapped.setup_username(context, mapped_properties)
+ self.assertEqual('bob', mapped_properties['user']['name'])
+ self.assertEqual('bob', mapped_properties['user']['id'])
+
+ def test_user_identification_id_and_name(self):
+ """Test varius mapping options and how users are identified.
+
+ This test calls mapped.setup_username() for propagating user object.
+
+ Test plan:
+ - Check if the user has proper domain ('federated') set
+ - Check if the user has proper type set ('ephemeral')
+ - Check if user's name is properly mapped from the assertion
+ - Check if user's id is properly set and and equal to value hardcoded
+ in the mapping
+
+ """
+ mapping = mapping_fixtures.MAPPING_USER_IDS
+ rp = mapping_utils.RuleProcessor(mapping['rules'])
+ assertion = mapping_fixtures.CUSTOMER_ASSERTION
+ mapped_properties = rp.process(assertion)
+ context = {'environment': {}}
+ self.assertIsNotNone(mapped_properties)
+ self.assertValidMappedUserObject(mapped_properties)
+ mapped.setup_username(context, mapped_properties)
+ self.assertEqual('bwilliams', mapped_properties['user']['name'])
+ self.assertEqual('abc123', mapped_properties['user']['id'])
+
+
+class FederatedTokenTests(FederationTests, FederatedSetupMixin):
+
+ def auth_plugin_config_override(self):
+ methods = ['saml2']
+ method_classes = {'saml2': 'keystone.auth.plugins.saml2.Saml2'}
+ super(FederatedTokenTests, self).auth_plugin_config_override(
+ methods, **method_classes)
+
+ def setUp(self):
+ super(FederatedTokenTests, self).setUp()
+ self._notifications = []
+
+ def fake_saml_notify(action, context, user_id, group_ids,
+ identity_provider, protocol, token_id, outcome):
+ note = {
+ 'action': action,
+ 'user_id': user_id,
+ 'identity_provider': identity_provider,
+ 'protocol': protocol,
+ 'send_notification_called': True}
+ self._notifications.append(note)
+
+ self.useFixture(mockpatch.PatchObject(
+ notifications,
+ 'send_saml_audit_notification',
+ fake_saml_notify))
+
+ def _assert_last_notify(self, action, identity_provider, protocol,
+ user_id=None):
+ self.assertTrue(self._notifications)
+ note = self._notifications[-1]
+ if user_id:
+ self.assertEqual(note['user_id'], user_id)
+ self.assertEqual(note['action'], action)
+ self.assertEqual(note['identity_provider'], identity_provider)
+ self.assertEqual(note['protocol'], protocol)
+ self.assertTrue(note['send_notification_called'])
+
+ def load_fixtures(self, fixtures):
+ super(FederationTests, self).load_fixtures(fixtures)
+ self.load_federation_sample_data()
+
+ def test_issue_unscoped_token_notify(self):
+ self._issue_unscoped_token()
+ self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL)
+
+ def test_issue_unscoped_token(self):
+ r = self._issue_unscoped_token()
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_issue_unscoped_token_disabled_idp(self):
+ """Checks if authentication works with disabled identity providers.
+
+ Test plan:
+ 1) Disable default IdP
+ 2) Try issuing unscoped token for that IdP
+ 3) Expect server to forbid authentication
+
+ """
+ enabled_false = {'enabled': False}
+ self.federation_api.update_idp(self.IDP, enabled_false)
+ self.assertRaises(exception.Forbidden,
+ self._issue_unscoped_token)
+
+ def test_issue_unscoped_token_group_names_in_mapping(self):
+ r = self._issue_unscoped_token(assertion='ANOTHER_CUSTOMER_ASSERTION')
+ ref_groups = set([self.group_customers['id'], self.group_admins['id']])
+ token_resp = r.json_body
+ token_groups = token_resp['token']['user']['OS-FEDERATION']['groups']
+ token_groups = set([group['id'] for group in token_groups])
+ self.assertEqual(ref_groups, token_groups)
+
+ def test_issue_unscoped_tokens_nonexisting_group(self):
+ self.assertRaises(exception.MissingGroups,
+ self._issue_unscoped_token,
+ assertion='ANOTHER_TESTER_ASSERTION')
+
+ def test_issue_unscoped_token_with_remote_no_attribute(self):
+ r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
+ environment={
+ self.REMOTE_ID_ATTR: self.REMOTE_ID
+ })
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_issue_unscoped_token_with_remote(self):
+ self.config_fixture.config(group='federation',
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+ r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE,
+ environment={
+ self.REMOTE_ID_ATTR: self.REMOTE_ID
+ })
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_issue_unscoped_token_with_remote_different(self):
+ self.config_fixture.config(group='federation',
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+ self.assertRaises(exception.Forbidden,
+ self._issue_unscoped_token,
+ idp=self.IDP_WITH_REMOTE,
+ environment={
+ self.REMOTE_ID_ATTR: uuid.uuid4().hex
+ })
+
+ def test_issue_unscoped_token_with_remote_unavailable(self):
+ self.config_fixture.config(group='federation',
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+ self.assertRaises(exception.ValidationError,
+ self._issue_unscoped_token,
+ idp=self.IDP_WITH_REMOTE,
+ environment={
+ uuid.uuid4().hex: uuid.uuid4().hex
+ })
+
+ def test_issue_unscoped_token_with_remote_user_as_empty_string(self):
+ # make sure that REMOTE_USER set as the empty string won't interfere
+ r = self._issue_unscoped_token(environment={'REMOTE_USER': ''})
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_issue_unscoped_token_no_groups(self):
+ self.assertRaises(exception.Unauthorized,
+ self._issue_unscoped_token,
+ assertion='BAD_TESTER_ASSERTION')
+
+ def test_issue_unscoped_token_malformed_environment(self):
+ """Test whether non string objects are filtered out.
+
+ Put non string objects into the environment, inject
+ correct assertion and try to get an unscoped token.
+ Expect server not to fail on using split() method on
+ non string objects and return token id in the HTTP header.
+
+ """
+ api = auth_controllers.Auth()
+ context = {
+ 'environment': {
+ 'malformed_object': object(),
+ 'another_bad_idea': tuple(xrange(10)),
+ 'yet_another_bad_param': dict(zip(uuid.uuid4().hex,
+ range(32)))
+ }
+ }
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION')
+ r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ)
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_scope_to_project_once_notify(self):
+ r = self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
+ user_id = r.json['token']['user']['id']
+ self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id)
+
+ def test_scope_to_project_once(self):
+ r = self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
+ token_resp = r.result['token']
+ project_id = token_resp['project']['id']
+ self.assertEqual(project_id, self.proj_employees['id'])
+ self._check_scoped_token_attributes(token_resp)
+ roles_ref = [self.role_employee]
+ projects_ref = self.proj_employees
+ self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
+
+ def test_scope_token_with_idp_disabled(self):
+ """Scope token issued by disabled IdP.
+
+ Try scoping the token issued by an IdP which is disabled now. Expect
+ server to refuse scoping operation.
+
+ This test confirms correct behaviour when IdP was enabled and unscoped
+ token was issued, but disabled before user tries to scope the token.
+ Here we assume the unscoped token was already issued and start from
+ the moment where IdP is being disabled and unscoped token is being
+ used.
+
+ Test plan:
+ 1) Disable IdP
+ 2) Try scoping unscoped token
+
+ """
+ enabled_false = {'enabled': False}
+ self.federation_api.update_idp(self.IDP, enabled_false)
+ self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
+ expected_status=403)
+
+ def test_scope_to_bad_project(self):
+ """Scope unscoped token with a project we don't have access to."""
+
+ self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER,
+ expected_status=401)
+
+ def test_scope_to_project_multiple_times(self):
+ """Try to scope the unscoped token multiple times.
+
+ The new tokens should be scoped to:
+
+ * Customers' project
+ * Employees' project
+
+ """
+
+ bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN,
+ self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN)
+ project_ids = (self.proj_employees['id'],
+ self.proj_customers['id'])
+ for body, project_id_ref in zip(bodies, project_ids):
+ r = self.v3_authenticate_token(body)
+ token_resp = r.result['token']
+ project_id = token_resp['project']['id']
+ self.assertEqual(project_id, project_id_ref)
+ self._check_scoped_token_attributes(token_resp)
+
+ def test_scope_to_project_with_only_inherited_roles(self):
+ """Try to scope token whose only roles are inherited."""
+ self.config_fixture.config(group='os_inherit', enabled=True)
+ r = self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER)
+ token_resp = r.result['token']
+ project_id = token_resp['project']['id']
+ self.assertEqual(project_id, self.project_inherited['id'])
+ self._check_scoped_token_attributes(token_resp)
+ roles_ref = [self.role_customer]
+ projects_ref = self.project_inherited
+ self._check_projects_and_roles(token_resp, roles_ref, projects_ref)
+
+ def test_scope_token_from_nonexistent_unscoped_token(self):
+ """Try to scope token from non-existent unscoped token."""
+ self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN,
+ expected_status=404)
+
+ def test_issue_token_from_rules_without_user(self):
+ api = auth_controllers.Auth()
+ context = {'environment': {}}
+ self._inject_assertion(context, 'BAD_TESTER_ASSERTION')
+ self.assertRaises(exception.Unauthorized,
+ api.authenticate_for_token,
+ context, self.UNSCOPED_V3_SAML2_REQ)
+
+ def test_issue_token_with_nonexistent_group(self):
+ """Inject assertion that matches rule issuing bad group id.
+
+ Expect server to find out that some groups are missing in the
+ backend and raise exception.MappedGroupNotFound exception.
+
+ """
+ self.assertRaises(exception.MappedGroupNotFound,
+ self._issue_unscoped_token,
+ assertion='CONTRACTOR_ASSERTION')
+
+ def test_scope_to_domain_once(self):
+ r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER)
+ token_resp = r.result['token']
+ domain_id = token_resp['domain']['id']
+ self.assertEqual(self.domainA['id'], domain_id)
+ self._check_scoped_token_attributes(token_resp)
+
+ def test_scope_to_domain_multiple_tokens(self):
+ """Issue multiple tokens scoping to different domains.
+
+ The new tokens should be scoped to:
+
+ * domainA
+ * domainB
+ * domainC
+
+ """
+ bodies = (self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN,
+ self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN,
+ self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN)
+ domain_ids = (self.domainA['id'],
+ self.domainB['id'],
+ self.domainC['id'])
+
+ for body, domain_id_ref in zip(bodies, domain_ids):
+ r = self.v3_authenticate_token(body)
+ token_resp = r.result['token']
+ domain_id = token_resp['domain']['id']
+ self.assertEqual(domain_id_ref, domain_id)
+ self._check_scoped_token_attributes(token_resp)
+
+ def test_scope_to_domain_with_only_inherited_roles_fails(self):
+ """Try to scope to a domain that has no direct roles."""
+ self.v3_authenticate_token(
+ self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER,
+ expected_status=401)
+
+ def test_list_projects(self):
+ urls = ('/OS-FEDERATION/projects', '/auth/projects')
+
+ token = (self.tokens['CUSTOMER_ASSERTION'],
+ self.tokens['EMPLOYEE_ASSERTION'],
+ self.tokens['ADMIN_ASSERTION'])
+
+ self.config_fixture.config(group='os_inherit', enabled=True)
+ projects_refs = (set([self.proj_customers['id'],
+ self.project_inherited['id']]),
+ set([self.proj_employees['id'],
+ self.project_all['id']]),
+ set([self.proj_employees['id'],
+ self.project_all['id'],
+ self.proj_customers['id'],
+ self.project_inherited['id']]))
+
+ for token, projects_ref in zip(token, projects_refs):
+ for url in urls:
+ r = self.get(url, token=token)
+ projects_resp = r.result['projects']
+ projects = set(p['id'] for p in projects_resp)
+ self.assertEqual(projects_ref, projects,
+ 'match failed for url %s' % url)
+
+ def test_list_domains(self):
+ urls = ('/OS-FEDERATION/domains', '/auth/domains')
+
+ tokens = (self.tokens['CUSTOMER_ASSERTION'],
+ self.tokens['EMPLOYEE_ASSERTION'],
+ self.tokens['ADMIN_ASSERTION'])
+
+ # NOTE(henry-nash): domain D does not appear in the expected results
+ # since it only had inherited roles (which only apply to projects
+ # within the domain)
+
+ domain_refs = (set([self.domainA['id']]),
+ set([self.domainA['id'],
+ self.domainB['id']]),
+ set([self.domainA['id'],
+ self.domainB['id'],
+ self.domainC['id']]))
+
+ for token, domains_ref in zip(tokens, domain_refs):
+ for url in urls:
+ r = self.get(url, token=token)
+ domains_resp = r.result['domains']
+ domains = set(p['id'] for p in domains_resp)
+ self.assertEqual(domains_ref, domains,
+ 'match failed for url %s' % url)
+
+ def test_full_workflow(self):
+ """Test 'standard' workflow for granting access tokens.
+
+ * Issue unscoped token
+ * List available projects based on groups
+ * Scope token to one of available projects
+
+ """
+
+ r = self._issue_unscoped_token()
+ employee_unscoped_token_id = r.headers.get('X-Subject-Token')
+ r = self.get('/OS-FEDERATION/projects',
+ token=employee_unscoped_token_id)
+ projects = r.result['projects']
+ random_project = random.randint(0, len(projects)) - 1
+ project = projects[random_project]
+
+ v3_scope_request = self._scope_request(employee_unscoped_token_id,
+ 'project', project['id'])
+
+ r = self.v3_authenticate_token(v3_scope_request)
+ token_resp = r.result['token']
+ project_id = token_resp['project']['id']
+ self.assertEqual(project['id'], project_id)
+ self._check_scoped_token_attributes(token_resp)
+
+ def test_workflow_with_groups_deletion(self):
+ """Test full workflow with groups deletion before token scoping.
+
+ The test scenario is as follows:
+ - Create group ``group``
+ - Create and assign roles to ``group`` and ``project_all``
+ - Patch mapping rules for existing IdP so it issues group id
+ - Issue unscoped token with ``group``'s id
+ - Delete group ``group``
+ - Scope token to ``project_all``
+ - Expect HTTP 500 response
+
+ """
+ # create group and role
+ group = self.new_group_ref(
+ domain_id=self.domainA['id'])
+ group = self.identity_api.create_group(group)
+ role = self.new_role_ref()
+ self.role_api.create_role(role['id'], role)
+
+ # assign role to group and project_admins
+ self.assignment_api.create_grant(role['id'],
+ group_id=group['id'],
+ project_id=self.project_all['id'])
+
+ rules = {
+ 'rules': [
+ {
+ 'local': [
+ {
+ 'group': {
+ 'id': group['id']
+ }
+ },
+ {
+ 'user': {
+ 'name': '{0}'
+ }
+ }
+ ],
+ 'remote': [
+ {
+ 'type': 'UserName'
+ },
+ {
+ 'type': 'LastName',
+ 'any_one_of': [
+ 'Account'
+ ]
+ }
+ ]
+ }
+ ]
+ }
+
+ self.federation_api.update_mapping(self.mapping['id'], rules)
+
+ r = self._issue_unscoped_token(assertion='TESTER_ASSERTION')
+ token_id = r.headers.get('X-Subject-Token')
+
+ # delete group
+ self.identity_api.delete_group(group['id'])
+
+ # scope token to project_all, expect HTTP 500
+ scoped_token = self._scope_request(
+ token_id, 'project',
+ self.project_all['id'])
+
+ self.v3_authenticate_token(scoped_token, expected_status=500)
+
+ def test_lists_with_missing_group_in_backend(self):
+ """Test a mapping that points to a group that does not exist
+
+ For explicit mappings, we expect the group to exist in the backend,
+ but for lists, specifically blacklists, a missing group is expected
+ as many groups will be specified by the IdP that are not Keystone
+ groups.
+
+ The test scenario is as follows:
+ - Create group ``EXISTS``
+ - Set mapping rules for existing IdP with a blacklist
+ that passes through as REMOTE_USER_GROUPS
+ - Issue unscoped token with on group ``EXISTS`` id in it
+
+ """
+ domain_id = self.domainA['id']
+ domain_name = self.domainA['name']
+ group = self.new_group_ref(domain_id=domain_id)
+ group['name'] = 'EXISTS'
+ group = self.identity_api.create_group(group)
+ rules = {
+ 'rules': [
+ {
+ "local": [
+ {
+ "user": {
+ "name": "{0}",
+ "id": "{0}"
+ }
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER"
+ }
+ ]
+ },
+ {
+ "local": [
+ {
+ "groups": "{0}",
+ "domain": {"name": domain_name}
+ }
+ ],
+ "remote": [
+ {
+ "type": "REMOTE_USER_GROUPS",
+ "blacklist": ["noblacklist"]
+ }
+ ]
+ }
+ ]
+ }
+ self.federation_api.update_mapping(self.mapping['id'], rules)
+
+ r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION')
+ assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups']
+ self.assertEqual(1, len(assigned_group_ids))
+ self.assertEqual(group['id'], assigned_group_ids[0]['id'])
+
+ def test_assertion_prefix_parameter(self):
+ """Test parameters filtering based on the prefix.
+
+ With ``assertion_prefix`` set to fixed, non default value,
+ issue an unscoped token from assertion EMPLOYEE_ASSERTION_PREFIXED.
+ Expect server to return unscoped token.
+
+ """
+ self.config_fixture.config(group='federation',
+ assertion_prefix=self.ASSERTION_PREFIX)
+ r = self._issue_unscoped_token(assertion='EMPLOYEE_ASSERTION_PREFIXED')
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+
+ def test_assertion_prefix_parameter_expect_fail(self):
+ """Test parameters filtering based on the prefix.
+
+ With ``assertion_prefix`` default value set to empty string
+ issue an unscoped token from assertion EMPLOYEE_ASSERTION.
+ Next, configure ``assertion_prefix`` to value ``UserName``.
+ Try issuing unscoped token with EMPLOYEE_ASSERTION.
+ Expect server to raise exception.Unathorized exception.
+
+ """
+ r = self._issue_unscoped_token()
+ self.assertIsNotNone(r.headers.get('X-Subject-Token'))
+ self.config_fixture.config(group='federation',
+ assertion_prefix='UserName')
+
+ self.assertRaises(exception.Unauthorized,
+ self._issue_unscoped_token)
+
+ def test_v2_auth_with_federation_token_fails(self):
+ """Test that using a federation token with v2 auth fails.
+
+ If an admin sets up a federated Keystone environment, and a user
+ incorrectly configures a service (like Nova) to only use v2 auth, the
+ returned message should be informative.
+
+ """
+ r = self._issue_unscoped_token()
+ token_id = r.headers.get('X-Subject-Token')
+ self.assertRaises(exception.Unauthorized,
+ self.token_provider_api.validate_v2_token,
+ token_id=token_id)
+
+ def test_unscoped_token_has_user_domain(self):
+ r = self._issue_unscoped_token()
+ self._check_domains_are_valid(r.json_body['token'])
+
+ def test_scoped_token_has_user_domain(self):
+ r = self.v3_authenticate_token(
+ self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE)
+ self._check_domains_are_valid(r.result['token'])
+
+ def test_issue_unscoped_token_for_local_user(self):
+ r = self._issue_unscoped_token(assertion='LOCAL_USER_ASSERTION')
+ token_resp = r.json_body['token']
+ self.assertListEqual(['saml2'], token_resp['methods'])
+ self.assertEqual(self.user['id'], token_resp['user']['id'])
+ self.assertEqual(self.user['name'], token_resp['user']['name'])
+ self.assertEqual(self.domain['id'], token_resp['user']['domain']['id'])
+ # Make sure the token is not scoped
+ self.assertNotIn('project', token_resp)
+ self.assertNotIn('domain', token_resp)
+
+ def test_issue_token_for_local_user_user_not_found(self):
+ self.assertRaises(exception.Unauthorized,
+ self._issue_unscoped_token,
+ assertion='ANOTHER_LOCAL_USER_ASSERTION')
+
+
+class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin):
+ AUTH_METHOD = 'token'
+
+ def load_fixtures(self, fixtures):
+ super(FernetFederatedTokenTests, self).load_fixtures(fixtures)
+ self.load_federation_sample_data()
+
+ def auth_plugin_config_override(self):
+ methods = ['saml2', 'token', 'password']
+ method_classes = dict(
+ password='keystone.auth.plugins.password.Password',
+ token='keystone.auth.plugins.token.Token',
+ saml2='keystone.auth.plugins.saml2.Saml2')
+ super(FernetFederatedTokenTests,
+ self).auth_plugin_config_override(methods, **method_classes)
+ self.config_fixture.config(
+ group='token',
+ provider='keystone.token.providers.fernet.Provider')
+ self.useFixture(ksfixtures.KeyRepository(self.config_fixture))
+
+ def test_federated_unscoped_token(self):
+ resp = self._issue_unscoped_token()
+ self.assertEqual(186, len(resp.headers['X-Subject-Token']))
+
+ def test_federated_unscoped_token_with_multiple_groups(self):
+ assertion = 'ANOTHER_CUSTOMER_ASSERTION'
+ resp = self._issue_unscoped_token(assertion=assertion)
+ self.assertEqual(204, len(resp.headers['X-Subject-Token']))
+
+ def test_validate_federated_unscoped_token(self):
+ resp = self._issue_unscoped_token()
+ unscoped_token = resp.headers.get('X-Subject-Token')
+ # assert that the token we received is valid
+ self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token})
+
+ def test_fernet_full_workflow(self):
+ """Test 'standard' workflow for granting Fernet access tokens.
+
+ * Issue unscoped token
+ * List available projects based on groups
+ * Scope token to one of available projects
+
+ """
+ resp = self._issue_unscoped_token()
+ unscoped_token = resp.headers.get('X-Subject-Token')
+ resp = self.get('/OS-FEDERATION/projects',
+ token=unscoped_token)
+ projects = resp.result['projects']
+ random_project = random.randint(0, len(projects)) - 1
+ project = projects[random_project]
+
+ v3_scope_request = self._scope_request(unscoped_token,
+ 'project', project['id'])
+
+ resp = self.v3_authenticate_token(v3_scope_request)
+ token_resp = resp.result['token']
+ project_id = token_resp['project']['id']
+ self.assertEqual(project['id'], project_id)
+ self._check_scoped_token_attributes(token_resp)
+
+
+class FederatedTokenTestsMethodToken(FederatedTokenTests):
+ """Test federation operation with unified scoping auth method.
+
+ Test all the operations with auth method set to ``token`` as a new, unified
+ way for scoping all the tokens.
+
+ """
+ AUTH_METHOD = 'token'
+
+ def auth_plugin_config_override(self):
+ methods = ['saml2', 'token']
+ method_classes = dict(
+ token='keystone.auth.plugins.token.Token',
+ saml2='keystone.auth.plugins.saml2.Saml2')
+ super(FederatedTokenTests,
+ self).auth_plugin_config_override(methods, **method_classes)
+
+
+class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin):
+ JSON_HOME_DATA = {
+ 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/'
+ '1.0/rel/identity_provider': {
+ 'href-template': '/OS-FEDERATION/identity_providers/{idp_id}',
+ 'href-vars': {
+ 'idp_id': 'http://docs.openstack.org/api/openstack-identity/3/'
+ 'ext/OS-FEDERATION/1.0/param/idp_id'
+ },
+ },
+ }
+
+
+def _is_xmlsec1_installed():
+ p = subprocess.Popen(
+ ['which', 'xmlsec1'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ # invert the return code
+ return not bool(p.wait())
+
+
+def _load_xml(filename):
+ with open(os.path.join(XMLDIR, filename), 'r') as xml:
+ return xml.read()
+
+
+class SAMLGenerationTests(FederationTests):
+
+ SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers'
+ '/BETA/protocols/saml2/auth')
+ ISSUER = 'https://acme.com/FIM/sps/openstack/saml20'
+ RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST'
+ SUBJECT = 'test_user'
+ ROLES = ['admin', 'member']
+ PROJECT = 'development'
+ SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2'
+ ASSERTION_VERSION = "2.0"
+ SERVICE_PROVDIER_ID = 'ACME'
+
+ def sp_ref(self):
+ ref = {
+ 'auth_url': self.SP_AUTH_URL,
+ 'enabled': True,
+ 'description': uuid.uuid4().hex,
+ 'sp_url': self.RECIPIENT,
+
+ }
+ return ref
+
+ def setUp(self):
+ super(SAMLGenerationTests, self).setUp()
+ self.signed_assertion = saml2.create_class_from_xml_string(
+ saml.Assertion, _load_xml('signed_saml2_assertion.xml'))
+ self.sp = self.sp_ref()
+ self.federation_api.create_sp(self.SERVICE_PROVDIER_ID, self.sp)
+
+ def test_samlize_token_values(self):
+ """Test the SAML generator produces a SAML object.
+
+ Test the SAML generator directly by passing known arguments, the result
+ should be a SAML object that consistently includes attributes based on
+ the known arguments that were passed in.
+
+ """
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ return_value=self.signed_assertion):
+ generator = keystone_idp.SAMLGenerator()
+ response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
+ self.SUBJECT, self.ROLES,
+ self.PROJECT)
+
+ assertion = response.assertion
+ self.assertIsNotNone(assertion)
+ self.assertIsInstance(assertion, saml.Assertion)
+ issuer = response.issuer
+ self.assertEqual(self.RECIPIENT, response.destination)
+ self.assertEqual(self.ISSUER, issuer.text)
+
+ user_attribute = assertion.attribute_statement[0].attribute[0]
+ self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text)
+
+ role_attribute = assertion.attribute_statement[0].attribute[1]
+ for attribute_value in role_attribute.attribute_value:
+ self.assertIn(attribute_value.text, self.ROLES)
+
+ project_attribute = assertion.attribute_statement[0].attribute[2]
+ self.assertEqual(self.PROJECT,
+ project_attribute.attribute_value[0].text)
+
+ def test_verify_assertion_object(self):
+ """Test that the Assertion object is built properly.
+
+ The Assertion doesn't need to be signed in this test, so
+ _sign_assertion method is patched and doesn't alter the assertion.
+
+ """
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ side_effect=lambda x: x):
+ generator = keystone_idp.SAMLGenerator()
+ response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
+ self.SUBJECT, self.ROLES,
+ self.PROJECT)
+ assertion = response.assertion
+ self.assertEqual(self.ASSERTION_VERSION, assertion.version)
+
+ def test_valid_saml_xml(self):
+ """Test the generated SAML object can become valid XML.
+
+ Test the generator directly by passing known arguments, the result
+ should be a SAML object that consistently includes attributes based on
+ the known arguments that were passed in.
+
+ """
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ return_value=self.signed_assertion):
+ generator = keystone_idp.SAMLGenerator()
+ response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
+ self.SUBJECT, self.ROLES,
+ self.PROJECT)
+
+ saml_str = response.to_string()
+ response = etree.fromstring(saml_str)
+ issuer = response[0]
+ assertion = response[2]
+
+ self.assertEqual(self.RECIPIENT, response.get('Destination'))
+ self.assertEqual(self.ISSUER, issuer.text)
+
+ user_attribute = assertion[4][0]
+ self.assertEqual(self.SUBJECT, user_attribute[0].text)
+
+ role_attribute = assertion[4][1]
+ for attribute_value in role_attribute:
+ self.assertIn(attribute_value.text, self.ROLES)
+
+ project_attribute = assertion[4][2]
+ self.assertEqual(self.PROJECT, project_attribute[0].text)
+
+ def test_assertion_using_explicit_namespace_prefixes(self):
+ def mocked_subprocess_check_output(*popenargs, **kwargs):
+ # the last option is the assertion file to be signed
+ filename = popenargs[0][-1]
+ with open(filename, 'r') as f:
+ assertion_content = f.read()
+ # since we are not testing the signature itself, we can return
+ # the assertion as is without signing it
+ return assertion_content
+
+ with mock.patch('subprocess.check_output',
+ side_effect=mocked_subprocess_check_output):
+ generator = keystone_idp.SAMLGenerator()
+ response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
+ self.SUBJECT, self.ROLES,
+ self.PROJECT)
+ assertion_xml = response.assertion.to_string()
+ # make sure we have the proper tag and prefix for the assertion
+ # namespace
+ self.assertIn('<saml:Assertion', assertion_xml)
+ self.assertIn('xmlns:saml="' + saml2.NAMESPACE + '"',
+ assertion_xml)
+ self.assertIn('xmlns:xmldsig="' + xmldsig.NAMESPACE + '"',
+ assertion_xml)
+
+ def test_saml_signing(self):
+ """Test that the SAML generator produces a SAML object.
+
+ Test the SAML generator directly by passing known arguments, the result
+ should be a SAML object that consistently includes attributes based on
+ the known arguments that were passed in.
+
+ """
+ if not _is_xmlsec1_installed():
+ self.skip('xmlsec1 is not installed')
+
+ generator = keystone_idp.SAMLGenerator()
+ response = generator.samlize_token(self.ISSUER, self.RECIPIENT,
+ self.SUBJECT, self.ROLES,
+ self.PROJECT)
+
+ signature = response.assertion.signature
+ self.assertIsNotNone(signature)
+ self.assertIsInstance(signature, xmldsig.Signature)
+
+ idp_public_key = sigver.read_cert_from_file(CONF.saml.certfile, 'pem')
+ cert_text = signature.key_info.x509_data[0].x509_certificate.text
+ # NOTE(stevemar): Rather than one line of text, the certificate is
+ # printed with newlines for readability, we remove these so we can
+ # match it with the key that we used.
+ cert_text = cert_text.replace(os.linesep, '')
+ self.assertEqual(idp_public_key, cert_text)
+
+ def _create_generate_saml_request(self, token_id, sp_id):
+ return {
+ "auth": {
+ "identity": {
+ "methods": [
+ "token"
+ ],
+ "token": {
+ "id": token_id
+ }
+ },
+ "scope": {
+ "service_provider": {
+ "id": sp_id
+ }
+ }
+ }
+ }
+
+ def _fetch_valid_token(self):
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ project_id=self.project['id'])
+ resp = self.v3_authenticate_token(auth_data)
+ token_id = resp.headers.get('X-Subject-Token')
+ return token_id
+
+ def _fetch_domain_scoped_token(self):
+ auth_data = self.build_authentication_request(
+ user_id=self.user['id'],
+ password=self.user['password'],
+ user_domain_id=self.domain['id'])
+ resp = self.v3_authenticate_token(auth_data)
+ token_id = resp.headers.get('X-Subject-Token')
+ return token_id
+
+ def test_not_project_scoped_token(self):
+ """Ensure SAML generation fails when passing domain-scoped tokens.
+
+ The server should return a 403 Forbidden Action.
+
+ """
+ self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
+ token_id = self._fetch_domain_scoped_token()
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ return_value=self.signed_assertion):
+ self.post(self.SAML_GENERATION_ROUTE, body=body,
+ expected_status=403)
+
+ def test_generate_saml_route(self):
+ """Test that the SAML generation endpoint produces XML.
+
+ The SAML endpoint /v3/auth/OS-FEDERATION/saml2 should take as input,
+ a scoped token ID, and a Service Provider ID.
+ The controller should fetch details about the user from the token,
+ and details about the service provider from its ID.
+ This should be enough information to invoke the SAML generator and
+ provide a valid SAML (XML) document back.
+
+ """
+ self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER)
+ token_id = self._fetch_valid_token()
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+
+ with mock.patch.object(keystone_idp, '_sign_assertion',
+ return_value=self.signed_assertion):
+ http_response = self.post(self.SAML_GENERATION_ROUTE, body=body,
+ response_content_type='text/xml',
+ expected_status=200)
+
+ response = etree.fromstring(http_response.result)
+ issuer = response[0]
+ assertion = response[2]
+
+ self.assertEqual(self.RECIPIENT, response.get('Destination'))
+ self.assertEqual(self.ISSUER, issuer.text)
+
+ # NOTE(stevemar): We should test this against expected values,
+ # but the self.xyz attribute names are uuids, and we mock out
+ # the result. Ideally we should update the mocked result with
+ # some known data, and create the roles/project/user before
+ # these tests run.
+ user_attribute = assertion[4][0]
+ self.assertIsInstance(user_attribute[0].text, str)
+
+ role_attribute = assertion[4][1]
+ self.assertIsInstance(role_attribute[0].text, str)
+
+ project_attribute = assertion[4][2]
+ self.assertIsInstance(project_attribute[0].text, str)
+
+ def test_invalid_scope_body(self):
+ """Test that missing the scope in request body raises an exception.
+
+ Raises exception.SchemaValidationError() - error code 400
+
+ """
+
+ token_id = uuid.uuid4().hex
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+ del body['auth']['scope']
+
+ self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
+
+ def test_invalid_token_body(self):
+ """Test that missing the token in request body raises an exception.
+
+ Raises exception.SchemaValidationError() - error code 400
+
+ """
+
+ token_id = uuid.uuid4().hex
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+ del body['auth']['identity']['token']
+
+ self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=400)
+
+ def test_sp_not_found(self):
+ """Test SAML generation with an invalid service provider ID.
+
+ Raises exception.ServiceProviderNotFound() - error code 404
+
+ """
+ sp_id = uuid.uuid4().hex
+ token_id = self._fetch_valid_token()
+ body = self._create_generate_saml_request(token_id, sp_id)
+ self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
+
+ def test_sp_disabled(self):
+ """Try generating assertion for disabled Service Provider."""
+
+ # Disable Service Provider
+ sp_ref = {'enabled': False}
+ self.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref)
+
+ token_id = self._fetch_valid_token()
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+ self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=403)
+
+ def test_token_not_found(self):
+ """Test that an invalid token in the request body raises an exception.
+
+ Raises exception.TokenNotFound() - error code 404
+
+ """
+
+ token_id = uuid.uuid4().hex
+ body = self._create_generate_saml_request(token_id,
+ self.SERVICE_PROVDIER_ID)
+ self.post(self.SAML_GENERATION_ROUTE, body=body, expected_status=404)
+
+
+class IdPMetadataGenerationTests(FederationTests):
+ """A class for testing Identity Provider Metadata generation."""
+
+ METADATA_URL = '/OS-FEDERATION/saml2/metadata'
+
+ def setUp(self):
+ super(IdPMetadataGenerationTests, self).setUp()
+ self.generator = keystone_idp.MetadataGenerator()
+
+ def config_overrides(self):
+ super(IdPMetadataGenerationTests, self).config_overrides()
+ self.config_fixture.config(
+ group='saml',
+ idp_entity_id=federation_fixtures.IDP_ENTITY_ID,
+ idp_sso_endpoint=federation_fixtures.IDP_SSO_ENDPOINT,
+ idp_organization_name=federation_fixtures.IDP_ORGANIZATION_NAME,
+ idp_organization_display_name=(
+ federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME),
+ idp_organization_url=federation_fixtures.IDP_ORGANIZATION_URL,
+ idp_contact_company=federation_fixtures.IDP_CONTACT_COMPANY,
+ idp_contact_name=federation_fixtures.IDP_CONTACT_GIVEN_NAME,
+ idp_contact_surname=federation_fixtures.IDP_CONTACT_SURNAME,
+ idp_contact_email=federation_fixtures.IDP_CONTACT_EMAIL,
+ idp_contact_telephone=(
+ federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER),
+ idp_contact_type=federation_fixtures.IDP_CONTACT_TYPE)
+
+ def test_check_entity_id(self):
+ metadata = self.generator.generate_metadata()
+ self.assertEqual(federation_fixtures.IDP_ENTITY_ID, metadata.entity_id)
+
+ def test_metadata_validity(self):
+ """Call md.EntityDescriptor method that does internal verification."""
+ self.generator.generate_metadata().verify()
+
+ def test_serialize_metadata_object(self):
+ """Check whether serialization doesn't raise any exceptions."""
+ self.generator.generate_metadata().to_string()
+ # TODO(marek-denis): Check values here
+
+ def test_check_idp_sso(self):
+ metadata = self.generator.generate_metadata()
+ idpsso_descriptor = metadata.idpsso_descriptor
+ self.assertIsNotNone(metadata.idpsso_descriptor)
+ self.assertEqual(federation_fixtures.IDP_SSO_ENDPOINT,
+ idpsso_descriptor.single_sign_on_service.location)
+
+ self.assertIsNotNone(idpsso_descriptor.organization)
+ organization = idpsso_descriptor.organization
+ self.assertEqual(federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME,
+ organization.organization_display_name.text)
+ self.assertEqual(federation_fixtures.IDP_ORGANIZATION_NAME,
+ organization.organization_name.text)
+ self.assertEqual(federation_fixtures.IDP_ORGANIZATION_URL,
+ organization.organization_url.text)
+
+ self.assertIsNotNone(idpsso_descriptor.contact_person)
+ contact_person = idpsso_descriptor.contact_person
+
+ self.assertEqual(federation_fixtures.IDP_CONTACT_GIVEN_NAME,
+ contact_person.given_name.text)
+ self.assertEqual(federation_fixtures.IDP_CONTACT_SURNAME,
+ contact_person.sur_name.text)
+ self.assertEqual(federation_fixtures.IDP_CONTACT_EMAIL,
+ contact_person.email_address.text)
+ self.assertEqual(federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER,
+ contact_person.telephone_number.text)
+ self.assertEqual(federation_fixtures.IDP_CONTACT_TYPE,
+ contact_person.contact_type)
+
+ def test_metadata_no_organization(self):
+ self.config_fixture.config(
+ group='saml',
+ idp_organization_display_name=None,
+ idp_organization_url=None,
+ idp_organization_name=None)
+ metadata = self.generator.generate_metadata()
+ idpsso_descriptor = metadata.idpsso_descriptor
+ self.assertIsNotNone(metadata.idpsso_descriptor)
+ self.assertIsNone(idpsso_descriptor.organization)
+ self.assertIsNotNone(idpsso_descriptor.contact_person)
+
+ def test_metadata_no_contact_person(self):
+ self.config_fixture.config(
+ group='saml',
+ idp_contact_name=None,
+ idp_contact_surname=None,
+ idp_contact_email=None,
+ idp_contact_telephone=None)
+ metadata = self.generator.generate_metadata()
+ idpsso_descriptor = metadata.idpsso_descriptor
+ self.assertIsNotNone(metadata.idpsso_descriptor)
+ self.assertIsNotNone(idpsso_descriptor.organization)
+ self.assertEqual([], idpsso_descriptor.contact_person)
+
+ def test_metadata_invalid_contact_type(self):
+ self.config_fixture.config(
+ group='saml',
+ idp_contact_type="invalid")
+ self.assertRaises(exception.ValidationError,
+ self.generator.generate_metadata)
+
+ def test_metadata_invalid_idp_sso_endpoint(self):
+ self.config_fixture.config(
+ group='saml',
+ idp_sso_endpoint=None)
+ self.assertRaises(exception.ValidationError,
+ self.generator.generate_metadata)
+
+ def test_metadata_invalid_idp_entity_id(self):
+ self.config_fixture.config(
+ group='saml',
+ idp_entity_id=None)
+ self.assertRaises(exception.ValidationError,
+ self.generator.generate_metadata)
+
+ def test_get_metadata_with_no_metadata_file_configured(self):
+ self.get(self.METADATA_URL, expected_status=500)
+
+ def test_get_metadata(self):
+ self.config_fixture.config(
+ group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml')
+ r = self.get(self.METADATA_URL, response_content_type='text/xml',
+ expected_status=200)
+ self.assertEqual('text/xml', r.headers.get('Content-Type'))
+
+ reference_file = _load_xml('idp_saml2_metadata.xml')
+ self.assertEqual(reference_file, r.result)
+
+
+class ServiceProviderTests(FederationTests):
+ """A test class for Service Providers."""
+
+ MEMBER_NAME = 'service_provider'
+ COLLECTION_NAME = 'service_providers'
+ SERVICE_PROVIDER_ID = 'ACME'
+ SP_KEYS = ['auth_url', 'id', 'enabled', 'description', 'sp_url']
+
+ def setUp(self):
+ super(FederationTests, self).setUp()
+ # Add a Service Provider
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ self.SP_REF = self.sp_ref()
+ self.SERVICE_PROVIDER = self.put(
+ url, body={'service_provider': self.SP_REF},
+ expected_status=201).result
+
+ def sp_ref(self):
+ ref = {
+ 'auth_url': 'https://' + uuid.uuid4().hex + '.com',
+ 'enabled': True,
+ 'description': uuid.uuid4().hex,
+ 'sp_url': 'https://' + uuid.uuid4().hex + '.com',
+ }
+ return ref
+
+ def base_url(self, suffix=None):
+ if suffix is not None:
+ return '/OS-FEDERATION/service_providers/' + str(suffix)
+ return '/OS-FEDERATION/service_providers'
+
+ def test_get_service_provider(self):
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ resp = self.get(url, expected_status=200)
+ self.assertValidEntity(resp.result['service_provider'],
+ keys_to_check=self.SP_KEYS)
+
+ def test_get_service_provider_fail(self):
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ self.get(url, expected_status=404)
+
+ def test_create_service_provider(self):
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ sp = self.sp_ref()
+ resp = self.put(url, body={'service_provider': sp},
+ expected_status=201)
+ self.assertValidEntity(resp.result['service_provider'],
+ keys_to_check=self.SP_KEYS)
+
+ def test_create_service_provider_fail(self):
+ """Try adding SP object with unallowed attribute."""
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ sp = self.sp_ref()
+ sp[uuid.uuid4().hex] = uuid.uuid4().hex
+ self.put(url, body={'service_provider': sp},
+ expected_status=400)
+
+ def test_list_service_providers(self):
+ """Test listing of service provider objects.
+
+ Add two new service providers. List all available service providers.
+ Expect to get list of three service providers (one created by setUp())
+ Test if attributes match.
+
+ """
+ ref_service_providers = {
+ uuid.uuid4().hex: self.sp_ref(),
+ uuid.uuid4().hex: self.sp_ref(),
+ }
+ for id, sp in ref_service_providers.items():
+ url = self.base_url(suffix=id)
+ self.put(url, body={'service_provider': sp}, expected_status=201)
+
+ # Insert ids into service provider object, we will compare it with
+ # responses from server and those include 'id' attribute.
+
+ ref_service_providers[self.SERVICE_PROVIDER_ID] = self.SP_REF
+ for id, sp in ref_service_providers.items():
+ sp['id'] = id
+
+ url = self.base_url()
+ resp = self.get(url)
+ service_providers = resp.result
+ for service_provider in service_providers['service_providers']:
+ id = service_provider['id']
+ self.assertValidEntity(
+ service_provider, ref=ref_service_providers[id],
+ keys_to_check=self.SP_KEYS)
+
+ def test_update_service_provider(self):
+ """Update existing service provider.
+
+ Update default existing service provider and make sure it has been
+ properly changed.
+
+ """
+ new_sp_ref = self.sp_ref()
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ resp = self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=200)
+ patch_result = resp.result
+ new_sp_ref['id'] = self.SERVICE_PROVIDER_ID
+ self.assertValidEntity(patch_result['service_provider'],
+ ref=new_sp_ref,
+ keys_to_check=self.SP_KEYS)
+
+ resp = self.get(url, expected_status=200)
+ get_result = resp.result
+
+ self.assertDictEqual(patch_result['service_provider'],
+ get_result['service_provider'])
+
+ def test_update_service_provider_immutable_parameters(self):
+ """Update immutable attributes in service provider.
+
+ In this particular case the test will try to change ``id`` attribute.
+ The server should return an HTTP 403 error code.
+
+ """
+ new_sp_ref = {'id': uuid.uuid4().hex}
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=400)
+
+ def test_update_service_provider_unknown_parameter(self):
+ new_sp_ref = self.sp_ref()
+ new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=400)
+
+ def test_update_service_provider_404(self):
+ new_sp_ref = self.sp_ref()
+ new_sp_ref['description'] = uuid.uuid4().hex
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ self.patch(url, body={'service_provider': new_sp_ref},
+ expected_status=404)
+
+ def test_delete_service_provider(self):
+ url = self.base_url(suffix=self.SERVICE_PROVIDER_ID)
+ self.delete(url, expected_status=204)
+
+ def test_delete_service_provider_404(self):
+ url = self.base_url(suffix=uuid.uuid4().hex)
+ self.delete(url, expected_status=404)
+
+
+class WebSSOTests(FederatedTokenTests):
+ """A class for testing Web SSO."""
+
+ SSO_URL = '/auth/OS-FEDERATION/websso/'
+ SSO_TEMPLATE_NAME = 'sso_callback_template.html'
+ SSO_TEMPLATE_PATH = os.path.join(core.dirs.etc(), SSO_TEMPLATE_NAME)
+ TRUSTED_DASHBOARD = 'http://horizon.com'
+ ORIGIN = urllib.parse.quote_plus(TRUSTED_DASHBOARD)
+
+ def setUp(self):
+ super(WebSSOTests, self).setUp()
+ self.api = federation_controllers.Auth()
+
+ def config_overrides(self):
+ super(WebSSOTests, self).config_overrides()
+ self.config_fixture.config(
+ group='federation',
+ trusted_dashboard=[self.TRUSTED_DASHBOARD],
+ sso_callback_template=self.SSO_TEMPLATE_PATH,
+ remote_id_attribute=self.REMOTE_ID_ATTR)
+
+ def test_render_callback_template(self):
+ token_id = uuid.uuid4().hex
+ resp = self.api.render_html_response(self.TRUSTED_DASHBOARD, token_id)
+ self.assertIn(token_id, resp.body)
+ self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
+
+ def test_federated_sso_auth(self):
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ context = {'environment': environment}
+ query_string = {'origin': self.ORIGIN}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ resp = self.api.federated_sso_auth(context, self.PROTOCOL)
+ self.assertIn(self.TRUSTED_DASHBOARD, resp.body)
+
+ def test_federated_sso_auth_bad_remote_id(self):
+ environment = {self.REMOTE_ID_ATTR: self.IDP}
+ context = {'environment': environment}
+ query_string = {'origin': self.ORIGIN}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ self.assertRaises(exception.IdentityProviderNotFound,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+ def test_federated_sso_missing_query(self):
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ context = {'environment': environment}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION')
+ self.assertRaises(exception.ValidationError,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+ def test_federated_sso_missing_query_bad_remote_id(self):
+ environment = {self.REMOTE_ID_ATTR: self.IDP}
+ context = {'environment': environment}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION')
+ self.assertRaises(exception.ValidationError,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+ def test_federated_sso_untrusted_dashboard(self):
+ environment = {self.REMOTE_ID_ATTR: self.REMOTE_ID}
+ context = {'environment': environment}
+ query_string = {'origin': uuid.uuid4().hex}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ self.assertRaises(exception.Unauthorized,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+ def test_federated_sso_untrusted_dashboard_bad_remote_id(self):
+ environment = {self.REMOTE_ID_ATTR: self.IDP}
+ context = {'environment': environment}
+ query_string = {'origin': uuid.uuid4().hex}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ self.assertRaises(exception.Unauthorized,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+ def test_federated_sso_missing_remote_id(self):
+ context = {'environment': {}}
+ query_string = {'origin': self.ORIGIN}
+ self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string)
+ self.assertRaises(exception.Unauthorized,
+ self.api.federated_sso_auth,
+ context, self.PROTOCOL)
+
+
+class K2KServiceCatalogTests(FederationTests):
+ SP1 = 'SP1'
+ SP2 = 'SP2'
+ SP3 = 'SP3'
+
+ def setUp(self):
+ super(K2KServiceCatalogTests, self).setUp()
+
+ sp = self.sp_ref()
+ self.federation_api.create_sp(self.SP1, sp)
+ self.sp_alpha = {self.SP1: sp}
+
+ sp = self.sp_ref()
+ self.federation_api.create_sp(self.SP2, sp)
+ self.sp_beta = {self.SP2: sp}
+
+ sp = self.sp_ref()
+ self.federation_api.create_sp(self.SP3, sp)
+ self.sp_gamma = {self.SP3: sp}
+
+ self.token_v3_helper = token_common.V3TokenDataHelper()
+
+ def sp_response(self, id, ref):
+ ref.pop('enabled')
+ ref.pop('description')
+ ref['id'] = id
+ return ref
+
+ def sp_ref(self):
+ ref = {
+ 'auth_url': uuid.uuid4().hex,
+ 'enabled': True,
+ 'description': uuid.uuid4().hex,
+ 'sp_url': uuid.uuid4().hex,
+ }
+ return ref
+
+ def _validate_service_providers(self, token, ref):
+ token_data = token['token']
+ self.assertIn('service_providers', token_data)
+ self.assertIsNotNone(token_data['service_providers'])
+ service_providers = token_data.get('service_providers')
+
+ self.assertEqual(len(ref), len(service_providers))
+ for entity in service_providers:
+ id = entity.get('id')
+ ref_entity = self.sp_response(id, ref.get(id))
+ self.assertDictEqual(ref_entity, entity)
+
+ def test_service_providers_in_token(self):
+ """Check if service providers are listed in service catalog."""
+
+ token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
+ ref = {}
+ for r in (self.sp_alpha, self.sp_beta, self.sp_gamma):
+ ref.update(r)
+ self._validate_service_providers(token, ref)
+
+ def test_service_provides_in_token_disabled_sp(self):
+ """Test behaviour with disabled service providers.
+
+ Disabled service providers should not be listed in the service
+ catalog.
+
+ """
+ # disable service provider ALPHA
+ sp_ref = {'enabled': False}
+ self.federation_api.update_sp(self.SP1, sp_ref)
+
+ token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
+ ref = {}
+ for r in (self.sp_beta, self.sp_gamma):
+ ref.update(r)
+ self._validate_service_providers(token, ref)
+
+ def test_no_service_providers_in_token(self):
+ """Test service catalog with disabled service providers.
+
+ There should be no entry ``service_providers`` in the catalog.
+ Test passes providing no attribute was raised.
+
+ """
+ sp_ref = {'enabled': False}
+ for sp in (self.SP1, self.SP2, self.SP3):
+ self.federation_api.update_sp(sp, sp_ref)
+
+ token = self.token_v3_helper.get_token_data(self.user_id, ['password'])
+ self.assertNotIn('service_providers', token['token'],
+ message=('Expected Service Catalog not to have '
+ 'service_providers'))