aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_wsgi.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_wsgi.py586
1 files changed, 0 insertions, 586 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_wsgi.py b/keystone-moon/keystone/tests/unit/test_wsgi.py
deleted file mode 100644
index 564d7406..00000000
--- a/keystone-moon/keystone/tests/unit/test_wsgi.py
+++ /dev/null
@@ -1,586 +0,0 @@
-# encoding: utf-8
-#
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import gettext
-import socket
-import uuid
-
-import eventlet
-import mock
-import oslo_i18n
-from oslo_serialization import jsonutils
-import six
-from six.moves import http_client
-from testtools import matchers
-import webob
-
-from keystone.common import environment
-from keystone.common import wsgi
-from keystone import exception
-from keystone.tests import unit
-
-
-class FakeApp(wsgi.Application):
- def index(self, context):
- return {'a': 'b'}
-
-
-class FakeAttributeCheckerApp(wsgi.Application):
- def index(self, context):
- return context['query_string']
-
- def assert_attribute(self, body, attr):
- """Asserts that the given request has a certain attribute."""
- ref = jsonutils.loads(body)
- self._require_attribute(ref, attr)
-
- def assert_attributes(self, body, attr):
- """Asserts that the given request has a certain set attributes."""
- ref = jsonutils.loads(body)
- self._require_attributes(ref, attr)
-
-
-class RouterTest(unit.TestCase):
- def setUp(self):
- self.router = wsgi.RoutersBase()
- super(RouterTest, self).setUp()
-
- def test_invalid_status(self):
- fake_mapper = uuid.uuid4().hex
- fake_controller = uuid.uuid4().hex
- fake_path = uuid.uuid4().hex
- fake_rel = uuid.uuid4().hex
- self.assertRaises(exception.Error,
- self.router._add_resource,
- fake_mapper, fake_controller, fake_path, fake_rel,
- status=uuid.uuid4().hex)
-
-
-class BaseWSGITest(unit.TestCase):
- def setUp(self):
- self.app = FakeApp()
- super(BaseWSGITest, self).setUp()
-
- def _make_request(self, url='/'):
- req = webob.Request.blank(url)
- args = {'action': 'index', 'controller': None}
- req.environ['wsgiorg.routing_args'] = [None, args]
- return req
-
-
-class ApplicationTest(BaseWSGITest):
- def test_response_content_type(self):
- req = self._make_request()
- resp = req.get_response(self.app)
- self.assertEqual('application/json', resp.content_type)
-
- def test_query_string_available(self):
- class FakeApp(wsgi.Application):
- def index(self, context):
- return context['query_string']
- req = self._make_request(url='/?1=2')
- resp = req.get_response(FakeApp())
- self.assertEqual({'1': '2'}, jsonutils.loads(resp.body))
-
- def test_headers_available(self):
- class FakeApp(wsgi.Application):
- def index(self, context):
- return context['headers']
-
- app = FakeApp()
- req = self._make_request(url='/?1=2')
- req.headers['X-Foo'] = "bar"
- resp = req.get_response(app)
- self.assertIn('X-Foo', eval(resp.body))
-
- def test_render_response(self):
- data = {'attribute': 'value'}
- body = b'{"attribute": "value"}'
-
- resp = wsgi.render_response(body=data)
- self.assertEqual('200 OK', resp.status)
- self.assertEqual(http_client.OK, resp.status_int)
- self.assertEqual(body, resp.body)
- self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
- self.assertEqual(str(len(body)), resp.headers.get('Content-Length'))
-
- def test_render_response_custom_status(self):
- resp = wsgi.render_response(
- status=(http_client.NOT_IMPLEMENTED, 'Not Implemented'))
- self.assertEqual('501 Not Implemented', resp.status)
- self.assertEqual(http_client.NOT_IMPLEMENTED, resp.status_int)
-
- def test_successful_require_attribute(self):
- app = FakeAttributeCheckerApp()
- req = self._make_request(url='/?1=2')
- resp = req.get_response(app)
- app.assert_attribute(resp.body, '1')
-
- def test_require_attribute_fail_if_attribute_not_present(self):
- app = FakeAttributeCheckerApp()
- req = self._make_request(url='/?1=2')
- resp = req.get_response(app)
- self.assertRaises(exception.ValidationError,
- app.assert_attribute, resp.body, 'a')
-
- def test_successful_require_multiple_attributes(self):
- app = FakeAttributeCheckerApp()
- req = self._make_request(url='/?a=1&b=2')
- resp = req.get_response(app)
- app.assert_attributes(resp.body, ['a', 'b'])
-
- def test_attribute_missing_from_request(self):
- app = FakeAttributeCheckerApp()
- req = self._make_request(url='/?a=1&b=2')
- resp = req.get_response(app)
- ex = self.assertRaises(exception.ValidationError,
- app.assert_attributes,
- resp.body, ['a', 'missing_attribute'])
- self.assertThat(six.text_type(ex),
- matchers.Contains('missing_attribute'))
-
- def test_no_required_attributes_present(self):
- app = FakeAttributeCheckerApp()
- req = self._make_request(url='/')
- resp = req.get_response(app)
-
- ex = self.assertRaises(exception.ValidationError,
- app.assert_attributes, resp.body,
- ['missing_attribute1', 'missing_attribute2'])
- self.assertThat(six.text_type(ex),
- matchers.Contains('missing_attribute1'))
- self.assertThat(six.text_type(ex),
- matchers.Contains('missing_attribute2'))
-
- def test_render_response_custom_headers(self):
- resp = wsgi.render_response(headers=[('Custom-Header', 'Some-Value')])
- self.assertEqual('Some-Value', resp.headers.get('Custom-Header'))
- self.assertEqual('X-Auth-Token', resp.headers.get('Vary'))
-
- def test_render_response_non_str_headers_converted(self):
- resp = wsgi.render_response(
- headers=[('Byte-Header', 'Byte-Value'),
- (u'Unicode-Header', u'Unicode-Value')])
- # assert that all headers are identified.
- self.assertThat(resp.headers, matchers.HasLength(4))
- self.assertEqual('Unicode-Value', resp.headers.get('Unicode-Header'))
- # assert that unicode value is converted, the expected type is str
- # on both python2 and python3.
- self.assertEqual(str,
- type(resp.headers.get('Unicode-Header')))
-
- def test_render_response_no_body(self):
- resp = wsgi.render_response()
- self.assertEqual('204 No Content', resp.status)
- self.assertEqual(http_client.NO_CONTENT, resp.status_int)
- self.assertEqual(b'', resp.body)
- self.assertEqual('0', resp.headers.get('Content-Length'))
- self.assertIsNone(resp.headers.get('Content-Type'))
-
- def test_render_response_head_with_body(self):
- resp = wsgi.render_response({'id': uuid.uuid4().hex}, method='HEAD')
- self.assertEqual(http_client.OK, resp.status_int)
- self.assertEqual(b'', resp.body)
- self.assertNotEqual('0', resp.headers.get('Content-Length'))
- self.assertEqual('application/json', resp.headers.get('Content-Type'))
-
- def test_application_local_config(self):
- class FakeApp(wsgi.Application):
- def __init__(self, *args, **kwargs):
- self.kwargs = kwargs
-
- app = FakeApp.factory({}, testkey="test")
- self.assertIn("testkey", app.kwargs)
- self.assertEqual("test", app.kwargs["testkey"])
-
- def test_render_exception(self):
- e = exception.Unauthorized(message=u'\u7f51\u7edc')
- resp = wsgi.render_exception(e)
- self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
-
- def test_render_exception_host(self):
- e = exception.Unauthorized(message=u'\u7f51\u7edc')
- req = self._make_request(url='/')
- context = {'host_url': 'http://%s:5000' % uuid.uuid4().hex,
- 'environment': req.environ}
- resp = wsgi.render_exception(e, context=context)
-
- self.assertEqual(http_client.UNAUTHORIZED, resp.status_int)
-
- def test_improperly_encoded_params(self):
- class FakeApp(wsgi.Application):
- def index(self, context):
- return context['query_string']
- # this is high bit set ASCII, copy & pasted from Windows.
- # aka code page 1252. It is not valid UTF8.
- req = self._make_request(url='/?name=nonexit%E8nt')
- self.assertRaises(exception.ValidationError, req.get_response,
- FakeApp())
-
- def test_properly_encoded_params(self):
- class FakeApp(wsgi.Application):
- def index(self, context):
- return context['query_string']
- # nonexitènt encoded as UTF-8
- req = self._make_request(url='/?name=nonexit%C3%A8nt')
- resp = req.get_response(FakeApp())
- self.assertEqual({'name': u'nonexit\xe8nt'},
- jsonutils.loads(resp.body))
-
- def test_base_url(self):
- class FakeApp(wsgi.Application):
- def index(self, context):
- return self.base_url(context, 'public')
- req = self._make_request(url='/')
- # NOTE(gyee): according to wsgiref, if HTTP_HOST is present in the
- # request environment, it will be used to construct the base url.
- # SERVER_NAME and SERVER_PORT will be ignored. These are standard
- # WSGI environment variables populated by the webserver.
- req.environ.update({
- 'SCRIPT_NAME': '/identity',
- 'SERVER_NAME': '1.2.3.4',
- 'wsgi.url_scheme': 'http',
- 'SERVER_PORT': '80',
- 'HTTP_HOST': '1.2.3.4',
- })
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://1.2.3.4/identity", resp.body)
-
- # if HTTP_HOST is absent, SERVER_NAME and SERVER_PORT will be used
- req = self._make_request(url='/')
- del req.environ['HTTP_HOST']
- req.environ.update({
- 'SCRIPT_NAME': '/identity',
- 'SERVER_NAME': '1.1.1.1',
- 'wsgi.url_scheme': 'http',
- 'SERVER_PORT': '1234',
- })
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://1.1.1.1:1234/identity", resp.body)
-
- # make sure keystone normalize the standard HTTP port 80 by stripping
- # it
- req = self._make_request(url='/')
- req.environ.update({'HTTP_HOST': 'foo:80',
- 'SCRIPT_NAME': '/identity'})
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://foo/identity", resp.body)
-
- # make sure keystone normalize the standard HTTPS port 443 by stripping
- # it
- req = self._make_request(url='/')
- req.environ.update({'HTTP_HOST': 'foo:443',
- 'SCRIPT_NAME': '/identity',
- 'wsgi.url_scheme': 'https'})
- resp = req.get_response(FakeApp())
- self.assertEqual(b"https://foo/identity", resp.body)
-
- # make sure non-standard port is preserved
- req = self._make_request(url='/')
- req.environ.update({'HTTP_HOST': 'foo:1234',
- 'SCRIPT_NAME': '/identity'})
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://foo:1234/identity", resp.body)
-
- # make sure version portion of the SCRIPT_NAME, '/v2.0', is stripped
- # from base url
- req = self._make_request(url='/')
- req.environ.update({'HTTP_HOST': 'foo:80',
- 'SCRIPT_NAME': '/bar/identity/v2.0'})
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://foo/bar/identity", resp.body)
-
- # make sure version portion of the SCRIPT_NAME, '/v3' is stripped from
- # base url
- req = self._make_request(url='/')
- req.environ.update({'HTTP_HOST': 'foo:80',
- 'SCRIPT_NAME': '/identity/v3'})
- resp = req.get_response(FakeApp())
- self.assertEqual(b"http://foo/identity", resp.body)
-
-
-class ExtensionRouterTest(BaseWSGITest):
- def test_extensionrouter_local_config(self):
- class FakeRouter(wsgi.ExtensionRouter):
- def __init__(self, *args, **kwargs):
- self.kwargs = kwargs
-
- factory = FakeRouter.factory({}, testkey="test")
- app = factory(self.app)
- self.assertIn("testkey", app.kwargs)
- self.assertEqual("test", app.kwargs["testkey"])
-
-
-class MiddlewareTest(BaseWSGITest):
- def test_middleware_request(self):
- class FakeMiddleware(wsgi.Middleware):
- def process_request(self, req):
- req.environ['fake_request'] = True
- return req
- req = self._make_request()
- resp = FakeMiddleware(None)(req)
- self.assertIn('fake_request', resp.environ)
-
- def test_middleware_response(self):
- class FakeMiddleware(wsgi.Middleware):
- def process_response(self, request, response):
- response.environ = {}
- response.environ['fake_response'] = True
- return response
- req = self._make_request()
- resp = FakeMiddleware(self.app)(req)
- self.assertIn('fake_response', resp.environ)
-
- def test_middleware_bad_request(self):
- class FakeMiddleware(wsgi.Middleware):
- def process_response(self, request, response):
- raise exception.Unauthorized()
-
- req = self._make_request()
- req.environ['REMOTE_ADDR'] = '127.0.0.1'
- resp = FakeMiddleware(self.app)(req)
- self.assertEqual(exception.Unauthorized.code, resp.status_int)
-
- def test_middleware_type_error(self):
- class FakeMiddleware(wsgi.Middleware):
- def process_response(self, request, response):
- raise TypeError()
-
- req = self._make_request()
- req.environ['REMOTE_ADDR'] = '127.0.0.1'
- resp = FakeMiddleware(self.app)(req)
- # This is a validationerror type
- self.assertEqual(exception.ValidationError.code, resp.status_int)
-
- def test_middleware_exception_error(self):
-
- exception_str = b'EXCEPTIONERROR'
-
- class FakeMiddleware(wsgi.Middleware):
- def process_response(self, request, response):
- raise exception.UnexpectedError(exception_str)
-
- def do_request():
- req = self._make_request()
- resp = FakeMiddleware(self.app)(req)
- self.assertEqual(exception.UnexpectedError.code, resp.status_int)
- return resp
-
- # Exception data should not be in the message when insecure_debug is
- # False
- self.config_fixture.config(debug=False, insecure_debug=False)
- self.assertNotIn(exception_str, do_request().body)
-
- # Exception data should be in the message when insecure_debug is True
- self.config_fixture.config(debug=True, insecure_debug=True)
- self.assertIn(exception_str, do_request().body)
-
-
-class LocalizedResponseTest(unit.TestCase):
- def test_request_match_default(self):
- # The default language if no Accept-Language is provided is None
- req = webob.Request.blank('/')
- self.assertIsNone(wsgi.best_match_language(req))
-
- @mock.patch.object(oslo_i18n, 'get_available_languages')
- def test_request_match_language_expected(self, mock_gal):
- # If Accept-Language is a supported language, best_match_language()
- # returns it.
-
- language = uuid.uuid4().hex
- mock_gal.return_value = [language]
-
- req = webob.Request.blank('/', headers={'Accept-Language': language})
- self.assertEqual(language, wsgi.best_match_language(req))
-
- @mock.patch.object(oslo_i18n, 'get_available_languages')
- def test_request_match_language_unexpected(self, mock_gal):
- # If Accept-Language is a language we do not support,
- # best_match_language() returns None.
-
- supported_language = uuid.uuid4().hex
- mock_gal.return_value = [supported_language]
-
- request_language = uuid.uuid4().hex
- req = webob.Request.blank(
- '/', headers={'Accept-Language': request_language})
- self.assertIsNone(wsgi.best_match_language(req))
-
- def test_static_translated_string_is_lazy_translatable(self):
- # Statically created message strings are an object that can get
- # lazy-translated rather than a regular string.
- self.assertNotEqual(six.text_type,
- type(exception.Unauthorized.message_format))
-
- @mock.patch.object(oslo_i18n, 'get_available_languages')
- def test_get_localized_response(self, mock_gal):
- # If the request has the Accept-Language set to a supported language
- # and an exception is raised by the application that is translatable
- # then the response will have the translated message.
-
- language = uuid.uuid4().hex
- mock_gal.return_value = [language]
-
- # The arguments for the xlated message format have to match the args
- # for the chosen exception (exception.NotFound)
- xlated_msg_fmt = "Xlated NotFound, %(target)s."
-
- # Fake out gettext.translation() to return a translator for our
- # expected language and a passthrough translator for other langs.
-
- def fake_translation(*args, **kwargs):
- class IdentityTranslator(object):
- def ugettext(self, msgid):
- return msgid
-
- gettext = ugettext
-
- class LangTranslator(object):
- def ugettext(self, msgid):
- if msgid == exception.NotFound.message_format:
- return xlated_msg_fmt
- return msgid
-
- gettext = ugettext
-
- if language in kwargs.get('languages', []):
- return LangTranslator()
- return IdentityTranslator()
-
- with mock.patch.object(gettext, 'translation',
- side_effect=fake_translation) as xlation_mock:
- target = uuid.uuid4().hex
-
- # Fake app raises NotFound exception to simulate Keystone raising.
-
- class FakeApp(wsgi.Application):
- def index(self, context):
- raise exception.NotFound(target=target)
-
- # Make the request with Accept-Language on the app, expect an error
- # response with the translated message.
-
- req = webob.Request.blank('/')
- args = {'action': 'index', 'controller': None}
- req.environ['wsgiorg.routing_args'] = [None, args]
- req.headers['Accept-Language'] = language
- resp = req.get_response(FakeApp())
-
- # Assert that the translated message appears in the response.
-
- exp_msg = xlated_msg_fmt % dict(target=target)
- self.assertThat(resp.json['error']['message'],
- matchers.Equals(exp_msg))
- self.assertThat(xlation_mock.called, matchers.Equals(True))
-
-
-class ServerTest(unit.TestCase):
-
- def setUp(self):
- super(ServerTest, self).setUp()
- self.host = '127.0.0.1'
- self.port = '1234'
-
- @mock.patch('eventlet.listen')
- @mock.patch('socket.getaddrinfo')
- def test_keepalive_unset(self, mock_getaddrinfo, mock_listen):
- mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
- mock_sock_dup = mock_listen.return_value.dup.return_value
-
- server = environment.Server(mock.MagicMock(), host=self.host,
- port=self.port)
- server.start()
- self.addCleanup(server.stop)
- self.assertTrue(mock_listen.called)
- self.assertFalse(mock_sock_dup.setsockopt.called)
-
- @mock.patch('eventlet.listen')
- @mock.patch('socket.getaddrinfo')
- def test_keepalive_set(self, mock_getaddrinfo, mock_listen):
- mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
- mock_sock_dup = mock_listen.return_value.dup.return_value
-
- server = environment.Server(mock.MagicMock(), host=self.host,
- port=self.port, keepalive=True)
- server.start()
- self.addCleanup(server.stop)
- mock_sock_dup.setsockopt.assert_called_once_with(socket.SOL_SOCKET,
- socket.SO_KEEPALIVE,
- 1)
- self.assertTrue(mock_listen.called)
-
- @mock.patch('eventlet.listen')
- @mock.patch('socket.getaddrinfo')
- def test_keepalive_and_keepidle_set(self, mock_getaddrinfo, mock_listen):
- mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
- mock_sock_dup = mock_listen.return_value.dup.return_value
-
- server = environment.Server(mock.MagicMock(), host=self.host,
- port=self.port, keepalive=True,
- keepidle=1)
- server.start()
- self.addCleanup(server.stop)
-
- if hasattr(socket, 'TCP_KEEPIDLE'):
- self.assertEqual(2, mock_sock_dup.setsockopt.call_count)
- # Test the last set of call args i.e. for the keepidle
- mock_sock_dup.setsockopt.assert_called_with(socket.IPPROTO_TCP,
- socket.TCP_KEEPIDLE,
- 1)
- else:
- self.assertEqual(1, mock_sock_dup.setsockopt.call_count)
-
- self.assertTrue(mock_listen.called)
-
- def test_client_socket_timeout(self):
- # mocking server method of eventlet.wsgi to check it is called with
- # configured 'client_socket_timeout' value.
- for socket_timeout in range(1, 10):
- self.config_fixture.config(group='eventlet_server',
- client_socket_timeout=socket_timeout)
- server = environment.Server(mock.MagicMock(), host=self.host,
- port=self.port)
- with mock.patch.object(eventlet.wsgi, 'server') as mock_server:
- fake_application = uuid.uuid4().hex
- fake_socket = uuid.uuid4().hex
- server._run(fake_application, fake_socket)
- mock_server.assert_called_once_with(
- fake_socket,
- fake_application,
- debug=mock.ANY,
- socket_timeout=socket_timeout,
- log=mock.ANY,
- keepalive=mock.ANY)
-
- def test_wsgi_keep_alive(self):
- # mocking server method of eventlet.wsgi to check it is called with
- # configured 'wsgi_keep_alive' value.
- wsgi_keepalive = False
- self.config_fixture.config(group='eventlet_server',
- wsgi_keep_alive=wsgi_keepalive)
-
- server = environment.Server(mock.MagicMock(), host=self.host,
- port=self.port)
- with mock.patch.object(eventlet.wsgi, 'server') as mock_server:
- fake_application = uuid.uuid4().hex
- fake_socket = uuid.uuid4().hex
- server._run(fake_application, fake_socket)
- mock_server.assert_called_once_with(fake_socket,
- fake_application,
- debug=mock.ANY,
- socket_timeout=mock.ANY,
- log=mock.ANY,
- keepalive=wsgi_keepalive)