aboutsummaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
new file mode 100644
index 00000000..b213f546
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
@@ -0,0 +1,202 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import uuid
+
+from keystoneclient import fixture
+import mock
+import six
+import testtools
+import webob
+
+from keystonemiddleware import auth_token
+from keystonemiddleware.auth_token import _request
+
+
+class FakeApp(object):
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ return webob.Response()
+
+
+class FetchingMiddleware(auth_token._BaseAuthProtocol):
+
+ def __init__(self, app, token_dict={}, **kwargs):
+ super(FetchingMiddleware, self).__init__(app, **kwargs)
+ self.token_dict = token_dict
+
+ def _fetch_token(self, token):
+ try:
+ return self.token_dict[token]
+ except KeyError:
+ raise auth_token.InvalidToken()
+
+
+class BaseAuthProtocolTests(testtools.TestCase):
+
+ @mock.patch.multiple(auth_token._BaseAuthProtocol,
+ process_request=mock.DEFAULT,
+ process_response=mock.DEFAULT)
+ def test_process_flow(self, process_request, process_response):
+ m = auth_token._BaseAuthProtocol(FakeApp())
+
+ process_request.return_value = None
+ process_response.side_effect = lambda x: x
+
+ req = webob.Request.blank('/', method='GET')
+ resp = req.get_response(m)
+
+ self.assertEqual(200, resp.status_code)
+
+ self.assertEqual(1, process_request.call_count)
+ self.assertIsInstance(process_request.call_args[0][0],
+ _request._AuthTokenRequest)
+
+ self.assertEqual(1, process_response.call_count)
+ self.assertIsInstance(process_response.call_args[0][0], webob.Response)
+
+ @classmethod
+ def call(cls, middleware, method='GET', path='/', headers=None):
+ req = webob.Request.blank(path)
+ req.method = method
+
+ for k, v in six.iteritems(headers or {}):
+ req.headers[k] = v
+
+ resp = req.get_response(middleware)
+ resp.request = req
+ return resp
+
+ def test_good_v3_user_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ role = t.add_role()
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+
+ self.assertEqual('Confirmed', req.headers['X-Identity-Status'])
+ self.assertNotIn('X-Service-Token', req.headers)
+
+ p = req.environ['keystone.token_auth']
+
+ self.assertTrue(p.has_user_token)
+ self.assertFalse(p.has_service_token)
+
+ self.assertEqual(t.project_id, p.user.project_id)
+ self.assertEqual(t.project_domain_id, p.user.project_domain_id)
+ self.assertEqual(t.user_id, p.user.user_id)
+ self.assertEqual(t.user_domain_id, p.user.user_domain_id)
+ self.assertIn(role['name'], p.user.role_names)
+
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_invalid_user_token(self):
+ token_id = uuid.uuid4().hex
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid', req.headers['X-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_expired_user_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid', req.headers['X-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict=token_dict)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_good_v3_service_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ role = t.add_role()
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+
+ self.assertEqual('Confirmed',
+ req.headers['X-Service-Identity-Status'])
+ self.assertNotIn('X-Auth-Token', req.headers)
+
+ p = req.environ['keystone.token_auth']
+
+ self.assertFalse(p.has_user_token)
+ self.assertTrue(p.has_service_token)
+
+ self.assertEqual(t.project_id, p.service.project_id)
+ self.assertEqual(t.project_domain_id, p.service.project_domain_id)
+ self.assertEqual(t.user_id, p.service.user_id)
+ self.assertEqual(t.user_domain_id, p.service.user_domain_id)
+ self.assertIn(role['name'], p.service.role_names)
+
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict)
+ self.call(m, headers={'X-Service-Token': token_id})
+
+ def test_invalid_service_token(self):
+ token_id = uuid.uuid4().hex
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid',
+ req.headers['X-Service-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb)
+ self.call(m, headers={'X-Service-Token': token_id})
+
+ def test_expired_service_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid',
+ req.headers['X-Service-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict=token_dict)
+ self.call(m, headers={'X-Service-Token': token_id})