diff options
author | Ruan HE <ruan.he@orange.com> | 2016-06-09 08:12:34 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@172.30.200.206> | 2016-06-09 08:12:34 +0000 |
commit | 4bc079a2664f9a407e332291f34d174625a9d5ea (patch) | |
tree | 7481cd5d0a9b3ce37c44c797a1e0d39881221cbe /keystone-moon/keystone/tests/unit/test_middleware.py | |
parent | 2f179c5790fbbf6144205d3c6e5089e6eb5f048a (diff) | |
parent | 2e7b4f2027a1147ca28301e4f88adf8274b39a1f (diff) |
Merge "Update Keystone core to Mitaka."
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_middleware.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_middleware.py | 620 |
1 files changed, 317 insertions, 303 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_middleware.py b/keystone-moon/keystone/tests/unit/test_middleware.py index 0eedb9c6..d33e8c00 100644 --- a/keystone-moon/keystone/tests/unit/test_middleware.py +++ b/keystone-moon/keystone/tests/unit/test_middleware.py @@ -12,17 +12,18 @@ # License for the specific language governing permissions and limitations # under the License. +import copy import hashlib import uuid from oslo_config import cfg from six.moves import http_client -import webob +import webtest from keystone.common import authorization from keystone.common import tokenless_auth -from keystone.contrib.federation import constants as federation_constants from keystone import exception +from keystone.federation import constants as federation_constants from keystone import middleware from keystone.tests import unit from keystone.tests.unit import mapping_fixtures @@ -32,104 +33,158 @@ from keystone.tests.unit import test_backend_sql CONF = cfg.CONF -def make_request(**kwargs): - accept = kwargs.pop('accept', None) - method = kwargs.pop('method', 'GET') - body = kwargs.pop('body', None) - req = webob.Request.blank('/', **kwargs) - req.method = method - if body is not None: - req.body = body - if accept is not None: - req.accept = accept - return req +class MiddlewareRequestTestBase(unit.TestCase): + MIDDLEWARE_CLASS = None # override this in subclasses -def make_response(**kwargs): - body = kwargs.pop('body', None) - return webob.Response(body) + def _application(self): + """A base wsgi application that returns a simple response.""" + def app(environ, start_response): + # WSGI requires the body of the response to be six.binary_type + body = uuid.uuid4().hex.encode('utf-8') + resp_headers = [('Content-Type', 'text/html; charset=utf8'), + ('Content-Length', str(len(body)))] + start_response('200 OK', resp_headers) + return [body] + return app + + def _generate_app_response(self, app, headers=None, method='get', + path='/', **kwargs): + """Given a wsgi application wrap it in webtest and call it.""" + return getattr(webtest.TestApp(app), method)(path, + headers=headers or {}, + **kwargs) + + def _middleware_failure(self, exc, *args, **kwargs): + """Assert that an exception is being thrown from process_request.""" + # NOTE(jamielennox): This is a little ugly. We need to call the webtest + # framework so that the correct RequestClass object is created for when + # we call process_request. However because we go via webtest we only + # see the response object and not the actual exception that is thrown + # by process_request. To get around this we subclass process_request + # with something that checks for the right type of exception being + # thrown so we can test the middle of the request process. + # TODO(jamielennox): Change these tests to test the value of the + # response rather than the error that is raised. + + class _Failing(self.MIDDLEWARE_CLASS): + + _called = False + + def process_request(i_self, *i_args, **i_kwargs): + # i_ to distinguish it from and not clobber the outer vars + e = self.assertRaises(exc, + super(_Failing, i_self).process_request, + *i_args, **i_kwargs) + i_self._called = True + raise e + + # by default the returned status when an uncaught exception is raised + # for validation or caught errors this will likely be 400 + kwargs.setdefault('status', http_client.INTERNAL_SERVER_ERROR) # 500 + + app = _Failing(self._application()) + resp = self._generate_app_response(app, *args, **kwargs) + self.assertTrue(app._called) + return resp + + def _do_middleware_response(self, *args, **kwargs): + """Wrap a middleware around a sample application and call it.""" + app = self.MIDDLEWARE_CLASS(self._application()) + return self._generate_app_response(app, *args, **kwargs) + + def _do_middleware_request(self, *args, **kwargs): + """The request object from a successful middleware call.""" + return self._do_middleware_response(*args, **kwargs).request + + +class TokenAuthMiddlewareTest(MiddlewareRequestTestBase): + + MIDDLEWARE_CLASS = middleware.TokenAuthMiddleware -class TokenAuthMiddlewareTest(unit.TestCase): def test_request(self): - req = make_request() - req.headers[middleware.AUTH_TOKEN_HEADER] = 'MAGIC' - middleware.TokenAuthMiddleware(None).process_request(req) + headers = {middleware.AUTH_TOKEN_HEADER: 'MAGIC'} + req = self._do_middleware_request(headers=headers) context = req.environ[middleware.CONTEXT_ENV] self.assertEqual('MAGIC', context['token_id']) -class AdminTokenAuthMiddlewareTest(unit.TestCase): +class AdminTokenAuthMiddlewareTest(MiddlewareRequestTestBase): + + MIDDLEWARE_CLASS = middleware.AdminTokenAuthMiddleware + + def config_overrides(self): + super(AdminTokenAuthMiddlewareTest, self).config_overrides() + self.config_fixture.config( + admin_token='ADMIN') + def test_request_admin(self): - req = make_request() - req.headers[middleware.AUTH_TOKEN_HEADER] = CONF.admin_token - middleware.AdminTokenAuthMiddleware(None).process_request(req) - context = req.environ[middleware.CONTEXT_ENV] - self.assertTrue(context['is_admin']) + headers = {middleware.AUTH_TOKEN_HEADER: 'ADMIN'} + req = self._do_middleware_request(headers=headers) + self.assertTrue(req.environ[middleware.CONTEXT_ENV]['is_admin']) def test_request_non_admin(self): - req = make_request() - req.headers[middleware.AUTH_TOKEN_HEADER] = 'NOT-ADMIN' - middleware.AdminTokenAuthMiddleware(None).process_request(req) - context = req.environ[middleware.CONTEXT_ENV] - self.assertFalse(context['is_admin']) + headers = {middleware.AUTH_TOKEN_HEADER: 'NOT-ADMIN'} + req = self._do_middleware_request(headers=headers) + self.assertFalse(req.environ[middleware.CONTEXT_ENV]['is_admin']) -class PostParamsMiddlewareTest(unit.TestCase): - def test_request_with_params(self): - req = make_request(body="arg1=one", method='POST') - middleware.PostParamsMiddleware(None).process_request(req) - params = req.environ[middleware.PARAMS_ENV] - self.assertEqual({"arg1": "one"}, params) +class JsonBodyMiddlewareTest(MiddlewareRequestTestBase): + MIDDLEWARE_CLASS = middleware.JsonBodyMiddleware -class JsonBodyMiddlewareTest(unit.TestCase): def test_request_with_params(self): - req = make_request(body='{"arg1": "one", "arg2": ["a"]}', - content_type='application/json', - method='POST') - middleware.JsonBodyMiddleware(None).process_request(req) - params = req.environ[middleware.PARAMS_ENV] - self.assertEqual({"arg1": "one", "arg2": ["a"]}, params) + headers = {'Content-Type': 'application/json'} + params = '{"arg1": "one", "arg2": ["a"]}' + req = self._do_middleware_request(params=params, + headers=headers, + method='post') + self.assertEqual({"arg1": "one", "arg2": ["a"]}, + req.environ[middleware.PARAMS_ENV]) def test_malformed_json(self): - req = make_request(body='{"arg1": "on', - content_type='application/json', - method='POST') - resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(http_client.BAD_REQUEST, resp.status_int) + headers = {'Content-Type': 'application/json'} + self._do_middleware_response(params='{"arg1": "on', + headers=headers, + method='post', + status=http_client.BAD_REQUEST) def test_not_dict_body(self): - req = make_request(body='42', - content_type='application/json', - method='POST') - resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(http_client.BAD_REQUEST, resp.status_int) - self.assertTrue('valid JSON object' in resp.json['error']['message']) + headers = {'Content-Type': 'application/json'} + resp = self._do_middleware_response(params='42', + headers=headers, + method='post', + status=http_client.BAD_REQUEST) + + self.assertIn('valid JSON object', resp.json['error']['message']) def test_no_content_type(self): - req = make_request(body='{"arg1": "one", "arg2": ["a"]}', - method='POST') - middleware.JsonBodyMiddleware(None).process_request(req) - params = req.environ[middleware.PARAMS_ENV] - self.assertEqual({"arg1": "one", "arg2": ["a"]}, params) + headers = {'Content-Type': ''} + params = '{"arg1": "one", "arg2": ["a"]}' + req = self._do_middleware_request(params=params, + headers=headers, + method='post') + self.assertEqual({"arg1": "one", "arg2": ["a"]}, + req.environ[middleware.PARAMS_ENV]) def test_unrecognized_content_type(self): - req = make_request(body='{"arg1": "one", "arg2": ["a"]}', - content_type='text/plain', - method='POST') - resp = middleware.JsonBodyMiddleware(None).process_request(req) - self.assertEqual(http_client.BAD_REQUEST, resp.status_int) + headers = {'Content-Type': 'text/plain'} + self._do_middleware_response(params='{"arg1": "one", "arg2": ["a"]}', + headers=headers, + method='post', + status=http_client.BAD_REQUEST) def test_unrecognized_content_type_without_body(self): - req = make_request(content_type='text/plain', - method='GET') - middleware.JsonBodyMiddleware(None).process_request(req) - params = req.environ.get(middleware.PARAMS_ENV, {}) - self.assertEqual({}, params) + headers = {'Content-Type': 'text/plain'} + req = self._do_middleware_request(headers=headers) + self.assertEqual({}, req.environ.get(middleware.PARAMS_ENV, {})) + +class AuthContextMiddlewareTest(test_backend_sql.SqlTests, + MiddlewareRequestTestBase): -class AuthContextMiddlewareTest(test_backend_sql.SqlTests): + MIDDLEWARE_CLASS = middleware.AuthContextMiddleware def setUp(self): super(AuthContextMiddlewareTest, self).setUp() @@ -139,55 +194,32 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): self.config_fixture.config(group='tokenless_auth', trusted_issuer=[self.trusted_issuer]) - # This idp_id is calculated based on - # sha256(self.client_issuer) - hashed_idp = hashlib.sha256(self.client_issuer) + # client_issuer is encoded because you can't hash + # unicode objects with hashlib. + # This idp_id is calculated based on sha256(self.client_issuer) + hashed_idp = hashlib.sha256(self.client_issuer.encode('utf-8')) self.idp_id = hashed_idp.hexdigest() self._load_sample_data() def _load_sample_data(self): - self.domain_id = uuid.uuid4().hex - self.domain_name = uuid.uuid4().hex - self.project_id = uuid.uuid4().hex - self.project_name = uuid.uuid4().hex - self.user_name = uuid.uuid4().hex - self.user_password = uuid.uuid4().hex - self.user_email = uuid.uuid4().hex self.protocol_id = 'x509' - self.role_id = uuid.uuid4().hex - self.role_name = uuid.uuid4().hex - # for ephemeral user - self.group_name = uuid.uuid4().hex # 1) Create a domain for the user. - self.domain = { - 'description': uuid.uuid4().hex, - 'enabled': True, - 'id': self.domain_id, - 'name': self.domain_name, - } - + self.domain = unit.new_domain_ref() + self.domain_id = self.domain['id'] + self.domain_name = self.domain['name'] self.resource_api.create_domain(self.domain_id, self.domain) # 2) Create a project for the user. - self.project = { - 'description': uuid.uuid4().hex, - 'domain_id': self.domain_id, - 'enabled': True, - 'id': self.project_id, - 'name': self.project_name, - } + self.project = unit.new_project_ref(domain_id=self.domain_id) + self.project_id = self.project['id'] + self.project_name = self.project['name'] self.resource_api.create_project(self.project_id, self.project) # 3) Create a user in new domain. - self.user = { - 'name': self.user_name, - 'domain_id': self.domain_id, - 'project_id': self.project_id, - 'password': self.user_password, - 'email': self.user_email, - } + self.user = unit.new_user_ref(domain_id=self.domain_id, + project_id=self.project_id) self.user = self.identity_api.create_user(self.user) @@ -197,17 +229,13 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): self.idp) # Add a role - self.role = { - 'id': self.role_id, - 'name': self.role_name, - } + self.role = unit.new_role_ref() + self.role_id = self.role['id'] + self.role_name = self.role['name'] self.role_api.create_role(self.role_id, self.role) # Add a group - self.group = { - 'name': self.group_name, - 'domain_id': self.domain_id, - } + self.group = unit.new_group_ref(domain_id=self.domain_id) self.group = self.identity_api.create_group(self.group) # Assign a role to the user on a project @@ -282,7 +310,7 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): :param request: HTTP request :param mapping_ref: A mapping in JSON structure will be setup in the - backend DB for mapping an user or a group. + backend DB for mapping a user or a group. :param exception_expected: Sets to True when an exception is expected to raised based on the given arguments. :returns: context an auth context contains user and role information @@ -300,30 +328,27 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): return context def test_context_already_exists(self): - req = make_request() - token_id = uuid.uuid4().hex - req.environ[authorization.AUTH_CONTEXT_ENV] = {'token_id': token_id} - context = self._create_context(request=req) - self.assertEqual(token_id, context['token_id']) + stub_value = uuid.uuid4().hex + env = {authorization.AUTH_CONTEXT_ENV: stub_value} + req = self._do_middleware_request(extra_environ=env) + self.assertEqual(stub_value, + req.environ.get(authorization.AUTH_CONTEXT_ENV)) def test_not_applicable_to_token_request(self): - env = {} - env['PATH_INFO'] = '/auth/tokens' - env['REQUEST_METHOD'] = 'POST' - req = make_request(environ=env) - context = self._create_context(request=req) + req = self._do_middleware_request(path='/auth/tokens', method='post') + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self.assertIsNone(context) def test_no_tokenless_attributes_request(self): - req = make_request() - context = self._create_context(request=req) + req = self._do_middleware_request() + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self.assertIsNone(context) def test_no_issuer_attribute_request(self): env = {} env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex - req = make_request(environ=env) - context = self._create_context(request=req) + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self.assertIsNone(context) def test_has_only_issuer_and_project_name_request(self): @@ -332,61 +357,51 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): # references to issuer of the client certificate. env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex - req = make_request(environ=env) - context = self._create_context(request=req, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_has_only_issuer_and_project_domain_name_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex - req = make_request(environ=env) - context = self._create_context(request=req, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_has_only_issuer_and_project_domain_id_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_DOMAIN_ID'] = uuid.uuid4().hex - req = make_request(environ=env) - context = self._create_context(request=req, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_missing_both_domain_and_project_request(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer - req = make_request(environ=env) - context = self._create_context(request=req, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_empty_trusted_issuer_list(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex - req = make_request(environ=env) + self.config_fixture.config(group='tokenless_auth', trusted_issuer=[]) - context = self._create_context(request=req) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self.assertIsNone(context) def test_client_issuer_not_trusted(self): env = {} env['SSL_CLIENT_I_DN'] = self.untrusted_client_issuer env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex - req = make_request(environ=env) - context = self._create_context(request=req) + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self.assertIsNone(context) def test_proj_scope_with_proj_id_and_proj_dom_id_success(self): @@ -397,24 +412,28 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): # SSL_CLIENT_USER_NAME and SSL_CLIENT_DOMAIN_NAME are the types # defined in the mapping that will map to the user name and # domain name - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_id_only_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_and_proj_dom_id_success(self): @@ -422,12 +441,14 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_and_proj_dom_name_success(self): @@ -435,28 +456,29 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_proj_scope_with_proj_name_only_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_id - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_mapping_with_userid_and_domainid_success(self): env = {} @@ -465,10 +487,12 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_ID'] = self.user['id'] env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_mapping_with_userid_and_domainname_success(self): @@ -478,10 +502,12 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_ID'] = self.user['id'] env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_mapping_with_username_and_domainid_success(self): @@ -489,12 +515,14 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_only_domain_name_fail(self): @@ -503,14 +531,13 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY) + + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_only_domain_id_fail(self): env = {} @@ -518,29 +545,27 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY) + + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_missing_domain_data_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_ONLY, - exception_expected=True) - self.assertRaises(exception.ValidationError, - context.process_request, - req) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_ONLY) + + self._middleware_failure(exception.ValidationError, + extra_environ=env, + status=400) def test_userid_success(self): env = {} @@ -548,10 +573,10 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_ID'] = self.project_id env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_ID'] = self.user['id'] - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_ONLY) + + self._load_mapping_rules(mapping_fixtures.MAPPING_WITH_USERID_ONLY) + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context) def test_domain_disable_fail(self): @@ -559,37 +584,35 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id - req = make_request(environ=env) + self.domain['enabled'] = False self.domain = self.resource_api.update_domain( self.domain['id'], self.domain) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, - exception_expected=True) - self.assertRaises(exception.Unauthorized, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) + self._middleware_failure(exception.Unauthorized, + extra_environ=env, + status=401) def test_user_disable_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name + env['SSL_CLIENT_USER_NAME'] = self.user['name'] env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id - req = make_request(environ=env) + self.user['enabled'] = False self.user = self.identity_api.update_user(self.user['id'], self.user) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID, - exception_expected=True) - self.assertRaises(AssertionError, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID) + + self._middleware_failure(AssertionError, + extra_environ=env) def test_invalid_user_fail(self): env = {} @@ -598,30 +621,29 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name - req = make_request(environ=env) - context = self._create_context( - request=req, - mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME, - exception_expected=True) - self.assertRaises(exception.UserNotFound, - context.process_request, - req) + + self._load_mapping_rules( + mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME) + + self._middleware_failure(exception.UserNotFound, + extra_environ=env, + status=404) def test_ephemeral_success(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' - mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] - context = self._create_context( - request=req, - mapping_ref=mapping) + self._load_mapping_rules(mapping) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_with_default_user_type_success(self): @@ -629,23 +651,25 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' # this mapping does not have the user type defined # and it should defaults to 'ephemeral' which is # the expected type for the test case. - mapping = mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER.copy() + mapping = copy.deepcopy( + mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] - context = self._create_context( - request=req, - mapping_ref=mapping) + self._load_mapping_rules(mapping) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_any_user_success(self): - """Ephemeral user does not need a specified user + """Verify ephemeral user does not need a specified user. + Keystone is not looking to match the user, but a corresponding group. """ env = {} @@ -653,15 +677,15 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex - req = make_request(environ=env) self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' - mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] - context = self._create_context( - request=req, - mapping_ref=mapping) + self._load_mapping_rules(mapping) + + req = self._do_middleware_request(extra_environ=env) + context = req.environ.get(authorization.AUTH_CONTEXT_ENV) self._assert_tokenless_auth_context(context, ephemeral_user=True) def test_ephemeral_invalid_scope_fail(self): @@ -669,43 +693,37 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' - mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = self.group['id'] - context = self._create_context( - request=req, - mapping_ref=mapping, - exception_expected=True) - self.assertRaises(exception.Unauthorized, - context.process_request, - req) + self._load_mapping_rules(mapping) + + self._middleware_failure(exception.Unauthorized, + extra_environ=env, + status=401) def test_ephemeral_no_group_found_fail(self): env = {} env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] self.config_fixture.config(group='tokenless_auth', protocol='ephemeral') self.protocol_id = 'ephemeral' - mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex - context = self._create_context( - request=req, - mapping_ref=mapping, - exception_expected=True) - self.assertRaises(exception.MappedGroupNotFound, - context.process_request, - req) + self._load_mapping_rules(mapping) + + self._middleware_failure(exception.MappedGroupNotFound, + extra_environ=env) def test_ephemeral_incorrect_mapping_fail(self): - """Ephemeral user picks up the non-ephemeral user mapping. + """Test ephemeral user picking up the non-ephemeral user mapping. + Looking up the mapping with protocol Id 'x509' will load up the non-ephemeral user mapping, results unauthenticated. """ @@ -713,21 +731,17 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests): env['SSL_CLIENT_I_DN'] = self.client_issuer env['HTTP_X_PROJECT_NAME'] = self.project_name env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name - env['SSL_CLIENT_USER_NAME'] = self.user_name - req = make_request(environ=env) + env['SSL_CLIENT_USER_NAME'] = self.user['name'] # This will pick up the incorrect mapping self.config_fixture.config(group='tokenless_auth', protocol='x509') self.protocol_id = 'x509' - mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy() + mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER) mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex - context = self._create_context( - request=req, - mapping_ref=mapping, - exception_expected=True) - self.assertRaises(exception.MappedGroupNotFound, - context.process_request, - req) + self._load_mapping_rules(mapping) + + self._middleware_failure(exception.MappedGroupNotFound, + extra_environ=env) def test_create_idp_id_success(self): env = {} |