aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_middleware.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_middleware.py620
1 files changed, 317 insertions, 303 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_middleware.py b/keystone-moon/keystone/tests/unit/test_middleware.py
index 0eedb9c6..d33e8c00 100644
--- a/keystone-moon/keystone/tests/unit/test_middleware.py
+++ b/keystone-moon/keystone/tests/unit/test_middleware.py
@@ -12,17 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import hashlib
import uuid
from oslo_config import cfg
from six.moves import http_client
-import webob
+import webtest
from keystone.common import authorization
from keystone.common import tokenless_auth
-from keystone.contrib.federation import constants as federation_constants
from keystone import exception
+from keystone.federation import constants as federation_constants
from keystone import middleware
from keystone.tests import unit
from keystone.tests.unit import mapping_fixtures
@@ -32,104 +33,158 @@ from keystone.tests.unit import test_backend_sql
CONF = cfg.CONF
-def make_request(**kwargs):
- accept = kwargs.pop('accept', None)
- method = kwargs.pop('method', 'GET')
- body = kwargs.pop('body', None)
- req = webob.Request.blank('/', **kwargs)
- req.method = method
- if body is not None:
- req.body = body
- if accept is not None:
- req.accept = accept
- return req
+class MiddlewareRequestTestBase(unit.TestCase):
+ MIDDLEWARE_CLASS = None # override this in subclasses
-def make_response(**kwargs):
- body = kwargs.pop('body', None)
- return webob.Response(body)
+ def _application(self):
+ """A base wsgi application that returns a simple response."""
+ def app(environ, start_response):
+ # WSGI requires the body of the response to be six.binary_type
+ body = uuid.uuid4().hex.encode('utf-8')
+ resp_headers = [('Content-Type', 'text/html; charset=utf8'),
+ ('Content-Length', str(len(body)))]
+ start_response('200 OK', resp_headers)
+ return [body]
+ return app
+
+ def _generate_app_response(self, app, headers=None, method='get',
+ path='/', **kwargs):
+ """Given a wsgi application wrap it in webtest and call it."""
+ return getattr(webtest.TestApp(app), method)(path,
+ headers=headers or {},
+ **kwargs)
+
+ def _middleware_failure(self, exc, *args, **kwargs):
+ """Assert that an exception is being thrown from process_request."""
+ # NOTE(jamielennox): This is a little ugly. We need to call the webtest
+ # framework so that the correct RequestClass object is created for when
+ # we call process_request. However because we go via webtest we only
+ # see the response object and not the actual exception that is thrown
+ # by process_request. To get around this we subclass process_request
+ # with something that checks for the right type of exception being
+ # thrown so we can test the middle of the request process.
+ # TODO(jamielennox): Change these tests to test the value of the
+ # response rather than the error that is raised.
+
+ class _Failing(self.MIDDLEWARE_CLASS):
+
+ _called = False
+
+ def process_request(i_self, *i_args, **i_kwargs):
+ # i_ to distinguish it from and not clobber the outer vars
+ e = self.assertRaises(exc,
+ super(_Failing, i_self).process_request,
+ *i_args, **i_kwargs)
+ i_self._called = True
+ raise e
+
+ # by default the returned status when an uncaught exception is raised
+ # for validation or caught errors this will likely be 400
+ kwargs.setdefault('status', http_client.INTERNAL_SERVER_ERROR) # 500
+
+ app = _Failing(self._application())
+ resp = self._generate_app_response(app, *args, **kwargs)
+ self.assertTrue(app._called)
+ return resp
+
+ def _do_middleware_response(self, *args, **kwargs):
+ """Wrap a middleware around a sample application and call it."""
+ app = self.MIDDLEWARE_CLASS(self._application())
+ return self._generate_app_response(app, *args, **kwargs)
+
+ def _do_middleware_request(self, *args, **kwargs):
+ """The request object from a successful middleware call."""
+ return self._do_middleware_response(*args, **kwargs).request
+
+
+class TokenAuthMiddlewareTest(MiddlewareRequestTestBase):
+
+ MIDDLEWARE_CLASS = middleware.TokenAuthMiddleware
-class TokenAuthMiddlewareTest(unit.TestCase):
def test_request(self):
- req = make_request()
- req.headers[middleware.AUTH_TOKEN_HEADER] = 'MAGIC'
- middleware.TokenAuthMiddleware(None).process_request(req)
+ headers = {middleware.AUTH_TOKEN_HEADER: 'MAGIC'}
+ req = self._do_middleware_request(headers=headers)
context = req.environ[middleware.CONTEXT_ENV]
self.assertEqual('MAGIC', context['token_id'])
-class AdminTokenAuthMiddlewareTest(unit.TestCase):
+class AdminTokenAuthMiddlewareTest(MiddlewareRequestTestBase):
+
+ MIDDLEWARE_CLASS = middleware.AdminTokenAuthMiddleware
+
+ def config_overrides(self):
+ super(AdminTokenAuthMiddlewareTest, self).config_overrides()
+ self.config_fixture.config(
+ admin_token='ADMIN')
+
def test_request_admin(self):
- req = make_request()
- req.headers[middleware.AUTH_TOKEN_HEADER] = CONF.admin_token
- middleware.AdminTokenAuthMiddleware(None).process_request(req)
- context = req.environ[middleware.CONTEXT_ENV]
- self.assertTrue(context['is_admin'])
+ headers = {middleware.AUTH_TOKEN_HEADER: 'ADMIN'}
+ req = self._do_middleware_request(headers=headers)
+ self.assertTrue(req.environ[middleware.CONTEXT_ENV]['is_admin'])
def test_request_non_admin(self):
- req = make_request()
- req.headers[middleware.AUTH_TOKEN_HEADER] = 'NOT-ADMIN'
- middleware.AdminTokenAuthMiddleware(None).process_request(req)
- context = req.environ[middleware.CONTEXT_ENV]
- self.assertFalse(context['is_admin'])
+ headers = {middleware.AUTH_TOKEN_HEADER: 'NOT-ADMIN'}
+ req = self._do_middleware_request(headers=headers)
+ self.assertFalse(req.environ[middleware.CONTEXT_ENV]['is_admin'])
-class PostParamsMiddlewareTest(unit.TestCase):
- def test_request_with_params(self):
- req = make_request(body="arg1=one", method='POST')
- middleware.PostParamsMiddleware(None).process_request(req)
- params = req.environ[middleware.PARAMS_ENV]
- self.assertEqual({"arg1": "one"}, params)
+class JsonBodyMiddlewareTest(MiddlewareRequestTestBase):
+ MIDDLEWARE_CLASS = middleware.JsonBodyMiddleware
-class JsonBodyMiddlewareTest(unit.TestCase):
def test_request_with_params(self):
- req = make_request(body='{"arg1": "one", "arg2": ["a"]}',
- content_type='application/json',
- method='POST')
- middleware.JsonBodyMiddleware(None).process_request(req)
- params = req.environ[middleware.PARAMS_ENV]
- self.assertEqual({"arg1": "one", "arg2": ["a"]}, params)
+ headers = {'Content-Type': 'application/json'}
+ params = '{"arg1": "one", "arg2": ["a"]}'
+ req = self._do_middleware_request(params=params,
+ headers=headers,
+ method='post')
+ self.assertEqual({"arg1": "one", "arg2": ["a"]},
+ req.environ[middleware.PARAMS_ENV])
def test_malformed_json(self):
- req = make_request(body='{"arg1": "on',
- content_type='application/json',
- method='POST')
- resp = middleware.JsonBodyMiddleware(None).process_request(req)
- self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
+ headers = {'Content-Type': 'application/json'}
+ self._do_middleware_response(params='{"arg1": "on',
+ headers=headers,
+ method='post',
+ status=http_client.BAD_REQUEST)
def test_not_dict_body(self):
- req = make_request(body='42',
- content_type='application/json',
- method='POST')
- resp = middleware.JsonBodyMiddleware(None).process_request(req)
- self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
- self.assertTrue('valid JSON object' in resp.json['error']['message'])
+ headers = {'Content-Type': 'application/json'}
+ resp = self._do_middleware_response(params='42',
+ headers=headers,
+ method='post',
+ status=http_client.BAD_REQUEST)
+
+ self.assertIn('valid JSON object', resp.json['error']['message'])
def test_no_content_type(self):
- req = make_request(body='{"arg1": "one", "arg2": ["a"]}',
- method='POST')
- middleware.JsonBodyMiddleware(None).process_request(req)
- params = req.environ[middleware.PARAMS_ENV]
- self.assertEqual({"arg1": "one", "arg2": ["a"]}, params)
+ headers = {'Content-Type': ''}
+ params = '{"arg1": "one", "arg2": ["a"]}'
+ req = self._do_middleware_request(params=params,
+ headers=headers,
+ method='post')
+ self.assertEqual({"arg1": "one", "arg2": ["a"]},
+ req.environ[middleware.PARAMS_ENV])
def test_unrecognized_content_type(self):
- req = make_request(body='{"arg1": "one", "arg2": ["a"]}',
- content_type='text/plain',
- method='POST')
- resp = middleware.JsonBodyMiddleware(None).process_request(req)
- self.assertEqual(http_client.BAD_REQUEST, resp.status_int)
+ headers = {'Content-Type': 'text/plain'}
+ self._do_middleware_response(params='{"arg1": "one", "arg2": ["a"]}',
+ headers=headers,
+ method='post',
+ status=http_client.BAD_REQUEST)
def test_unrecognized_content_type_without_body(self):
- req = make_request(content_type='text/plain',
- method='GET')
- middleware.JsonBodyMiddleware(None).process_request(req)
- params = req.environ.get(middleware.PARAMS_ENV, {})
- self.assertEqual({}, params)
+ headers = {'Content-Type': 'text/plain'}
+ req = self._do_middleware_request(headers=headers)
+ self.assertEqual({}, req.environ.get(middleware.PARAMS_ENV, {}))
+
+class AuthContextMiddlewareTest(test_backend_sql.SqlTests,
+ MiddlewareRequestTestBase):
-class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
+ MIDDLEWARE_CLASS = middleware.AuthContextMiddleware
def setUp(self):
super(AuthContextMiddlewareTest, self).setUp()
@@ -139,55 +194,32 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
self.config_fixture.config(group='tokenless_auth',
trusted_issuer=[self.trusted_issuer])
- # This idp_id is calculated based on
- # sha256(self.client_issuer)
- hashed_idp = hashlib.sha256(self.client_issuer)
+ # client_issuer is encoded because you can't hash
+ # unicode objects with hashlib.
+ # This idp_id is calculated based on sha256(self.client_issuer)
+ hashed_idp = hashlib.sha256(self.client_issuer.encode('utf-8'))
self.idp_id = hashed_idp.hexdigest()
self._load_sample_data()
def _load_sample_data(self):
- self.domain_id = uuid.uuid4().hex
- self.domain_name = uuid.uuid4().hex
- self.project_id = uuid.uuid4().hex
- self.project_name = uuid.uuid4().hex
- self.user_name = uuid.uuid4().hex
- self.user_password = uuid.uuid4().hex
- self.user_email = uuid.uuid4().hex
self.protocol_id = 'x509'
- self.role_id = uuid.uuid4().hex
- self.role_name = uuid.uuid4().hex
- # for ephemeral user
- self.group_name = uuid.uuid4().hex
# 1) Create a domain for the user.
- self.domain = {
- 'description': uuid.uuid4().hex,
- 'enabled': True,
- 'id': self.domain_id,
- 'name': self.domain_name,
- }
-
+ self.domain = unit.new_domain_ref()
+ self.domain_id = self.domain['id']
+ self.domain_name = self.domain['name']
self.resource_api.create_domain(self.domain_id, self.domain)
# 2) Create a project for the user.
- self.project = {
- 'description': uuid.uuid4().hex,
- 'domain_id': self.domain_id,
- 'enabled': True,
- 'id': self.project_id,
- 'name': self.project_name,
- }
+ self.project = unit.new_project_ref(domain_id=self.domain_id)
+ self.project_id = self.project['id']
+ self.project_name = self.project['name']
self.resource_api.create_project(self.project_id, self.project)
# 3) Create a user in new domain.
- self.user = {
- 'name': self.user_name,
- 'domain_id': self.domain_id,
- 'project_id': self.project_id,
- 'password': self.user_password,
- 'email': self.user_email,
- }
+ self.user = unit.new_user_ref(domain_id=self.domain_id,
+ project_id=self.project_id)
self.user = self.identity_api.create_user(self.user)
@@ -197,17 +229,13 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
self.idp)
# Add a role
- self.role = {
- 'id': self.role_id,
- 'name': self.role_name,
- }
+ self.role = unit.new_role_ref()
+ self.role_id = self.role['id']
+ self.role_name = self.role['name']
self.role_api.create_role(self.role_id, self.role)
# Add a group
- self.group = {
- 'name': self.group_name,
- 'domain_id': self.domain_id,
- }
+ self.group = unit.new_group_ref(domain_id=self.domain_id)
self.group = self.identity_api.create_group(self.group)
# Assign a role to the user on a project
@@ -282,7 +310,7 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
:param request: HTTP request
:param mapping_ref: A mapping in JSON structure will be setup in the
- backend DB for mapping an user or a group.
+ backend DB for mapping a user or a group.
:param exception_expected: Sets to True when an exception is expected
to raised based on the given arguments.
:returns: context an auth context contains user and role information
@@ -300,30 +328,27 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
return context
def test_context_already_exists(self):
- req = make_request()
- token_id = uuid.uuid4().hex
- req.environ[authorization.AUTH_CONTEXT_ENV] = {'token_id': token_id}
- context = self._create_context(request=req)
- self.assertEqual(token_id, context['token_id'])
+ stub_value = uuid.uuid4().hex
+ env = {authorization.AUTH_CONTEXT_ENV: stub_value}
+ req = self._do_middleware_request(extra_environ=env)
+ self.assertEqual(stub_value,
+ req.environ.get(authorization.AUTH_CONTEXT_ENV))
def test_not_applicable_to_token_request(self):
- env = {}
- env['PATH_INFO'] = '/auth/tokens'
- env['REQUEST_METHOD'] = 'POST'
- req = make_request(environ=env)
- context = self._create_context(request=req)
+ req = self._do_middleware_request(path='/auth/tokens', method='post')
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self.assertIsNone(context)
def test_no_tokenless_attributes_request(self):
- req = make_request()
- context = self._create_context(request=req)
+ req = self._do_middleware_request()
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self.assertIsNone(context)
def test_no_issuer_attribute_request(self):
env = {}
env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex
- req = make_request(environ=env)
- context = self._create_context(request=req)
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self.assertIsNone(context)
def test_has_only_issuer_and_project_name_request(self):
@@ -332,61 +357,51 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
# references to issuer of the client certificate.
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex
- req = make_request(environ=env)
- context = self._create_context(request=req,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_has_only_issuer_and_project_domain_name_request(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex
- req = make_request(environ=env)
- context = self._create_context(request=req,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_has_only_issuer_and_project_domain_id_request(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_DOMAIN_ID'] = uuid.uuid4().hex
- req = make_request(environ=env)
- context = self._create_context(request=req,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_missing_both_domain_and_project_request(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
- req = make_request(environ=env)
- context = self._create_context(request=req,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_empty_trusted_issuer_list(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex
- req = make_request(environ=env)
+
self.config_fixture.config(group='tokenless_auth',
trusted_issuer=[])
- context = self._create_context(request=req)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self.assertIsNone(context)
def test_client_issuer_not_trusted(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.untrusted_client_issuer
env['HTTP_X_PROJECT_ID'] = uuid.uuid4().hex
- req = make_request(environ=env)
- context = self._create_context(request=req)
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self.assertIsNone(context)
def test_proj_scope_with_proj_id_and_proj_dom_id_success(self):
@@ -397,24 +412,28 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
# SSL_CLIENT_USER_NAME and SSL_CLIENT_DOMAIN_NAME are the types
# defined in the mapping that will map to the user name and
# domain name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_proj_scope_with_proj_id_only_success(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_ID'] = self.project_id
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_proj_scope_with_proj_name_and_proj_dom_id_success(self):
@@ -422,12 +441,14 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_proj_scope_with_proj_name_and_proj_dom_name_success(self):
@@ -435,28 +456,29 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_proj_scope_with_proj_name_only_fail(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_id
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_mapping_with_userid_and_domainid_success(self):
env = {}
@@ -465,10 +487,12 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
env['SSL_CLIENT_USER_ID'] = self.user['id']
env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINID)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_mapping_with_userid_and_domainname_success(self):
@@ -478,10 +502,12 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
env['SSL_CLIENT_USER_ID'] = self.user['id']
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERID_AND_DOMAINNAME)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_mapping_with_username_and_domainid_success(self):
@@ -489,12 +515,14 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_only_domain_name_fail(self):
@@ -503,14 +531,13 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_ID'] = self.project_id
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_DOMAINNAME_ONLY)
+
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_only_domain_id_fail(self):
env = {}
@@ -518,29 +545,27 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_ID'] = self.project_id
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_DOMAINID_ONLY)
+
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_missing_domain_data_fail(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_ID'] = self.project_id
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_ONLY,
- exception_expected=True)
- self.assertRaises(exception.ValidationError,
- context.process_request,
- req)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_ONLY)
+
+ self._middleware_failure(exception.ValidationError,
+ extra_environ=env,
+ status=400)
def test_userid_success(self):
env = {}
@@ -548,10 +573,10 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_ID'] = self.project_id
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
env['SSL_CLIENT_USER_ID'] = self.user['id']
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERID_ONLY)
+
+ self._load_mapping_rules(mapping_fixtures.MAPPING_WITH_USERID_ONLY)
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context)
def test_domain_disable_fail(self):
@@ -559,37 +584,35 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id
- req = make_request(environ=env)
+
self.domain['enabled'] = False
self.domain = self.resource_api.update_domain(
self.domain['id'], self.domain)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID,
- exception_expected=True)
- self.assertRaises(exception.Unauthorized,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID)
+ self._middleware_failure(exception.Unauthorized,
+ extra_environ=env,
+ status=401)
def test_user_disable_fail(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
env['SSL_CLIENT_DOMAIN_ID'] = self.domain_id
- req = make_request(environ=env)
+
self.user['enabled'] = False
self.user = self.identity_api.update_user(self.user['id'], self.user)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID,
- exception_expected=True)
- self.assertRaises(AssertionError,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINID)
+
+ self._middleware_failure(AssertionError,
+ extra_environ=env)
def test_invalid_user_fail(self):
env = {}
@@ -598,30 +621,29 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_DOMAIN_ID'] = self.domain_id
env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex
env['SSL_CLIENT_DOMAIN_NAME'] = self.domain_name
- req = make_request(environ=env)
- context = self._create_context(
- request=req,
- mapping_ref=mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME,
- exception_expected=True)
- self.assertRaises(exception.UserNotFound,
- context.process_request,
- req)
+
+ self._load_mapping_rules(
+ mapping_fixtures.MAPPING_WITH_USERNAME_AND_DOMAINNAME)
+
+ self._middleware_failure(exception.UserNotFound,
+ extra_environ=env,
+ status=404)
def test_ephemeral_success(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
self.config_fixture.config(group='tokenless_auth',
protocol='ephemeral')
self.protocol_id = 'ephemeral'
- mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = self.group['id']
- context = self._create_context(
- request=req,
- mapping_ref=mapping)
+ self._load_mapping_rules(mapping)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context, ephemeral_user=True)
def test_ephemeral_with_default_user_type_success(self):
@@ -629,23 +651,25 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
self.config_fixture.config(group='tokenless_auth',
protocol='ephemeral')
self.protocol_id = 'ephemeral'
# this mapping does not have the user type defined
# and it should defaults to 'ephemeral' which is
# the expected type for the test case.
- mapping = mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(
+ mapping_fixtures.MAPPING_FOR_DEFAULT_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = self.group['id']
- context = self._create_context(
- request=req,
- mapping_ref=mapping)
+ self._load_mapping_rules(mapping)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context, ephemeral_user=True)
def test_ephemeral_any_user_success(self):
- """Ephemeral user does not need a specified user
+ """Verify ephemeral user does not need a specified user.
+
Keystone is not looking to match the user, but a corresponding group.
"""
env = {}
@@ -653,15 +677,15 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
env['SSL_CLIENT_USER_NAME'] = uuid.uuid4().hex
- req = make_request(environ=env)
self.config_fixture.config(group='tokenless_auth',
protocol='ephemeral')
self.protocol_id = 'ephemeral'
- mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = self.group['id']
- context = self._create_context(
- request=req,
- mapping_ref=mapping)
+ self._load_mapping_rules(mapping)
+
+ req = self._do_middleware_request(extra_environ=env)
+ context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
self._assert_tokenless_auth_context(context, ephemeral_user=True)
def test_ephemeral_invalid_scope_fail(self):
@@ -669,43 +693,37 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = uuid.uuid4().hex
env['HTTP_X_PROJECT_DOMAIN_NAME'] = uuid.uuid4().hex
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
self.config_fixture.config(group='tokenless_auth',
protocol='ephemeral')
self.protocol_id = 'ephemeral'
- mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = self.group['id']
- context = self._create_context(
- request=req,
- mapping_ref=mapping,
- exception_expected=True)
- self.assertRaises(exception.Unauthorized,
- context.process_request,
- req)
+ self._load_mapping_rules(mapping)
+
+ self._middleware_failure(exception.Unauthorized,
+ extra_environ=env,
+ status=401)
def test_ephemeral_no_group_found_fail(self):
env = {}
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
self.config_fixture.config(group='tokenless_auth',
protocol='ephemeral')
self.protocol_id = 'ephemeral'
- mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex
- context = self._create_context(
- request=req,
- mapping_ref=mapping,
- exception_expected=True)
- self.assertRaises(exception.MappedGroupNotFound,
- context.process_request,
- req)
+ self._load_mapping_rules(mapping)
+
+ self._middleware_failure(exception.MappedGroupNotFound,
+ extra_environ=env)
def test_ephemeral_incorrect_mapping_fail(self):
- """Ephemeral user picks up the non-ephemeral user mapping.
+ """Test ephemeral user picking up the non-ephemeral user mapping.
+
Looking up the mapping with protocol Id 'x509' will load up
the non-ephemeral user mapping, results unauthenticated.
"""
@@ -713,21 +731,17 @@ class AuthContextMiddlewareTest(test_backend_sql.SqlTests):
env['SSL_CLIENT_I_DN'] = self.client_issuer
env['HTTP_X_PROJECT_NAME'] = self.project_name
env['HTTP_X_PROJECT_DOMAIN_NAME'] = self.domain_name
- env['SSL_CLIENT_USER_NAME'] = self.user_name
- req = make_request(environ=env)
+ env['SSL_CLIENT_USER_NAME'] = self.user['name']
# This will pick up the incorrect mapping
self.config_fixture.config(group='tokenless_auth',
protocol='x509')
self.protocol_id = 'x509'
- mapping = mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER.copy()
+ mapping = copy.deepcopy(mapping_fixtures.MAPPING_FOR_EPHEMERAL_USER)
mapping['rules'][0]['local'][0]['group']['id'] = uuid.uuid4().hex
- context = self._create_context(
- request=req,
- mapping_ref=mapping,
- exception_expected=True)
- self.assertRaises(exception.MappedGroupNotFound,
- context.process_request,
- req)
+ self._load_mapping_rules(mapping)
+
+ self._middleware_failure(exception.MappedGroupNotFound,
+ extra_environ=env)
def test_create_idp_id_success(self):
env = {}