From b8c756ecdd7cced1db4300935484e8c83701c82e Mon Sep 17 00:00:00 2001 From: WuKong Date: Tue, 30 Jun 2015 18:47:29 +0200 Subject: migrate moon code from github to opnfv Change-Id: Ice53e368fd1114d56a75271aa9f2e598e3eba604 Signed-off-by: WuKong --- .../keystone/tests/unit/test_v3_federation.py | 3296 ++++++++++++++++++++ 1 file changed, 3296 insertions(+) create mode 100644 keystone-moon/keystone/tests/unit/test_v3_federation.py (limited to 'keystone-moon/keystone/tests/unit/test_v3_federation.py') diff --git a/keystone-moon/keystone/tests/unit/test_v3_federation.py b/keystone-moon/keystone/tests/unit/test_v3_federation.py new file mode 100644 index 00000000..3b6f4d8b --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v3_federation.py @@ -0,0 +1,3296 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import random +import subprocess +import uuid + +from lxml import etree +import mock +from oslo_config import cfg +from oslo_log import log +from oslo_serialization import jsonutils +from oslotest import mockpatch +import saml2 +from saml2 import saml +from saml2 import sigver +from six.moves import urllib +import xmldsig + +from keystone.auth import controllers as auth_controllers +from keystone.auth.plugins import mapped +from keystone.contrib import federation +from keystone.contrib.federation import controllers as federation_controllers +from keystone.contrib.federation import idp as keystone_idp +from keystone.contrib.federation import utils as mapping_utils +from keystone import exception +from keystone import notifications +from keystone.tests.unit import core +from keystone.tests.unit import federation_fixtures +from keystone.tests.unit import ksfixtures +from keystone.tests.unit import mapping_fixtures +from keystone.tests.unit import test_v3 +from keystone.token.providers import common as token_common + + +CONF = cfg.CONF +LOG = log.getLogger(__name__) +ROOTDIR = os.path.dirname(os.path.abspath(__file__)) +XMLDIR = os.path.join(ROOTDIR, 'saml2/') + + +def dummy_validator(*args, **kwargs): + pass + + +class FederationTests(test_v3.RestfulTestCase): + + EXTENSION_NAME = 'federation' + EXTENSION_TO_ADD = 'federation_extension' + + +class FederatedSetupMixin(object): + + ACTION = 'authenticate' + IDP = 'ORG_IDP' + PROTOCOL = 'saml2' + AUTH_METHOD = 'saml2' + USER = 'user@ORGANIZATION' + ASSERTION_PREFIX = 'PREFIX_' + IDP_WITH_REMOTE = 'ORG_IDP_REMOTE' + REMOTE_ID = 'entityID_IDP' + REMOTE_ID_ATTR = uuid.uuid4().hex + + UNSCOPED_V3_SAML2_REQ = { + "identity": { + "methods": [AUTH_METHOD], + AUTH_METHOD: { + "identity_provider": IDP, + "protocol": PROTOCOL + } + } + } + + def _check_domains_are_valid(self, token): + self.assertEqual('Federated', token['user']['domain']['id']) + self.assertEqual('Federated', token['user']['domain']['name']) + + def _project(self, project): + return (project['id'], project['name']) + + def _roles(self, roles): + return set([(r['id'], r['name']) for r in roles]) + + def _check_projects_and_roles(self, token, roles, projects): + """Check whether the projects and the roles match.""" + token_roles = token.get('roles') + if token_roles is None: + raise AssertionError('Roles not found in the token') + token_roles = self._roles(token_roles) + roles_ref = self._roles(roles) + self.assertEqual(token_roles, roles_ref) + + token_projects = token.get('project') + if token_projects is None: + raise AssertionError('Projects not found in the token') + token_projects = self._project(token_projects) + projects_ref = self._project(projects) + self.assertEqual(token_projects, projects_ref) + + def _check_scoped_token_attributes(self, token): + def xor_project_domain(iterable): + return sum(('project' in iterable, 'domain' in iterable)) % 2 + + for obj in ('user', 'catalog', 'expires_at', 'issued_at', + 'methods', 'roles'): + self.assertIn(obj, token) + # Check for either project or domain + if not xor_project_domain(token.keys()): + raise AssertionError("You must specify either" + "project or domain.") + + self.assertIn('OS-FEDERATION', token['user']) + os_federation = token['user']['OS-FEDERATION'] + self.assertEqual(self.IDP, os_federation['identity_provider']['id']) + self.assertEqual(self.PROTOCOL, os_federation['protocol']['id']) + + def _issue_unscoped_token(self, + idp=None, + assertion='EMPLOYEE_ASSERTION', + environment=None): + api = federation_controllers.Auth() + context = {'environment': environment or {}} + self._inject_assertion(context, assertion) + if idp is None: + idp = self.IDP + r = api.federated_authentication(context, idp, self.PROTOCOL) + return r + + def idp_ref(self, id=None): + idp = { + 'id': id or uuid.uuid4().hex, + 'enabled': True, + 'description': uuid.uuid4().hex + } + return idp + + def proto_ref(self, mapping_id=None): + proto = { + 'id': uuid.uuid4().hex, + 'mapping_id': mapping_id or uuid.uuid4().hex + } + return proto + + def mapping_ref(self, rules=None): + return { + 'id': uuid.uuid4().hex, + 'rules': rules or self.rules['rules'] + } + + def _scope_request(self, unscoped_token_id, scope, scope_id): + return { + 'auth': { + 'identity': { + 'methods': [ + self.AUTH_METHOD + ], + self.AUTH_METHOD: { + 'id': unscoped_token_id + } + }, + 'scope': { + scope: { + 'id': scope_id + } + } + } + } + + def _inject_assertion(self, context, variant, query_string=None): + assertion = getattr(mapping_fixtures, variant) + context['environment'].update(assertion) + context['query_string'] = query_string or [] + + def load_federation_sample_data(self): + """Inject additional data.""" + + # Create and add domains + self.domainA = self.new_domain_ref() + self.resource_api.create_domain(self.domainA['id'], + self.domainA) + + self.domainB = self.new_domain_ref() + self.resource_api.create_domain(self.domainB['id'], + self.domainB) + + self.domainC = self.new_domain_ref() + self.resource_api.create_domain(self.domainC['id'], + self.domainC) + + self.domainD = self.new_domain_ref() + self.resource_api.create_domain(self.domainD['id'], + self.domainD) + + # Create and add projects + self.proj_employees = self.new_project_ref( + domain_id=self.domainA['id']) + self.resource_api.create_project(self.proj_employees['id'], + self.proj_employees) + self.proj_customers = self.new_project_ref( + domain_id=self.domainA['id']) + self.resource_api.create_project(self.proj_customers['id'], + self.proj_customers) + + self.project_all = self.new_project_ref( + domain_id=self.domainA['id']) + self.resource_api.create_project(self.project_all['id'], + self.project_all) + + self.project_inherited = self.new_project_ref( + domain_id=self.domainD['id']) + self.resource_api.create_project(self.project_inherited['id'], + self.project_inherited) + + # Create and add groups + self.group_employees = self.new_group_ref( + domain_id=self.domainA['id']) + self.group_employees = ( + self.identity_api.create_group(self.group_employees)) + + self.group_customers = self.new_group_ref( + domain_id=self.domainA['id']) + self.group_customers = ( + self.identity_api.create_group(self.group_customers)) + + self.group_admins = self.new_group_ref( + domain_id=self.domainA['id']) + self.group_admins = self.identity_api.create_group(self.group_admins) + + # Create and add roles + self.role_employee = self.new_role_ref() + self.role_api.create_role(self.role_employee['id'], self.role_employee) + self.role_customer = self.new_role_ref() + self.role_api.create_role(self.role_customer['id'], self.role_customer) + + self.role_admin = self.new_role_ref() + self.role_api.create_role(self.role_admin['id'], self.role_admin) + + # Employees can access + # * proj_employees + # * project_all + self.assignment_api.create_grant(self.role_employee['id'], + group_id=self.group_employees['id'], + project_id=self.proj_employees['id']) + self.assignment_api.create_grant(self.role_employee['id'], + group_id=self.group_employees['id'], + project_id=self.project_all['id']) + # Customers can access + # * proj_customers + self.assignment_api.create_grant(self.role_customer['id'], + group_id=self.group_customers['id'], + project_id=self.proj_customers['id']) + + # Admins can access: + # * proj_customers + # * proj_employees + # * project_all + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + project_id=self.proj_customers['id']) + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + project_id=self.proj_employees['id']) + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + project_id=self.project_all['id']) + + self.assignment_api.create_grant(self.role_customer['id'], + group_id=self.group_customers['id'], + domain_id=self.domainA['id']) + + # Customers can access: + # * domain A + self.assignment_api.create_grant(self.role_customer['id'], + group_id=self.group_customers['id'], + domain_id=self.domainA['id']) + + # Customers can access projects via inheritance: + # * domain D + self.assignment_api.create_grant(self.role_customer['id'], + group_id=self.group_customers['id'], + domain_id=self.domainD['id'], + inherited_to_projects=True) + + # Employees can access: + # * domain A + # * domain B + + self.assignment_api.create_grant(self.role_employee['id'], + group_id=self.group_employees['id'], + domain_id=self.domainA['id']) + self.assignment_api.create_grant(self.role_employee['id'], + group_id=self.group_employees['id'], + domain_id=self.domainB['id']) + + # Admins can access: + # * domain A + # * domain B + # * domain C + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + domain_id=self.domainA['id']) + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + domain_id=self.domainB['id']) + + self.assignment_api.create_grant(self.role_admin['id'], + group_id=self.group_admins['id'], + domain_id=self.domainC['id']) + self.rules = { + 'rules': [ + { + 'local': [ + { + 'group': { + 'id': self.group_employees['id'] + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName' + }, + { + 'type': 'orgPersonType', + 'any_one_of': [ + 'Employee' + ] + } + ] + }, + { + 'local': [ + { + 'group': { + 'id': self.group_employees['id'] + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': self.ASSERTION_PREFIX + 'UserName' + }, + { + 'type': self.ASSERTION_PREFIX + 'orgPersonType', + 'any_one_of': [ + 'SuperEmployee' + ] + } + ] + }, + { + 'local': [ + { + 'group': { + 'id': self.group_customers['id'] + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName' + }, + { + 'type': 'orgPersonType', + 'any_one_of': [ + 'Customer' + ] + } + ] + }, + { + 'local': [ + { + 'group': { + 'id': self.group_admins['id'] + } + }, + { + 'group': { + 'id': self.group_employees['id'] + } + }, + { + 'group': { + 'id': self.group_customers['id'] + } + }, + + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName' + }, + { + 'type': 'orgPersonType', + 'any_one_of': [ + 'Admin', + 'Chief' + ] + } + ] + }, + { + 'local': [ + { + 'group': { + 'id': uuid.uuid4().hex + } + }, + { + 'group': { + 'id': self.group_customers['id'] + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName', + }, + { + 'type': 'FirstName', + 'any_one_of': [ + 'Jill' + ] + }, + { + 'type': 'LastName', + 'any_one_of': [ + 'Smith' + ] + } + ] + }, + { + 'local': [ + { + 'group': { + 'id': 'this_group_no_longer_exists' + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName', + }, + { + 'type': 'Email', + 'any_one_of': [ + 'testacct@example.com' + ] + }, + { + 'type': 'orgPersonType', + 'any_one_of': [ + 'Tester' + ] + } + ] + }, + # rules with local group names + { + "local": [ + { + 'user': { + 'name': '{0}' + } + }, + { + "group": { + "name": self.group_customers['name'], + "domain": { + "name": self.domainA['name'] + } + } + } + ], + "remote": [ + { + 'type': 'UserName', + }, + { + "type": "orgPersonType", + "any_one_of": [ + "CEO", + "CTO" + ], + } + ] + }, + { + "local": [ + { + 'user': { + 'name': '{0}' + } + }, + { + "group": { + "name": self.group_admins['name'], + "domain": { + "id": self.domainA['id'] + } + } + } + ], + "remote": [ + { + "type": "UserName", + }, + { + "type": "orgPersonType", + "any_one_of": [ + "Managers" + ] + } + ] + }, + { + "local": [ + { + "user": { + "name": "{0}" + } + }, + { + "group": { + "name": "NON_EXISTING", + "domain": { + "id": self.domainA['id'] + } + } + } + ], + "remote": [ + { + "type": "UserName", + }, + { + "type": "UserName", + "any_one_of": [ + "IamTester" + ] + } + ] + }, + { + "local": [ + { + "user": { + "type": "local", + "name": self.user['name'], + "domain": { + "id": self.user['domain_id'] + } + } + }, + { + "group": { + "id": self.group_customers['id'] + } + } + ], + "remote": [ + { + "type": "UserType", + "any_one_of": [ + "random" + ] + } + ] + }, + { + "local": [ + { + "user": { + "type": "local", + "name": self.user['name'], + "domain": { + "id": uuid.uuid4().hex + } + } + } + ], + "remote": [ + { + "type": "Position", + "any_one_of": [ + "DirectorGeneral" + ] + } + ] + } + ] + } + + # Add IDP + self.idp = self.idp_ref(id=self.IDP) + self.federation_api.create_idp(self.idp['id'], + self.idp) + # Add IDP with remote + self.idp_with_remote = self.idp_ref(id=self.IDP_WITH_REMOTE) + self.idp_with_remote['remote_id'] = self.REMOTE_ID + self.federation_api.create_idp(self.idp_with_remote['id'], + self.idp_with_remote) + # Add a mapping + self.mapping = self.mapping_ref() + self.federation_api.create_mapping(self.mapping['id'], + self.mapping) + # Add protocols + self.proto_saml = self.proto_ref(mapping_id=self.mapping['id']) + self.proto_saml['id'] = self.PROTOCOL + self.federation_api.create_protocol(self.idp['id'], + self.proto_saml['id'], + self.proto_saml) + # Add protocols IDP with remote + self.federation_api.create_protocol(self.idp_with_remote['id'], + self.proto_saml['id'], + self.proto_saml) + # Generate fake tokens + context = {'environment': {}} + + self.tokens = {} + VARIANTS = ('EMPLOYEE_ASSERTION', 'CUSTOMER_ASSERTION', + 'ADMIN_ASSERTION') + api = auth_controllers.Auth() + for variant in VARIANTS: + self._inject_assertion(context, variant) + r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ) + self.tokens[variant] = r.headers.get('X-Subject-Token') + + self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN = self._scope_request( + uuid.uuid4().hex, 'project', self.proj_customers['id']) + + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE = self._scope_request( + self.tokens['EMPLOYEE_ASSERTION'], 'project', + self.proj_employees['id']) + + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN = self._scope_request( + self.tokens['ADMIN_ASSERTION'], 'project', + self.proj_employees['id']) + + self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN = self._scope_request( + self.tokens['ADMIN_ASSERTION'], 'project', + self.proj_customers['id']) + + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER = self._scope_request( + self.tokens['CUSTOMER_ASSERTION'], 'project', + self.proj_employees['id']) + + self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER = self._scope_request( + self.tokens['CUSTOMER_ASSERTION'], 'project', + self.project_inherited['id']) + + self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER = self._scope_request( + self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainA['id']) + + self.TOKEN_SCOPE_DOMAIN_B_FROM_CUSTOMER = self._scope_request( + self.tokens['CUSTOMER_ASSERTION'], 'domain', + self.domainB['id']) + + self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER = self._scope_request( + self.tokens['CUSTOMER_ASSERTION'], 'domain', self.domainD['id']) + + self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN = self._scope_request( + self.tokens['ADMIN_ASSERTION'], 'domain', self.domainA['id']) + + self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN = self._scope_request( + self.tokens['ADMIN_ASSERTION'], 'domain', self.domainB['id']) + + self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN = self._scope_request( + self.tokens['ADMIN_ASSERTION'], 'domain', + self.domainC['id']) + + +class FederatedIdentityProviderTests(FederationTests): + """A test class for Identity Providers.""" + + idp_keys = ['description', 'enabled'] + + default_body = {'description': None, 'enabled': True} + + def base_url(self, suffix=None): + if suffix is not None: + return '/OS-FEDERATION/identity_providers/' + str(suffix) + return '/OS-FEDERATION/identity_providers' + + def _fetch_attribute_from_response(self, resp, parameter, + assert_is_not_none=True): + """Fetch single attribute from TestResponse object.""" + result = resp.result.get(parameter) + if assert_is_not_none: + self.assertIsNotNone(result) + return result + + def _create_and_decapsulate_response(self, body=None): + """Create IdP and fetch it's random id along with entity.""" + default_resp = self._create_default_idp(body=body) + idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + self.assertIsNotNone(idp) + idp_id = idp.get('id') + return (idp_id, idp) + + def _get_idp(self, idp_id): + """Fetch IdP entity based on its id.""" + url = self.base_url(suffix=idp_id) + resp = self.get(url) + return resp + + def _create_default_idp(self, body=None): + """Create default IdP.""" + url = self.base_url(suffix=uuid.uuid4().hex) + if body is None: + body = self._http_idp_input() + resp = self.put(url, body={'identity_provider': body}, + expected_status=201) + return resp + + def _http_idp_input(self, **kwargs): + """Create default input for IdP data.""" + body = None + if 'body' not in kwargs: + body = self.default_body.copy() + body['description'] = uuid.uuid4().hex + else: + body = kwargs['body'] + return body + + def _assign_protocol_to_idp(self, idp_id=None, proto=None, url=None, + mapping_id=None, validate=True, **kwargs): + if url is None: + url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') + if idp_id is None: + idp_id, _ = self._create_and_decapsulate_response() + if proto is None: + proto = uuid.uuid4().hex + if mapping_id is None: + mapping_id = uuid.uuid4().hex + body = {'mapping_id': mapping_id} + url = url % {'idp_id': idp_id, 'protocol_id': proto} + resp = self.put(url, body={'protocol': body}, **kwargs) + if validate: + self.assertValidResponse(resp, 'protocol', dummy_validator, + keys_to_check=['id', 'mapping_id'], + ref={'id': proto, + 'mapping_id': mapping_id}) + return (resp, idp_id, proto) + + def _get_protocol(self, idp_id, protocol_id): + url = "%s/protocols/%s" % (idp_id, protocol_id) + url = self.base_url(suffix=url) + r = self.get(url) + return r + + def test_create_idp(self): + """Creates the IdentityProvider entity.""" + + keys_to_check = self.idp_keys + body = self._http_idp_input() + resp = self._create_default_idp(body=body) + self.assertValidResponse(resp, 'identity_provider', dummy_validator, + keys_to_check=keys_to_check, + ref=body) + + def test_create_idp_remote(self): + """Creates the IdentityProvider entity associated to a remote_id.""" + + keys_to_check = list(self.idp_keys) + keys_to_check.append('remote_id') + body = self.default_body.copy() + body['description'] = uuid.uuid4().hex + body['remote_id'] = uuid.uuid4().hex + resp = self._create_default_idp(body=body) + self.assertValidResponse(resp, 'identity_provider', dummy_validator, + keys_to_check=keys_to_check, + ref=body) + + def test_list_idps(self, iterations=5): + """Lists all available IdentityProviders. + + This test collects ids of created IdPs and + intersects it with the list of all available IdPs. + List of all IdPs can be a superset of IdPs created in this test, + because other tests also create IdPs. + + """ + def get_id(resp): + r = self._fetch_attribute_from_response(resp, + 'identity_provider') + return r.get('id') + + ids = [] + for _ in range(iterations): + id = get_id(self._create_default_idp()) + ids.append(id) + ids = set(ids) + + keys_to_check = self.idp_keys + url = self.base_url() + resp = self.get(url) + self.assertValidListResponse(resp, 'identity_providers', + dummy_validator, + keys_to_check=keys_to_check) + entities = self._fetch_attribute_from_response(resp, + 'identity_providers') + entities_ids = set([e['id'] for e in entities]) + ids_intersection = entities_ids.intersection(ids) + self.assertEqual(ids_intersection, ids) + + def test_check_idp_uniqueness(self): + """Add same IdP twice. + + Expect HTTP 409 code for the latter call. + + """ + url = self.base_url(suffix=uuid.uuid4().hex) + body = self._http_idp_input() + self.put(url, body={'identity_provider': body}, + expected_status=201) + self.put(url, body={'identity_provider': body}, + expected_status=409) + + def test_get_idp(self): + """Create and later fetch IdP.""" + body = self._http_idp_input() + default_resp = self._create_default_idp(body=body) + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + url = self.base_url(suffix=idp_id) + resp = self.get(url) + self.assertValidResponse(resp, 'identity_provider', + dummy_validator, keys_to_check=body.keys(), + ref=body) + + def test_get_nonexisting_idp(self): + """Fetch nonexisting IdP entity. + + Expected HTTP 404 status code. + + """ + idp_id = uuid.uuid4().hex + self.assertIsNotNone(idp_id) + + url = self.base_url(suffix=idp_id) + self.get(url, expected_status=404) + + def test_delete_existing_idp(self): + """Create and later delete IdP. + + Expect HTTP 404 for the GET IdP call. + """ + default_resp = self._create_default_idp() + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + self.assertIsNotNone(idp_id) + url = self.base_url(suffix=idp_id) + self.delete(url) + self.get(url, expected_status=404) + + def test_delete_nonexisting_idp(self): + """Delete nonexisting IdP. + + Expect HTTP 404 for the GET IdP call. + """ + idp_id = uuid.uuid4().hex + url = self.base_url(suffix=idp_id) + self.delete(url, expected_status=404) + + def test_update_idp_mutable_attributes(self): + """Update IdP's mutable parameters.""" + default_resp = self._create_default_idp() + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + url = self.base_url(suffix=idp_id) + self.assertIsNotNone(idp_id) + + _enabled = not default_idp.get('enabled') + body = {'remote_id': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': _enabled} + + body = {'identity_provider': body} + resp = self.patch(url, body=body) + updated_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + body = body['identity_provider'] + for key in body.keys(): + self.assertEqual(body[key], updated_idp.get(key)) + + resp = self.get(url) + updated_idp = self._fetch_attribute_from_response(resp, + 'identity_provider') + for key in body.keys(): + self.assertEqual(body[key], updated_idp.get(key)) + + def test_update_idp_immutable_attributes(self): + """Update IdP's immutable parameters. + + Expect HTTP 403 code. + + """ + default_resp = self._create_default_idp() + default_idp = self._fetch_attribute_from_response(default_resp, + 'identity_provider') + idp_id = default_idp.get('id') + self.assertIsNotNone(idp_id) + + body = self._http_idp_input() + body['id'] = uuid.uuid4().hex + body['protocols'] = [uuid.uuid4().hex, uuid.uuid4().hex] + + url = self.base_url(suffix=idp_id) + self.patch(url, body={'identity_provider': body}, expected_status=403) + + def test_update_nonexistent_idp(self): + """Update nonexistent IdP + + Expect HTTP 404 code. + + """ + idp_id = uuid.uuid4().hex + url = self.base_url(suffix=idp_id) + body = self._http_idp_input() + body['enabled'] = False + body = {'identity_provider': body} + + self.patch(url, body=body, expected_status=404) + + def test_assign_protocol_to_idp(self): + """Assign a protocol to existing IdP.""" + + self._assign_protocol_to_idp(expected_status=201) + + def test_protocol_composite_pk(self): + """Test whether Keystone let's add two entities with identical + names, however attached to different IdPs. + + 1. Add IdP and assign it protocol with predefined name + 2. Add another IdP and assign it a protocol with same name. + + Expect HTTP 201 code + + """ + url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') + + kwargs = {'expected_status': 201} + self._assign_protocol_to_idp(proto='saml2', + url=url, **kwargs) + + self._assign_protocol_to_idp(proto='saml2', + url=url, **kwargs) + + def test_protocol_idp_pk_uniqueness(self): + """Test whether Keystone checks for unique idp/protocol values. + + Add same protocol twice, expect Keystone to reject a latter call and + return HTTP 409 code. + + """ + url = self.base_url(suffix='%(idp_id)s/protocols/%(protocol_id)s') + + kwargs = {'expected_status': 201} + resp, idp_id, proto = self._assign_protocol_to_idp(proto='saml2', + url=url, **kwargs) + kwargs = {'expected_status': 409} + resp, idp_id, proto = self._assign_protocol_to_idp(idp_id=idp_id, + proto='saml2', + validate=False, + url=url, **kwargs) + + def test_assign_protocol_to_nonexistent_idp(self): + """Assign protocol to IdP that doesn't exist. + + Expect HTTP 404 code. + + """ + + idp_id = uuid.uuid4().hex + kwargs = {'expected_status': 404} + self._assign_protocol_to_idp(proto='saml2', + idp_id=idp_id, + validate=False, + **kwargs) + + def test_get_protocol(self): + """Create and later fetch protocol tied to IdP.""" + + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + proto_id = self._fetch_attribute_from_response(resp, 'protocol')['id'] + url = "%s/protocols/%s" % (idp_id, proto_id) + url = self.base_url(suffix=url) + + resp = self.get(url) + + reference = {'id': proto_id} + self.assertValidResponse(resp, 'protocol', + dummy_validator, + keys_to_check=reference.keys(), + ref=reference) + + def test_list_protocols(self): + """Create set of protocols and later list them. + + Compare input and output id sets. + + """ + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + iterations = random.randint(0, 16) + protocol_ids = [] + for _ in range(iterations): + resp, _, proto = self._assign_protocol_to_idp(idp_id=idp_id, + expected_status=201) + proto_id = self._fetch_attribute_from_response(resp, 'protocol') + proto_id = proto_id['id'] + protocol_ids.append(proto_id) + + url = "%s/protocols" % idp_id + url = self.base_url(suffix=url) + resp = self.get(url) + self.assertValidListResponse(resp, 'protocols', + dummy_validator, + keys_to_check=['id']) + entities = self._fetch_attribute_from_response(resp, 'protocols') + entities = set([entity['id'] for entity in entities]) + protocols_intersection = entities.intersection(protocol_ids) + self.assertEqual(protocols_intersection, set(protocol_ids)) + + def test_update_protocols_attribute(self): + """Update protocol's attribute.""" + + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + new_mapping_id = uuid.uuid4().hex + + url = "%s/protocols/%s" % (idp_id, proto) + url = self.base_url(suffix=url) + body = {'mapping_id': new_mapping_id} + resp = self.patch(url, body={'protocol': body}) + self.assertValidResponse(resp, 'protocol', dummy_validator, + keys_to_check=['id', 'mapping_id'], + ref={'id': proto, + 'mapping_id': new_mapping_id} + ) + + def test_delete_protocol(self): + """Delete protocol. + + Expect HTTP 404 code for the GET call after the protocol is deleted. + + """ + url = self.base_url(suffix='/%(idp_id)s/' + 'protocols/%(protocol_id)s') + resp, idp_id, proto = self._assign_protocol_to_idp(expected_status=201) + url = url % {'idp_id': idp_id, + 'protocol_id': proto} + self.delete(url) + self.get(url, expected_status=404) + + +class MappingCRUDTests(FederationTests): + """A class for testing CRUD operations for Mappings.""" + + MAPPING_URL = '/OS-FEDERATION/mappings/' + + def assertValidMappingListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'mappings', + self.assertValidMapping, + keys_to_check=[], + *args, + **kwargs) + + def assertValidMappingResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'mapping', + self.assertValidMapping, + keys_to_check=[], + *args, + **kwargs) + + def assertValidMapping(self, entity, ref=None): + self.assertIsNotNone(entity.get('id')) + self.assertIsNotNone(entity.get('rules')) + if ref: + self.assertEqual(jsonutils.loads(entity['rules']), ref['rules']) + return entity + + def _create_default_mapping_entry(self): + url = self.MAPPING_URL + uuid.uuid4().hex + resp = self.put(url, + body={'mapping': mapping_fixtures.MAPPING_LARGE}, + expected_status=201) + return resp + + def _get_id_from_response(self, resp): + r = resp.result.get('mapping') + return r.get('id') + + def test_mapping_create(self): + resp = self._create_default_mapping_entry() + self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) + + def test_mapping_list(self): + url = self.MAPPING_URL + self._create_default_mapping_entry() + resp = self.get(url) + entities = resp.result.get('mappings') + self.assertIsNotNone(entities) + self.assertResponseStatus(resp, 200) + self.assertValidListLinks(resp.result.get('links')) + self.assertEqual(1, len(entities)) + + def test_mapping_delete(self): + url = self.MAPPING_URL + '%(mapping_id)s' + resp = self._create_default_mapping_entry() + mapping_id = self._get_id_from_response(resp) + url = url % {'mapping_id': str(mapping_id)} + resp = self.delete(url) + self.assertResponseStatus(resp, 204) + self.get(url, expected_status=404) + + def test_mapping_get(self): + url = self.MAPPING_URL + '%(mapping_id)s' + resp = self._create_default_mapping_entry() + mapping_id = self._get_id_from_response(resp) + url = url % {'mapping_id': mapping_id} + resp = self.get(url) + self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_LARGE) + + def test_mapping_update(self): + url = self.MAPPING_URL + '%(mapping_id)s' + resp = self._create_default_mapping_entry() + mapping_id = self._get_id_from_response(resp) + url = url % {'mapping_id': mapping_id} + resp = self.patch(url, + body={'mapping': mapping_fixtures.MAPPING_SMALL}) + self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) + resp = self.get(url) + self.assertValidMappingResponse(resp, mapping_fixtures.MAPPING_SMALL) + + def test_delete_mapping_dne(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.delete(url, expected_status=404) + + def test_get_mapping_dne(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.get(url, expected_status=404) + + def test_create_mapping_bad_requirements(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_BAD_REQ}) + + def test_create_mapping_no_rules(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_NO_RULES}) + + def test_create_mapping_no_remote_objects(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_NO_REMOTE}) + + def test_create_mapping_bad_value(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_BAD_VALUE}) + + def test_create_mapping_missing_local(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_MISSING_LOCAL}) + + def test_create_mapping_missing_type(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_MISSING_TYPE}) + + def test_create_mapping_wrong_type(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_WRONG_TYPE}) + + def test_create_mapping_extra_remote_properties_not_any_of(self): + url = self.MAPPING_URL + uuid.uuid4().hex + mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_NOT_ANY_OF + self.put(url, expected_status=400, body={'mapping': mapping}) + + def test_create_mapping_extra_remote_properties_any_one_of(self): + url = self.MAPPING_URL + uuid.uuid4().hex + mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_ANY_ONE_OF + self.put(url, expected_status=400, body={'mapping': mapping}) + + def test_create_mapping_extra_remote_properties_just_type(self): + url = self.MAPPING_URL + uuid.uuid4().hex + mapping = mapping_fixtures.MAPPING_EXTRA_REMOTE_PROPS_JUST_TYPE + self.put(url, expected_status=400, body={'mapping': mapping}) + + def test_create_mapping_empty_map(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': {}}) + + def test_create_mapping_extra_rules_properties(self): + url = self.MAPPING_URL + uuid.uuid4().hex + self.put(url, expected_status=400, + body={'mapping': mapping_fixtures.MAPPING_EXTRA_RULES_PROPS}) + + def test_create_mapping_with_blacklist_and_whitelist(self): + """Test for adding whitelist and blacklist in the rule + + Server should respond with HTTP 400 error upon discovering both + ``whitelist`` and ``blacklist`` keywords in the same rule. + + """ + url = self.MAPPING_URL + uuid.uuid4().hex + mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_AND_BLACKLIST + self.put(url, expected_status=400, body={'mapping': mapping}) + + +class MappingRuleEngineTests(FederationTests): + """A class for testing the mapping rule engine.""" + + def assertValidMappedUserObject(self, mapped_properties, + user_type='ephemeral', + domain_id=None): + """Check whether mapped properties object has 'user' within. + + According to today's rules, RuleProcessor does not have to issue user's + id or name. What's actually required is user's type and for ephemeral + users that would be service domain named 'Federated'. + """ + self.assertIn('user', mapped_properties, + message='Missing user object in mapped properties') + user = mapped_properties['user'] + self.assertIn('type', user) + self.assertEqual(user_type, user['type']) + self.assertIn('domain', user) + domain = user['domain'] + domain_name_or_id = domain.get('id') or domain.get('name') + domain_ref = domain_id or federation.FEDERATED_DOMAIN_KEYWORD + self.assertEqual(domain_ref, domain_name_or_id) + + def test_rule_engine_any_one_of_and_direct_mapping(self): + """Should return user's name and group id EMPLOYEE_GROUP_ID. + + The ADMIN_ASSERTION should successfully have a match in MAPPING_LARGE. + They will test the case where `any_one_of` is valid, and there is + a direct mapping for the users name. + + """ + + mapping = mapping_fixtures.MAPPING_LARGE + assertion = mapping_fixtures.ADMIN_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + fn = assertion.get('FirstName') + ln = assertion.get('LastName') + full_name = '%s %s' % (fn, ln) + group_ids = values.get('group_ids') + user_name = values.get('user', {}).get('name') + + self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) + self.assertEqual(full_name, user_name) + + def test_rule_engine_no_regex_match(self): + """Should deny authorization, the email of the tester won't match. + + This will not match since the email in the assertion will fail + the regex test. It is set to match any @example.com address. + But the incoming value is set to eviltester@example.org. + RuleProcessor should return list of empty group_ids. + + """ + + mapping = mapping_fixtures.MAPPING_LARGE + assertion = mapping_fixtures.BAD_TESTER_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + + self.assertValidMappedUserObject(mapped_properties) + self.assertIsNone(mapped_properties['user'].get('name')) + self.assertListEqual(list(), mapped_properties['group_ids']) + + def test_rule_engine_regex_many_groups(self): + """Should return group CONTRACTOR_GROUP_ID. + + The TESTER_ASSERTION should successfully have a match in + MAPPING_TESTER_REGEX. This will test the case where many groups + are in the assertion, and a regex value is used to try and find + a match. + + """ + + mapping = mapping_fixtures.MAPPING_TESTER_REGEX + assertion = mapping_fixtures.TESTER_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + self.assertValidMappedUserObject(values) + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertEqual(user_name, name) + self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids) + + def test_rule_engine_any_one_of_many_rules(self): + """Should return group CONTRACTOR_GROUP_ID. + + The CONTRACTOR_ASSERTION should successfully have a match in + MAPPING_SMALL. This will test the case where many rules + must be matched, including an `any_one_of`, and a direct + mapping. + + """ + + mapping = mapping_fixtures.MAPPING_SMALL + assertion = mapping_fixtures.CONTRACTOR_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + self.assertValidMappedUserObject(values) + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertEqual(user_name, name) + self.assertIn(mapping_fixtures.CONTRACTOR_GROUP_ID, group_ids) + + def test_rule_engine_not_any_of_and_direct_mapping(self): + """Should return user's name and email. + + The CUSTOMER_ASSERTION should successfully have a match in + MAPPING_LARGE. This will test the case where a requirement + has `not_any_of`, and direct mapping to a username, no group. + + """ + + mapping = mapping_fixtures.MAPPING_LARGE + assertion = mapping_fixtures.CUSTOMER_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + self.assertValidMappedUserObject(values) + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertEqual(user_name, name) + self.assertEqual([], group_ids,) + + def test_rule_engine_not_any_of_many_rules(self): + """Should return group EMPLOYEE_GROUP_ID. + + The EMPLOYEE_ASSERTION should successfully have a match in + MAPPING_SMALL. This will test the case where many remote + rules must be matched, including a `not_any_of`. + + """ + + mapping = mapping_fixtures.MAPPING_SMALL + assertion = mapping_fixtures.EMPLOYEE_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + self.assertValidMappedUserObject(values) + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertEqual(user_name, name) + self.assertIn(mapping_fixtures.EMPLOYEE_GROUP_ID, group_ids) + + def test_rule_engine_not_any_of_regex_verify_pass(self): + """Should return group DEVELOPER_GROUP_ID. + + The DEVELOPER_ASSERTION should successfully have a match in + MAPPING_DEVELOPER_REGEX. This will test the case where many + remote rules must be matched, including a `not_any_of`, with + regex set to True. + + """ + + mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX + assertion = mapping_fixtures.DEVELOPER_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + self.assertValidMappedUserObject(values) + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertEqual(user_name, name) + self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids) + + def test_rule_engine_not_any_of_regex_verify_fail(self): + """Should deny authorization. + + The email in the assertion will fail the regex test. + It is set to reject any @example.org address, but the + incoming value is set to evildeveloper@example.org. + RuleProcessor should return list of empty group_ids. + + """ + + mapping = mapping_fixtures.MAPPING_DEVELOPER_REGEX + assertion = mapping_fixtures.BAD_DEVELOPER_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + + self.assertValidMappedUserObject(mapped_properties) + self.assertIsNone(mapped_properties['user'].get('name')) + self.assertListEqual(list(), mapped_properties['group_ids']) + + def _rule_engine_regex_match_and_many_groups(self, assertion): + """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID. + + A helper function injecting assertion passed as an argument. + Expect DEVELOPER_GROUP_ID and TESTER_GROUP_ID in the results. + + """ + + mapping = mapping_fixtures.MAPPING_LARGE + rp = mapping_utils.RuleProcessor(mapping['rules']) + values = rp.process(assertion) + + user_name = assertion.get('UserName') + group_ids = values.get('group_ids') + name = values.get('user', {}).get('name') + + self.assertValidMappedUserObject(values) + self.assertEqual(user_name, name) + self.assertIn(mapping_fixtures.DEVELOPER_GROUP_ID, group_ids) + self.assertIn(mapping_fixtures.TESTER_GROUP_ID, group_ids) + + def test_rule_engine_regex_match_and_many_groups(self): + """Should return group DEVELOPER_GROUP_ID and TESTER_GROUP_ID. + + The TESTER_ASSERTION should successfully have a match in + MAPPING_LARGE. This will test a successful regex match + for an `any_one_of` evaluation type, and will have many + groups returned. + + """ + self._rule_engine_regex_match_and_many_groups( + mapping_fixtures.TESTER_ASSERTION) + + def test_rule_engine_discards_nonstring_objects(self): + """Check whether RuleProcessor discards non string objects. + + Despite the fact that assertion is malformed and contains + non string objects, RuleProcessor should correctly discard them and + successfully have a match in MAPPING_LARGE. + + """ + self._rule_engine_regex_match_and_many_groups( + mapping_fixtures.MALFORMED_TESTER_ASSERTION) + + def test_rule_engine_fails_after_discarding_nonstring(self): + """Check whether RuleProcessor discards non string objects. + + Expect RuleProcessor to discard non string object, which + is required for a correct rule match. RuleProcessor will result with + empty list of groups. + + """ + mapping = mapping_fixtures.MAPPING_SMALL + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CONTRACTOR_MALFORMED_ASSERTION + mapped_properties = rp.process(assertion) + self.assertValidMappedUserObject(mapped_properties) + self.assertIsNone(mapped_properties['user'].get('name')) + self.assertListEqual(list(), mapped_properties['group_ids']) + + def test_rule_engine_returns_group_names(self): + """Check whether RuleProcessor returns group names with their domains. + + RuleProcessor should return 'group_names' entry with a list of + dictionaries with two entries 'name' and 'domain' identifying group by + its name and domain. + + """ + mapping = mapping_fixtures.MAPPING_GROUP_NAMES + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.EMPLOYEE_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + reference = { + mapping_fixtures.DEVELOPER_GROUP_NAME: + { + "name": mapping_fixtures.DEVELOPER_GROUP_NAME, + "domain": { + "name": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_NAME + } + }, + mapping_fixtures.TESTER_GROUP_NAME: + { + "name": mapping_fixtures.TESTER_GROUP_NAME, + "domain": { + "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID + } + } + } + for rule in mapped_properties['group_names']: + self.assertDictEqual(reference.get(rule.get('name')), rule) + + def test_rule_engine_whitelist_and_direct_groups_mapping(self): + """Should return user's groups Developer and Contractor. + + The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match + in MAPPING_GROUPS_WHITELIST. It will test the case where 'whitelist' + correctly filters out Manager and only allows Developer and Contractor. + + """ + + mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST + assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + + reference = { + mapping_fixtures.DEVELOPER_GROUP_NAME: + { + "name": mapping_fixtures.DEVELOPER_GROUP_NAME, + "domain": { + "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID + } + }, + mapping_fixtures.CONTRACTOR_GROUP_NAME: + { + "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, + "domain": { + "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID + } + } + } + for rule in mapped_properties['group_names']: + self.assertDictEqual(reference.get(rule.get('name')), rule) + + self.assertEqual('tbo', mapped_properties['user']['name']) + self.assertEqual([], mapped_properties['group_ids']) + + def test_rule_engine_blacklist_and_direct_groups_mapping(self): + """Should return user's group Developer. + + The EMPLOYEE_ASSERTION_MULTIPLE_GROUPS should successfully have a match + in MAPPING_GROUPS_BLACKLIST. It will test the case where 'blacklist' + correctly filters out Manager and Developer and only allows Contractor. + + """ + + mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST + assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + + reference = { + mapping_fixtures.CONTRACTOR_GROUP_NAME: + { + "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, + "domain": { + "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID + } + } + } + for rule in mapped_properties['group_names']: + self.assertDictEqual(reference.get(rule.get('name')), rule) + self.assertEqual('tbo', mapped_properties['user']['name']) + self.assertEqual([], mapped_properties['group_ids']) + + def test_rule_engine_blacklist_and_direct_groups_mapping_multiples(self): + """Tests matching multiple values before the blacklist. + + Verifies that the local indexes are correct when matching multiple + remote values for a field when the field occurs before the blacklist + entry in the remote rules. + + """ + + mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MULTIPLES + assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + + reference = { + mapping_fixtures.CONTRACTOR_GROUP_NAME: + { + "name": mapping_fixtures.CONTRACTOR_GROUP_NAME, + "domain": { + "id": mapping_fixtures.DEVELOPER_GROUP_DOMAIN_ID + } + } + } + for rule in mapped_properties['group_names']: + self.assertDictEqual(reference.get(rule.get('name')), rule) + self.assertEqual('tbo', mapped_properties['user']['name']) + self.assertEqual([], mapped_properties['group_ids']) + + def test_rule_engine_whitelist_direct_group_mapping_missing_domain(self): + """Test if the local rule is rejected upon missing domain value + + This is a variation with a ``whitelist`` filter. + + """ + mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST_MISSING_DOMAIN + assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS + rp = mapping_utils.RuleProcessor(mapping['rules']) + self.assertRaises(exception.ValidationError, rp.process, assertion) + + def test_rule_engine_blacklist_direct_group_mapping_missing_domain(self): + """Test if the local rule is rejected upon missing domain value + + This is a variation with a ``blacklist`` filter. + + """ + mapping = mapping_fixtures.MAPPING_GROUPS_BLACKLIST_MISSING_DOMAIN + assertion = mapping_fixtures.EMPLOYEE_ASSERTION_MULTIPLE_GROUPS + rp = mapping_utils.RuleProcessor(mapping['rules']) + self.assertRaises(exception.ValidationError, rp.process, assertion) + + def test_rule_engine_no_groups_allowed(self): + """Should return user mapped to no groups. + + The EMPLOYEE_ASSERTION should successfully have a match + in MAPPING_GROUPS_WHITELIST, but 'whitelist' should filter out + the group values from the assertion and thus map to no groups. + + """ + mapping = mapping_fixtures.MAPPING_GROUPS_WHITELIST + assertion = mapping_fixtures.EMPLOYEE_ASSERTION + rp = mapping_utils.RuleProcessor(mapping['rules']) + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertListEqual(mapped_properties['group_names'], []) + self.assertListEqual(mapped_properties['group_ids'], []) + self.assertEqual('tbo', mapped_properties['user']['name']) + + def test_mapping_federated_domain_specified(self): + """Test mapping engine when domain 'ephemeral' is explicitely set. + + For that, we use mapping rule MAPPING_EPHEMERAL_USER and assertion + EMPLOYEE_ASSERTION + + """ + mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.EMPLOYEE_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + + def test_create_user_object_with_bad_mapping(self): + """Test if user object is created even with bad mapping. + + User objects will be created by mapping engine always as long as there + is corresponding local rule. This test shows, that even with assertion + where no group names nor ids are matched, but there is 'blind' rule for + mapping user, such object will be created. + + In this test MAPPING_EHPEMERAL_USER expects UserName set to jsmith + whereas value from assertion is 'tbo'. + + """ + mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CONTRACTOR_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + + self.assertNotIn('id', mapped_properties['user']) + self.assertNotIn('name', mapped_properties['user']) + + def test_set_ephemeral_domain_to_ephemeral_users(self): + """Test auto assigning service domain to ephemeral users. + + Test that ephemeral users will always become members of federated + service domain. The check depends on ``type`` value which must be set + to ``ephemeral`` in case of ephemeral user. + + """ + mapping = mapping_fixtures.MAPPING_EPHEMERAL_USER_LOCAL_DOMAIN + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CONTRACTOR_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + + def test_local_user_local_domain(self): + """Test that local users can have non-service domains assigned.""" + mapping = mapping_fixtures.MAPPING_LOCAL_USER_LOCAL_DOMAIN + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CONTRACTOR_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject( + mapped_properties, user_type='local', + domain_id=mapping_fixtures.LOCAL_DOMAIN) + + def test_user_identifications_name(self): + """Test varius mapping options and how users are identified. + + This test calls mapped.setup_username() for propagating user object. + + Test plan: + - Check if the user has proper domain ('federated') set + - Check if the user has property type set ('ephemeral') + - Check if user's name is properly mapped from the assertion + - Check if user's id is properly set and equal to name, as it was not + explicitely specified in the mapping. + + """ + mapping = mapping_fixtures.MAPPING_USER_IDS + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CONTRACTOR_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + mapped.setup_username({}, mapped_properties) + self.assertEqual('jsmith', mapped_properties['user']['id']) + self.assertEqual('jsmith', mapped_properties['user']['name']) + + def test_user_identifications_name_and_federated_domain(self): + """Test varius mapping options and how users are identified. + + This test calls mapped.setup_username() for propagating user object. + + Test plan: + - Check if the user has proper domain ('federated') set + - Check if the user has propert type set ('ephemeral') + - Check if user's name is properly mapped from the assertion + - Check if user's id is properly set and equal to name, as it was not + explicitely specified in the mapping. + + """ + mapping = mapping_fixtures.MAPPING_USER_IDS + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.EMPLOYEE_ASSERTION + mapped_properties = rp.process(assertion) + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + mapped.setup_username({}, mapped_properties) + self.assertEqual('tbo', mapped_properties['user']['name']) + self.assertEqual('tbo', mapped_properties['user']['id']) + + def test_user_identification_id(self): + """Test varius mapping options and how users are identified. + + This test calls mapped.setup_username() for propagating user object. + + Test plan: + - Check if the user has proper domain ('federated') set + - Check if the user has propert type set ('ephemeral') + - Check if user's id is properly mapped from the assertion + - Check if user's name is properly set and equal to id, as it was not + explicitely specified in the mapping. + + """ + mapping = mapping_fixtures.MAPPING_USER_IDS + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.ADMIN_ASSERTION + mapped_properties = rp.process(assertion) + context = {'environment': {}} + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + mapped.setup_username(context, mapped_properties) + self.assertEqual('bob', mapped_properties['user']['name']) + self.assertEqual('bob', mapped_properties['user']['id']) + + def test_user_identification_id_and_name(self): + """Test varius mapping options and how users are identified. + + This test calls mapped.setup_username() for propagating user object. + + Test plan: + - Check if the user has proper domain ('federated') set + - Check if the user has proper type set ('ephemeral') + - Check if user's name is properly mapped from the assertion + - Check if user's id is properly set and and equal to value hardcoded + in the mapping + + """ + mapping = mapping_fixtures.MAPPING_USER_IDS + rp = mapping_utils.RuleProcessor(mapping['rules']) + assertion = mapping_fixtures.CUSTOMER_ASSERTION + mapped_properties = rp.process(assertion) + context = {'environment': {}} + self.assertIsNotNone(mapped_properties) + self.assertValidMappedUserObject(mapped_properties) + mapped.setup_username(context, mapped_properties) + self.assertEqual('bwilliams', mapped_properties['user']['name']) + self.assertEqual('abc123', mapped_properties['user']['id']) + + +class FederatedTokenTests(FederationTests, FederatedSetupMixin): + + def auth_plugin_config_override(self): + methods = ['saml2'] + method_classes = {'saml2': 'keystone.auth.plugins.saml2.Saml2'} + super(FederatedTokenTests, self).auth_plugin_config_override( + methods, **method_classes) + + def setUp(self): + super(FederatedTokenTests, self).setUp() + self._notifications = [] + + def fake_saml_notify(action, context, user_id, group_ids, + identity_provider, protocol, token_id, outcome): + note = { + 'action': action, + 'user_id': user_id, + 'identity_provider': identity_provider, + 'protocol': protocol, + 'send_notification_called': True} + self._notifications.append(note) + + self.useFixture(mockpatch.PatchObject( + notifications, + 'send_saml_audit_notification', + fake_saml_notify)) + + def _assert_last_notify(self, action, identity_provider, protocol, + user_id=None): + self.assertTrue(self._notifications) + note = self._notifications[-1] + if user_id: + self.assertEqual(note['user_id'], user_id) + self.assertEqual(note['action'], action) + self.assertEqual(note['identity_provider'], identity_provider) + self.assertEqual(note['protocol'], protocol) + self.assertTrue(note['send_notification_called']) + + def load_fixtures(self, fixtures): + super(FederationTests, self).load_fixtures(fixtures) + self.load_federation_sample_data() + + def test_issue_unscoped_token_notify(self): + self._issue_unscoped_token() + self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL) + + def test_issue_unscoped_token(self): + r = self._issue_unscoped_token() + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_issue_unscoped_token_disabled_idp(self): + """Checks if authentication works with disabled identity providers. + + Test plan: + 1) Disable default IdP + 2) Try issuing unscoped token for that IdP + 3) Expect server to forbid authentication + + """ + enabled_false = {'enabled': False} + self.federation_api.update_idp(self.IDP, enabled_false) + self.assertRaises(exception.Forbidden, + self._issue_unscoped_token) + + def test_issue_unscoped_token_group_names_in_mapping(self): + r = self._issue_unscoped_token(assertion='ANOTHER_CUSTOMER_ASSERTION') + ref_groups = set([self.group_customers['id'], self.group_admins['id']]) + token_resp = r.json_body + token_groups = token_resp['token']['user']['OS-FEDERATION']['groups'] + token_groups = set([group['id'] for group in token_groups]) + self.assertEqual(ref_groups, token_groups) + + def test_issue_unscoped_tokens_nonexisting_group(self): + self.assertRaises(exception.MissingGroups, + self._issue_unscoped_token, + assertion='ANOTHER_TESTER_ASSERTION') + + def test_issue_unscoped_token_with_remote_no_attribute(self): + r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, + environment={ + self.REMOTE_ID_ATTR: self.REMOTE_ID + }) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_issue_unscoped_token_with_remote(self): + self.config_fixture.config(group='federation', + remote_id_attribute=self.REMOTE_ID_ATTR) + r = self._issue_unscoped_token(idp=self.IDP_WITH_REMOTE, + environment={ + self.REMOTE_ID_ATTR: self.REMOTE_ID + }) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_issue_unscoped_token_with_remote_different(self): + self.config_fixture.config(group='federation', + remote_id_attribute=self.REMOTE_ID_ATTR) + self.assertRaises(exception.Forbidden, + self._issue_unscoped_token, + idp=self.IDP_WITH_REMOTE, + environment={ + self.REMOTE_ID_ATTR: uuid.uuid4().hex + }) + + def test_issue_unscoped_token_with_remote_unavailable(self): + self.config_fixture.config(group='federation', + remote_id_attribute=self.REMOTE_ID_ATTR) + self.assertRaises(exception.ValidationError, + self._issue_unscoped_token, + idp=self.IDP_WITH_REMOTE, + environment={ + uuid.uuid4().hex: uuid.uuid4().hex + }) + + def test_issue_unscoped_token_with_remote_user_as_empty_string(self): + # make sure that REMOTE_USER set as the empty string won't interfere + r = self._issue_unscoped_token(environment={'REMOTE_USER': ''}) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_issue_unscoped_token_no_groups(self): + self.assertRaises(exception.Unauthorized, + self._issue_unscoped_token, + assertion='BAD_TESTER_ASSERTION') + + def test_issue_unscoped_token_malformed_environment(self): + """Test whether non string objects are filtered out. + + Put non string objects into the environment, inject + correct assertion and try to get an unscoped token. + Expect server not to fail on using split() method on + non string objects and return token id in the HTTP header. + + """ + api = auth_controllers.Auth() + context = { + 'environment': { + 'malformed_object': object(), + 'another_bad_idea': tuple(xrange(10)), + 'yet_another_bad_param': dict(zip(uuid.uuid4().hex, + range(32))) + } + } + self._inject_assertion(context, 'EMPLOYEE_ASSERTION') + r = api.authenticate_for_token(context, self.UNSCOPED_V3_SAML2_REQ) + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_scope_to_project_once_notify(self): + r = self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) + user_id = r.json['token']['user']['id'] + self._assert_last_notify(self.ACTION, self.IDP, self.PROTOCOL, user_id) + + def test_scope_to_project_once(self): + r = self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) + token_resp = r.result['token'] + project_id = token_resp['project']['id'] + self.assertEqual(project_id, self.proj_employees['id']) + self._check_scoped_token_attributes(token_resp) + roles_ref = [self.role_employee] + projects_ref = self.proj_employees + self._check_projects_and_roles(token_resp, roles_ref, projects_ref) + + def test_scope_token_with_idp_disabled(self): + """Scope token issued by disabled IdP. + + Try scoping the token issued by an IdP which is disabled now. Expect + server to refuse scoping operation. + + This test confirms correct behaviour when IdP was enabled and unscoped + token was issued, but disabled before user tries to scope the token. + Here we assume the unscoped token was already issued and start from + the moment where IdP is being disabled and unscoped token is being + used. + + Test plan: + 1) Disable IdP + 2) Try scoping unscoped token + + """ + enabled_false = {'enabled': False} + self.federation_api.update_idp(self.IDP, enabled_false) + self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, + expected_status=403) + + def test_scope_to_bad_project(self): + """Scope unscoped token with a project we don't have access to.""" + + self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_CUSTOMER, + expected_status=401) + + def test_scope_to_project_multiple_times(self): + """Try to scope the unscoped token multiple times. + + The new tokens should be scoped to: + + * Customers' project + * Employees' project + + """ + + bodies = (self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_ADMIN, + self.TOKEN_SCOPE_PROJECT_CUSTOMER_FROM_ADMIN) + project_ids = (self.proj_employees['id'], + self.proj_customers['id']) + for body, project_id_ref in zip(bodies, project_ids): + r = self.v3_authenticate_token(body) + token_resp = r.result['token'] + project_id = token_resp['project']['id'] + self.assertEqual(project_id, project_id_ref) + self._check_scoped_token_attributes(token_resp) + + def test_scope_to_project_with_only_inherited_roles(self): + """Try to scope token whose only roles are inherited.""" + self.config_fixture.config(group='os_inherit', enabled=True) + r = self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_INHERITED_FROM_CUSTOMER) + token_resp = r.result['token'] + project_id = token_resp['project']['id'] + self.assertEqual(project_id, self.project_inherited['id']) + self._check_scoped_token_attributes(token_resp) + roles_ref = [self.role_customer] + projects_ref = self.project_inherited + self._check_projects_and_roles(token_resp, roles_ref, projects_ref) + + def test_scope_token_from_nonexistent_unscoped_token(self): + """Try to scope token from non-existent unscoped token.""" + self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_FROM_NONEXISTENT_TOKEN, + expected_status=404) + + def test_issue_token_from_rules_without_user(self): + api = auth_controllers.Auth() + context = {'environment': {}} + self._inject_assertion(context, 'BAD_TESTER_ASSERTION') + self.assertRaises(exception.Unauthorized, + api.authenticate_for_token, + context, self.UNSCOPED_V3_SAML2_REQ) + + def test_issue_token_with_nonexistent_group(self): + """Inject assertion that matches rule issuing bad group id. + + Expect server to find out that some groups are missing in the + backend and raise exception.MappedGroupNotFound exception. + + """ + self.assertRaises(exception.MappedGroupNotFound, + self._issue_unscoped_token, + assertion='CONTRACTOR_ASSERTION') + + def test_scope_to_domain_once(self): + r = self.v3_authenticate_token(self.TOKEN_SCOPE_DOMAIN_A_FROM_CUSTOMER) + token_resp = r.result['token'] + domain_id = token_resp['domain']['id'] + self.assertEqual(self.domainA['id'], domain_id) + self._check_scoped_token_attributes(token_resp) + + def test_scope_to_domain_multiple_tokens(self): + """Issue multiple tokens scoping to different domains. + + The new tokens should be scoped to: + + * domainA + * domainB + * domainC + + """ + bodies = (self.TOKEN_SCOPE_DOMAIN_A_FROM_ADMIN, + self.TOKEN_SCOPE_DOMAIN_B_FROM_ADMIN, + self.TOKEN_SCOPE_DOMAIN_C_FROM_ADMIN) + domain_ids = (self.domainA['id'], + self.domainB['id'], + self.domainC['id']) + + for body, domain_id_ref in zip(bodies, domain_ids): + r = self.v3_authenticate_token(body) + token_resp = r.result['token'] + domain_id = token_resp['domain']['id'] + self.assertEqual(domain_id_ref, domain_id) + self._check_scoped_token_attributes(token_resp) + + def test_scope_to_domain_with_only_inherited_roles_fails(self): + """Try to scope to a domain that has no direct roles.""" + self.v3_authenticate_token( + self.TOKEN_SCOPE_DOMAIN_D_FROM_CUSTOMER, + expected_status=401) + + def test_list_projects(self): + urls = ('/OS-FEDERATION/projects', '/auth/projects') + + token = (self.tokens['CUSTOMER_ASSERTION'], + self.tokens['EMPLOYEE_ASSERTION'], + self.tokens['ADMIN_ASSERTION']) + + self.config_fixture.config(group='os_inherit', enabled=True) + projects_refs = (set([self.proj_customers['id'], + self.project_inherited['id']]), + set([self.proj_employees['id'], + self.project_all['id']]), + set([self.proj_employees['id'], + self.project_all['id'], + self.proj_customers['id'], + self.project_inherited['id']])) + + for token, projects_ref in zip(token, projects_refs): + for url in urls: + r = self.get(url, token=token) + projects_resp = r.result['projects'] + projects = set(p['id'] for p in projects_resp) + self.assertEqual(projects_ref, projects, + 'match failed for url %s' % url) + + def test_list_domains(self): + urls = ('/OS-FEDERATION/domains', '/auth/domains') + + tokens = (self.tokens['CUSTOMER_ASSERTION'], + self.tokens['EMPLOYEE_ASSERTION'], + self.tokens['ADMIN_ASSERTION']) + + # NOTE(henry-nash): domain D does not appear in the expected results + # since it only had inherited roles (which only apply to projects + # within the domain) + + domain_refs = (set([self.domainA['id']]), + set([self.domainA['id'], + self.domainB['id']]), + set([self.domainA['id'], + self.domainB['id'], + self.domainC['id']])) + + for token, domains_ref in zip(tokens, domain_refs): + for url in urls: + r = self.get(url, token=token) + domains_resp = r.result['domains'] + domains = set(p['id'] for p in domains_resp) + self.assertEqual(domains_ref, domains, + 'match failed for url %s' % url) + + def test_full_workflow(self): + """Test 'standard' workflow for granting access tokens. + + * Issue unscoped token + * List available projects based on groups + * Scope token to one of available projects + + """ + + r = self._issue_unscoped_token() + employee_unscoped_token_id = r.headers.get('X-Subject-Token') + r = self.get('/OS-FEDERATION/projects', + token=employee_unscoped_token_id) + projects = r.result['projects'] + random_project = random.randint(0, len(projects)) - 1 + project = projects[random_project] + + v3_scope_request = self._scope_request(employee_unscoped_token_id, + 'project', project['id']) + + r = self.v3_authenticate_token(v3_scope_request) + token_resp = r.result['token'] + project_id = token_resp['project']['id'] + self.assertEqual(project['id'], project_id) + self._check_scoped_token_attributes(token_resp) + + def test_workflow_with_groups_deletion(self): + """Test full workflow with groups deletion before token scoping. + + The test scenario is as follows: + - Create group ``group`` + - Create and assign roles to ``group`` and ``project_all`` + - Patch mapping rules for existing IdP so it issues group id + - Issue unscoped token with ``group``'s id + - Delete group ``group`` + - Scope token to ``project_all`` + - Expect HTTP 500 response + + """ + # create group and role + group = self.new_group_ref( + domain_id=self.domainA['id']) + group = self.identity_api.create_group(group) + role = self.new_role_ref() + self.role_api.create_role(role['id'], role) + + # assign role to group and project_admins + self.assignment_api.create_grant(role['id'], + group_id=group['id'], + project_id=self.project_all['id']) + + rules = { + 'rules': [ + { + 'local': [ + { + 'group': { + 'id': group['id'] + } + }, + { + 'user': { + 'name': '{0}' + } + } + ], + 'remote': [ + { + 'type': 'UserName' + }, + { + 'type': 'LastName', + 'any_one_of': [ + 'Account' + ] + } + ] + } + ] + } + + self.federation_api.update_mapping(self.mapping['id'], rules) + + r = self._issue_unscoped_token(assertion='TESTER_ASSERTION') + token_id = r.headers.get('X-Subject-Token') + + # delete group + self.identity_api.delete_group(group['id']) + + # scope token to project_all, expect HTTP 500 + scoped_token = self._scope_request( + token_id, 'project', + self.project_all['id']) + + self.v3_authenticate_token(scoped_token, expected_status=500) + + def test_lists_with_missing_group_in_backend(self): + """Test a mapping that points to a group that does not exist + + For explicit mappings, we expect the group to exist in the backend, + but for lists, specifically blacklists, a missing group is expected + as many groups will be specified by the IdP that are not Keystone + groups. + + The test scenario is as follows: + - Create group ``EXISTS`` + - Set mapping rules for existing IdP with a blacklist + that passes through as REMOTE_USER_GROUPS + - Issue unscoped token with on group ``EXISTS`` id in it + + """ + domain_id = self.domainA['id'] + domain_name = self.domainA['name'] + group = self.new_group_ref(domain_id=domain_id) + group['name'] = 'EXISTS' + group = self.identity_api.create_group(group) + rules = { + 'rules': [ + { + "local": [ + { + "user": { + "name": "{0}", + "id": "{0}" + } + } + ], + "remote": [ + { + "type": "REMOTE_USER" + } + ] + }, + { + "local": [ + { + "groups": "{0}", + "domain": {"name": domain_name} + } + ], + "remote": [ + { + "type": "REMOTE_USER_GROUPS", + "blacklist": ["noblacklist"] + } + ] + } + ] + } + self.federation_api.update_mapping(self.mapping['id'], rules) + + r = self._issue_unscoped_token(assertion='UNMATCHED_GROUP_ASSERTION') + assigned_group_ids = r.json['token']['user']['OS-FEDERATION']['groups'] + self.assertEqual(1, len(assigned_group_ids)) + self.assertEqual(group['id'], assigned_group_ids[0]['id']) + + def test_assertion_prefix_parameter(self): + """Test parameters filtering based on the prefix. + + With ``assertion_prefix`` set to fixed, non default value, + issue an unscoped token from assertion EMPLOYEE_ASSERTION_PREFIXED. + Expect server to return unscoped token. + + """ + self.config_fixture.config(group='federation', + assertion_prefix=self.ASSERTION_PREFIX) + r = self._issue_unscoped_token(assertion='EMPLOYEE_ASSERTION_PREFIXED') + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + + def test_assertion_prefix_parameter_expect_fail(self): + """Test parameters filtering based on the prefix. + + With ``assertion_prefix`` default value set to empty string + issue an unscoped token from assertion EMPLOYEE_ASSERTION. + Next, configure ``assertion_prefix`` to value ``UserName``. + Try issuing unscoped token with EMPLOYEE_ASSERTION. + Expect server to raise exception.Unathorized exception. + + """ + r = self._issue_unscoped_token() + self.assertIsNotNone(r.headers.get('X-Subject-Token')) + self.config_fixture.config(group='federation', + assertion_prefix='UserName') + + self.assertRaises(exception.Unauthorized, + self._issue_unscoped_token) + + def test_v2_auth_with_federation_token_fails(self): + """Test that using a federation token with v2 auth fails. + + If an admin sets up a federated Keystone environment, and a user + incorrectly configures a service (like Nova) to only use v2 auth, the + returned message should be informative. + + """ + r = self._issue_unscoped_token() + token_id = r.headers.get('X-Subject-Token') + self.assertRaises(exception.Unauthorized, + self.token_provider_api.validate_v2_token, + token_id=token_id) + + def test_unscoped_token_has_user_domain(self): + r = self._issue_unscoped_token() + self._check_domains_are_valid(r.json_body['token']) + + def test_scoped_token_has_user_domain(self): + r = self.v3_authenticate_token( + self.TOKEN_SCOPE_PROJECT_EMPLOYEE_FROM_EMPLOYEE) + self._check_domains_are_valid(r.result['token']) + + def test_issue_unscoped_token_for_local_user(self): + r = self._issue_unscoped_token(assertion='LOCAL_USER_ASSERTION') + token_resp = r.json_body['token'] + self.assertListEqual(['saml2'], token_resp['methods']) + self.assertEqual(self.user['id'], token_resp['user']['id']) + self.assertEqual(self.user['name'], token_resp['user']['name']) + self.assertEqual(self.domain['id'], token_resp['user']['domain']['id']) + # Make sure the token is not scoped + self.assertNotIn('project', token_resp) + self.assertNotIn('domain', token_resp) + + def test_issue_token_for_local_user_user_not_found(self): + self.assertRaises(exception.Unauthorized, + self._issue_unscoped_token, + assertion='ANOTHER_LOCAL_USER_ASSERTION') + + +class FernetFederatedTokenTests(FederationTests, FederatedSetupMixin): + AUTH_METHOD = 'token' + + def load_fixtures(self, fixtures): + super(FernetFederatedTokenTests, self).load_fixtures(fixtures) + self.load_federation_sample_data() + + def auth_plugin_config_override(self): + methods = ['saml2', 'token', 'password'] + method_classes = dict( + password='keystone.auth.plugins.password.Password', + token='keystone.auth.plugins.token.Token', + saml2='keystone.auth.plugins.saml2.Saml2') + super(FernetFederatedTokenTests, + self).auth_plugin_config_override(methods, **method_classes) + self.config_fixture.config( + group='token', + provider='keystone.token.providers.fernet.Provider') + self.useFixture(ksfixtures.KeyRepository(self.config_fixture)) + + def test_federated_unscoped_token(self): + resp = self._issue_unscoped_token() + self.assertEqual(186, len(resp.headers['X-Subject-Token'])) + + def test_federated_unscoped_token_with_multiple_groups(self): + assertion = 'ANOTHER_CUSTOMER_ASSERTION' + resp = self._issue_unscoped_token(assertion=assertion) + self.assertEqual(204, len(resp.headers['X-Subject-Token'])) + + def test_validate_federated_unscoped_token(self): + resp = self._issue_unscoped_token() + unscoped_token = resp.headers.get('X-Subject-Token') + # assert that the token we received is valid + self.get('/auth/tokens/', headers={'X-Subject-Token': unscoped_token}) + + def test_fernet_full_workflow(self): + """Test 'standard' workflow for granting Fernet access tokens. + + * Issue unscoped token + * List available projects based on groups + * Scope token to one of available projects + + """ + resp = self._issue_unscoped_token() + unscoped_token = resp.headers.get('X-Subject-Token') + resp = self.get('/OS-FEDERATION/projects', + token=unscoped_token) + projects = resp.result['projects'] + random_project = random.randint(0, len(projects)) - 1 + project = projects[random_project] + + v3_scope_request = self._scope_request(unscoped_token, + 'project', project['id']) + + resp = self.v3_authenticate_token(v3_scope_request) + token_resp = resp.result['token'] + project_id = token_resp['project']['id'] + self.assertEqual(project['id'], project_id) + self._check_scoped_token_attributes(token_resp) + + +class FederatedTokenTestsMethodToken(FederatedTokenTests): + """Test federation operation with unified scoping auth method. + + Test all the operations with auth method set to ``token`` as a new, unified + way for scoping all the tokens. + + """ + AUTH_METHOD = 'token' + + def auth_plugin_config_override(self): + methods = ['saml2', 'token'] + method_classes = dict( + token='keystone.auth.plugins.token.Token', + saml2='keystone.auth.plugins.saml2.Saml2') + super(FederatedTokenTests, + self).auth_plugin_config_override(methods, **method_classes) + + +class JsonHomeTests(FederationTests, test_v3.JsonHomeTestMixin): + JSON_HOME_DATA = { + 'http://docs.openstack.org/api/openstack-identity/3/ext/OS-FEDERATION/' + '1.0/rel/identity_provider': { + 'href-template': '/OS-FEDERATION/identity_providers/{idp_id}', + 'href-vars': { + 'idp_id': 'http://docs.openstack.org/api/openstack-identity/3/' + 'ext/OS-FEDERATION/1.0/param/idp_id' + }, + }, + } + + +def _is_xmlsec1_installed(): + p = subprocess.Popen( + ['which', 'xmlsec1'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + + # invert the return code + return not bool(p.wait()) + + +def _load_xml(filename): + with open(os.path.join(XMLDIR, filename), 'r') as xml: + return xml.read() + + +class SAMLGenerationTests(FederationTests): + + SP_AUTH_URL = ('http://beta.com:5000/v3/OS-FEDERATION/identity_providers' + '/BETA/protocols/saml2/auth') + ISSUER = 'https://acme.com/FIM/sps/openstack/saml20' + RECIPIENT = 'http://beta.com/Shibboleth.sso/SAML2/POST' + SUBJECT = 'test_user' + ROLES = ['admin', 'member'] + PROJECT = 'development' + SAML_GENERATION_ROUTE = '/auth/OS-FEDERATION/saml2' + ASSERTION_VERSION = "2.0" + SERVICE_PROVDIER_ID = 'ACME' + + def sp_ref(self): + ref = { + 'auth_url': self.SP_AUTH_URL, + 'enabled': True, + 'description': uuid.uuid4().hex, + 'sp_url': self.RECIPIENT, + + } + return ref + + def setUp(self): + super(SAMLGenerationTests, self).setUp() + self.signed_assertion = saml2.create_class_from_xml_string( + saml.Assertion, _load_xml('signed_saml2_assertion.xml')) + self.sp = self.sp_ref() + self.federation_api.create_sp(self.SERVICE_PROVDIER_ID, self.sp) + + def test_samlize_token_values(self): + """Test the SAML generator produces a SAML object. + + Test the SAML generator directly by passing known arguments, the result + should be a SAML object that consistently includes attributes based on + the known arguments that were passed in. + + """ + with mock.patch.object(keystone_idp, '_sign_assertion', + return_value=self.signed_assertion): + generator = keystone_idp.SAMLGenerator() + response = generator.samlize_token(self.ISSUER, self.RECIPIENT, + self.SUBJECT, self.ROLES, + self.PROJECT) + + assertion = response.assertion + self.assertIsNotNone(assertion) + self.assertIsInstance(assertion, saml.Assertion) + issuer = response.issuer + self.assertEqual(self.RECIPIENT, response.destination) + self.assertEqual(self.ISSUER, issuer.text) + + user_attribute = assertion.attribute_statement[0].attribute[0] + self.assertEqual(self.SUBJECT, user_attribute.attribute_value[0].text) + + role_attribute = assertion.attribute_statement[0].attribute[1] + for attribute_value in role_attribute.attribute_value: + self.assertIn(attribute_value.text, self.ROLES) + + project_attribute = assertion.attribute_statement[0].attribute[2] + self.assertEqual(self.PROJECT, + project_attribute.attribute_value[0].text) + + def test_verify_assertion_object(self): + """Test that the Assertion object is built properly. + + The Assertion doesn't need to be signed in this test, so + _sign_assertion method is patched and doesn't alter the assertion. + + """ + with mock.patch.object(keystone_idp, '_sign_assertion', + side_effect=lambda x: x): + generator = keystone_idp.SAMLGenerator() + response = generator.samlize_token(self.ISSUER, self.RECIPIENT, + self.SUBJECT, self.ROLES, + self.PROJECT) + assertion = response.assertion + self.assertEqual(self.ASSERTION_VERSION, assertion.version) + + def test_valid_saml_xml(self): + """Test the generated SAML object can become valid XML. + + Test the generator directly by passing known arguments, the result + should be a SAML object that consistently includes attributes based on + the known arguments that were passed in. + + """ + with mock.patch.object(keystone_idp, '_sign_assertion', + return_value=self.signed_assertion): + generator = keystone_idp.SAMLGenerator() + response = generator.samlize_token(self.ISSUER, self.RECIPIENT, + self.SUBJECT, self.ROLES, + self.PROJECT) + + saml_str = response.to_string() + response = etree.fromstring(saml_str) + issuer = response[0] + assertion = response[2] + + self.assertEqual(self.RECIPIENT, response.get('Destination')) + self.assertEqual(self.ISSUER, issuer.text) + + user_attribute = assertion[4][0] + self.assertEqual(self.SUBJECT, user_attribute[0].text) + + role_attribute = assertion[4][1] + for attribute_value in role_attribute: + self.assertIn(attribute_value.text, self.ROLES) + + project_attribute = assertion[4][2] + self.assertEqual(self.PROJECT, project_attribute[0].text) + + def test_assertion_using_explicit_namespace_prefixes(self): + def mocked_subprocess_check_output(*popenargs, **kwargs): + # the last option is the assertion file to be signed + filename = popenargs[0][-1] + with open(filename, 'r') as f: + assertion_content = f.read() + # since we are not testing the signature itself, we can return + # the assertion as is without signing it + return assertion_content + + with mock.patch('subprocess.check_output', + side_effect=mocked_subprocess_check_output): + generator = keystone_idp.SAMLGenerator() + response = generator.samlize_token(self.ISSUER, self.RECIPIENT, + self.SUBJECT, self.ROLES, + self.PROJECT) + assertion_xml = response.assertion.to_string() + # make sure we have the proper tag and prefix for the assertion + # namespace + self.assertIn('