diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3_federation.py | 3722 |
1 files changed, 0 insertions, 3722 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py deleted file mode 100644 index f4ec8e51..00000000 --- a/keystone-moon/keystone/tests/unit/test_v3_federation.py +++ /dev/null @@ -1,3722 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import os -import random -from testtools import matchers -import uuid - -import fixtures -from lxml import etree -import mock -from oslo_config import cfg -from oslo_log import versionutils -from oslo_serialization import jsonutils -from oslo_utils import importutils -from oslotest import mockpatch -import saml2 -from saml2 import saml -from saml2 import sigver -from six.moves import http_client -from six.moves import range, urllib, zip -xmldsig = importutils.try_import("saml2.xmldsig") -if not xmldsig: - xmldsig = importutils.try_import("xmldsig") - -from keystone.auth import controllers as auth_controllers -from keystone.common import environment -from keystone.contrib.federation import routers -from keystone import exception -from keystone.federation import controllers as federation_controllers -from keystone.federation import idp as keystone_idp -from keystone import notifications -from keystone.tests import unit -from keystone.tests.unit import core -from keystone.tests.unit import federation_fixtures -from keystone.tests.unit import ksfixtures -from keystone.tests.unit import mapping_fixtures -from keystone.tests.unit import test_v3 -from keystone.tests.unit import utils -from keystone.token.providers import common as token_common - - -subprocess = environment.subprocess - -CONF = cfg.CONF -ROOTDIR = os.path.dirname(os.path.abspath(__file__)) -XMLDIR = os.path.join(ROOTDIR, 'saml2/') - - -def dummy_validator(*args, **kwargs): - pass - - -class FederationTests(test_v3.RestfulTestCase): - - @mock.patch.object(versionutils, 'report_deprecated_feature') - def test_exception_happens(self, mock_deprecator): - routers.FederationExtension(mock.ANY) - mock_deprecator.assert_called_once_with(mock.ANY, mock.ANY) - args, _kwargs = mock_deprecator.call_args - self.assertIn("Remove federation_extension from", args[1]) - - -class FederatedSetupMixin(object): - - ACTION = 'authenticate' - IDP = 'ORG_IDP' - PROTOCOL = 'saml2' - AUTH_METHOD = 'saml2' - USER = 'user@ORGANIZATION' - ASSERTION_PREFIX = 'PREFIX_' - IDP_WITH_REMOTE = 'ORG_IDP_REMOTE' - REMOTE_IDS = ['entityID_IDP1', 'entityID_IDP2'] - REMOTE_ID_ATTR = uuid.uuid4().hex - - UNSCOPED_V3_SAML2_REQ = { - "identity": { - "methods": [AUTH_METHOD], - AUTH_METHOD: { - "identity_provider": IDP, - "protocol": PROTOCOL - } - } - } - - def _check_domains_are_valid(self, token): - self.assertEqual('Federated', token['user']['domain']['id']) - self.assertEqual('Federated', token['user']['domain']['name']) - - def _project(self, project): - return (project['id'], project['name']) - - def _roles(self, roles): - return set([(r['id'], r['name']) for r in roles]) - - def _check_projects_and_roles(self, token, roles, projects): - """Check whether the projects and the roles match.""" - token_roles = token.get('roles') - if token_roles is None: - raise AssertionError('Roles not found in the token') - token_roles = self._roles(token_roles) - roles_ref = self._roles(roles) - self.assertEqual(token_roles, roles_ref) - - token_projects = token.get('project') - if token_projects is None: - raise AssertionError('Projects not found in the token') - token_projects = self._project(token_projects) - projects_ref = self._project(projects) - self.assertEqual(token_projects, projects_ref) - - def _check_scoped_token_attributes(self, token): - - for obj in ('user', 'catalog', 'expires_at', 'issued_at', - 'methods', 'roles'): - self.assertIn(obj, token) - - os_federation = token['user']['OS-FEDERATION'] - - self.assertIn('groups', os_federation) - self.assertIn('identity_provider', os_federation) - self.assertIn('protocol', os_federation) - self.assertThat(os_federation, matchers.HasLength(3)) - - self.assertEqual(self.IDP, os_federation['identity_provider']['id']) - self.assertEqual(self.PROTOCOL, os_federation['protocol']['id']) - - def _check_project_scoped_token_attributes(self, token, project_id): - self.assertEqual(project_id, token['project']['id']) - self._check_scoped_token_attributes(token) - - def _check_domain_scoped_token_attributes(self, token, domain_id): - self.assertEqual(domain_id, token['domain']['id']) - self._check_scoped_token_attributes(token) - - def assertValidMappedUser(self, token): - """Check if user object meets all the criteria.""" - user = token['user'] - self.assertIn('id', user) - self.assertIn('name', user) - self.assertIn('domain', user) - - self.assertIn('groups', user['OS-FEDERATION']) - self.assertIn('identity_provider', user['OS-FEDERATION']) - self.assertIn('protocol', user['OS-FEDERATION']) - - # Make sure user_id is url safe - self.assertEqual(urllib.parse.quote(user['name']), user['id']) - - def _issue_unscoped_token(self, - idp=None, - assertion='EMPLOYEE_ASSERTION', - environment=None): - api = federation_controllers.Auth() - context = {'environment': environment or {}} - self._inject_assertion(context, assertion) - if idp is None: - idp = self.IDP - r = api.federated_authentication(context, idp, self.PROTOCOL) - return r - - def idp_ref(self, id=None): - idp = { - 'id': id or uuid.uuid4().hex, - 'enabled': True, - 'description': uuid.uuid4().hex - } - return idp - - def proto_ref(self, mapping_id=None): - proto = { - 'id': uuid.uuid4().hex, - 'mapping_id': mapping_id or uuid.uuid4().hex - } - return proto - - def mapping_ref(self, rules=None): - return { - 'id': uuid.uuid4().hex, - 'rules': rules or self.rules['rules'] - } - - def _scope_request(self, unscoped_token_id, scope, scope_id): - return { - 'auth': { - 'identity': { - 'methods': [ - self.AUTH_METHOD - ], - self.AUTH_METHOD: { - 'id': unscoped_token_id - } - }, - 'scope': { - scope: { - 'id': scope_id - } - } - } - } - - def _inject_assertion(self, context, variant, query_string=None): - assertion = getattr(mapping_fixtures, variant) - context['environment'].update(assertion) - context['query_string'] = query_string or [] - - def load_federation_sample_data(self): - """Inject additional data.""" - # Create and add domains - self.domainA = unit.new_domain_ref() - self.resource_api.create_domain(self.domainA['id'], - self.domainA) - - self.domainB = unit.new_domain_ref() - self.resource_api.create_domain(self.domainB['id'], - self.domainB) - - self.domainC = unit.new_domain_ref() - self.resource_api.create_domain(self.domainC['id'], - self.domainC) - - self.domainD = unit.new_domain_ref() - self.resource_api.create_domain(self.domainD['id'], - self.domainD) - - # Create and add projects - self.proj_employees = unit.new_project_ref( - domain_id=self.domainA['id']) - self.resource_api.create_project(self.proj_employees['id'], - self.proj_employees) - self.proj_customers = unit.new_project_ref( - domain_id=self.domainA['id']) - self.resource_api.create_project(self.proj_customers['id'], - self.proj_customers) - - self.project_all = unit.new_project_ref( - domain_id=self.domainA['id']) - self.resource_api.create_project(self.project_all['id'], - self.project_all) - - self.project_inherited = unit.new_project_ref( - domain_id=self.domainD['id']) - self.resource_api.create_project(self.project_inherited['id'], - self.project_inherited) - - # Create and add groups - self.group_employees = unit.new_group_ref(domain_id=self.domainA['id']) - self.group_employees = ( - self.identity_api.create_group(self.group_employees)) - - self.group_customers = unit.new_group_ref(domain_id=self.domainA['id']) - self.group_customers = ( - self.identity_api.create_group(self.group_customers)) - - self.group_admins = unit.new_group_ref(domain_id=self.domainA['id']) - self.group_admins = self.identity_api.create_group(self.group_admins) - - # Create and add roles - self.role_employee = unit.new_role_ref() - self.role_api.create_role(self.role_employee['id'], self.role_employee) - self.role_customer = unit.new_role_ref() - self.role_api.create_role(self.role_customer['id'], self.role_customer) - - self.role_admin = unit.new_role_ref() - self.role_api.create_role(self.role_admin['id'], self.role_admin) - - # Employees can access - # * proj_employees - # * project_all - self.assignment_api.create_grant(self.role_employee['id'], - group_id=self.group_employees['id'], - project_id=self.proj_employees['id']) - self.assignment_api.create_grant(self.role_employee['id'], - group_id=self.group_employees['id'], - project_id=self.project_all['id']) - # Customers can access - # * proj_customers - self.assignment_api.create_grant(self.role_customer['id'], - group_id=self.group_customers['id'], - project_id=self.proj_customers['id']) - - # Admins can access: - # * proj_customers - # * proj_employees - # * project_all - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - project_id=self.proj_customers['id']) - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - project_id=self.proj_employees['id']) - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - project_id=self.project_all['id']) - - self.assignment_api.create_grant(self.role_customer['id'], - group_id=self.group_customers['id'], - domain_id=self.domainA['id']) - - # Customers can access: - # * domain A - self.assignment_api.create_grant(self.role_customer['id'], - group_id=self.group_customers['id'], - domain_id=self.domainA['id']) - - # Customers can access projects via inheritance: - # * domain D - self.assignment_api.create_grant(self.role_customer['id'], - group_id=self.group_customers['id'], - domain_id=self.domainD['id'], - inherited_to_projects=True) - - # Employees can access: - # * domain A - # * domain B - - self.assignment_api.create_grant(self.role_employee['id'], - group_id=self.group_employees['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.role_employee['id'], - group_id=self.group_employees['id'], - domain_id=self.domainB['id']) - - # Admins can access: - # * domain A - # * domain B - # * domain C - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - domain_id=self.domainA['id']) - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - domain_id=self.domainB['id']) - - self.assignment_api.create_grant(self.role_admin['id'], - group_id=self.group_admins['id'], - domain_id=self.domainC['id']) - self.rules = { - 'rules': [ - { - 'local': [ - { - 'group': { - 'id': self.group_employees['id'] - } - }, - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': 'UserName' - }, - { - 'type': 'Email', - }, - { - 'type': 'orgPersonType', - 'any_one_of': [ - 'Employee' - ] - } - ] - }, - { - 'local': [ - { - 'group': { - 'id': self.group_employees['id'] - } - }, - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': self.ASSERTION_PREFIX + 'UserName' - }, - { - 'type': self.ASSERTION_PREFIX + 'Email', - }, - { - 'type': self.ASSERTION_PREFIX + 'orgPersonType', - 'any_one_of': [ - 'SuperEmployee' - ] - } - ] - }, - { - 'local': [ - { - 'group': { - 'id': self.group_customers['id'] - } - }, - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': 'UserName' - }, - { - 'type': 'Email' - }, - { - 'type': 'orgPersonType', - 'any_one_of': [ - 'Customer' - ] - } - ] - }, - { - 'local': [ - { - 'group': { - 'id': self.group_admins['id'] - } - }, - { - 'group': { - 'id': self.group_employees['id'] - } - }, - { - 'group': { - 'id': self.group_customers['id'] - } - }, - - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': 'UserName' - }, - { - 'type': 'Email' - }, - { - 'type': 'orgPersonType', - 'any_one_of': [ - 'Admin', - 'Chief' - ] - } - ] - }, - { - 'local': [ - { - 'group': { - 'id': uuid.uuid4().hex - } - }, - { - 'group': { - 'id': self.group_customers['id'] - } - }, - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': 'UserName', - }, - { - 'type': 'Email', - }, - { - 'type': 'FirstName', - 'any_one_of': [ - 'Jill' - ] - }, - { - 'type': 'LastName', - 'any_one_of': [ - 'Smith' - ] - } - ] - }, - { - 'local': [ - { - 'group': { - 'id': 'this_group_no_longer_exists' - } - }, - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - } - ], - 'remote': [ - { - 'type': 'UserName', - }, - { - 'type': 'Email', - }, - { - 'type': 'Email', - 'any_one_of': [ - 'testacct@example.com' - ] - }, - { - 'type': 'orgPersonType', - 'any_one_of': [ - 'Tester' - ] - } - ] - }, - # rules with local group names - { - "local": [ - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - }, - { - "group": { - "name": self.group_customers['name'], - "domain": { - "name": self.domainA['name'] - } - } - } - ], - "remote": [ - { - 'type': 'UserName', - }, - { - 'type': 'Email', - }, - { - "type": "orgPersonType", - "any_one_of": [ - "CEO", - "CTO" - ], - } - ] - }, - { - "local": [ - { - 'user': { - 'name': '{0}', - 'id': '{1}' - } - }, - { - "group": { - "name": self.group_admins['name'], - "domain": { - "id": self.domainA['id'] - } - } - } - ], - "remote": [ - { - "type": "UserName", - }, - { - "type": "Email", - }, - { - "type": "orgPersonType", - "any_one_of": [ - "Managers" - ] - } - ] - }, - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{1}" - } - }, - { - "group": { - "name": "NON_EXISTING", - "domain": { - "id": self.domainA['id'] - } - } - } - ], - "remote": [ - { - "type": "UserName", - }, - { - "type": "Email", - }, - { - "type": "UserName", - "any_one_of": [ - "IamTester" - ] - } - ] - }, - { - "local": [ - { - "user": { - "type": "local", - "name": self.user['name'], - "domain": { - "id": self.user['domain_id'] - } - } - }, - { - "group": { - "id": self.group_customers['id'] - } - } - ], - "remote": [ - { - "type": "UserType", - "any_one_of": [ - "random" - ] - } - ] - }, - { - "local": [ - { - "user": { - "type": "local", - "name": self.user['name'], - "domain": { - "id": uuid.uuid4().hex - } - } - } - ], - "remote": [ - { - "type": "Position", - "any_one_of": [ - "DirectorGeneral" - ] - } - ] - } - ] - } - - # Add IDP - self.idp = self.idp_ref(id=self.IDP) - self.federation_api.create_idp(self.idp['id'], - self.idp) - # Add IDP with remote - self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE) - self.idp_with_remote['remote_ids'] = self.REMOTE_IDS - self.federation_api.create_idp(self.idp_with_remote['id'], - self.idp_with_remote) - # Add a mapping - self.mapping = self.mapping_ref() - self.federation_api.create_mapping(self.mapping['id'], - self.mapping) - # Add protocols - self.proto_saml = self.proto_ref(mapping_id=self.mapping['id']) - self.proto_saml['id'] = self.PROTOCOL - self.federation_api.create_protocol(self.idp['id'], - self.proto_saml['id'], - self.proto_saml) - # Add protocols IDP with remote - self.federation_api.create_protocol(self.idp_with_remote['id'], - self.proto_saml['id'], - self.proto_saml) - # Generate fake tokens - context = {'environment': {}} - - self.tokens = {} - VARIANTS = ('EMPLOYEE_ASSERTION', 'CUSTOMER_ASSERTION', - 'ADMIN_ASSERTION') - api = auth_controllers.Auth() - for variant in VARIANTS: - self._inject_assertion(context, variant) - r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ) - self.tokens[variant] = r.headers.get('X-Subject-Token') - - self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN = self._scope_request( - uuid.uuid4().hex, 'project', self.proj_customers['id']) - - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE = self._scope_request( - self.tokens['EMPLOYEE_ASSERTION'], 'project', - self.proj_employees['id']) - - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN = self._scope_request( - self.tokens['ADMIN_ASSERTION'], 'project', - self.proj_employees['id']) - - self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN = self._scope_request( - self.tokens['ADMIN_ASSERTION'], 'project', - self.proj_customers['id']) - - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER = self._scope_request( - self.tokens['CUSTOMER_ASSERTION'], 'project', - self.proj_employees['id']) - - self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER = self._scope_request( - self.tokens['CUSTOMER_ASSERTION'], 'project', - self.project_inherited['id']) - - self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER = self._scope_request( - self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainA['id']) - - self.TOKEN_SCOPE_DOMAIN_B_FROM_CUSTOMER = self._scope_request( - self.tokens['CUSTOMER_ASSERTION'], 'domain', - self.domainB['id']) - - self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER = self._scope_request( - self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainD['id']) - - self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN = self._scope_request( - self.tokens['ADMIN_ASSERTION'], 'domain', self.domainA['id']) - - self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN = self._scope_request( - self.tokens['ADMIN_ASSERTION'], 'domain', self.domainB['id']) - - self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN = self._scope_request( - self.tokens['ADMIN_ASSERTION'], 'domain', - self.domainC['id']) - - -class FederatedIdentityProviderTests(test_v3.RestfulTestCase): - """A test class for Identity Providers.""" - - idp_keys = ['description', 'enabled'] - - default_body = {'description': None, 'enabled': True} - - def base_url(self, suffix=None): - if suffix is not None: - return '/OS-FEDERATION/identity_providers/' + str(suffix) - return '/OS-FEDERATION/identity_providers' - - def _fetch_attribute_from_response(self, resp, parameter, - assert_is_not_none=True): - """Fetch single attribute from TestResponse object.""" - result = resp.result.get(parameter) - if assert_is_not_none: - self.assertIsNotNone(result) - return result - - def _create_and_decapsulate_response(self, body=None): - """Create IdP and fetch it's random id along with entity.""" - default_resp = self._create_default_idp(body=body) - idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - self.assertIsNotNone(idp) - idp_id = idp.get('id') - return (idp_id, idp) - - def _get_idp(self, idp_id): - """Fetch IdP entity based on its id.""" - url = self.base_url(suffix=idp_id) - resp = self.get(url) - return resp - - def _create_default_idp(self, body=None): - """Create default IdP.""" - url = self.base_url(suffix=uuid.uuid4().hex) - if body is None: - body = self._http_idp_input() - resp = self.put(url, body={'identity_provider': body}, - expected_status=http_client.CREATED) - return resp - - def _http_idp_input(self, **kwargs): - """Create default input for IdP data.""" - body = None - if 'body' not in kwargs: - body = self.default_body.copy() - body['description'] = uuid.uuid4().hex - else: - body = kwargs['body'] - return body - - def _assign_protocol_to_idp(self, idp_id=None, proto=None, url=None, - mapping_id=None, validate=True, **kwargs): - if url is None: - url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - if idp_id is None: - idp_id, _ = self._create_and_decapsulate_response() - if proto is None: - proto = uuid.uuid4().hex - if mapping_id is None: - mapping_id = uuid.uuid4().hex - body = {'mapping_id': mapping_id} - url = url % {'idp_id': idp_id, 'protocol_id': proto} - resp = self.put(url, body={'protocol': body}, **kwargs) - if validate: - self.assertValidResponse(resp, 'protocol', dummy_validator, - keys_to_check=['id', 'mapping_id'], - ref={'id': proto, - 'mapping_id': mapping_id}) - return (resp, idp_id, proto) - - def _get_protocol(self, idp_id, protocol_id): - url = "%s/protocols/%s" % (idp_id, protocol_id) - url = self.base_url(suffix=url) - r = self.get(url) - return r - - def test_create_idp(self): - """Creates the IdentityProvider entity associated to remote_ids.""" - keys_to_check = list(self.idp_keys) - body = self.default_body.copy() - body['description'] = uuid.uuid4().hex - resp = self._create_default_idp(body=body) - self.assertValidResponse(resp, 'identity_provider', dummy_validator, - keys_to_check=keys_to_check, - ref=body) - - def test_create_idp_remote(self): - """Creates the IdentityProvider entity associated to remote_ids.""" - keys_to_check = list(self.idp_keys) - keys_to_check.append('remote_ids') - body = self.default_body.copy() - body['description'] = uuid.uuid4().hex - body['remote_ids'] = [uuid.uuid4().hex, - uuid.uuid4().hex, - uuid.uuid4().hex] - resp = self._create_default_idp(body=body) - self.assertValidResponse(resp, 'identity_provider', dummy_validator, - keys_to_check=keys_to_check, - ref=body) - - def test_create_idp_remote_repeated(self): - """Creates two IdentityProvider entities with some remote_ids - - A remote_id is the same for both so the second IdP is not - created because of the uniqueness of the remote_ids - - Expect HTTP 409 Conflict code for the latter call. - - """ - body = self.default_body.copy() - repeated_remote_id = uuid.uuid4().hex - body['remote_ids'] = [uuid.uuid4().hex, - uuid.uuid4().hex, - uuid.uuid4().hex, - repeated_remote_id] - self._create_default_idp(body=body) - - url = self.base_url(suffix=uuid.uuid4().hex) - body['remote_ids'] = [uuid.uuid4().hex, - repeated_remote_id] - resp = self.put(url, body={'identity_provider': body}, - expected_status=http_client.CONFLICT) - - resp_data = jsonutils.loads(resp.body) - self.assertIn('Duplicate remote ID', - resp_data.get('error', {}).get('message')) - - def test_create_idp_remote_empty(self): - """Creates an IdP with empty remote_ids.""" - keys_to_check = list(self.idp_keys) - keys_to_check.append('remote_ids') - body = self.default_body.copy() - body['description'] = uuid.uuid4().hex - body['remote_ids'] = [] - resp = self._create_default_idp(body=body) - self.assertValidResponse(resp, 'identity_provider', dummy_validator, - keys_to_check=keys_to_check, - ref=body) - - def test_create_idp_remote_none(self): - """Creates an IdP with a None remote_ids.""" - keys_to_check = list(self.idp_keys) - keys_to_check.append('remote_ids') - body = self.default_body.copy() - body['description'] = uuid.uuid4().hex - body['remote_ids'] = None - resp = self._create_default_idp(body=body) - expected = body.copy() - expected['remote_ids'] = [] - self.assertValidResponse(resp, 'identity_provider', dummy_validator, - keys_to_check=keys_to_check, - ref=expected) - - def test_update_idp_remote_ids(self): - """Update IdP's remote_ids parameter.""" - body = self.default_body.copy() - body['remote_ids'] = [uuid.uuid4().hex] - default_resp = self._create_default_idp(body=body) - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - url = self.base_url(suffix=idp_id) - self.assertIsNotNone(idp_id) - - body['remote_ids'] = [uuid.uuid4().hex, uuid.uuid4().hex] - - body = {'identity_provider': body} - resp = self.patch(url, body=body) - updated_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - body = body['identity_provider'] - self.assertEqual(sorted(body['remote_ids']), - sorted(updated_idp.get('remote_ids'))) - - resp = self.get(url) - returned_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - self.assertEqual(sorted(body['remote_ids']), - sorted(returned_idp.get('remote_ids'))) - - def test_update_idp_clean_remote_ids(self): - """Update IdP's remote_ids parameter with an empty list.""" - body = self.default_body.copy() - body['remote_ids'] = [uuid.uuid4().hex] - default_resp = self._create_default_idp(body=body) - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - url = self.base_url(suffix=idp_id) - self.assertIsNotNone(idp_id) - - body['remote_ids'] = [] - - body = {'identity_provider': body} - resp = self.patch(url, body=body) - updated_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - body = body['identity_provider'] - self.assertEqual(sorted(body['remote_ids']), - sorted(updated_idp.get('remote_ids'))) - - resp = self.get(url) - returned_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - self.assertEqual(sorted(body['remote_ids']), - sorted(returned_idp.get('remote_ids'))) - - def test_update_idp_remote_repeated(self): - """Update an IdentityProvider entity reusing a remote_id. - - A remote_id is the same for both so the second IdP is not - updated because of the uniqueness of the remote_ids. - - Expect HTTP 409 Conflict code for the latter call. - - """ - # Create first identity provider - body = self.default_body.copy() - repeated_remote_id = uuid.uuid4().hex - body['remote_ids'] = [uuid.uuid4().hex, - repeated_remote_id] - self._create_default_idp(body=body) - - # Create second identity provider (without remote_ids) - body = self.default_body.copy() - default_resp = self._create_default_idp(body=body) - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - url = self.base_url(suffix=idp_id) - - body['remote_ids'] = [repeated_remote_id] - resp = self.patch(url, body={'identity_provider': body}, - expected_status=http_client.CONFLICT) - resp_data = jsonutils.loads(resp.body) - self.assertIn('Duplicate remote ID', - resp_data['error']['message']) - - def test_list_idps(self, iterations=5): - """Lists all available IdentityProviders. - - This test collects ids of created IdPs and - intersects it with the list of all available IdPs. - List of all IdPs can be a superset of IdPs created in this test, - because other tests also create IdPs. - - """ - def get_id(resp): - r = self._fetch_attribute_from_response(resp, - 'identity_provider') - return r.get('id') - - ids = [] - for _ in range(iterations): - id = get_id(self._create_default_idp()) - ids.append(id) - ids = set(ids) - - keys_to_check = self.idp_keys - url = self.base_url() - resp = self.get(url) - self.assertValidListResponse(resp, 'identity_providers', - dummy_validator, - keys_to_check=keys_to_check) - entities = self._fetch_attribute_from_response(resp, - 'identity_providers') - entities_ids = set([e['id'] for e in entities]) - ids_intersection = entities_ids.intersection(ids) - self.assertEqual(ids_intersection, ids) - - def test_filter_list_idp_by_id(self): - def get_id(resp): - r = self._fetch_attribute_from_response(resp, - 'identity_provider') - return r.get('id') - - idp1_id = get_id(self._create_default_idp()) - idp2_id = get_id(self._create_default_idp()) - - # list the IdP, should get two IdP. - url = self.base_url() - resp = self.get(url) - entities = self._fetch_attribute_from_response(resp, - 'identity_providers') - entities_ids = [e['id'] for e in entities] - self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) - - # filter the IdP by ID. - url = self.base_url() + '?id=' + idp1_id - resp = self.get(url) - filtered_service_list = resp.json['identity_providers'] - self.assertThat(filtered_service_list, matchers.HasLength(1)) - self.assertEqual(idp1_id, filtered_service_list[0].get('id')) - - def test_filter_list_idp_by_enabled(self): - def get_id(resp): - r = self._fetch_attribute_from_response(resp, - 'identity_provider') - return r.get('id') - - idp1_id = get_id(self._create_default_idp()) - - body = self.default_body.copy() - body['enabled'] = False - idp2_id = get_id(self._create_default_idp(body=body)) - - # list the IdP, should get two IdP. - url = self.base_url() - resp = self.get(url) - entities = self._fetch_attribute_from_response(resp, - 'identity_providers') - entities_ids = [e['id'] for e in entities] - self.assertItemsEqual(entities_ids, [idp1_id, idp2_id]) - - # filter the IdP by 'enabled'. - url = self.base_url() + '?enabled=True' - resp = self.get(url) - filtered_service_list = resp.json['identity_providers'] - self.assertThat(filtered_service_list, matchers.HasLength(1)) - self.assertEqual(idp1_id, filtered_service_list[0].get('id')) - - def test_check_idp_uniqueness(self): - """Add same IdP twice. - - Expect HTTP 409 Conflict code for the latter call. - - """ - url = self.base_url(suffix=uuid.uuid4().hex) - body = self._http_idp_input() - self.put(url, body={'identity_provider': body}, - expected_status=http_client.CREATED) - resp = self.put(url, body={'identity_provider': body}, - expected_status=http_client.CONFLICT) - - resp_data = jsonutils.loads(resp.body) - self.assertIn('Duplicate entry', - resp_data.get('error', {}).get('message')) - - def test_get_idp(self): - """Create and later fetch IdP.""" - body = self._http_idp_input() - default_resp = self._create_default_idp(body=body) - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - url = self.base_url(suffix=idp_id) - resp = self.get(url) - self.assertValidResponse(resp, 'identity_provider', - dummy_validator, keys_to_check=body.keys(), - ref=body) - - def test_get_nonexisting_idp(self): - """Fetch nonexisting IdP entity. - - Expected HTTP 404 Not Found status code. - - """ - idp_id = uuid.uuid4().hex - self.assertIsNotNone(idp_id) - - url = self.base_url(suffix=idp_id) - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_delete_existing_idp(self): - """Create and later delete IdP. - - Expect HTTP 404 Not Found for the GET IdP call. - """ - default_resp = self._create_default_idp() - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - self.assertIsNotNone(idp_id) - url = self.base_url(suffix=idp_id) - self.delete(url) - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_delete_idp_also_deletes_assigned_protocols(self): - """Deleting an IdP will delete its assigned protocol.""" - # create default IdP - default_resp = self._create_default_idp() - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp['id'] - protocol_id = uuid.uuid4().hex - - url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - idp_url = self.base_url(suffix=idp_id) - - # assign protocol to IdP - kwargs = {'expected_status': http_client.CREATED} - resp, idp_id, proto = self._assign_protocol_to_idp( - url=url, - idp_id=idp_id, - proto=protocol_id, - **kwargs) - - # removing IdP will remove the assigned protocol as well - self.assertEqual(1, len(self.federation_api.list_protocols(idp_id))) - self.delete(idp_url) - self.get(idp_url, expected_status=http_client.NOT_FOUND) - self.assertEqual(0, len(self.federation_api.list_protocols(idp_id))) - - def test_delete_nonexisting_idp(self): - """Delete nonexisting IdP. - - Expect HTTP 404 Not Found for the GET IdP call. - """ - idp_id = uuid.uuid4().hex - url = self.base_url(suffix=idp_id) - self.delete(url, expected_status=http_client.NOT_FOUND) - - def test_update_idp_mutable_attributes(self): - """Update IdP's mutable parameters.""" - default_resp = self._create_default_idp() - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - url = self.base_url(suffix=idp_id) - self.assertIsNotNone(idp_id) - - _enabled = not default_idp.get('enabled') - body = {'remote_ids': [uuid.uuid4().hex, uuid.uuid4().hex], - 'description': uuid.uuid4().hex, - 'enabled': _enabled} - - body = {'identity_provider': body} - resp = self.patch(url, body=body) - updated_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - body = body['identity_provider'] - for key in body.keys(): - if isinstance(body[key], list): - self.assertEqual(sorted(body[key]), - sorted(updated_idp.get(key))) - else: - self.assertEqual(body[key], updated_idp.get(key)) - - resp = self.get(url) - updated_idp = self._fetch_attribute_from_response(resp, - 'identity_provider') - for key in body.keys(): - if isinstance(body[key], list): - self.assertEqual(sorted(body[key]), - sorted(updated_idp.get(key))) - else: - self.assertEqual(body[key], updated_idp.get(key)) - - def test_update_idp_immutable_attributes(self): - """Update IdP's immutable parameters. - - Expect HTTP BAD REQUEST. - - """ - default_resp = self._create_default_idp() - default_idp = self._fetch_attribute_from_response(default_resp, - 'identity_provider') - idp_id = default_idp.get('id') - self.assertIsNotNone(idp_id) - - body = self._http_idp_input() - body['id'] = uuid.uuid4().hex - body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] - - url = self.base_url(suffix=idp_id) - self.patch(url, body={'identity_provider': body}, - expected_status=http_client.BAD_REQUEST) - - def test_update_nonexistent_idp(self): - """Update nonexistent IdP - - Expect HTTP 404 Not Found code. - - """ - idp_id = uuid.uuid4().hex - url = self.base_url(suffix=idp_id) - body = self._http_idp_input() - body['enabled'] = False - body = {'identity_provider': body} - - self.patch(url, body=body, expected_status=http_client.NOT_FOUND) - - def test_assign_protocol_to_idp(self): - """Assign a protocol to existing IdP.""" - self._assign_protocol_to_idp(expected_status=http_client.CREATED) - - def test_protocol_composite_pk(self): - """Test that Keystone can add two entities. - - The entities have identical names, however, attached to different - IdPs. - - 1. Add IdP and assign it protocol with predefined name - 2. Add another IdP and assign it a protocol with same name. - - Expect HTTP 201 code - - """ - url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - - kwargs = {'expected_status': http_client.CREATED} - self._assign_protocol_to_idp(proto='saml2', - url=url, **kwargs) - - self._assign_protocol_to_idp(proto='saml2', - url=url, **kwargs) - - def test_protocol_idp_pk_uniqueness(self): - """Test whether Keystone checks for unique idp/protocol values. - - Add same protocol twice, expect Keystone to reject a latter call and - return HTTP 409 Conflict code. - - """ - url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') - - kwargs = {'expected_status': http_client.CREATED} - resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', - url=url, **kwargs) - kwargs = {'expected_status': http_client.CONFLICT} - resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id, - proto='saml2', - validate=False, - url=url, **kwargs) - - def test_assign_protocol_to_nonexistent_idp(self): - """Assign protocol to IdP that doesn't exist. - - Expect HTTP 404 Not Found code. - - """ - idp_id = uuid.uuid4().hex - kwargs = {'expected_status': http_client.NOT_FOUND} - self._assign_protocol_to_idp(proto='saml2', - idp_id=idp_id, - validate=False, - **kwargs) - - def test_get_protocol(self): - """Create and later fetch protocol tied to IdP.""" - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) - proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] - url = "%s/protocols/%s" % (idp_id, proto_id) - url = self.base_url(suffix=url) - - resp = self.get(url) - - reference = {'id': proto_id} - self.assertValidResponse(resp, 'protocol', - dummy_validator, - keys_to_check=reference.keys(), - ref=reference) - - def test_list_protocols(self): - """Create set of protocols and later list them. - - Compare input and output id sets. - - """ - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) - iterations = random.randint(0, 16) - protocol_ids = [] - for _ in range(iterations): - resp, _, proto = self._assign_protocol_to_idp( - idp_id=idp_id, - expected_status=http_client.CREATED) - proto_id = self._fetch_attribute_from_response(resp, 'protocol') - proto_id = proto_id['id'] - protocol_ids.append(proto_id) - - url = "%s/protocols" % idp_id - url = self.base_url(suffix=url) - resp = self.get(url) - self.assertValidListResponse(resp, 'protocols', - dummy_validator, - keys_to_check=['id']) - entities = self._fetch_attribute_from_response(resp, 'protocols') - entities = set([entity['id'] for entity in entities]) - protocols_intersection = entities.intersection(protocol_ids) - self.assertEqual(protocols_intersection, set(protocol_ids)) - - def test_update_protocols_attribute(self): - """Update protocol's attribute.""" - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) - new_mapping_id = uuid.uuid4().hex - - url = "%s/protocols/%s" % (idp_id, proto) - url = self.base_url(suffix=url) - body = {'mapping_id': new_mapping_id} - resp = self.patch(url, body={'protocol': body}) - self.assertValidResponse(resp, 'protocol', dummy_validator, - keys_to_check=['id', 'mapping_id'], - ref={'id': proto, - 'mapping_id': new_mapping_id} - ) - - def test_delete_protocol(self): - """Delete protocol. - - Expect HTTP 404 Not Found code for the GET call after the protocol is - deleted. - - """ - url = self.base_url(suffix='/%(idp_id)s/' - 'protocols/%(protocol_id)s') - resp, idp_id, proto = self._assign_protocol_to_idp( - expected_status=http_client.CREATED) - url = url % {'idp_id': idp_id, - 'protocol_id': proto} - self.delete(url) - self.get(url, expected_status=http_client.NOT_FOUND) - - -class MappingCRUDTests(test_v3.RestfulTestCase): - """A class for testing CRUD operations for Mappings.""" - - MAPPING_URL = '/OS-FEDERATION/mappings/' - - def assertValidMappingListResponse(self, resp, *args, **kwargs): - return self.assertValidListResponse( - resp, - 'mappings', - self.assertValidMapping, - keys_to_check=[], - *args, - **kwargs) - - def assertValidMappingResponse(self, resp, *args, **kwargs): - return self.assertValidResponse( - resp, - 'mapping', - self.assertValidMapping, - keys_to_check=[], - *args, - **kwargs) - - def assertValidMapping(self, entity, ref=None): - self.assertIsNotNone(entity.get('id')) - self.assertIsNotNone(entity.get('rules')) - if ref: - self.assertEqual(entity['rules'], ref['rules']) - return entity - - def _create_default_mapping_entry(self): - url = self.MAPPING_URL + uuid.uuid4().hex - resp = self.put(url, - body={'mapping': mapping_fixtures.MAPPING_LARGE}, - expected_status=http_client.CREATED) - return resp - - def _get_id_from_response(self, resp): - r = resp.result.get('mapping') - return r.get('id') - - def test_mapping_create(self): - resp = self._create_default_mapping_entry() - self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) - - def test_mapping_list(self): - url = self.MAPPING_URL - self._create_default_mapping_entry() - resp = self.get(url) - entities = resp.result.get('mappings') - self.assertIsNotNone(entities) - self.assertResponseStatus(resp, http_client.OK) - self.assertValidListLinks(resp.result.get('links')) - self.assertEqual(1, len(entities)) - - def test_mapping_delete(self): - url = self.MAPPING_URL + '%(mapping_id)s' - resp = self._create_default_mapping_entry() - mapping_id = self._get_id_from_response(resp) - url = url % {'mapping_id': str(mapping_id)} - resp = self.delete(url) - self.assertResponseStatus(resp, http_client.NO_CONTENT) - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_mapping_get(self): - url = self.MAPPING_URL + '%(mapping_id)s' - resp = self._create_default_mapping_entry() - mapping_id = self._get_id_from_response(resp) - url = url % {'mapping_id': mapping_id} - resp = self.get(url) - self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) - - def test_mapping_update(self): - url = self.MAPPING_URL + '%(mapping_id)s' - resp = self._create_default_mapping_entry() - mapping_id = self._get_id_from_response(resp) - url = url % {'mapping_id': mapping_id} - resp = self.patch(url, - body={'mapping': mapping_fixtures.MAPPING_SMALL}) - self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) - resp = self.get(url) - self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) - - def test_delete_mapping_dne(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.delete(url, expected_status=http_client.NOT_FOUND) - - def test_get_mapping_dne(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_create_mapping_bad_requirements(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) - - def test_create_mapping_no_rules(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) - - def test_create_mapping_no_remote_objects(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) - - def test_create_mapping_bad_value(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) - - def test_create_mapping_missing_local(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) - - def test_create_mapping_missing_type(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) - - def test_create_mapping_wrong_type(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) - - def test_create_mapping_extra_remote_properties_not_any_of(self): - url = self.MAPPING_URL + uuid.uuid4().hex - mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping}) - - def test_create_mapping_extra_remote_properties_any_one_of(self): - url = self.MAPPING_URL + uuid.uuid4().hex - mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping}) - - def test_create_mapping_extra_remote_properties_just_type(self): - url = self.MAPPING_URL + uuid.uuid4().hex - mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping}) - - def test_create_mapping_empty_map(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': {}}) - - def test_create_mapping_extra_rules_properties(self): - url = self.MAPPING_URL + uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) - - def test_create_mapping_with_blacklist_and_whitelist(self): - """Test for adding whitelist and blacklist in the rule - - Server should respond with HTTP 400 Bad Request error upon discovering - both ``whitelist`` and ``blacklist`` keywords in the same rule. - - """ - url = self.MAPPING_URL + uuid.uuid4().hex - mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': mapping}) - - def test_create_mapping_with_local_user_and_local_domain(self): - url = self.MAPPING_URL + uuid.uuid4().hex - resp = self.put( - url, - body={ - 'mapping': mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN - }, - expected_status=http_client.CREATED) - self.assertValidMappingResponse( - resp, mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN) - - def test_create_mapping_with_ephemeral(self): - url = self.MAPPING_URL + uuid.uuid4().hex - resp = self.put( - url, - body={'mapping': mapping_fixtures.MAPPING_EPHEMERAL_USER}, - expected_status=http_client.CREATED) - self.assertValidMappingResponse( - resp, mapping_fixtures.MAPPING_EPHEMERAL_USER) - - def test_create_mapping_with_bad_user_type(self): - url = self.MAPPING_URL + uuid.uuid4().hex - # get a copy of a known good map - bad_mapping = copy.deepcopy(mapping_fixtures.MAPPING_EPHEMERAL_USER) - # now sabotage the user type - bad_mapping['rules'][0]['local'][0]['user']['type'] = uuid.uuid4().hex - self.put(url, expected_status=http_client.BAD_REQUEST, - body={'mapping': bad_mapping}) - - -class FederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin): - - def auth_plugin_config_override(self): - methods = ['saml2'] - super(FederatedTokenTests, self).auth_plugin_config_override(methods) - - def setUp(self): - super(FederatedTokenTests, self).setUp() - self._notifications = [] - - def fake_saml_notify(action, context, user_id, group_ids, - identity_provider, protocol, token_id, outcome): - note = { - 'action': action, - 'user_id': user_id, - 'identity_provider': identity_provider, - 'protocol': protocol, - 'send_notification_called': True} - self._notifications.append(note) - - self.useFixture(mockpatch.PatchObject( - notifications, - 'send_saml_audit_notification', - fake_saml_notify)) - - def _assert_last_notify(self, action, identity_provider, protocol, - user_id=None): - self.assertTrue(self._notifications) - note = self._notifications[-1] - if user_id: - self.assertEqual(note['user_id'], user_id) - self.assertEqual(note['action'], action) - self.assertEqual(note['identity_provider'], identity_provider) - self.assertEqual(note['protocol'], protocol) - self.assertTrue(note['send_notification_called']) - - def load_fixtures(self, fixtures): - super(FederatedTokenTests, self).load_fixtures(fixtures) - self.load_federation_sample_data() - - def test_issue_unscoped_token_notify(self): - self._issue_unscoped_token() - self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL) - - def test_issue_unscoped_token(self): - r = self._issue_unscoped_token() - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - self.assertValidMappedUser(r.json['token']) - - def test_issue_unscoped_token_disabled_idp(self): - """Checks if authentication works with disabled identity providers. - - Test plan: - 1) Disable default IdP - 2) Try issuing unscoped token for that IdP - 3) Expect server to forbid authentication - - """ - enabled_false = {'enabled': False} - self.federation_api.update_idp(self.IDP, enabled_false) - self.assertRaises(exception.Forbidden, - self._issue_unscoped_token) - - def test_issue_unscoped_token_group_names_in_mapping(self): - r = self._issue_unscoped_token(assertion='ANOTHER_CUSTOMER_ASSERTION') - ref_groups = set([self.group_customers['id'], self.group_admins['id']]) - token_resp = r.json_body - token_groups = token_resp['token']['user']['OS-FEDERATION']['groups'] - token_groups = set([group['id'] for group in token_groups]) - self.assertEqual(ref_groups, token_groups) - - def test_issue_unscoped_tokens_nonexisting_group(self): - self.assertRaises(exception.MissingGroups, - self._issue_unscoped_token, - assertion='ANOTHER_TESTER_ASSERTION') - - def test_issue_unscoped_token_with_remote_no_attribute(self): - r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, - environment={ - self.REMOTE_ID_ATTR: - self.REMOTE_IDS[0] - }) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_issue_unscoped_token_with_remote(self): - self.config_fixture.config(group='federation', - remote_id_attribute=self.REMOTE_ID_ATTR) - r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, - environment={ - self.REMOTE_ID_ATTR: - self.REMOTE_IDS[0] - }) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_issue_unscoped_token_with_saml2_remote(self): - self.config_fixture.config(group='saml2', - remote_id_attribute=self.REMOTE_ID_ATTR) - r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, - environment={ - self.REMOTE_ID_ATTR: - self.REMOTE_IDS[0] - }) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_issue_unscoped_token_with_remote_different(self): - self.config_fixture.config(group='federation', - remote_id_attribute=self.REMOTE_ID_ATTR) - self.assertRaises(exception.Forbidden, - self._issue_unscoped_token, - idp=self.IDP_WITH_REMOTE, - environment={ - self.REMOTE_ID_ATTR: uuid.uuid4().hex - }) - - def test_issue_unscoped_token_with_remote_default_overwritten(self): - """Test that protocol remote_id_attribute has higher priority. - - Make sure the parameter stored under ``protocol`` section has higher - priority over parameter from default ``federation`` configuration - section. - - """ - self.config_fixture.config(group='saml2', - remote_id_attribute=self.REMOTE_ID_ATTR) - self.config_fixture.config(group='federation', - remote_id_attribute=uuid.uuid4().hex) - r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, - environment={ - self.REMOTE_ID_ATTR: - self.REMOTE_IDS[0] - }) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_issue_unscoped_token_with_remote_unavailable(self): - self.config_fixture.config(group='federation', - remote_id_attribute=self.REMOTE_ID_ATTR) - self.assertRaises(exception.Unauthorized, - self._issue_unscoped_token, - idp=self.IDP_WITH_REMOTE, - environment={ - uuid.uuid4().hex: uuid.uuid4().hex - }) - - def test_issue_unscoped_token_with_remote_user_as_empty_string(self): - # make sure that REMOTE_USER set as the empty string won't interfere - r = self._issue_unscoped_token(environment={'REMOTE_USER': ''}) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_issue_unscoped_token_no_groups(self): - self.assertRaises(exception.Unauthorized, - self._issue_unscoped_token, - assertion='BAD_TESTER_ASSERTION') - - def test_issue_unscoped_token_malformed_environment(self): - """Test whether non string objects are filtered out. - - Put non string objects into the environment, inject - correct assertion and try to get an unscoped token. - Expect server not to fail on using split() method on - non string objects and return token id in the HTTP header. - - """ - api = auth_controllers.Auth() - context = { - 'environment': { - 'malformed_object': object(), - 'another_bad_idea': tuple(range(10)), - 'yet_another_bad_param': dict(zip(uuid.uuid4().hex, - range(32))) - } - } - self._inject_assertion(context, 'EMPLOYEE_ASSERTION') - r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ) - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_scope_to_project_once_notify(self): - r = self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) - user_id = r.json['token']['user']['id'] - self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id) - - def test_scope_to_project_once(self): - r = self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) - token_resp = r.result['token'] - project_id = token_resp['project']['id'] - self._check_project_scoped_token_attributes(token_resp, project_id) - roles_ref = [self.role_employee] - - projects_ref = self.proj_employees - self._check_projects_and_roles(token_resp, roles_ref, projects_ref) - self.assertValidMappedUser(token_resp) - - def test_scope_token_with_idp_disabled(self): - """Scope token issued by disabled IdP. - - Try scoping the token issued by an IdP which is disabled now. Expect - server to refuse scoping operation. - - This test confirms correct behaviour when IdP was enabled and unscoped - token was issued, but disabled before user tries to scope the token. - Here we assume the unscoped token was already issued and start from - the moment where IdP is being disabled and unscoped token is being - used. - - Test plan: - 1) Disable IdP - 2) Try scoping unscoped token - - """ - enabled_false = {'enabled': False} - self.federation_api.update_idp(self.IDP, enabled_false) - self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=http_client.FORBIDDEN) - - def test_scope_to_bad_project(self): - """Scope unscoped token with a project we don't have access to.""" - self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, - expected_status=http_client.UNAUTHORIZED) - - def test_scope_to_project_multiple_times(self): - """Try to scope the unscoped token multiple times. - - The new tokens should be scoped to: - - * Customers' project - * Employees' project - - """ - bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN, - self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN) - project_ids = (self.proj_employees['id'], - self.proj_customers['id']) - for body, project_id_ref in zip(bodies, project_ids): - r = self.v3_create_token(body) - token_resp = r.result['token'] - self._check_project_scoped_token_attributes(token_resp, - project_id_ref) - - def test_scope_to_project_with_only_inherited_roles(self): - """Try to scope token whose only roles are inherited.""" - self.config_fixture.config(group='os_inherit', enabled=True) - r = self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER) - token_resp = r.result['token'] - self._check_project_scoped_token_attributes( - token_resp, self.project_inherited['id']) - roles_ref = [self.role_customer] - projects_ref = self.project_inherited - self._check_projects_and_roles(token_resp, roles_ref, projects_ref) - self.assertValidMappedUser(token_resp) - - def test_scope_token_from_nonexistent_unscoped_token(self): - """Try to scope token from non-existent unscoped token.""" - self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, - expected_status=http_client.NOT_FOUND) - - def test_issue_token_from_rules_without_user(self): - api = auth_controllers.Auth() - context = {'environment': {}} - self._inject_assertion(context, 'BAD_TESTER_ASSERTION') - self.assertRaises(exception.Unauthorized, - api.authenticate_for_token, - context, self.UNSCOPED_V3_SAML2_REQ) - - def test_issue_token_with_nonexistent_group(self): - """Inject assertion that matches rule issuing bad group id. - - Expect server to find out that some groups are missing in the - backend and raise exception.MappedGroupNotFound exception. - - """ - self.assertRaises(exception.MappedGroupNotFound, - self._issue_unscoped_token, - assertion='CONTRACTOR_ASSERTION') - - def test_scope_to_domain_once(self): - r = self.v3_create_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER) - token_resp = r.result['token'] - self._check_domain_scoped_token_attributes(token_resp, - self.domainA['id']) - - def test_scope_to_domain_multiple_tokens(self): - """Issue multiple tokens scoping to different domains. - - The new tokens should be scoped to: - - * domainA - * domainB - * domainC - - """ - bodies = (self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN, - self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN, - self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN) - domain_ids = (self.domainA['id'], - self.domainB['id'], - self.domainC['id']) - - for body, domain_id_ref in zip(bodies, domain_ids): - r = self.v3_create_token(body) - token_resp = r.result['token'] - self._check_domain_scoped_token_attributes(token_resp, - domain_id_ref) - - def test_scope_to_domain_with_only_inherited_roles_fails(self): - """Try to scope to a domain that has no direct roles.""" - self.v3_create_token( - self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, - expected_status=http_client.UNAUTHORIZED) - - def test_list_projects(self): - urls = ('/OS-FEDERATION/projects', '/auth/projects') - - token = (self.tokens['CUSTOMER_ASSERTION'], - self.tokens['EMPLOYEE_ASSERTION'], - self.tokens['ADMIN_ASSERTION']) - - self.config_fixture.config(group='os_inherit', enabled=True) - projects_refs = (set([self.proj_customers['id'], - self.project_inherited['id']]), - set([self.proj_employees['id'], - self.project_all['id']]), - set([self.proj_employees['id'], - self.project_all['id'], - self.proj_customers['id'], - self.project_inherited['id']])) - - for token, projects_ref in zip(token, projects_refs): - for url in urls: - r = self.get(url, token=token) - projects_resp = r.result['projects'] - projects = set(p['id'] for p in projects_resp) - self.assertEqual(projects_ref, projects, - 'match failed for url %s' % url) - - # TODO(samueldmq): Create another test class for role inheritance tests. - # The advantage would be to reduce the complexity of this test class and - # have tests specific to this functionality grouped, easing readability and - # maintenability. - def test_list_projects_for_inherited_project_assignment(self): - # Enable os_inherit extension - self.config_fixture.config(group='os_inherit', enabled=True) - - # Create a subproject - subproject_inherited = unit.new_project_ref( - domain_id=self.domainD['id'], - parent_id=self.project_inherited['id']) - self.resource_api.create_project(subproject_inherited['id'], - subproject_inherited) - - # Create an inherited role assignment - self.assignment_api.create_grant( - role_id=self.role_employee['id'], - group_id=self.group_employees['id'], - project_id=self.project_inherited['id'], - inherited_to_projects=True) - - # Define expected projects from employee assertion, which contain - # the created subproject - expected_project_ids = [self.project_all['id'], - self.proj_employees['id'], - subproject_inherited['id']] - - # Assert expected projects for both available URLs - for url in ('/OS-FEDERATION/projects', '/auth/projects'): - r = self.get(url, token=self.tokens['EMPLOYEE_ASSERTION']) - project_ids = [project['id'] for project in r.result['projects']] - - self.assertEqual(len(expected_project_ids), len(project_ids)) - for expected_project_id in expected_project_ids: - self.assertIn(expected_project_id, project_ids, - 'Projects match failed for url %s' % url) - - def test_list_domains(self): - urls = ('/OS-FEDERATION/domains', '/auth/domains') - - tokens = (self.tokens['CUSTOMER_ASSERTION'], - self.tokens['EMPLOYEE_ASSERTION'], - self.tokens['ADMIN_ASSERTION']) - - # NOTE(henry-nash): domain D does not appear in the expected results - # since it only had inherited roles (which only apply to projects - # within the domain) - - domain_refs = (set([self.domainA['id']]), - set([self.domainA['id'], - self.domainB['id']]), - set([self.domainA['id'], - self.domainB['id'], - self.domainC['id']])) - - for token, domains_ref in zip(tokens, domain_refs): - for url in urls: - r = self.get(url, token=token) - domains_resp = r.result['domains'] - domains = set(p['id'] for p in domains_resp) - self.assertEqual(domains_ref, domains, - 'match failed for url %s' % url) - - @utils.wip('This will fail because of bug #1501032. The returned method' - 'list should contain "saml2". This is documented in bug ' - '1501032.') - def test_full_workflow(self): - """Test 'standard' workflow for granting access tokens. - - * Issue unscoped token - * List available projects based on groups - * Scope token to one of available projects - - """ - r = self._issue_unscoped_token() - token_resp = r.json_body['token'] - # NOTE(lbragstad): Ensure only 'saml2' is in the method list. - self.assertListEqual(['saml2'], token_resp['methods']) - self.assertValidMappedUser(token_resp) - employee_unscoped_token_id = r.headers.get('X-Subject-Token') - r = self.get('/auth/projects', token=employee_unscoped_token_id) - projects = r.result['projects'] - random_project = random.randint(0, len(projects)) - 1 - project = projects[random_project] - - v3_scope_request = self._scope_request(employee_unscoped_token_id, - 'project', project['id']) - - r = self.v3_create_token(v3_scope_request) - token_resp = r.result['token'] - # FIXME(lbragstad): 'token' should be in the list of methods returned - # but it isn't. This is documented in bug 1501032. - self.assertIn('token', token_resp['methods']) - self.assertIn('saml2', token_resp['methods']) - self._check_project_scoped_token_attributes(token_resp, project['id']) - - def test_workflow_with_groups_deletion(self): - """Test full workflow with groups deletion before token scoping. - - The test scenario is as follows: - - Create group ``group`` - - Create and assign roles to ``group`` and ``project_all`` - - Patch mapping rules for existing IdP so it issues group id - - Issue unscoped token with ``group``'s id - - Delete group ``group`` - - Scope token to ``project_all`` - - Expect HTTP 500 response - - """ - # create group and role - group = unit.new_group_ref(domain_id=self.domainA['id']) - group = self.identity_api.create_group(group) - role = unit.new_role_ref() - self.role_api.create_role(role['id'], role) - - # assign role to group and project_admins - self.assignment_api.create_grant(role['id'], - group_id=group['id'], - project_id=self.project_all['id']) - - rules = { - 'rules': [ - { - 'local': [ - { - 'group': { - 'id': group['id'] - } - }, - { - 'user': { - 'name': '{0}' - } - } - ], - 'remote': [ - { - 'type': 'UserName' - }, - { - 'type': 'LastName', - 'any_one_of': [ - 'Account' - ] - } - ] - } - ] - } - - self.federation_api.update_mapping(self.mapping['id'], rules) - - r = self._issue_unscoped_token(assertion='TESTER_ASSERTION') - token_id = r.headers.get('X-Subject-Token') - - # delete group - self.identity_api.delete_group(group['id']) - - # scope token to project_all, expect HTTP 500 - scoped_token = self._scope_request( - token_id, 'project', - self.project_all['id']) - - self.v3_create_token( - scoped_token, expected_status=http_client.INTERNAL_SERVER_ERROR) - - def test_lists_with_missing_group_in_backend(self): - """Test a mapping that points to a group that does not exist - - For explicit mappings, we expect the group to exist in the backend, - but for lists, specifically blacklists, a missing group is expected - as many groups will be specified by the IdP that are not Keystone - groups. - - The test scenario is as follows: - - Create group ``EXISTS`` - - Set mapping rules for existing IdP with a blacklist - that passes through as REMOTE_USER_GROUPS - - Issue unscoped token with on group ``EXISTS`` id in it - - """ - domain_id = self.domainA['id'] - domain_name = self.domainA['name'] - group = unit.new_group_ref(domain_id=domain_id, name='EXISTS') - group = self.identity_api.create_group(group) - rules = { - 'rules': [ - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{0}" - } - } - ], - "remote": [ - { - "type": "REMOTE_USER" - } - ] - }, - { - "local": [ - { - "groups": "{0}", - "domain": {"name": domain_name} - } - ], - "remote": [ - { - "type": "REMOTE_USER_GROUPS", - } - ] - } - ] - } - self.federation_api.update_mapping(self.mapping['id'], rules) - - def test_empty_blacklist_passess_all_values(self): - """Test a mapping with empty blacklist specified - - Not adding a ``blacklist`` keyword to the mapping rules has the same - effect as adding an empty ``blacklist``. - In both cases, the mapping engine will not discard any groups that are - associated with apache environment variables. - - This test checks scenario where an empty blacklist was specified. - Expected result is to allow any value. - - The test scenario is as follows: - - Create group ``EXISTS`` - - Create group ``NO_EXISTS`` - - Set mapping rules for existing IdP with a blacklist - that passes through as REMOTE_USER_GROUPS - - Issue unscoped token with groups ``EXISTS`` and ``NO_EXISTS`` - assigned - - """ - domain_id = self.domainA['id'] - domain_name = self.domainA['name'] - - # Add a group "EXISTS" - group_exists = unit.new_group_ref(domain_id=domain_id, name='EXISTS') - group_exists = self.identity_api.create_group(group_exists) - - # Add a group "NO_EXISTS" - group_no_exists = unit.new_group_ref(domain_id=domain_id, - name='NO_EXISTS') - group_no_exists = self.identity_api.create_group(group_no_exists) - - group_ids = set([group_exists['id'], group_no_exists['id']]) - - rules = { - 'rules': [ - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{0}" - } - } - ], - "remote": [ - { - "type": "REMOTE_USER" - } - ] - }, - { - "local": [ - { - "groups": "{0}", - "domain": {"name": domain_name} - } - ], - "remote": [ - { - "type": "REMOTE_USER_GROUPS", - "blacklist": [] - } - ] - } - ] - } - self.federation_api.update_mapping(self.mapping['id'], rules) - r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') - assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] - self.assertEqual(len(group_ids), len(assigned_group_ids)) - for group in assigned_group_ids: - self.assertIn(group['id'], group_ids) - - def test_not_adding_blacklist_passess_all_values(self): - """Test a mapping without blacklist specified. - - Not adding a ``blacklist`` keyword to the mapping rules has the same - effect as adding an empty ``blacklist``. In both cases all values will - be accepted and passed. - - This test checks scenario where an blacklist was not specified. - Expected result is to allow any value. - - The test scenario is as follows: - - Create group ``EXISTS`` - - Create group ``NO_EXISTS`` - - Set mapping rules for existing IdP with a blacklist - that passes through as REMOTE_USER_GROUPS - - Issue unscoped token with on groups ``EXISTS`` and ``NO_EXISTS`` - assigned - - """ - domain_id = self.domainA['id'] - domain_name = self.domainA['name'] - - # Add a group "EXISTS" - group_exists = unit.new_group_ref(domain_id=domain_id, - name='EXISTS') - group_exists = self.identity_api.create_group(group_exists) - - # Add a group "NO_EXISTS" - group_no_exists = unit.new_group_ref(domain_id=domain_id, - name='NO_EXISTS') - group_no_exists = self.identity_api.create_group(group_no_exists) - - group_ids = set([group_exists['id'], group_no_exists['id']]) - - rules = { - 'rules': [ - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{0}" - } - } - ], - "remote": [ - { - "type": "REMOTE_USER" - } - ] - }, - { - "local": [ - { - "groups": "{0}", - "domain": {"name": domain_name} - } - ], - "remote": [ - { - "type": "REMOTE_USER_GROUPS", - } - ] - } - ] - } - self.federation_api.update_mapping(self.mapping['id'], rules) - r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') - assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] - self.assertEqual(len(group_ids), len(assigned_group_ids)) - for group in assigned_group_ids: - self.assertIn(group['id'], group_ids) - - def test_empty_whitelist_discards_all_values(self): - """Test that empty whitelist blocks all the values - - Not adding a ``whitelist`` keyword to the mapping value is different - than adding empty whitelist. The former case will simply pass all the - values, whereas the latter would discard all the values. - - This test checks scenario where an empty whitelist was specified. - The expected result is that no groups are matched. - - The test scenario is as follows: - - Create group ``EXISTS`` - - Set mapping rules for existing IdP with an empty whitelist - that whould discard any values from the assertion - - Try issuing unscoped token, expect server to raise - ``exception.MissingGroups`` as no groups were matched and ephemeral - user does not have any group assigned. - - """ - domain_id = self.domainA['id'] - domain_name = self.domainA['name'] - group = unit.new_group_ref(domain_id=domain_id, name='EXISTS') - group = self.identity_api.create_group(group) - rules = { - 'rules': [ - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{0}" - } - } - ], - "remote": [ - { - "type": "REMOTE_USER" - } - ] - }, - { - "local": [ - { - "groups": "{0}", - "domain": {"name": domain_name} - } - ], - "remote": [ - { - "type": "REMOTE_USER_GROUPS", - "whitelist": [] - } - ] - } - ] - } - self.federation_api.update_mapping(self.mapping['id'], rules) - - self.assertRaises(exception.MissingGroups, - self._issue_unscoped_token, - assertion='UNMATCHED_GROUP_ASSERTION') - - def test_not_setting_whitelist_accepts_all_values(self): - """Test that not setting whitelist passes - - Not adding a ``whitelist`` keyword to the mapping value is different - than adding empty whitelist. The former case will simply pass all the - values, whereas the latter would discard all the values. - - This test checks a scenario where a ``whitelist`` was not specified. - Expected result is that no groups are ignored. - - The test scenario is as follows: - - Create group ``EXISTS`` - - Set mapping rules for existing IdP with an empty whitelist - that whould discard any values from the assertion - - Issue an unscoped token and make sure ephemeral user is a member of - two groups. - - """ - domain_id = self.domainA['id'] - domain_name = self.domainA['name'] - - # Add a group "EXISTS" - group_exists = unit.new_group_ref(domain_id=domain_id, - name='EXISTS') - group_exists = self.identity_api.create_group(group_exists) - - # Add a group "NO_EXISTS" - group_no_exists = unit.new_group_ref(domain_id=domain_id, - name='NO_EXISTS') - group_no_exists = self.identity_api.create_group(group_no_exists) - - group_ids = set([group_exists['id'], group_no_exists['id']]) - - rules = { - 'rules': [ - { - "local": [ - { - "user": { - "name": "{0}", - "id": "{0}" - } - } - ], - "remote": [ - { - "type": "REMOTE_USER" - } - ] - }, - { - "local": [ - { - "groups": "{0}", - "domain": {"name": domain_name} - } - ], - "remote": [ - { - "type": "REMOTE_USER_GROUPS", - } - ] - } - ] - } - self.federation_api.update_mapping(self.mapping['id'], rules) - r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') - assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] - self.assertEqual(len(group_ids), len(assigned_group_ids)) - for group in assigned_group_ids: - self.assertIn(group['id'], group_ids) - - def test_assertion_prefix_parameter(self): - """Test parameters filtering based on the prefix. - - With ``assertion_prefix`` set to fixed, non default value, - issue an unscoped token from assertion EMPLOYEE_ASSERTION_PREFIXED. - Expect server to return unscoped token. - - """ - self.config_fixture.config(group='federation', - assertion_prefix=self.ASSERTION_PREFIX) - r = self._issue_unscoped_token(assertion='EMPLOYEE_ASSERTION_PREFIXED') - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - - def test_assertion_prefix_parameter_expect_fail(self): - """Test parameters filtering based on the prefix. - - With ``assertion_prefix`` default value set to empty string - issue an unscoped token from assertion EMPLOYEE_ASSERTION. - Next, configure ``assertion_prefix`` to value ``UserName``. - Try issuing unscoped token with EMPLOYEE_ASSERTION. - Expect server to raise exception.Unathorized exception. - - """ - r = self._issue_unscoped_token() - self.assertIsNotNone(r.headers.get('X-Subject-Token')) - self.config_fixture.config(group='federation', - assertion_prefix='UserName') - - self.assertRaises(exception.Unauthorized, - self._issue_unscoped_token) - - def test_v2_auth_with_federation_token_fails(self): - """Test that using a federation token with v2 auth fails. - - If an admin sets up a federated Keystone environment, and a user - incorrectly configures a service (like Nova) to only use v2 auth, the - returned message should be informative. - - """ - r = self._issue_unscoped_token() - token_id = r.headers.get('X-Subject-Token') - self.assertRaises(exception.Unauthorized, - self.token_provider_api.validate_v2_token, - token_id=token_id) - - def test_unscoped_token_has_user_domain(self): - r = self._issue_unscoped_token() - self._check_domains_are_valid(r.json_body['token']) - - def test_scoped_token_has_user_domain(self): - r = self.v3_create_token( - self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) - self._check_domains_are_valid(r.result['token']) - - def test_issue_unscoped_token_for_local_user(self): - r = self._issue_unscoped_token(assertion='LOCAL_USER_ASSERTION') - token_resp = r.json_body['token'] - self.assertListEqual(['saml2'], token_resp['methods']) - self.assertEqual(self.user['id'], token_resp['user']['id']) - self.assertEqual(self.user['name'], token_resp['user']['name']) - self.assertEqual(self.domain['id'], token_resp['user']['domain']['id']) - # Make sure the token is not scoped - self.assertNotIn('project', token_resp) - self.assertNotIn('domain', token_resp) - - def test_issue_token_for_local_user_user_not_found(self): - self.assertRaises(exception.Unauthorized, - self._issue_unscoped_token, - assertion='ANOTHER_LOCAL_USER_ASSERTION') - - -class FernetFederatedTokenTests(test_v3.RestfulTestCase, FederatedSetupMixin): - AUTH_METHOD = 'token' - - def load_fixtures(self, fixtures): - super(FernetFederatedTokenTests, self).load_fixtures(fixtures) - self.load_federation_sample_data() - - def config_overrides(self): - super(FernetFederatedTokenTests, self).config_overrides() - self.config_fixture.config(group='token', provider='fernet') - self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) - - def auth_plugin_config_override(self): - methods = ['saml2', 'token', 'password'] - super(FernetFederatedTokenTests, - self).auth_plugin_config_override(methods) - - def test_federated_unscoped_token(self): - resp = self._issue_unscoped_token() - self.assertEqual(204, len(resp.headers['X-Subject-Token'])) - self.assertValidMappedUser(resp.json_body['token']) - - def test_federated_unscoped_token_with_multiple_groups(self): - assertion = 'ANOTHER_CUSTOMER_ASSERTION' - resp = self._issue_unscoped_token(assertion=assertion) - self.assertEqual(226, len(resp.headers['X-Subject-Token'])) - self.assertValidMappedUser(resp.json_body['token']) - - def test_validate_federated_unscoped_token(self): - resp = self._issue_unscoped_token() - unscoped_token = resp.headers.get('X-Subject-Token') - # assert that the token we received is valid - self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token}) - - def test_fernet_full_workflow(self): - """Test 'standard' workflow for granting Fernet access tokens. - - * Issue unscoped token - * List available projects based on groups - * Scope token to one of available projects - - """ - resp = self._issue_unscoped_token() - self.assertValidMappedUser(resp.json_body['token']) - unscoped_token = resp.headers.get('X-Subject-Token') - resp = self.get('/auth/projects', token=unscoped_token) - projects = resp.result['projects'] - random_project = random.randint(0, len(projects)) - 1 - project = projects[random_project] - - v3_scope_request = self._scope_request(unscoped_token, - 'project', project['id']) - - resp = self.v3_create_token(v3_scope_request) - token_resp = resp.result['token'] - self._check_project_scoped_token_attributes(token_resp, project['id']) - - -class FederatedTokenTestsMethodToken(FederatedTokenTests): - """Test federation operation with unified scoping auth method. - - Test all the operations with auth method set to ``token`` as a new, unified - way for scoping all the tokens. - - """ - - AUTH_METHOD = 'token' - - def auth_plugin_config_override(self): - methods = ['saml2', 'token'] - super(FederatedTokenTests, - self).auth_plugin_config_override(methods) - - @utils.wip('This will fail because of bug #1501032. The returned method' - 'list should contain "saml2". This is documented in bug ' - '1501032.') - def test_full_workflow(self): - """Test 'standard' workflow for granting access tokens. - - * Issue unscoped token - * List available projects based on groups - * Scope token to one of available projects - - """ - r = self._issue_unscoped_token() - token_resp = r.json_body['token'] - # NOTE(lbragstad): Ensure only 'saml2' is in the method list. - self.assertListEqual(['saml2'], token_resp['methods']) - self.assertValidMappedUser(token_resp) - employee_unscoped_token_id = r.headers.get('X-Subject-Token') - r = self.get('/auth/projects', token=employee_unscoped_token_id) - projects = r.result['projects'] - random_project = random.randint(0, len(projects)) - 1 - project = projects[random_project] - - v3_scope_request = self._scope_request(employee_unscoped_token_id, - 'project', project['id']) - - r = self.v3_authenticate_token(v3_scope_request) - token_resp = r.result['token'] - self.assertIn('token', token_resp['methods']) - self.assertIn('saml2', token_resp['methods']) - self._check_project_scoped_token_attributes(token_resp, project['id']) - - -class FederatedUserTests(test_v3.RestfulTestCase, FederatedSetupMixin): - """Tests for federated users - - Tests new shadow users functionality - - """ - - def auth_plugin_config_override(self): - methods = ['saml2'] - super(FederatedUserTests, self).auth_plugin_config_override(methods) - - def setUp(self): - super(FederatedUserTests, self).setUp() - - def load_fixtures(self, fixtures): - super(FederatedUserTests, self).load_fixtures(fixtures) - self.load_federation_sample_data() - - def test_user_id_persistense(self): - """Ensure user_id is persistend for multiple federated authn calls.""" - r = self._issue_unscoped_token() - user_id = r.json_body['token']['user']['id'] - - r = self._issue_unscoped_token() - user_id2 = r.json_body['token']['user']['id'] - self.assertEqual(user_id, user_id2) - - -class JsonHomeTests(test_v3.RestfulTestCase, test_v3.JsonHomeTestMixin): - JSON_HOME_DATA = { - 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/' - '1.0/rel/identity_provider': { - 'href-template': '/OS-FEDERATION/identity_providers/{idp_id}', - 'href-vars': { - 'idp_id': 'http://docs.openstack.org/api/openstack-identity/3/' - 'ext/OS-FEDERATION/1.0/param/idp_id' - }, - }, - } - - -def _is_xmlsec1_installed(): - p = subprocess.Popen( - ['which', 'xmlsec1'], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - - # invert the return code - return not bool(p.wait()) - - -def _load_xml(filename): - with open(os.path.join(XMLDIR, filename), 'r') as xml: - return xml.read() - - -class SAMLGenerationTests(test_v3.RestfulTestCase): - - SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers' - '/BETA/protocols/saml2/auth') - - ASSERTION_FILE = 'signed_saml2_assertion.xml' - - # The values of the following variables match the attributes values found - # in ASSERTION_FILE - ISSUER = 'https://acme.com/FIM/sps/openstack/saml20' - RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST' - SUBJECT = 'test_user' - SUBJECT_DOMAIN = 'user_domain' - ROLES = ['admin', 'member'] - PROJECT = 'development' - PROJECT_DOMAIN = 'project_domain' - SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2' - ECP_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2/ecp' - ASSERTION_VERSION = "2.0" - SERVICE_PROVDIER_ID = 'ACME' - - def sp_ref(self): - ref = { - 'auth_url': self.SP_AUTH_URL, - 'enabled': True, - 'description': uuid.uuid4().hex, - 'sp_url': self.RECIPIENT, - 'relay_state_prefix': CONF.saml.relay_state_prefix, - - } - return ref - - def setUp(self): - super(SAMLGenerationTests, self).setUp() - self.signed_assertion = saml2.create_class_from_xml_string( - saml.Assertion, _load_xml(self.ASSERTION_FILE)) - self.sp = self.sp_ref() - url = '/OS-FEDERATION/service_providers/' + self.SERVICE_PROVDIER_ID - self.put(url, body={'service_provider': self.sp}, - expected_status=http_client.CREATED) - - def test_samlize_token_values(self): - """Test the SAML generator produces a SAML object. - - Test the SAML generator directly by passing known arguments, the result - should be a SAML object that consistently includes attributes based on - the known arguments that were passed in. - - """ - with mock.patch.object(keystone_idp, '_sign_assertion', - return_value=self.signed_assertion): - generator = keystone_idp.SAMLGenerator() - response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, - self.SUBJECT_DOMAIN, - self.ROLES, self.PROJECT, - self.PROJECT_DOMAIN) - - assertion = response.assertion - self.assertIsNotNone(assertion) - self.assertIsInstance(assertion, saml.Assertion) - issuer = response.issuer - self.assertEqual(self.RECIPIENT, response.destination) - self.assertEqual(self.ISSUER, issuer.text) - - user_attribute = assertion.attribute_statement[0].attribute[0] - self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text) - - user_domain_attribute = ( - assertion.attribute_statement[0].attribute[1]) - self.assertEqual(self.SUBJECT_DOMAIN, - user_domain_attribute.attribute_value[0].text) - - role_attribute = assertion.attribute_statement[0].attribute[2] - for attribute_value in role_attribute.attribute_value: - self.assertIn(attribute_value.text, self.ROLES) - - project_attribute = assertion.attribute_statement[0].attribute[3] - self.assertEqual(self.PROJECT, - project_attribute.attribute_value[0].text) - - project_domain_attribute = ( - assertion.attribute_statement[0].attribute[4]) - self.assertEqual(self.PROJECT_DOMAIN, - project_domain_attribute.attribute_value[0].text) - - def test_verify_assertion_object(self): - """Test that the Assertion object is built properly. - - The Assertion doesn't need to be signed in this test, so - _sign_assertion method is patched and doesn't alter the assertion. - - """ - with mock.patch.object(keystone_idp, '_sign_assertion', - side_effect=lambda x: x): - generator = keystone_idp.SAMLGenerator() - response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, - self.SUBJECT_DOMAIN, - self.ROLES, self.PROJECT, - self.PROJECT_DOMAIN) - assertion = response.assertion - self.assertEqual(self.ASSERTION_VERSION, assertion.version) - - def test_valid_saml_xml(self): - """Test the generated SAML object can become valid XML. - - Test the generator directly by passing known arguments, the result - should be a SAML object that consistently includes attributes based on - the known arguments that were passed in. - - """ - with mock.patch.object(keystone_idp, '_sign_assertion', - return_value=self.signed_assertion): - generator = keystone_idp.SAMLGenerator() - response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, - self.SUBJECT_DOMAIN, - self.ROLES, self.PROJECT, - self.PROJECT_DOMAIN) - - saml_str = response.to_string() - response = etree.fromstring(saml_str) - issuer = response[0] - assertion = response[2] - - self.assertEqual(self.RECIPIENT, response.get('Destination')) - self.assertEqual(self.ISSUER, issuer.text) - - user_attribute = assertion[4][0] - self.assertEqual(self.SUBJECT, user_attribute[0].text) - - user_domain_attribute = assertion[4][1] - self.assertEqual(self.SUBJECT_DOMAIN, user_domain_attribute[0].text) - - role_attribute = assertion[4][2] - for attribute_value in role_attribute: - self.assertIn(attribute_value.text, self.ROLES) - - project_attribute = assertion[4][3] - self.assertEqual(self.PROJECT, project_attribute[0].text) - - project_domain_attribute = assertion[4][4] - self.assertEqual(self.PROJECT_DOMAIN, project_domain_attribute[0].text) - - def test_assertion_using_explicit_namespace_prefixes(self): - def mocked_subprocess_check_output(*popenargs, **kwargs): - # the last option is the assertion file to be signed - filename = popenargs[0][-1] - with open(filename, 'r') as f: - assertion_content = f.read() - # since we are not testing the signature itself, we can return - # the assertion as is without signing it - return assertion_content - - with mock.patch.object(subprocess, 'check_output', - side_effect=mocked_subprocess_check_output): - generator = keystone_idp.SAMLGenerator() - response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, - self.SUBJECT_DOMAIN, - self.ROLES, self.PROJECT, - self.PROJECT_DOMAIN) - assertion_xml = response.assertion.to_string() - # make sure we have the proper tag and prefix for the assertion - # namespace - self.assertIn('<saml:Assertion', assertion_xml) - self.assertIn('xmlns:saml="' + saml2.NAMESPACE + '"', - assertion_xml) - self.assertIn('xmlns:xmldsig="' + xmldsig.NAMESPACE + '"', - assertion_xml) - - def test_saml_signing(self): - """Test that the SAML generator produces a SAML object. - - Test the SAML generator directly by passing known arguments, the result - should be a SAML object that consistently includes attributes based on - the known arguments that were passed in. - - """ - if not _is_xmlsec1_installed(): - self.skipTest('xmlsec1 is not installed') - - generator = keystone_idp.SAMLGenerator() - response = generator.samlize_token(self.ISSUER, self.RECIPIENT, - self.SUBJECT, self.SUBJECT_DOMAIN, - self.ROLES, self.PROJECT, - self.PROJECT_DOMAIN) - - signature = response.assertion.signature - self.assertIsNotNone(signature) - self.assertIsInstance(signature, xmldsig.Signature) - - idp_public_key = sigver.read_cert_from_file(CONF.saml.certfile, 'pem') - cert_text = signature.key_info.x509_data[0].x509_certificate.text - # NOTE(stevemar): Rather than one line of text, the certificate is - # printed with newlines for readability, we remove these so we can - # match it with the key that we used. - cert_text = cert_text.replace(os.linesep, '') - self.assertEqual(idp_public_key, cert_text) - - def _create_generate_saml_request(self, token_id, sp_id): - return { - "auth": { - "identity": { - "methods": [ - "token" - ], - "token": { - "id": token_id - } - }, - "scope": { - "service_provider": { - "id": sp_id - } - } - } - } - - def _fetch_valid_token(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - project_id=self.project['id']) - resp = self.v3_create_token(auth_data) - token_id = resp.headers.get('X-Subject-Token') - return token_id - - def _fetch_domain_scoped_token(self): - auth_data = self.build_authentication_request( - user_id=self.user['id'], - password=self.user['password'], - user_domain_id=self.domain['id']) - resp = self.v3_create_token(auth_data) - token_id = resp.headers.get('X-Subject-Token') - return token_id - - def test_not_project_scoped_token(self): - """Ensure SAML generation fails when passing domain-scoped tokens. - - The server should return a 403 Forbidden Action. - - """ - self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) - token_id = self._fetch_domain_scoped_token() - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - with mock.patch.object(keystone_idp, '_sign_assertion', - return_value=self.signed_assertion): - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.FORBIDDEN) - - def test_generate_saml_route(self): - """Test that the SAML generation endpoint produces XML. - - The SAML endpoint /v3/auth/OS-FEDERATION/saml2 should take as input, - a scoped token ID, and a Service Provider ID. - The controller should fetch details about the user from the token, - and details about the service provider from its ID. - This should be enough information to invoke the SAML generator and - provide a valid SAML (XML) document back. - - """ - self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) - token_id = self._fetch_valid_token() - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - - with mock.patch.object(keystone_idp, '_sign_assertion', - return_value=self.signed_assertion): - http_response = self.post(self.SAML_GENERATION_ROUTE, body=body, - response_content_type='text/xml', - expected_status=http_client.OK) - - response = etree.fromstring(http_response.result) - issuer = response[0] - assertion = response[2] - - self.assertEqual(self.RECIPIENT, response.get('Destination')) - self.assertEqual(self.ISSUER, issuer.text) - - # NOTE(stevemar): We should test this against expected values, - # but the self.xyz attribute names are uuids, and we mock out - # the result. Ideally we should update the mocked result with - # some known data, and create the roles/project/user before - # these tests run. - user_attribute = assertion[4][0] - self.assertIsInstance(user_attribute[0].text, str) - - user_domain_attribute = assertion[4][1] - self.assertIsInstance(user_domain_attribute[0].text, str) - - role_attribute = assertion[4][2] - self.assertIsInstance(role_attribute[0].text, str) - - project_attribute = assertion[4][3] - self.assertIsInstance(project_attribute[0].text, str) - - project_domain_attribute = assertion[4][4] - self.assertIsInstance(project_domain_attribute[0].text, str) - - def test_invalid_scope_body(self): - """Test that missing the scope in request body raises an exception. - - Raises exception.SchemaValidationError() - error 400 Bad Request - - """ - token_id = uuid.uuid4().hex - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - del body['auth']['scope'] - - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.BAD_REQUEST) - - def test_invalid_token_body(self): - """Test that missing the token in request body raises an exception. - - Raises exception.SchemaValidationError() - error 400 Bad Request - - """ - token_id = uuid.uuid4().hex - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - del body['auth']['identity']['token'] - - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.BAD_REQUEST) - - def test_sp_not_found(self): - """Test SAML generation with an invalid service provider ID. - - Raises exception.ServiceProviderNotFound() - error Not Found 404 - - """ - sp_id = uuid.uuid4().hex - token_id = self._fetch_valid_token() - body = self._create_generate_saml_request(token_id, sp_id) - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.NOT_FOUND) - - def test_sp_disabled(self): - """Try generating assertion for disabled Service Provider.""" - # Disable Service Provider - sp_ref = {'enabled': False} - self.federation_api.update_sp(self.SERVICE_PROVDIER_ID, sp_ref) - - token_id = self._fetch_valid_token() - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.FORBIDDEN) - - def test_token_not_found(self): - """Test that an invalid token in the request body raises an exception. - - Raises exception.TokenNotFound() - error Not Found 404 - - """ - token_id = uuid.uuid4().hex - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - self.post(self.SAML_GENERATION_ROUTE, body=body, - expected_status=http_client.NOT_FOUND) - - def test_generate_ecp_route(self): - """Test that the ECP generation endpoint produces XML. - - The ECP endpoint /v3/auth/OS-FEDERATION/saml2/ecp should take the same - input as the SAML generation endpoint (scoped token ID + Service - Provider ID). - The controller should return a SAML assertion that is wrapped in a - SOAP envelope. - """ - self.config_fixture.config(group='saml', idp_entity_id=self.ISSUER) - token_id = self._fetch_valid_token() - body = self._create_generate_saml_request(token_id, - self.SERVICE_PROVDIER_ID) - - with mock.patch.object(keystone_idp, '_sign_assertion', - return_value=self.signed_assertion): - http_response = self.post(self.ECP_GENERATION_ROUTE, body=body, - response_content_type='text/xml', - expected_status=http_client.OK) - - env_response = etree.fromstring(http_response.result) - header = env_response[0] - - # Verify the relay state starts with 'ss:mem' - prefix = CONF.saml.relay_state_prefix - self.assertThat(header[0].text, matchers.StartsWith(prefix)) - - # Verify that the content in the body matches the expected assertion - body = env_response[1] - response = body[0] - issuer = response[0] - assertion = response[2] - - self.assertEqual(self.RECIPIENT, response.get('Destination')) - self.assertEqual(self.ISSUER, issuer.text) - - user_attribute = assertion[4][0] - self.assertIsInstance(user_attribute[0].text, str) - - user_domain_attribute = assertion[4][1] - self.assertIsInstance(user_domain_attribute[0].text, str) - - role_attribute = assertion[4][2] - self.assertIsInstance(role_attribute[0].text, str) - - project_attribute = assertion[4][3] - self.assertIsInstance(project_attribute[0].text, str) - - project_domain_attribute = assertion[4][4] - self.assertIsInstance(project_domain_attribute[0].text, str) - - @mock.patch('saml2.create_class_from_xml_string') - @mock.patch('oslo_utils.fileutils.write_to_tempfile') - @mock.patch.object(subprocess, 'check_output') - def test__sign_assertion(self, check_output_mock, - write_to_tempfile_mock, create_class_mock): - write_to_tempfile_mock.return_value = 'tmp_path' - check_output_mock.return_value = 'fakeoutput' - - keystone_idp._sign_assertion(self.signed_assertion) - - create_class_mock.assert_called_with(saml.Assertion, 'fakeoutput') - - @mock.patch('oslo_utils.fileutils.write_to_tempfile') - @mock.patch.object(subprocess, 'check_output') - def test__sign_assertion_exc(self, check_output_mock, - write_to_tempfile_mock): - # If the command fails the command output is logged. - - write_to_tempfile_mock.return_value = 'tmp_path' - - sample_returncode = 1 - sample_output = self.getUniqueString() - check_output_mock.side_effect = subprocess.CalledProcessError( - returncode=sample_returncode, cmd=CONF.saml.xmlsec1_binary, - output=sample_output) - - logger_fixture = self.useFixture(fixtures.LoggerFixture()) - self.assertRaises(exception.SAMLSigningError, - keystone_idp._sign_assertion, - self.signed_assertion) - expected_log = ( - "Error when signing assertion, reason: Command '%s' returned " - "non-zero exit status %s %s\n" % - (CONF.saml.xmlsec1_binary, sample_returncode, sample_output)) - self.assertEqual(expected_log, logger_fixture.output) - - @mock.patch('oslo_utils.fileutils.write_to_tempfile') - def test__sign_assertion_fileutils_exc(self, write_to_tempfile_mock): - exception_msg = 'fake' - write_to_tempfile_mock.side_effect = Exception(exception_msg) - - logger_fixture = self.useFixture(fixtures.LoggerFixture()) - self.assertRaises(exception.SAMLSigningError, - keystone_idp._sign_assertion, - self.signed_assertion) - expected_log = ( - 'Error when signing assertion, reason: %s\n' % exception_msg) - self.assertEqual(expected_log, logger_fixture.output) - - -class IdPMetadataGenerationTests(test_v3.RestfulTestCase): - """A class for testing Identity Provider Metadata generation.""" - - METADATA_URL = '/OS-FEDERATION/saml2/metadata' - - def setUp(self): - super(IdPMetadataGenerationTests, self).setUp() - self.generator = keystone_idp.MetadataGenerator() - - def config_overrides(self): - super(IdPMetadataGenerationTests, self).config_overrides() - self.config_fixture.config( - group='saml', - idp_entity_id=federation_fixtures.IDP_ENTITY_ID, - idp_sso_endpoint=federation_fixtures.IDP_SSO_ENDPOINT, - idp_organization_name=federation_fixtures.IDP_ORGANIZATION_NAME, - idp_organization_display_name=( - federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME), - idp_organization_url=federation_fixtures.IDP_ORGANIZATION_URL, - idp_contact_company=federation_fixtures.IDP_CONTACT_COMPANY, - idp_contact_name=federation_fixtures.IDP_CONTACT_GIVEN_NAME, - idp_contact_surname=federation_fixtures.IDP_CONTACT_SURNAME, - idp_contact_email=federation_fixtures.IDP_CONTACT_EMAIL, - idp_contact_telephone=( - federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER), - idp_contact_type=federation_fixtures.IDP_CONTACT_TYPE) - - def test_check_entity_id(self): - metadata = self.generator.generate_metadata() - self.assertEqual(federation_fixtures.IDP_ENTITY_ID, metadata.entity_id) - - def test_metadata_validity(self): - """Call md.EntityDescriptor method that does internal verification.""" - self.generator.generate_metadata().verify() - - def test_serialize_metadata_object(self): - """Check whether serialization doesn't raise any exceptions.""" - self.generator.generate_metadata().to_string() - # TODO(marek-denis): Check values here - - def test_check_idp_sso(self): - metadata = self.generator.generate_metadata() - idpsso_descriptor = metadata.idpsso_descriptor - self.assertIsNotNone(metadata.idpsso_descriptor) - self.assertEqual(federation_fixtures.IDP_SSO_ENDPOINT, - idpsso_descriptor.single_sign_on_service.location) - - self.assertIsNotNone(idpsso_descriptor.organization) - organization = idpsso_descriptor.organization - self.assertEqual(federation_fixtures.IDP_ORGANIZATION_DISPLAY_NAME, - organization.organization_display_name.text) - self.assertEqual(federation_fixtures.IDP_ORGANIZATION_NAME, - organization.organization_name.text) - self.assertEqual(federation_fixtures.IDP_ORGANIZATION_URL, - organization.organization_url.text) - - self.assertIsNotNone(idpsso_descriptor.contact_person) - contact_person = idpsso_descriptor.contact_person - - self.assertEqual(federation_fixtures.IDP_CONTACT_GIVEN_NAME, - contact_person.given_name.text) - self.assertEqual(federation_fixtures.IDP_CONTACT_SURNAME, - contact_person.sur_name.text) - self.assertEqual(federation_fixtures.IDP_CONTACT_EMAIL, - contact_person.email_address.text) - self.assertEqual(federation_fixtures.IDP_CONTACT_TELEPHONE_NUMBER, - contact_person.telephone_number.text) - self.assertEqual(federation_fixtures.IDP_CONTACT_TYPE, - contact_person.contact_type) - - def test_metadata_no_organization(self): - self.config_fixture.config( - group='saml', - idp_organization_display_name=None, - idp_organization_url=None, - idp_organization_name=None) - metadata = self.generator.generate_metadata() - idpsso_descriptor = metadata.idpsso_descriptor - self.assertIsNotNone(metadata.idpsso_descriptor) - self.assertIsNone(idpsso_descriptor.organization) - self.assertIsNotNone(idpsso_descriptor.contact_person) - - def test_metadata_no_contact_person(self): - self.config_fixture.config( - group='saml', - idp_contact_name=None, - idp_contact_surname=None, - idp_contact_email=None, - idp_contact_telephone=None) - metadata = self.generator.generate_metadata() - idpsso_descriptor = metadata.idpsso_descriptor - self.assertIsNotNone(metadata.idpsso_descriptor) - self.assertIsNotNone(idpsso_descriptor.organization) - self.assertEqual([], idpsso_descriptor.contact_person) - - def test_metadata_invalid_contact_type(self): - self.config_fixture.config( - group='saml', - idp_contact_type="invalid") - self.assertRaises(exception.ValidationError, - self.generator.generate_metadata) - - def test_metadata_invalid_idp_sso_endpoint(self): - self.config_fixture.config( - group='saml', - idp_sso_endpoint=None) - self.assertRaises(exception.ValidationError, - self.generator.generate_metadata) - - def test_metadata_invalid_idp_entity_id(self): - self.config_fixture.config( - group='saml', - idp_entity_id=None) - self.assertRaises(exception.ValidationError, - self.generator.generate_metadata) - - def test_get_metadata_with_no_metadata_file_configured(self): - self.get(self.METADATA_URL, - expected_status=http_client.INTERNAL_SERVER_ERROR) - - def test_get_metadata(self): - self.config_fixture.config( - group='saml', idp_metadata_path=XMLDIR + '/idp_saml2_metadata.xml') - r = self.get(self.METADATA_URL, response_content_type='text/xml') - self.assertEqual('text/xml', r.headers.get('Content-Type')) - - reference_file = _load_xml('idp_saml2_metadata.xml') - self.assertEqual(reference_file, r.result) - - -class ServiceProviderTests(test_v3.RestfulTestCase): - """A test class for Service Providers.""" - - MEMBER_NAME = 'service_provider' - COLLECTION_NAME = 'service_providers' - SERVICE_PROVIDER_ID = 'ACME' - SP_KEYS = ['auth_url', 'id', 'enabled', 'description', - 'relay_state_prefix', 'sp_url'] - - def setUp(self): - super(ServiceProviderTests, self).setUp() - # Add a Service Provider - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.SP_REF = self.sp_ref() - self.SERVICE_PROVIDER = self.put( - url, body={'service_provider': self.SP_REF}, - expected_status=http_client.CREATED).result - - def sp_ref(self): - ref = { - 'auth_url': 'https://' + uuid.uuid4().hex + '.com', - 'enabled': True, - 'description': uuid.uuid4().hex, - 'sp_url': 'https://' + uuid.uuid4().hex + '.com', - 'relay_state_prefix': CONF.saml.relay_state_prefix - } - return ref - - def base_url(self, suffix=None): - if suffix is not None: - return '/OS-FEDERATION/service_providers/' + str(suffix) - return '/OS-FEDERATION/service_providers' - - def _create_default_sp(self, body=None): - """Create default Service Provider.""" - url = self.base_url(suffix=uuid.uuid4().hex) - if body is None: - body = self.sp_ref() - resp = self.put(url, body={'service_provider': body}, - expected_status=http_client.CREATED) - return resp - - def test_get_service_provider(self): - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.get(url) - self.assertValidEntity(resp.result['service_provider'], - keys_to_check=self.SP_KEYS) - - def test_get_service_provider_fail(self): - url = self.base_url(suffix=uuid.uuid4().hex) - self.get(url, expected_status=http_client.NOT_FOUND) - - def test_create_service_provider(self): - url = self.base_url(suffix=uuid.uuid4().hex) - sp = self.sp_ref() - resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) - self.assertValidEntity(resp.result['service_provider'], - keys_to_check=self.SP_KEYS) - - def test_create_sp_relay_state_default(self): - """Create an SP without relay state, should default to `ss:mem`.""" - url = self.base_url(suffix=uuid.uuid4().hex) - sp = self.sp_ref() - del sp['relay_state_prefix'] - resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) - sp_result = resp.result['service_provider'] - self.assertEqual(CONF.saml.relay_state_prefix, - sp_result['relay_state_prefix']) - - def test_create_sp_relay_state_non_default(self): - """Create an SP with custom relay state.""" - url = self.base_url(suffix=uuid.uuid4().hex) - sp = self.sp_ref() - non_default_prefix = uuid.uuid4().hex - sp['relay_state_prefix'] = non_default_prefix - resp = self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) - sp_result = resp.result['service_provider'] - self.assertEqual(non_default_prefix, - sp_result['relay_state_prefix']) - - def test_create_service_provider_fail(self): - """Try adding SP object with unallowed attribute.""" - url = self.base_url(suffix=uuid.uuid4().hex) - sp = self.sp_ref() - sp[uuid.uuid4().hex] = uuid.uuid4().hex - self.put(url, body={'service_provider': sp}, - expected_status=http_client.BAD_REQUEST) - - def test_list_service_providers(self): - """Test listing of service provider objects. - - Add two new service providers. List all available service providers. - Expect to get list of three service providers (one created by setUp()) - Test if attributes match. - - """ - ref_service_providers = { - uuid.uuid4().hex: self.sp_ref(), - uuid.uuid4().hex: self.sp_ref(), - } - for id, sp in ref_service_providers.items(): - url = self.base_url(suffix=id) - self.put(url, body={'service_provider': sp}, - expected_status=http_client.CREATED) - - # Insert ids into service provider object, we will compare it with - # responses from server and those include 'id' attribute. - - ref_service_providers[self.SERVICE_PROVIDER_ID] = self.SP_REF - for id, sp in ref_service_providers.items(): - sp['id'] = id - - url = self.base_url() - resp = self.get(url) - service_providers = resp.result - for service_provider in service_providers['service_providers']: - id = service_provider['id'] - self.assertValidEntity( - service_provider, ref=ref_service_providers[id], - keys_to_check=self.SP_KEYS) - - def test_update_service_provider(self): - """Update existing service provider. - - Update default existing service provider and make sure it has been - properly changed. - - """ - new_sp_ref = self.sp_ref() - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}) - patch_result = resp.result - new_sp_ref['id'] = self.SERVICE_PROVIDER_ID - self.assertValidEntity(patch_result['service_provider'], - ref=new_sp_ref, - keys_to_check=self.SP_KEYS) - - resp = self.get(url) - get_result = resp.result - - self.assertDictEqual(patch_result['service_provider'], - get_result['service_provider']) - - def test_update_service_provider_immutable_parameters(self): - """Update immutable attributes in service provider. - - In this particular case the test will try to change ``id`` attribute. - The server should return an HTTP 403 Forbidden error code. - - """ - new_sp_ref = {'id': uuid.uuid4().hex} - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=http_client.BAD_REQUEST) - - def test_update_service_provider_unknown_parameter(self): - new_sp_ref = self.sp_ref() - new_sp_ref[uuid.uuid4().hex] = uuid.uuid4().hex - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=http_client.BAD_REQUEST) - - def test_update_service_provider_returns_not_found(self): - new_sp_ref = self.sp_ref() - new_sp_ref['description'] = uuid.uuid4().hex - url = self.base_url(suffix=uuid.uuid4().hex) - self.patch(url, body={'service_provider': new_sp_ref}, - expected_status=http_client.NOT_FOUND) - - def test_update_sp_relay_state(self): - """Update an SP with custom relay state.""" - new_sp_ref = self.sp_ref() - non_default_prefix = uuid.uuid4().hex - new_sp_ref['relay_state_prefix'] = non_default_prefix - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - resp = self.patch(url, body={'service_provider': new_sp_ref}) - sp_result = resp.result['service_provider'] - self.assertEqual(non_default_prefix, - sp_result['relay_state_prefix']) - - def test_delete_service_provider(self): - url = self.base_url(suffix=self.SERVICE_PROVIDER_ID) - self.delete(url) - - def test_delete_service_provider_returns_not_found(self): - url = self.base_url(suffix=uuid.uuid4().hex) - self.delete(url, expected_status=http_client.NOT_FOUND) - - def test_filter_list_sp_by_id(self): - def get_id(resp): - sp = resp.result.get('service_provider') - return sp.get('id') - - sp1_id = get_id(self._create_default_sp()) - sp2_id = get_id(self._create_default_sp()) - - # list the SP, should get SPs. - url = self.base_url() - resp = self.get(url) - sps = resp.result.get('service_providers') - entities_ids = [e['id'] for e in sps] - self.assertIn(sp1_id, entities_ids) - self.assertIn(sp2_id, entities_ids) - - # filter the SP by 'id'. Only SP1 should appear. - url = self.base_url() + '?id=' + sp1_id - resp = self.get(url) - sps = resp.result.get('service_providers') - entities_ids = [e['id'] for e in sps] - self.assertIn(sp1_id, entities_ids) - self.assertNotIn(sp2_id, entities_ids) - - def test_filter_list_sp_by_enabled(self): - def get_id(resp): - sp = resp.result.get('service_provider') - return sp.get('id') - - sp1_id = get_id(self._create_default_sp()) - sp2_ref = self.sp_ref() - sp2_ref['enabled'] = False - sp2_id = get_id(self._create_default_sp(body=sp2_ref)) - - # list the SP, should get two SPs. - url = self.base_url() - resp = self.get(url) - sps = resp.result.get('service_providers') - entities_ids = [e['id'] for e in sps] - self.assertIn(sp1_id, entities_ids) - self.assertIn(sp2_id, entities_ids) - - # filter the SP by 'enabled'. Only SP1 should appear. - url = self.base_url() + '?enabled=True' - resp = self.get(url) - sps = resp.result.get('service_providers') - entities_ids = [e['id'] for e in sps] - self.assertIn(sp1_id, entities_ids) - self.assertNotIn(sp2_id, entities_ids) - - -class WebSSOTests(FederatedTokenTests): - """A class for testing Web SSO.""" - - SSO_URL = '/auth/OS-FEDERATION/websso/' - SSO_TEMPLATE_NAME = 'sso_callback_template.html' - SSO_TEMPLATE_PATH = os.path.join(core.dirs.etc(), SSO_TEMPLATE_NAME) - TRUSTED_DASHBOARD = 'http://horizon.com' - ORIGIN = urllib.parse.quote_plus(TRUSTED_DASHBOARD) - PROTOCOL_REMOTE_ID_ATTR = uuid.uuid4().hex - - def setUp(self): - super(WebSSOTests, self).setUp() - self.api = federation_controllers.Auth() - - def config_overrides(self): - super(WebSSOTests, self).config_overrides() - self.config_fixture.config( - group='federation', - trusted_dashboard=[self.TRUSTED_DASHBOARD], - sso_callback_template=self.SSO_TEMPLATE_PATH, - remote_id_attribute=self.REMOTE_ID_ATTR) - - def test_render_callback_template(self): - token_id = uuid.uuid4().hex - resp = self.api.render_html_response(self.TRUSTED_DASHBOARD, token_id) - self.assertIn(token_id, resp.body) - self.assertIn(self.TRUSTED_DASHBOARD, resp.body) - - def test_federated_sso_auth(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} - context = {'environment': environment} - query_string = {'origin': self.ORIGIN} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - resp = self.api.federated_sso_auth(context, self.PROTOCOL) - self.assertIn(self.TRUSTED_DASHBOARD, resp.body) - - def test_get_sso_origin_host_case_insensitive(self): - # test lowercase hostname in trusted_dashboard - context = { - 'query_string': { - 'origin': "http://horizon.com", - }, - } - host = self.api._get_sso_origin_host(context) - self.assertEqual("http://horizon.com", host) - # test uppercase hostname in trusted_dashboard - self.config_fixture.config(group='federation', - trusted_dashboard=['http://Horizon.com']) - host = self.api._get_sso_origin_host(context) - self.assertEqual("http://horizon.com", host) - - def test_federated_sso_auth_with_protocol_specific_remote_id(self): - self.config_fixture.config( - group=self.PROTOCOL, - remote_id_attribute=self.PROTOCOL_REMOTE_ID_ATTR) - - environment = {self.PROTOCOL_REMOTE_ID_ATTR: self.REMOTE_IDS[0]} - context = {'environment': environment} - query_string = {'origin': self.ORIGIN} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - resp = self.api.federated_sso_auth(context, self.PROTOCOL) - self.assertIn(self.TRUSTED_DASHBOARD, resp.body) - - def test_federated_sso_auth_bad_remote_id(self): - environment = {self.REMOTE_ID_ATTR: self.IDP} - context = {'environment': environment} - query_string = {'origin': self.ORIGIN} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - self.assertRaises(exception.IdentityProviderNotFound, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_federated_sso_missing_query(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} - context = {'environment': environment} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION') - self.assertRaises(exception.ValidationError, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_federated_sso_missing_query_bad_remote_id(self): - environment = {self.REMOTE_ID_ATTR: self.IDP} - context = {'environment': environment} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION') - self.assertRaises(exception.ValidationError, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_federated_sso_untrusted_dashboard(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} - context = {'environment': environment} - query_string = {'origin': uuid.uuid4().hex} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - self.assertRaises(exception.Unauthorized, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_federated_sso_untrusted_dashboard_bad_remote_id(self): - environment = {self.REMOTE_ID_ATTR: self.IDP} - context = {'environment': environment} - query_string = {'origin': uuid.uuid4().hex} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - self.assertRaises(exception.Unauthorized, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_federated_sso_missing_remote_id(self): - context = {'environment': {}} - query_string = {'origin': self.ORIGIN} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - self.assertRaises(exception.Unauthorized, - self.api.federated_sso_auth, - context, self.PROTOCOL) - - def test_identity_provider_specific_federated_authentication(self): - environment = {self.REMOTE_ID_ATTR: self.REMOTE_IDS[0]} - context = {'environment': environment} - query_string = {'origin': self.ORIGIN} - self._inject_assertion(context, 'EMPLOYEE_ASSERTION', query_string) - resp = self.api.federated_idp_specific_sso_auth(context, - self.idp['id'], - self.PROTOCOL) - self.assertIn(self.TRUSTED_DASHBOARD, resp.body) - - -class K2KServiceCatalogTests(test_v3.RestfulTestCase): - SP1 = 'SP1' - SP2 = 'SP2' - SP3 = 'SP3' - - def setUp(self): - super(K2KServiceCatalogTests, self).setUp() - - sp = self.sp_ref() - self.federation_api.create_sp(self.SP1, sp) - self.sp_alpha = {self.SP1: sp} - - sp = self.sp_ref() - self.federation_api.create_sp(self.SP2, sp) - self.sp_beta = {self.SP2: sp} - - sp = self.sp_ref() - self.federation_api.create_sp(self.SP3, sp) - self.sp_gamma = {self.SP3: sp} - - self.token_v3_helper = token_common.V3TokenDataHelper() - - def sp_response(self, id, ref): - ref.pop('enabled') - ref.pop('description') - ref.pop('relay_state_prefix') - ref['id'] = id - return ref - - def sp_ref(self): - ref = { - 'auth_url': uuid.uuid4().hex, - 'enabled': True, - 'description': uuid.uuid4().hex, - 'sp_url': uuid.uuid4().hex, - 'relay_state_prefix': CONF.saml.relay_state_prefix, - } - return ref - - def _validate_service_providers(self, token, ref): - token_data = token['token'] - self.assertIn('service_providers', token_data) - self.assertIsNotNone(token_data['service_providers']) - service_providers = token_data.get('service_providers') - - self.assertEqual(len(ref), len(service_providers)) - for entity in service_providers: - id = entity.get('id') - ref_entity = self.sp_response(id, ref.get(id)) - self.assertDictEqual(entity, ref_entity) - - def test_service_providers_in_token(self): - """Check if service providers are listed in service catalog.""" - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) - ref = {} - for r in (self.sp_alpha, self.sp_beta, self.sp_gamma): - ref.update(r) - self._validate_service_providers(token, ref) - - def test_service_provides_in_token_disabled_sp(self): - """Test behaviour with disabled service providers. - - Disabled service providers should not be listed in the service - catalog. - - """ - # disable service provider ALPHA - sp_ref = {'enabled': False} - self.federation_api.update_sp(self.SP1, sp_ref) - - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) - ref = {} - for r in (self.sp_beta, self.sp_gamma): - ref.update(r) - self._validate_service_providers(token, ref) - - def test_no_service_providers_in_token(self): - """Test service catalog with disabled service providers. - - There should be no entry ``service_providers`` in the catalog. - Test passes providing no attribute was raised. - - """ - sp_ref = {'enabled': False} - for sp in (self.SP1, self.SP2, self.SP3): - self.federation_api.update_sp(sp, sp_ref) - - token = self.token_v3_helper.get_token_data(self.user_id, ['password']) - self.assertNotIn('service_providers', token['token'], - message=('Expected Service Catalog not to have ' - 'service_providers')) |