diff options
author | asteroide <thomas.duval@orange.com> | 2015-09-24 14:39:09 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-09-24 14:39:09 +0200 |
commit | 0be7a3d4e0647dc0d94a34e4fc2f8c364de46602 (patch) | |
tree | 14214bb0bbf2430b6ee0df387ddbdbf13c4c4d63 /keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py | |
parent | e35decd4e989773c96a9abb263257291bd51ae1e (diff) |
Update code from KeystoneMiddleware Github repository (Master).
Change-Id: Id28c5bf48b3dbb6c8a08e66411b5785029f6857d
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py')
-rw-r--r-- | keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py | 202 |
1 files changed, 202 insertions, 0 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py new file mode 100644 index 00000000..b213f546 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py @@ -0,0 +1,202 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from keystoneclient import fixture +import mock +import six +import testtools +import webob + +from keystonemiddleware import auth_token +from keystonemiddleware.auth_token import _request + + +class FakeApp(object): + + @webob.dec.wsgify + def __call__(self, req): + return webob.Response() + + +class FetchingMiddleware(auth_token._BaseAuthProtocol): + + def __init__(self, app, token_dict={}, **kwargs): + super(FetchingMiddleware, self).__init__(app, **kwargs) + self.token_dict = token_dict + + def _fetch_token(self, token): + try: + return self.token_dict[token] + except KeyError: + raise auth_token.InvalidToken() + + +class BaseAuthProtocolTests(testtools.TestCase): + + @mock.patch.multiple(auth_token._BaseAuthProtocol, + process_request=mock.DEFAULT, + process_response=mock.DEFAULT) + def test_process_flow(self, process_request, process_response): + m = auth_token._BaseAuthProtocol(FakeApp()) + + process_request.return_value = None + process_response.side_effect = lambda x: x + + req = webob.Request.blank('/', method='GET') + resp = req.get_response(m) + + self.assertEqual(200, resp.status_code) + + self.assertEqual(1, process_request.call_count) + self.assertIsInstance(process_request.call_args[0][0], + _request._AuthTokenRequest) + + self.assertEqual(1, process_response.call_count) + self.assertIsInstance(process_response.call_args[0][0], webob.Response) + + @classmethod + def call(cls, middleware, method='GET', path='/', headers=None): + req = webob.Request.blank(path) + req.method = method + + for k, v in six.iteritems(headers or {}): + req.headers[k] = v + + resp = req.get_response(middleware) + resp.request = req + return resp + + def test_good_v3_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Auth-Token']) + + self.assertEqual('Confirmed', req.headers['X-Identity-Status']) + self.assertNotIn('X-Service-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertTrue(p.has_user_token) + self.assertFalse(p.has_service_token) + + self.assertEqual(t.project_id, p.user.project_id) + self.assertEqual(t.project_domain_id, p.user.project_domain_id) + self.assertEqual(t.user_id, p.user.user_id) + self.assertEqual(t.user_domain_id, p.user.user_domain_id) + self.assertIn(role['name'], p.user.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_invalid_user_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_expired_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_good_v3_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Service-Token']) + + self.assertEqual('Confirmed', + req.headers['X-Service-Identity-Status']) + self.assertNotIn('X-Auth-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertFalse(p.has_user_token) + self.assertTrue(p.has_service_token) + + self.assertEqual(t.project_id, p.service.project_id) + self.assertEqual(t.project_domain_id, p.service.project_domain_id) + self.assertEqual(t.user_id, p.service.user_id) + self.assertEqual(t.user_domain_id, p.service.user_domain_id) + self.assertIn(role['name'], p.service.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_invalid_service_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_expired_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Service-Token': token_id}) |