From 0be7a3d4e0647dc0d94a34e4fc2f8c364de46602 Mon Sep 17 00:00:00 2001 From: asteroide Date: Thu, 24 Sep 2015 14:39:09 +0200 Subject: Update code from KeystoneMiddleware Github repository (Master). Change-Id: Id28c5bf48b3dbb6c8a08e66411b5785029f6857d --- .../tests/unit/auth_token/test_base_middleware.py | 202 +++++++++++++++++++++ 1 file changed, 202 insertions(+) create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py new file mode 100644 index 00000000..b213f546 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py @@ -0,0 +1,202 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from keystoneclient import fixture +import mock +import six +import testtools +import webob + +from keystonemiddleware import auth_token +from keystonemiddleware.auth_token import _request + + +class FakeApp(object): + + @webob.dec.wsgify + def __call__(self, req): + return webob.Response() + + +class FetchingMiddleware(auth_token._BaseAuthProtocol): + + def __init__(self, app, token_dict={}, **kwargs): + super(FetchingMiddleware, self).__init__(app, **kwargs) + self.token_dict = token_dict + + def _fetch_token(self, token): + try: + return self.token_dict[token] + except KeyError: + raise auth_token.InvalidToken() + + +class BaseAuthProtocolTests(testtools.TestCase): + + @mock.patch.multiple(auth_token._BaseAuthProtocol, + process_request=mock.DEFAULT, + process_response=mock.DEFAULT) + def test_process_flow(self, process_request, process_response): + m = auth_token._BaseAuthProtocol(FakeApp()) + + process_request.return_value = None + process_response.side_effect = lambda x: x + + req = webob.Request.blank('/', method='GET') + resp = req.get_response(m) + + self.assertEqual(200, resp.status_code) + + self.assertEqual(1, process_request.call_count) + self.assertIsInstance(process_request.call_args[0][0], + _request._AuthTokenRequest) + + self.assertEqual(1, process_response.call_count) + self.assertIsInstance(process_response.call_args[0][0], webob.Response) + + @classmethod + def call(cls, middleware, method='GET', path='/', headers=None): + req = webob.Request.blank(path) + req.method = method + + for k, v in six.iteritems(headers or {}): + req.headers[k] = v + + resp = req.get_response(middleware) + resp.request = req + return resp + + def test_good_v3_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Auth-Token']) + + self.assertEqual('Confirmed', req.headers['X-Identity-Status']) + self.assertNotIn('X-Service-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertTrue(p.has_user_token) + self.assertFalse(p.has_service_token) + + self.assertEqual(t.project_id, p.user.project_id) + self.assertEqual(t.project_domain_id, p.user.project_domain_id) + self.assertEqual(t.user_id, p.user.user_id) + self.assertEqual(t.user_domain_id, p.user.user_domain_id) + self.assertIn(role['name'], p.user.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_invalid_user_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_expired_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_good_v3_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Service-Token']) + + self.assertEqual('Confirmed', + req.headers['X-Service-Identity-Status']) + self.assertNotIn('X-Auth-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertFalse(p.has_user_token) + self.assertTrue(p.has_service_token) + + self.assertEqual(t.project_id, p.service.project_id) + self.assertEqual(t.project_domain_id, p.service.project_domain_id) + self.assertEqual(t.user_id, p.service.user_id) + self.assertEqual(t.user_domain_id, p.service.user_domain_id) + self.assertIn(role['name'], p.service.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_invalid_service_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_expired_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Service-Token': token_id}) -- cgit 1.2.3-korg