From 92d11d139e9f76d4fd76859aea78643fc32ef36b Mon Sep 17 00:00:00 2001 From: asteroide Date: Thu, 24 Sep 2015 16:27:16 +0200 Subject: Update Keystone code from repository. Change-Id: Ib3d0a06b10902fcc6d520f58e85aa617bc326d00 --- .../keystone/tests/unit/test_middleware.py | 647 ++++++++++++++++++++- 1 file changed, 639 insertions(+), 8 deletions(-) (limited to 'keystone-moon/keystone/tests/unit/test_middleware.py') diff --git a/keystone-moon/keystone/tests/unit/test_middleware.py b/keystone-moon/keystone/tests/unit/test_middleware.py index 3a26dd24..0eedb9c6 100644 --- a/keystone-moon/keystone/tests/unit/test_middleware.py +++ b/keystone-moon/keystone/tests/unit/test_middleware.py @@ -12,11 +12,21 @@ # License for the specific language governing permissions and limitations # under the License. +import hashlib +import uuid + from oslo_config import cfg +from six.moves import http_client import webob +from keystone.common import authorization +from keystone.common import tokenless_auth +from keystone.contrib.federation import constants as federation_constants +from keystone import exception from keystone import middleware -from keystone.tests import unit as tests +from keystone.tests import unit +from keystone.tests.unit import mapping_fixtures +from keystone.tests.unit import test_backend_sql CONF = cfg.CONF @@ -40,7 +50,7 @@ def make_response(**kwargs): return webob.Response(body) -class TokenAuthMiddlewareTest(tests.TestCase): +class TokenAuthMiddlewareTest(unit.TestCase): def test_request(self): req = make_request() req.headers[middleware.AUTH_TOKEN_HEADER] = 'MAGIC' @@ -49,7 +59,7 @@ class TokenAuthMiddlewareTest(tests.TestCase): self.assertEqual('MAGIC', context['token_id']) -class AdminTokenAuthMiddlewareTest(tests.TestCase): +class AdminTokenAuthMiddlewareTest(unit.TestCase): def test_request_admin(self): req = make_request() req.headers[middleware.AUTH_TOKEN_HEADER] = CONF.admin_token @@ -65,7 +75,7 @@ class AdminTokenAuthMiddlewareTest(tests.TestCase): self.assertFalse(context['is_admin']) -class PostParamsMiddlewareTest(tests.TestCase): +class PostParamsMiddlewareTest(unit.TestCase): def test_request_with_params(self): req = make_request(body="arg1=one", method='POST') middleware.PostParamsMiddleware(None).process_request(req) @@ -73,7 +83,7 @@ class PostParamsMiddlewareTest(tests.TestCase): self.assertEqual({"arg1": "one"}, params) -class JsonBodyMiddlewareTest(tests.TestCase): +class JsonBodyMiddlewareTest(unit.TestCase): def test_request_with_params(self): req = make_request(body='{"arg1": "one", "arg2": ["a"]}', content_type='application/json', @@ -87,14 +97,14 @@ class JsonBodyMiddlewareTest(tests.TestCase): content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) def test_not_dict_body(self): req = make_request(body='42', content_type='application/json', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) self.assertTrue('valid JSON object' in resp.json['error']['message']) def test_no_content_type(self): @@ -109,7 +119,7 @@ class JsonBodyMiddlewareTest(tests.TestCase): content_type='text/plain', method='POST') resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(400, resp.status_int) + self.assertEqual(http_client.BAD_REQUEST, resp.status_int) def test_unrecognized_content_type_without_body(self): req = make_request(content_type='text/plain', @@ -117,3 +127,624 @@ class JsonBodyMiddlewareTest(tests.TestCase): middleware.JsonBodyMiddleware(None).process_request(req) params = req.environ.get(middleware.PARAMS_ENV, {}) self.assertEqual({}, params) + + +class AuthContextMiddlewareTest(test_backend_sql.SqlTests): + + def setUp(self): + super(AuthContextMiddlewareTest, self).setUp() + self.client_issuer = uuid.uuid4().hex + self.untrusted_client_issuer = uuid.uuid4().hex + self.trusted_issuer = self.client_issuer + self.config_fixture.config(group='tokenless_auth', + trusted_issuer=[self.trusted_issuer]) + + # This idp_id is calculated based on + # sha256(self.client_issuer) + hashed_idp = hashlib.sha256(self.client_issuer) + self.idp_id = hashed_idp.hexdigest() + self._load_sample_data() + + def _load_sample_data(self): + self.domain_id = uuid.uuid4().hex + self.domain_name = uuid.uuid4().hex + self.project_id = uuid.uuid4().hex + self.project_name = uuid.uuid4().hex + self.user_name = uuid.uuid4().hex + self.user_password = uuid.uuid4().hex + self.user_email = uuid.uuid4().hex + self.protocol_id = 'x509' + self.role_id = uuid.uuid4().hex + self.role_name = uuid.uuid4().hex + # for ephemeral user + self.group_name = uuid.uuid4().hex + + # 1) Create a domain for the user. + self.domain = { + 'description': uuid.uuid4().hex, + 'enabled': True, + 'id': self.domain_id, + 'name': self.domain_name, + } + + self.resource_api.create_domain(self.domain_id, self.domain) + + # 2) Create a project for the user. + self.project = { + 'description': uuid.uuid4().hex, + 'domain_id': self.domain_id, + 'enabled': True, + 'id': self.project_id, + 'name': self.project_name, + } + + self.resource_api.create_project(self.project_id, self.project) + + # 3) Create a user in new domain. + self.user = { + 'name': self.user_name, + 'domain_id': self.domain_id, + 'project_id': self.project_id, + 'password': self.user_password, + 'email': self.user_email, + } + + self.user = self.identity_api.create_user(self.user) + + # Add IDP + self.idp = self._idp_ref(id=self.idp_id) + self.federation_api.create_idp(self.idp['id'], + self.idp) + + # Add a role + self.role = { + 'id': self.role_id, + 'name': self.role_name, + } + self.role_api.create_role(self.role_id, self.role) + + # Add a group + self.group = { + 'name': self.group_name, + 'domain_id': self.domain_id, + } + self.group = self.identity_api.create_group(self.group) + + # Assign a role to the user on a project + self.assignment_api.add_role_to_user_and_project( + user_id=self.user['id'], + tenant_id=self.project_id, + role_id=self.role_id) + + # Assign a role to the group on a project + self.assignment_api.create_grant( + role_id=self.role_id, + group_id=self.group['id'], + project_id=self.project_id) + + def _load_mapping_rules(self, rules): + # Add a mapping + self.mapping = self._mapping_ref(rules=rules) + self.federation_api.create_mapping(self.mapping['id'], + self.mapping) + # Add protocols + self.proto_x509 = self._proto_ref(mapping_id=self.mapping['id']) + self.proto_x509['id'] = self.protocol_id + self.federation_api.create_protocol(self.idp['id'], + self.proto_x509['id'], + self.proto_x509) + + def _idp_ref(self, id=None): + idp = { + 'id': id or uuid.uuid4().hex, + 'enabled': True, + 'description': uuid.uuid4().hex + } + return idp + + def _proto_ref(self, mapping_id=None): + proto = { + 'id': uuid.uuid4().hex, + 'mapping_id': mapping_id or uuid.uuid4().hex + } + return proto + + def _mapping_ref(self, rules=None): + if rules is None: + mapped_rules = {} + else: + mapped_rules = rules.get('rules', {}) + return { + 'id': uuid.uuid4().hex, + 'rules': mapped_rules + } + + def _assert_tokenless_auth_context(self, context, ephemeral_user=False): + self.assertIsNotNone(context) + self.assertEqual(self.project_id, context['project_id']) + self.assertIn(self.role_name, context['roles']) + if ephemeral_user: + self.assertEqual(self.group['id'], context['group_ids'][0]) + self.assertEqual('ephemeral', + context[federation_constants.PROTOCOL]) + self.assertEqual(self.idp_id, + context[federation_constants.IDENTITY_PROVIDER]) + else: + self.assertEqual(self.user['id'], context['user_id']) + + def _create_context(self, request, mapping_ref=None, + exception_expected=False): + """Builds the auth context from the given arguments. + + auth context will be returned from the AuthContextMiddleware based on + what is being passed in the given request and what mapping is being + setup in the backend DB. + + :param request: HTTP request + :param mapping_ref: A mapping in JSON structure will be setup in the + backend DB for mapping an user or a group. + :param exception_expected: Sets to True when an exception is expected + to raised based on the given arguments. + :returns: context an auth context contains user and role information + :rtype: dict + """ + if mapping_ref: + self._load_mapping_rules(mapping_ref) + + if not exception_expected: + (middleware.AuthContextMiddleware('Tokenless_auth_test'). + process_request(request)) + context = request.environ.get(authorization.AUTH_CONTEXT_ENV) + else: + context = middleware.AuthContextMiddleware('Tokenless_auth_test') + return context + + def test_context_already_exists(self): + req = make_request() + token_id = uuid.uuid4().hex + req.environ[authorization.AUTH_CONTEXT_ENV] = {'token_id': token_id} + context = self._create_context(request=req) + self.assertEqual(token_id, context['token_id']) + + def test_not_applicable_to_token_request(self): + env = {} + env['PATH_INFO'] = '/auth/tokens' + env['REQUEST_METHOD'] = 'POST' + req = make_request(environ=env) + context = self._create_context(request=req) + self.assertIsNone(context) + + def test_no_tokenless_attributes_request(self): + req = make_request() + context = self._create_context(request=req) + self.assertIsNone(context) + + def test_no_issuer_attribute_request(self): + env = {} + env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex + req = make_request(environ=env) + context = self._create_context(request=req) + self.assertIsNone(context) + + def test_has_only_issuer_and_project_name_request(self): + env = {} + # SSL_CLIENT_I_DN is the attribute name that wsgi env + # references to issuer of the client certificate. + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex + req = make_request(environ=env) + context = self._create_context(request=req, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_has_only_issuer_and_project_domain_name_request(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex + req = make_request(environ=env) + context = self._create_context(request=req, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_has_only_issuer_and_project_domain_id_request(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_DOMAIN_ID'] = uuid.uuid4().hex + req = make_request(environ=env) + context = self._create_context(request=req, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_missing_both_domain_and_project_request(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + req = make_request(environ=env) + context = self._create_context(request=req, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_empty_trusted_issuer_list(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + trusted_issuer=[]) + context = self._create_context(request=req) + self.assertIsNone(context) + + def test_client_issuer_not_trusted(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.untrusted_client_issuer + env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex + req = make_request(environ=env) + context = self._create_context(request=req) + self.assertIsNone(context) + + def test_proj_scope_with_proj_id_and_proj_dom_id_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + # SSL_CLIENT_USER_NAME and SSL_CLIENT_DOMAIN_NAME are the types + # defined in the mapping that will map to the user name and + # domain name + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + self._assert_tokenless_auth_context(context) + + def test_proj_scope_with_proj_id_only_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + self._assert_tokenless_auth_context(context) + + def test_proj_scope_with_proj_name_and_proj_dom_id_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + self._assert_tokenless_auth_context(context) + + def test_proj_scope_with_proj_name_and_proj_dom_name_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + self._assert_tokenless_auth_context(context) + + def test_proj_scope_with_proj_name_only_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_id + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_mapping_with_userid_and_domainid_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_ID'] = self.user['id'] + env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID) + self._assert_tokenless_auth_context(context) + + def test_mapping_with_userid_and_domainname_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_ID'] = self.user['id'] + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME) + self._assert_tokenless_auth_context(context) + + def test_mapping_with_username_and_domainid_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) + self._assert_tokenless_auth_context(context) + + def test_only_domain_name_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_only_domain_id_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_missing_domain_data_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_ONLY, + exception_expected=True) + self.assertRaises(exception.ValidationError, + context.process_request, + req) + + def test_userid_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_USER_ID'] = self.user['id'] + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_ONLY) + self._assert_tokenless_auth_context(context) + + def test_domain_disable_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id + req = make_request(environ=env) + self.domain['enabled'] = False + self.domain = self.resource_api.update_domain( + self.domain['id'], self.domain) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, + exception_expected=True) + self.assertRaises(exception.Unauthorized, + context.process_request, + req) + + def test_user_disable_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id + req = make_request(environ=env) + self.user['enabled'] = False + self.user = self.identity_api.update_user(self.user['id'], self.user) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, + exception_expected=True) + self.assertRaises(AssertionError, + context.process_request, + req) + + def test_invalid_user_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_ID'] = self.project_id + env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id + env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex + env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name + req = make_request(environ=env) + context = self._create_context( + request=req, + mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, + exception_expected=True) + self.assertRaises(exception.UserNotFound, + context.process_request, + req) + + def test_ephemeral_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + protocol='ephemeral') + self.protocol_id = 'ephemeral' + mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] + context = self._create_context( + request=req, + mapping_ref=mapping) + self._assert_tokenless_auth_context(context, ephemeral_user=True) + + def test_ephemeral_with_default_user_type_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + protocol='ephemeral') + self.protocol_id = 'ephemeral' + # this mapping does not have the user type defined + # and it should defaults to 'ephemeral' which is + # the expected type for the test case. + mapping = mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] + context = self._create_context( + request=req, + mapping_ref=mapping) + self._assert_tokenless_auth_context(context, ephemeral_user=True) + + def test_ephemeral_any_user_success(self): + """Ephemeral user does not need a specified user + Keystone is not looking to match the user, but a corresponding group. + """ + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + protocol='ephemeral') + self.protocol_id = 'ephemeral' + mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] + context = self._create_context( + request=req, + mapping_ref=mapping) + self._assert_tokenless_auth_context(context, ephemeral_user=True) + + def test_ephemeral_invalid_scope_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex + env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + protocol='ephemeral') + self.protocol_id = 'ephemeral' + mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] + context = self._create_context( + request=req, + mapping_ref=mapping, + exception_expected=True) + self.assertRaises(exception.Unauthorized, + context.process_request, + req) + + def test_ephemeral_no_group_found_fail(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', + protocol='ephemeral') + self.protocol_id = 'ephemeral' + mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex + context = self._create_context( + request=req, + mapping_ref=mapping, + exception_expected=True) + self.assertRaises(exception.MappedGroupNotFound, + context.process_request, + req) + + def test_ephemeral_incorrect_mapping_fail(self): + """Ephemeral user picks up the non-ephemeral user mapping. + Looking up the mapping with protocol Id 'x509' will load up + the non-ephemeral user mapping, results unauthenticated. + """ + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + env['HTTP_X_PROJECT_NAME'] = self.project_name + env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name + env['SSL_CLIENT_USER_NAME'] = self.user_name + req = make_request(environ=env) + # This will pick up the incorrect mapping + self.config_fixture.config(group='tokenless_auth', + protocol='x509') + self.protocol_id = 'x509' + mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex + context = self._create_context( + request=req, + mapping_ref=mapping, + exception_expected=True) + self.assertRaises(exception.MappedGroupNotFound, + context.process_request, + req) + + def test_create_idp_id_success(self): + env = {} + env['SSL_CLIENT_I_DN'] = self.client_issuer + auth = tokenless_auth.TokenlessAuthHelper(env) + idp_id = auth._build_idp_id() + self.assertEqual(self.idp_id, idp_id) + + def test_create_idp_id_attri_not_found_fail(self): + env = {} + env[uuid.uuid4().hex] = self.client_issuer + auth = tokenless_auth.TokenlessAuthHelper(env) + expected_msg = ('Could not determine Identity Provider ID. The ' + 'configuration option %s was not found in the ' + 'request environment.' % + CONF.tokenless_auth.issuer_attribute) + # Check the content of the exception message as well + self.assertRaisesRegexp(exception.TokenlessAuthConfigError, + expected_msg, + auth._build_idp_id) -- cgit 1.2.3-korg