summaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py73
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py10
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py1310
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py202
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py4
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py253
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py4
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py5
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py195
9 files changed, 1296 insertions, 760 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py
new file mode 100644
index 00000000..d76572a8
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py
@@ -0,0 +1,73 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+
+import fixtures
+from oslo_config import cfg
+from oslo_config import fixture as cfg_fixture
+from requests_mock.contrib import fixture as rm_fixture
+import six
+import webob.dec
+
+from keystonemiddleware import auth_token
+from keystonemiddleware.tests.unit import utils
+
+
+class BaseAuthTokenTestCase(utils.BaseTestCase):
+
+ def setUp(self):
+ super(BaseAuthTokenTestCase, self).setUp()
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
+ self.logger = fixtures.FakeLogger(level=logging.DEBUG)
+ self.cfg = self.useFixture(cfg_fixture.Config(conf=cfg.ConfigOpts()))
+
+ def create_middleware(self, cb, conf=None, use_global_conf=False):
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ return cb(req)
+
+ if use_global_conf:
+ opts = conf or {}
+ else:
+ opts = {
+ 'oslo_config_project': 'keystonemiddleware',
+ 'oslo_config_config': self.cfg.conf,
+ }
+ opts.update(conf or {})
+
+ return auth_token.AuthProtocol(_do_cb, opts)
+
+ def create_simple_middleware(self,
+ status='200 OK',
+ body='',
+ headers=None,
+ **kwargs):
+ def cb(req):
+ resp = webob.Response(body, status)
+ resp.headers.update(headers or {})
+ return resp
+
+ return self.create_middleware(cb, **kwargs)
+
+ @classmethod
+ def call(cls, middleware, method='GET', path='/', headers=None):
+ req = webob.Request.blank(path)
+ req.method = method
+
+ for k, v in six.iteritems(headers or {}):
+ req.headers[k] = v
+
+ resp = req.get_response(middleware)
+ resp.request = req
+ return resp
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py
index 517d597b..d6ebc9a0 100644
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py
@@ -18,12 +18,12 @@ from keystoneclient import fixture
from keystoneclient import session
from requests_mock.contrib import fixture as rm_fixture
import six
-import testtools
from keystonemiddleware.auth_token import _auth
+from keystonemiddleware.tests.unit import utils
-class DefaultAuthPluginTests(testtools.TestCase):
+class DefaultAuthPluginTests(utils.BaseTestCase):
def new_plugin(self, auth_host=None, auth_port=None, auth_protocol=None,
auth_admin_prefix=None, admin_user=None,
@@ -50,7 +50,7 @@ class DefaultAuthPluginTests(testtools.TestCase):
self.stream = six.StringIO()
self.logger = logging.getLogger(__name__)
self.session = session.Session()
- self.requests = self.useFixture(rm_fixture.Fixture())
+ self.requests_mock = self.useFixture(rm_fixture.Fixture())
def test_auth_uri_from_fragments(self):
auth_protocol = 'http'
@@ -91,8 +91,8 @@ class DefaultAuthPluginTests(testtools.TestCase):
token = fixture.V2Token()
admin_tenant_name = uuid.uuid4().hex
- self.requests.post(base_uri + '/v2.0/tokens',
- json=token)
+ self.requests_mock.post(base_uri + '/v2.0/tokens',
+ json=token)
plugin = self.new_plugin(identity_uri=base_uri,
admin_user=uuid.uuid4().hex,
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
index 97fcc557..bb572aa3 100644
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
@@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import calendar
import datetime
import json
import logging
@@ -24,17 +23,16 @@ import time
import uuid
import fixtures
-from keystoneclient import access
from keystoneclient import auth
from keystoneclient.common import cms
from keystoneclient import exceptions
from keystoneclient import fixture
from keystoneclient import session
import mock
-from oslo_config import fixture as cfg_fixture
+from oslo_config import cfg
from oslo_serialization import jsonutils
from oslo_utils import timeutils
-from requests_mock.contrib import fixture as rm_fixture
+from oslotest import createfile
import six
import testresources
import testtools
@@ -47,6 +45,7 @@ from keystonemiddleware.auth_token import _base
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.openstack.common import memorycache
+from keystonemiddleware.tests.unit.auth_token import base
from keystonemiddleware.tests.unit import client_fixtures
from keystonemiddleware.tests.unit import utils
@@ -134,6 +133,11 @@ def cleanup_revoked_file(filename):
pass
+def strtime(at=None):
+ at = at or timeutils.utcnow()
+ return at.strftime(timeutils.PERFECT_TIME_FORMAT)
+
+
class TimezoneFixture(fixtures.Fixture):
@staticmethod
def supported():
@@ -192,29 +196,30 @@ class FakeApp(object):
self.need_service_token = need_service_token
- def __call__(self, env, start_response):
+ @webob.dec.wsgify
+ def __call__(self, req):
for k, v in self.expected_env.items():
- assert env[k] == v, '%s != %s' % (env[k], v)
+ assert req.environ[k] == v, '%s != %s' % (req.environ[k], v)
resp = webob.Response()
- if (env.get('HTTP_X_IDENTITY_STATUS') == 'Invalid'
- and env['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'):
+ if (req.environ.get('HTTP_X_IDENTITY_STATUS') == 'Invalid' and
+ req.environ['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'):
# Simulate delayed auth forbidding access with arbitrary status
# code to differentiate checking this code path
resp.status = 419
resp.body = FakeApp.FORBIDDEN
- elif env.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid':
+ elif req.environ.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid':
# Simulate delayed auth forbidding access with arbitrary status
# code to differentiate checking this code path
resp.status = 420
resp.body = FakeApp.FORBIDDEN
- elif env['HTTP_X_IDENTITY_STATUS'] == 'Invalid':
+ elif req.environ['HTTP_X_IDENTITY_STATUS'] == 'Invalid':
# Simulate delayed auth forbidding access
resp.status = 403
resp.body = FakeApp.FORBIDDEN
elif (self.need_service_token is True and
- env.get('HTTP_X_SERVICE_TOKEN') is None):
+ req.environ.get('HTTP_X_SERVICE_TOKEN') is None):
# Simulate requiring composite auth
# Arbitrary value to allow checking this code path
resp.status = 418
@@ -222,7 +227,7 @@ class FakeApp(object):
else:
resp.body = FakeApp.SUCCESS
- return resp(env, start_response)
+ return resp
class v3FakeApp(FakeApp):
@@ -274,23 +279,7 @@ class v3CompositeFakeApp(CompositeBase, v3FakeApp):
v3_default_service_env_additions)
-def new_app(status, body, headers={}):
-
- class _App(object):
-
- def __init__(self, expected_env=None):
- self.expected_env = expected_env
-
- @webob.dec.wsgify
- def __call__(self, req):
- resp = webob.Response(body, status)
- resp.headers.update(headers)
- return resp
-
- return _App
-
-
-class BaseAuthTokenMiddlewareTest(testtools.TestCase):
+class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
"""Base test class for auth_token middleware.
All the tests allow for running with auth_token
@@ -309,7 +298,6 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.expected_env = expected_env or dict()
self.fake_app = fake_app or FakeApp
self.middleware = None
- self.requests = self.useFixture(rm_fixture.Fixture())
signing_dir = self._setup_signing_directory()
@@ -325,6 +313,9 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.response_status = None
self.response_headers = None
+ def call_middleware(self, **kwargs):
+ return self.call(self.middleware, **kwargs)
+
def _setup_signing_directory(self):
directory_name = self.useFixture(fixtures.TempDir()).path
@@ -366,15 +357,12 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
for key in six.iterkeys(self.service_token_expected_env):
del self.middleware._app.expected_env[key]
- def start_fake_response(self, status, headers, exc_info=None):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
-
def assertLastPath(self, path):
if path:
- self.assertEqual(BASE_URI + path, self.requests.last_request.url)
+ self.assertEqual(BASE_URI + path,
+ self.requests_mock.last_request.url)
else:
- self.assertIsNone(self.requests.last_request)
+ self.assertIsNone(self.requests_mock.last_request)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@@ -395,27 +383,25 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env)
- self.requests.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v2,
+ status_code=300)
- self.requests.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
- self.requests.get(url, text=token_response)
+ self.requests_mock.get(url, text=token_response)
self.set_middleware()
def test_valid_diablo_response(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_id
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertIn('keystone.token_info', req.environ)
+ resp = self.call_middleware(headers={'X-Auth-Token': self.token_id})
+ self.assertEqual(200, resp.status_int)
+ self.assertIn('keystone.token_info', resp.request.environ)
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
@@ -521,6 +507,21 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
token_response = self.examples.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v3(token_response))
+ def test_fixed_cache_key_length(self):
+ self.set_middleware()
+ short_string = uuid.uuid4().hex
+ long_string = 8 * uuid.uuid4().hex
+
+ token_cache = self.middleware._token_cache
+ hashed_short_string_key, context_ = token_cache._get_cache_key(
+ short_string)
+ hashed_long_string_key, context_ = token_cache._get_cache_key(
+ long_string)
+
+ # The hash keys should always match in length
+ self.assertThat(hashed_short_string_key,
+ matchers.HasLength(len(hashed_long_string_key)))
+
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_encrypt_cache_data(self):
conf = {
@@ -530,13 +531,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
}
self.set_middleware(conf=conf)
token = b'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = timeutils.strtime(some_time_later)
- data = ('this_data', expires)
+ data = 'this_data'
token_cache = self.middleware._token_cache
token_cache.initialize({})
token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
+ self.assertEqual(token_cache.get(token), data)
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_sign_cache_data(self):
@@ -547,13 +546,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
}
self.set_middleware(conf=conf)
token = b'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = timeutils.strtime(some_time_later)
- data = ('this_data', expires)
+ data = 'this_data'
token_cache = self.middleware._token_cache
token_cache.initialize({})
token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
+ self.assertEqual(token_cache.get(token), data)
@testtools.skipUnless(memcached_available(), 'memcached not available')
def test_no_memcache_protection(self):
@@ -563,13 +560,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
}
self.set_middleware(conf=conf)
token = 'my_token'
- some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4)
- expires = timeutils.strtime(some_time_later)
- data = ('this_data', expires)
+ data = 'this_data'
token_cache = self.middleware._token_cache
token_cache.initialize({})
token_cache._cache_store(token, data)
- self.assertEqual(token_cache._cache_get(token), data[0])
+ self.assertEqual(token_cache.get(token), data)
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
@@ -648,6 +643,64 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertRaises(exc.ConfigurationError,
auth_token.AuthProtocol, self.fake_app, conf)
+ def test_auth_region_name(self):
+ token = fixture.V3Token()
+
+ auth_url = 'http://keystone-auth.example.com:5000'
+ east_url = 'http://keystone-east.example.com:5000'
+ west_url = 'http://keystone-west.example.com:5000'
+
+ auth_versions = fixture.DiscoveryList(href=auth_url)
+ east_versions = fixture.DiscoveryList(href=east_url)
+ west_versions = fixture.DiscoveryList(href=west_url)
+
+ s = token.add_service('identity')
+ s.add_endpoint(interface='admin', url=east_url, region='east')
+ s.add_endpoint(interface='admin', url=west_url, region='west')
+
+ self.requests_mock.get(auth_url, json=auth_versions)
+ self.requests_mock.get(east_url, json=east_versions)
+ self.requests_mock.get(west_url, json=west_versions)
+
+ self.requests_mock.post(
+ '%s/v3/auth/tokens' % auth_url,
+ headers={'X-Subject-Token': uuid.uuid4().hex},
+ json=token)
+
+ east_mock = self.requests_mock.get(
+ '%s/v3/auth/tokens' % east_url,
+ headers={'X-Subject-Token': uuid.uuid4().hex},
+ json=fixture.V3Token())
+
+ west_mock = self.requests_mock.get(
+ '%s/v3/auth/tokens' % west_url,
+ headers={'X-Subject-Token': uuid.uuid4().hex},
+ json=fixture.V3Token())
+
+ conf = {'auth_uri': auth_url,
+ 'auth_url': auth_url + '/v3',
+ 'auth_plugin': 'v3password',
+ 'username': 'user',
+ 'password': 'pass'}
+
+ self.assertEqual(0, east_mock.call_count)
+ self.assertEqual(0, west_mock.call_count)
+
+ east_app = self.create_simple_middleware(conf=dict(region_name='east',
+ **conf))
+ self.call(east_app, headers={'X-Auth-Token': uuid.uuid4().hex})
+
+ self.assertEqual(1, east_mock.call_count)
+ self.assertEqual(0, west_mock.call_count)
+
+ west_app = self.create_simple_middleware(conf=dict(region_name='west',
+ **conf))
+
+ self.call(west_app, headers={'X-Auth-Token': uuid.uuid4().hex})
+
+ self.assertEqual(1, east_mock.call_count)
+ self.assertEqual(1, west_mock.call_count)
+
class CommonAuthTokenMiddlewareTest(object):
"""These tests are run once using v2 tokens and again using v3 tokens."""
@@ -656,15 +709,14 @@ class CommonAuthTokenMiddlewareTest(object):
conf = {
'revocation_cache_time': '1'
}
- self.set_middleware(conf=conf)
+ self.create_simple_middleware(conf=conf)
self.assertLastPath(None)
def test_auth_with_no_token_does_not_call_http(self):
- self.set_middleware()
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
+ middleware = self.create_simple_middleware()
+ resp = self.call(middleware)
self.assertLastPath(None)
- self.assertEqual(401, self.response_status)
+ self.assertEqual(401, resp.status_int)
def test_init_by_ipv6Addr_auth_host(self):
del self.conf['identity_uri']
@@ -675,23 +727,20 @@ class CommonAuthTokenMiddlewareTest(object):
'auth_uri': None,
'auth_version': 'v3.0',
}
- self.set_middleware(conf=conf)
- expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234'
- self.assertEqual(expected_auth_uri,
- self.middleware._auth_uri)
+ middleware = self.create_simple_middleware(conf=conf)
+ self.assertEqual('http://[2001:2013:1:f101::1]:1234',
+ middleware._auth_uri)
def assert_valid_request_200(self, token, with_catalog=True):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
if with_catalog:
- self.assertTrue(req.headers.get('X-Service-Catalog'))
+ self.assertTrue(resp.request.headers.get('X-Service-Catalog'))
else:
- self.assertNotIn('X-Service-Catalog', req.headers)
- self.assertEqual(body, [FakeApp.SUCCESS])
- self.assertIn('keystone.token_info', req.environ)
- return req
+ self.assertNotIn('X-Service-Catalog', resp.request.headers)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
+ self.assertIn('keystone.token_info', resp.request.environ)
+ return resp.request
def test_valid_uuid_request(self):
for _ in range(2): # Do it twice because first result was cached.
@@ -713,18 +762,16 @@ class CommonAuthTokenMiddlewareTest(object):
# When the token is cached and revoked, 401 is returned.
self.middleware._check_revocations_for_cached = True
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
-
# Token should be cached as ok after this.
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
# Put it in revocation list.
self.middleware._revocations._list = self.get_revocation_list_json(
token_ids=[revoked_form or token])
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
+
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
def test_cached_revoked_uuid(self):
# When the UUID token is cached and revoked, 401 is returned.
@@ -746,20 +793,21 @@ class CommonAuthTokenMiddlewareTest(object):
def test_revoked_token_receives_401(self):
self.middleware._revocations._list = (
self.get_revocation_list_json())
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
+
+ token = self.token_dict['revoked_token']
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+
+ self.assertEqual(401, resp.status_int)
def test_revoked_token_receives_401_sha256(self):
self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json(mode='sha256'))
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
+
+ token = self.token_dict['revoked_token']
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
def test_cached_revoked_pki(self):
# When the PKI token is cached and revoked, 401 is returned.
@@ -781,10 +829,10 @@ class CommonAuthTokenMiddlewareTest(object):
self.set_middleware()
self.middleware._revocations._list = (
self.get_revocation_list_json())
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
+
+ token = self.token_dict['revoked_token']
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
def _test_revoked_hashed_token(self, token_name):
# If hash_algorithms is set as ['sha256', 'md5'],
@@ -806,17 +854,14 @@ class CommonAuthTokenMiddlewareTest(object):
# First, request is using the hashed token, is valid so goes in
# cache using the given hash.
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token_hashed
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
+ resp = self.call_middleware(headers={'X-Auth-Token': token_hashed})
+ self.assertEqual(200, resp.status_int)
# This time use the PKI(Z) token
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
# Should find the token in the cache and revocation list.
- self.assertEqual(401, self.response_status)
+ self.assertEqual(401, resp.status_int)
def test_revoked_hashed_pki_token(self):
self._test_revoked_hashed_token('signed_token_scoped')
@@ -973,8 +1018,7 @@ class CommonAuthTokenMiddlewareTest(object):
in_memory_list)
def test_invalid_revocation_list_raises_error(self):
- self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={})
-
+ self.requests_mock.get(self.revocation_url, json={})
self.assertRaises(exc.RevocationListError,
self.middleware._revocations._fetch)
@@ -988,127 +1032,66 @@ class CommonAuthTokenMiddlewareTest(object):
# remember because we are testing the middleware we stub the connection
# to the keystone server, but this is not what gets returned
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests.get(invalid_uri, status_code=404)
+ self.requests_mock.get(invalid_uri, status_code=404)
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = 'invalid-token'
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
+ resp = self.call_middleware(headers={'X-Auth-Token': 'invalid-token'})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
+ resp.headers['WWW-Authenticate'])
def test_request_invalid_signed_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
+ token = self.examples.INVALID_SIGNED_TOKEN
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- self.response_headers['WWW-Authenticate'])
+ resp.headers['WWW-Authenticate'])
def test_request_invalid_signed_pkiz_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_PKIZ_TOKEN
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
+ token = self.examples.INVALID_SIGNED_PKIZ_TOKEN
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- self.response_headers['WWW-Authenticate'])
+ resp.headers['WWW-Authenticate'])
def test_request_no_token(self):
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
-
- def test_request_no_token_log_message(self):
- class FakeLog(object):
- def __init__(self):
- self.msg = None
- self.debugmsg = None
-
- def warn(self, msg=None, *args, **kwargs):
- self.msg = msg
-
- def debug(self, msg=None, *args, **kwargs):
- self.debugmsg = msg
-
- self.middleware._LOG = FakeLog()
- self.middleware._delay_auth_decision = False
- self.assertRaises(exc.InvalidToken,
- self.middleware._get_user_token_from_header, {})
- self.assertIsNotNone(self.middleware._LOG.msg)
- self.assertIsNotNone(self.middleware._LOG.debugmsg)
+ resp = self.call_middleware()
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
+ resp.headers['WWW-Authenticate'])
def test_request_no_token_http(self):
- req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
- self.set_middleware()
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
- self.assertEqual(body, [''])
+ resp = self.call_middleware(method='HEAD')
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
+ resp.headers['WWW-Authenticate'])
def test_request_blank_token(self):
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ''
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
+ resp = self.call_middleware(headers={'X-Auth-Token': ''})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
+ resp.headers['WWW-Authenticate'])
def _get_cached_token(self, token, mode='md5'):
token_id = cms.cms_hash_token(token, mode=mode)
- return self.middleware._token_cache._cache_get(token_id)
+ return self.middleware._token_cache.get(token_id)
def test_memcache(self):
- req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped']
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
+ self.call_middleware(headers={'X-Auth-Token': token})
self.assertIsNotNone(self._get_cached_token(token))
def test_expired(self):
- req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped_expired']
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
def test_memcache_set_invalid_uuid(self):
invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests.get(invalid_uri, status_code=404)
+ self.requests_mock.get(invalid_uri, status_code=404)
- req = webob.Request.blank('/')
token = 'invalid-token'
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(exc.InvalidToken,
- self._get_cached_token, token)
-
- def _test_memcache_set_invalid_signed(self, hash_algorithms=None,
- exp_mode='md5'):
- req = webob.Request.blank('/')
- token = self.token_dict['signed_token_scoped_expired']
- req.headers['X-Auth-Token'] = token
- if hash_algorithms:
- self.conf['hash_algorithms'] = ','.join(hash_algorithms)
- self.set_middleware()
- self.middleware(req.environ, self.start_fake_response)
- self.assertRaises(exc.InvalidToken,
- self._get_cached_token, token, mode=exp_mode)
-
- def test_memcache_set_invalid_signed(self):
- self._test_memcache_set_invalid_signed()
-
- def test_memcache_set_invalid_signed_sha256_md5(self):
- hash_algorithms = ['sha256', 'md5']
- self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms,
- exp_mode='sha256')
-
- def test_memcache_set_invalid_signed_sha256(self):
- hash_algorithms = ['sha256']
- self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms,
- exp_mode='sha256')
+ self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertRaises(exc.InvalidToken, self._get_cached_token, token)
def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
token_cache_time = 10
@@ -1117,14 +1100,17 @@ class CommonAuthTokenMiddlewareTest(object):
}
conf.update(extra_conf)
self.set_middleware(conf=conf)
- req = webob.Request.blank('/')
+
token = self.token_dict['signed_token_scoped']
+ self.call_middleware(headers={'X-Auth-Token': token})
+
+ req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
req.environ.update(extra_environ)
now = datetime.datetime.utcnow()
self.useFixture(TimeFixture(now))
- self.middleware(req.environ, self.start_fake_response)
+ req.get_response(self.middleware)
self.assertIsNotNone(self._get_cached_token(token))
timeutils.advance_time_seconds(token_cache_time)
@@ -1141,24 +1127,19 @@ class CommonAuthTokenMiddlewareTest(object):
We use UUID tokens since they are the easiest one to reach
get_http_connection.
"""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ERROR_TOKEN
self.middleware._http_request_max_retries = 0
- self.middleware(req.environ, self.start_fake_response)
+ self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN})
self.assertIsNone(self._get_cached_token(ERROR_TOKEN))
self.assert_valid_last_url(ERROR_TOKEN)
def test_http_request_max_retries(self):
times_retry = 10
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = ERROR_TOKEN
-
conf = {'http_request_max_retries': '%s' % times_retry}
self.set_middleware(conf=conf)
with mock.patch('time.sleep') as mock_obj:
- self.middleware(req.environ, self.start_fake_response)
+ self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN})
self.assertEqual(mock_obj.call_count, times_retry)
@@ -1189,18 +1170,17 @@ class CommonAuthTokenMiddlewareTest(object):
req.environ['AUTH_TYPE'] = 'Negotiate'
- body = self.middleware(req.environ, self.start_fake_response)
+ resp = req.get_response(self.middleware)
if success:
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, [FakeApp.SUCCESS])
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
self.assertIn('keystone.token_info', req.environ)
self.assert_valid_last_url(token)
else:
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'"
- )
+ self.assertEqual(401, resp.status_int)
+ msg = "Keystone uri='https://keystone.example.com:1234'"
+ self.assertEqual(msg, resp.headers['WWW-Authenticate'])
def test_uuid_bind_token_disabled_with_kerb_user(self):
for use_kerberos in [True, False]:
@@ -1348,17 +1328,13 @@ class CommonAuthTokenMiddlewareTest(object):
token = self.token_dict['signed_token_scoped']
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
self.assertThat(1, matchers.Equals(cache.set.call_count))
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
# Assert that the token wasn't cached again.
self.assertThat(1, matchers.Equals(cache.set.call_count))
@@ -1367,17 +1343,16 @@ class CommonAuthTokenMiddlewareTest(object):
for service_url in (self.examples.UNVERSIONED_SERVICE_URL,
self.examples.SERVICE_URL):
- self.requests.get(service_url,
- json=VERSION_LIST_v3,
- status_code=300)
+ self.requests_mock.get(service_url,
+ json=VERSION_LIST_v3,
+ status_code=300)
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
- self.assertEqual([FakeApp.SUCCESS], body)
+ token = self.token_dict['uuid_token_default']
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
- token_auth = req.environ['keystone.token_auth']
+ token_auth = resp.request.environ['keystone.token_auth']
endpoint_filter = {'service_type': self.examples.SERVICE_TYPE,
'version': 3}
@@ -1388,6 +1363,29 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertFalse(token_auth.has_service_token)
self.assertIsNone(token_auth.service)
+ def test_doesnt_auto_set_content_type(self):
+ # webob will set content_type = 'text/html' by default if nothing is
+ # provided. We don't want our middleware messing with the content type
+ # of the underlying applications.
+
+ text = uuid.uuid4().hex
+
+ def _middleware(environ, start_response):
+ start_response(200, [])
+ return text
+
+ def _start_response(status_code, headerlist, exc_info=None):
+ self.assertIn('200', status_code) # will be '200 OK'
+ self.assertEqual([], headerlist)
+
+ m = auth_token.AuthProtocol(_middleware, self.conf)
+
+ env = {'REQUEST_METHOD': 'GET',
+ 'HTTP_X_AUTH_TOKEN': self.token_dict['uuid_token_default']}
+
+ r = m(env, _start_response)
+ self.assertEqual(text, r)
+
class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
@@ -1414,10 +1412,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': self.auth_version,
}
- self.requests.register_uri('GET',
- BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v3,
+ status_code=300)
self.set_middleware(conf=conf)
@@ -1427,10 +1424,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_request_no_token_dummy(self):
cms._ensure_subprocess()
- self.requests.get('%s%s' % (BASE_URI, self.ca_path),
- status_code=404)
- self.requests.get('%s%s' % (BASE_URI, self.signing_path),
- status_code=404)
+ self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path),
+ status_code=404)
+ self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path),
+ status_code=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware._verify_signed_token,
self.examples.SIGNED_TOKEN_SCOPED,
@@ -1439,7 +1436,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_fetch_signing_cert(self):
data = 'FAKE CERT'
url = "%s%s" % (BASE_URI, self.signing_path)
- self.requests.get(url, text=data)
+ self.requests_mock.get(url, text=data)
self.middleware._fetch_signing_cert()
signing_cert_path = self.middleware._signing_directory.calc_path(
@@ -1447,12 +1444,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
with open(signing_cert_path, 'r') as f:
self.assertEqual(f.read(), data)
- self.assertEqual(url, self.requests.last_request.url)
+ self.assertEqual(url, self.requests_mock.last_request.url)
def test_fetch_signing_ca(self):
data = 'FAKE CA'
url = "%s%s" % (BASE_URI, self.ca_path)
- self.requests.get(url, text=data)
+ self.requests_mock.get(url, text=data)
self.middleware._fetch_ca_cert()
ca_file_path = self.middleware._signing_directory.calc_path(
@@ -1460,7 +1457,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
with open(ca_file_path, 'r') as f:
self.assertEqual(f.read(), data)
- self.assertEqual(url, self.requests.last_request.url)
+ self.assertEqual(url, self.requests_mock.last_request.url)
def test_prefix_trailing_slash(self):
del self.conf['identity_uri']
@@ -1473,19 +1470,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
ca_url = "%s%s" % (base_url, self.ca_path)
signing_url = "%s%s" % (base_url, self.signing_path)
- self.requests.get(base_url,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests.get(ca_url, text='FAKECA')
- self.requests.get(signing_url, text='FAKECERT')
+ self.requests_mock.get(base_url,
+ json=VERSION_LIST_v3,
+ status_code=300)
+ self.requests_mock.get(ca_url, text='FAKECA')
+ self.requests_mock.get(signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests.last_request.url)
+ self.assertEqual(ca_url, self.requests_mock.last_request.url)
self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests.last_request.url)
+ self.assertEqual(signing_url, self.requests_mock.last_request.url)
def test_without_prefix(self):
del self.conf['identity_uri']
@@ -1497,19 +1494,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
ca_url = "%s%s" % (BASE_HOST, self.ca_path)
signing_url = "%s%s" % (BASE_HOST, self.signing_path)
- self.requests.get(BASE_HOST,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests.get(ca_url, text='FAKECA')
- self.requests.get(signing_url, text='FAKECERT')
+ self.requests_mock.get(BASE_HOST,
+ json=VERSION_LIST_v3,
+ status_code=300)
+ self.requests_mock.get(ca_url, text='FAKECA')
+ self.requests_mock.get(signing_url, text='FAKECERT')
self.set_middleware(conf=self.conf)
self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests.last_request.url)
+ self.assertEqual(ca_url, self.requests_mock.last_request.url)
self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests.last_request.url)
+ self.assertEqual(signing_url, self.requests_mock.last_request.url)
class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
@@ -1523,7 +1520,7 @@ class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
def network_error_response(request, context):
- raise exceptions.ConnectionError("Network connection error.")
+ raise exceptions.ConnectionRefused("Network connection refused.")
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@@ -1571,15 +1568,16 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_TOKEN_HASH_SHA256,
}
- self.requests.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v2,
+ status_code=300)
- self.requests.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
- self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
+ self.revocation_url = '%s/v2.0/tokens/revoked' % BASE_URI
+ self.requests_mock.get(self.revocation_url,
+ text=self.examples.SIGNED_REVOCATION_LIST)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
@@ -1590,10 +1588,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests.get(url, text=text)
+ self.requests_mock.get(url, text=text)
url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
- self.requests.get(url, text=network_error_response)
+ self.requests_mock.get(url, text=network_error_response)
self.set_middleware()
@@ -1603,12 +1601,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
The implied scope is the user's tenant ID.
"""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(body, [FakeApp.SUCCESS])
- self.assertIn('keystone.token_info', req.environ)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
+ self.assertIn('keystone.token_info', resp.request.environ)
def assert_valid_last_url(self, token_id):
self.assertLastPath("/v2.0/tokens/%s" % token_id)
@@ -1623,12 +1619,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected."""
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 401)
- self.assertEqual(self.response_headers['WWW-Authenticate'],
- "Keystone uri='https://keystone.example.com:1234'")
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
+ resp.headers['WWW-Authenticate'])
def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401(
@@ -1639,28 +1633,26 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.SIGNED_TOKEN_UNSCOPED)
def test_request_prevent_service_catalog_injection(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
- req.headers['X-Auth-Token'] = (
- self.examples.UUID_TOKEN_NO_SERVICE_CATALOG)
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertFalse(req.headers.get('X-Service-Catalog'))
- self.assertEqual(body, [FakeApp.SUCCESS])
+ token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
+ resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
+ 'X-Auth-Token': token})
+
+ self.assertEqual(200, resp.status_int)
+ self.assertFalse(resp.request.headers.get('X-Service-Catalog'))
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
def test_user_plugin_token_properties(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
token = self.examples.UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual([FakeApp.SUCCESS], body)
+ resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
+ 'X-Auth-Token': token,
+ 'X-Service-Token': token})
+
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
- token_auth = req.environ['keystone.token_auth']
+ token_auth = resp.request.environ['keystone.token_auth']
self.assertTrue(token_auth.has_user_token)
self.assertTrue(token_auth.has_service_token)
@@ -1697,27 +1689,25 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'auth_version': 'v2.0'
}
- self.requests.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v3,
+ status_code=300)
- self.requests.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
token = self.examples.UUID_TOKEN_DEFAULT
url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests.get(url, text=text)
+ self.requests_mock.get(url, text=text)
self.set_middleware(conf=conf)
# This tests will only work is auth_token has chosen to use the
# lower, v2, api version
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual(url, self.requests.last_request.url)
+ resp = self.call_middleware(headers={'X-Auth-Token': token})
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(url, self.requests_mock.last_request.url)
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
@@ -1778,21 +1768,22 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
}
- self.requests.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v3,
+ status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
- self.requests.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
- # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
- self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
+ self.revocation_url = '%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI
+ self.requests_mock.get(self.revocation_url,
+ text=self.examples.SIGNED_REVOCATION_LIST)
- self.requests.get('%s/v3/auth/tokens' % BASE_URI,
- text=self.token_response)
+ self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
+ text=self.token_response,
+ headers={'X-Subject-Token': uuid.uuid4().hex})
self.set_middleware()
@@ -1802,7 +1793,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
if token_id == ERROR_TOKEN:
- raise exceptions.ConnectionError("Network connection error.")
+ raise exceptions.ConnectionRefused("Network connection refused.")
try:
response = self.examples.JSON_TOKEN_RESPONSES[token_id]
@@ -1866,43 +1857,37 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertIn('adminURL', endpoint)
def test_fallback_to_online_validation_with_signing_error(self):
- self.requests.register_uri(
- 'GET',
- '%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
- status_code=404)
+ self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
+ status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
def test_fallback_to_online_validation_with_ca_error(self):
- self.requests.register_uri('GET',
- '%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
- status_code=404)
+ self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
+ status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
def test_fallback_to_online_validation_with_revocation_list_error(self):
- self.requests.register_uri('GET',
- '%s/v2.0/tokens/revoked' % BASE_URI,
- status_code=404)
+ self.requests_mock.get(self.revocation_url, status_code=404)
self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
self.assert_valid_request_200(
self.token_dict['signed_token_scoped_pkiz'])
def test_user_plugin_token_properties(self):
- req = webob.Request.blank('/')
- req.headers['X-Service-Catalog'] = '[]'
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_data = self.examples.TOKEN_RESPONSES[token]
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
- self.assertEqual([FakeApp.SUCCESS], body)
+ resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
+ 'X-Auth-Token': token,
+ 'X-Service-Token': token})
+
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
- token_auth = req.environ['keystone.token_auth']
+ token_auth = resp.request.environ['keystone.token_auth']
self.assertTrue(token_auth.has_user_token)
self.assertTrue(token_auth.has_service_token)
@@ -1919,280 +1904,18 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
self.assertIsNone(t.trust_id)
-
-class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
- def setUp(self):
- super(TokenExpirationTest, self).setUp()
- self.now = timeutils.utcnow()
- self.delta = datetime.timedelta(hours=1)
- self.one_hour_ago = timeutils.isotime(self.now - self.delta,
- subsecond=True)
- self.one_hour_earlier = timeutils.isotime(self.now + self.delta,
- subsecond=True)
-
- def create_v2_token_fixture(self, expires=None):
- v2_fixture = {
- 'access': {
- 'token': {
- 'id': 'blah',
- 'expires': expires or self.one_hour_earlier,
- 'tenant': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- },
- },
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'roles': [
- {'name': 'role1'},
- {'name': 'role2'},
- ],
- },
- 'serviceCatalog': {}
- },
- }
-
- return v2_fixture
-
- def create_v3_token_fixture(self, expires=None):
-
- v3_fixture = {
- 'token': {
- 'expires_at': expires or self.one_hour_earlier,
- 'user': {
- 'id': 'user_id1',
- 'name': 'user_name1',
- 'domain': {
- 'id': 'domain_id1',
- 'name': 'domain_name1'
- }
- },
- 'project': {
- 'id': 'tenant_id1',
- 'name': 'tenant_name1',
- 'domain': {
- 'id': 'domain_id1',
- 'name': 'domain_name1'
- }
- },
- 'roles': [
- {'name': 'role1', 'id': 'Role1'},
- {'name': 'role2', 'id': 'Role2'},
- ],
- 'catalog': {}
- }
- }
-
- return v3_fixture
-
- def test_no_data(self):
- data = {}
- self.assertRaises(exc.InvalidToken,
- auth_token._get_token_expiration,
- data)
-
- def test_bad_data(self):
- data = {'my_happy_token_dict': 'woo'}
- self.assertRaises(exc.InvalidToken,
- auth_token._get_token_expiration,
- data)
-
- def test_v2_token_get_token_expiration_return_isotime(self):
- data = self.create_v2_token_fixture()
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(self.one_hour_earlier, actual_expires)
-
- def test_v2_token_not_expired(self):
- data = self.create_v2_token_fixture()
- expected_expires = data['access']['token']['expires']
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v2_token_expired(self):
- data = self.create_v2_token_fixture(expires=self.one_hour_ago)
- expires = auth_token._get_token_expiration(data)
- self.assertRaises(exc.InvalidToken,
- auth_token._confirm_token_not_expired,
- expires)
-
- def test_v2_token_with_timezone_offset_not_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v2_token_fixture(
- expires='2000-01-01T05:05:10.000123Z')
- expected_expires = '2000-01-01T05:05:10.000123Z'
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v2_token_with_timezone_offset_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v2_token_fixture(
- expires='1999-12-31T19:05:10Z')
- expires = auth_token._get_token_expiration(data)
- self.assertRaises(exc.InvalidToken,
- auth_token._confirm_token_not_expired,
- expires)
-
- def test_v3_token_get_token_expiration_return_isotime(self):
- data = self.create_v3_token_fixture()
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(self.one_hour_earlier, actual_expires)
-
- def test_v3_token_not_expired(self):
- data = self.create_v3_token_fixture()
- expected_expires = data['token']['expires_at']
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v3_token_expired(self):
- data = self.create_v3_token_fixture(expires=self.one_hour_ago)
- expires = auth_token._get_token_expiration(data)
- self.assertRaises(exc.InvalidToken,
- auth_token._confirm_token_not_expired,
- expires)
-
- def test_v3_token_with_timezone_offset_not_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v3_token_fixture(
- expires='2000-01-01T05:05:10.000123Z')
- expected_expires = '2000-01-01T05:05:10.000123Z'
-
- actual_expires = auth_token._get_token_expiration(data)
- self.assertEqual(actual_expires, expected_expires)
-
- def test_v3_token_with_timezone_offset_expired(self):
- self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z'))
- data = self.create_v3_token_fixture(
- expires='1999-12-31T19:05:10Z')
- expires = auth_token._get_token_expiration(data)
- self.assertRaises(exc.InvalidToken,
- auth_token._confirm_token_not_expired,
- expires)
-
- def test_cached_token_not_expired(self):
+ def test_expire_stored_in_cache(self):
+ # tests the upgrade path from storing a tuple vs just the data in the
+ # cache. Can be removed in the future.
token = 'mytoken'
data = 'this_data'
self.set_middleware()
self.middleware._token_cache.initialize({})
- some_time_later = timeutils.strtime(at=(self.now + self.delta))
- expires = some_time_later
- self.middleware._token_cache.store(token, data, expires)
- self.assertEqual(self.middleware._token_cache._cache_get(token), data)
-
- def test_cached_token_not_expired_with_old_style_nix_timestamp(self):
- """Ensure we cannot retrieve a token from the cache.
-
- Getting a token from the cache should return None when the token data
- in the cache stores the expires time as a \*nix style timestamp.
-
- """
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- some_time_later = self.now + self.delta
- # Store a unix timestamp in the cache.
- expires = calendar.timegm(some_time_later.timetuple())
- token_cache.store(token, data, expires)
- self.assertIsNone(token_cache._cache_get(token))
-
- def test_cached_token_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- some_time_earlier = timeutils.strtime(at=(self.now - self.delta))
- expires = some_time_earlier
- self.middleware._token_cache.store(token, data, expires)
- self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(exc.InvalidToken))
-
- def test_cached_token_with_timezone_offset_not_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- timezone_offset = datetime.timedelta(hours=2)
- some_time_later = self.now - timezone_offset + self.delta
- expires = timeutils.strtime(some_time_later) + '-02:00'
- self.middleware._token_cache.store(token, data, expires)
- self.assertEqual(self.middleware._token_cache._cache_get(token), data)
-
- def test_cached_token_with_timezone_offset_expired(self):
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- timezone_offset = datetime.timedelta(hours=2)
- some_time_earlier = self.now - timezone_offset - self.delta
- expires = timeutils.strtime(some_time_earlier) + '-02:00'
- self.middleware._token_cache.store(token, data, expires)
- self.assertThat(lambda: self.middleware._token_cache._cache_get(token),
- matchers.raises(exc.InvalidToken))
-
-
-class CatalogConversionTests(BaseAuthTokenMiddlewareTest):
-
- PUBLIC_URL = 'http://server:5000/v2.0'
- ADMIN_URL = 'http://admin:35357/v2.0'
- INTERNAL_URL = 'http://internal:5000/v2.0'
-
- REGION_ONE = 'RegionOne'
- REGION_TWO = 'RegionTwo'
- REGION_THREE = 'RegionThree'
-
- def test_basic_convert(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
- s.add_standard_endpoints(public=self.PUBLIC_URL,
- admin=self.ADMIN_URL,
- internal=self.INTERNAL_URL,
- region=self.REGION_ONE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = auth_token._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
- self.assertEqual(1, len(service['endpoints']))
- endpoints = service['endpoints'][0]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(4, len(endpoints))
- self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
- self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
- self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
- self.assertEqual(self.REGION_ONE, endpoints['region'])
-
- def test_multi_region(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
-
- s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
- s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
- s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = auth_token._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
-
- # the 3 regions will come through as 3 separate endpoints
- expected = [{'internalURL': self.INTERNAL_URL,
- 'region': self.REGION_ONE},
- {'publicURL': self.PUBLIC_URL,
- 'region': self.REGION_TWO},
- {'adminURL': self.ADMIN_URL,
- 'region': self.REGION_THREE}]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(3, len(service['endpoints']))
- for e in expected:
- self.assertIn(e, expected)
+ now = datetime.datetime.utcnow()
+ delta = datetime.timedelta(hours=1)
+ expires = strtime(at=(now + delta))
+ self.middleware._token_cache.store(token, (data, expires))
+ self.assertEqual(self.middleware._token_cache.get(token), data)
class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
@@ -2204,51 +1927,49 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
'auth_version': 'v3.0',
'auth_uri': auth_uri}
- self.fake_app = new_app('401 Unauthorized', body)
- self.set_middleware(conf=conf)
+ middleware = self.create_simple_middleware(status='401 Unauthorized',
+ body=body,
+ conf=conf)
+ resp = self.call(middleware)
+ self.assertEqual(six.b(body), resp.body)
- req = webob.Request.blank('/')
- resp = self.middleware(req.environ, self.start_fake_response)
-
- self.assertEqual([six.b(body)], resp)
-
- self.assertEqual(401, self.response_status)
+ self.assertEqual(401, resp.status_int)
self.assertEqual("Keystone uri='%s'" % auth_uri,
- self.response_headers['WWW-Authenticate'])
+ resp.headers['WWW-Authenticate'])
def test_delayed_auth_values(self):
- fake_app = new_app('401 Unauthorized', uuid.uuid4().hex)
- middleware = auth_token.AuthProtocol(fake_app,
- {'auth_uri': 'http://local.test'})
+ conf = {'auth_uri': 'http://local.test'}
+ status = '401 Unauthorized'
+
+ middleware = self.create_simple_middleware(status=status, conf=conf)
self.assertFalse(middleware._delay_auth_decision)
for v in ('True', '1', 'on', 'yes'):
conf = {'delay_auth_decision': v,
'auth_uri': 'http://local.test'}
- middleware = auth_token.AuthProtocol(fake_app, conf)
+ middleware = self.create_simple_middleware(status=status,
+ conf=conf)
self.assertTrue(middleware._delay_auth_decision)
for v in ('False', '0', 'no'):
conf = {'delay_auth_decision': v,
'auth_uri': 'http://local.test'}
- middleware = auth_token.AuthProtocol(fake_app, conf)
+ middleware = self.create_simple_middleware(status=status,
+ conf=conf)
self.assertFalse(middleware._delay_auth_decision)
def test_auth_plugin_with_no_tokens(self):
body = uuid.uuid4().hex
auth_uri = 'http://local.test'
conf = {'delay_auth_decision': True, 'auth_uri': auth_uri}
- self.fake_app = new_app('200 OK', body)
- self.set_middleware(conf=conf)
-
- req = webob.Request.blank('/')
- resp = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual([six.b(body)], resp)
+ middleware = self.create_simple_middleware(body=body, conf=conf)
+ resp = self.call(middleware)
+ self.assertEqual(six.b(body), resp.body)
- token_auth = req.environ['keystone.token_auth']
+ token_auth = resp.request.environ['keystone.token_auth']
self.assertFalse(token_auth.has_user_token)
self.assertIsNone(token_auth.user)
@@ -2263,42 +1984,44 @@ class CommonCompositeAuthTests(object):
"""
def test_composite_auth_ok(self):
- req = webob.Request.blank('/')
token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
fake_logger = fixtures.FakeLogger(level=logging.DEBUG)
self.middleware.logger = self.useFixture(fake_logger)
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
- self.assertEqual([FakeApp.SUCCESS], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
expected_env.update(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
- self.assertIn('Received request from user: '
- 'user_id %(HTTP_X_USER_ID)s, '
+
+ # role list may get reordered, check for string pieces individually
+ self.assertIn('Received request from user: ', fake_logger.output)
+ self.assertIn('user_id %(HTTP_X_USER_ID)s, '
'project_id %(HTTP_X_TENANT_ID)s, '
- 'roles %(HTTP_X_ROLES)s '
- 'service: user_id %(HTTP_X_SERVICE_USER_ID)s, '
+ 'roles ' % expected_env, fake_logger.output)
+ self.assertIn('service: user_id %(HTTP_X_SERVICE_USER_ID)s, '
'project_id %(HTTP_X_SERVICE_PROJECT_ID)s, '
- 'roles %(HTTP_X_SERVICE_ROLES)s' % expected_env,
- fake_logger.output)
+ 'roles ' % expected_env, fake_logger.output)
+
+ roles = ','.join([expected_env['HTTP_X_SERVICE_ROLES'],
+ expected_env['HTTP_X_ROLES']])
+
+ for r in roles.split(','):
+ self.assertIn(r, fake_logger.output)
def test_composite_auth_invalid_service_token(self):
- req = webob.Request.blank('/')
token = self.token_dict['uuid_token_default']
service_token = 'invalid-service-token'
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
- self.assertEqual([b'Authentication required'], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual(b'Authentication required', resp.body)
def test_composite_auth_no_service_token(self):
self.purge_service_token_expected_env()
req = webob.Request.blank('/')
- token = self.token_dict['uuid_token_default']
- req.headers['X-Auth-Token'] = token
+ req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
# Ensure injection of service headers is not possible
for key, value in six.iteritems(self.service_token_expected_env):
@@ -2306,42 +2029,36 @@ class CommonCompositeAuthTests(object):
req.headers[header_key] = value
# Check arbitrary headers not removed
req.headers['X-Foo'] = 'Bar'
- body = self.middleware(req.environ, self.start_fake_response)
+ resp = req.get_response(self.middleware)
for key in six.iterkeys(self.service_token_expected_env):
header_key = key[len('HTTP_'):].replace('_', '-')
self.assertFalse(req.headers.get(header_key))
self.assertEqual('Bar', req.headers.get('X-Foo'))
- self.assertEqual(418, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ self.assertEqual(418, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
def test_composite_auth_invalid_user_token(self):
- req = webob.Request.blank('/')
token = 'invalid-token'
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
- self.assertEqual([b'Authentication required'], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual(b'Authentication required', resp.body)
def test_composite_auth_no_user_token(self):
- req = webob.Request.blank('/')
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(401, self.response_status)
- self.assertEqual([b'Authentication required'], body)
+ resp = self.call_middleware(headers={'X-Service-Token': service_token})
+ self.assertEqual(401, resp.status_int)
+ self.assertEqual(b'Authentication required', resp.body)
def test_composite_auth_delay_ok(self):
self.middleware._delay_auth_decision = True
- req = webob.Request.blank('/')
token = self.token_dict['uuid_token_default']
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(200, self.response_status)
- self.assertEqual([FakeApp.SUCCESS], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(200, resp.status_int)
+ self.assertEqual(FakeApp.SUCCESS, resp.body)
def test_composite_auth_delay_invalid_service_token(self):
self.middleware._delay_auth_decision = True
@@ -2351,14 +2068,12 @@ class CommonCompositeAuthTests(object):
}
self.update_expected_env(expected_env)
- req = webob.Request.blank('/')
token = self.token_dict['uuid_token_default']
service_token = 'invalid-service-token'
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(420, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(420, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
def test_composite_auth_delay_invalid_service_and_user_tokens(self):
self.middleware._delay_auth_decision = True
@@ -2370,22 +2085,19 @@ class CommonCompositeAuthTests(object):
}
self.update_expected_env(expected_env)
- req = webob.Request.blank('/')
token = 'invalid-user-token'
service_token = 'invalid-service-token'
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(419, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(419, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
def test_composite_auth_delay_no_service_token(self):
self.middleware._delay_auth_decision = True
self.purge_service_token_expected_env()
req = webob.Request.blank('/')
- token = self.token_dict['uuid_token_default']
- req.headers['X-Auth-Token'] = token
+ req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
# Ensure injection of service headers is not possible
for key, value in six.iteritems(self.service_token_expected_env):
@@ -2393,13 +2105,13 @@ class CommonCompositeAuthTests(object):
req.headers[header_key] = value
# Check arbitrary headers not removed
req.headers['X-Foo'] = 'Bar'
- body = self.middleware(req.environ, self.start_fake_response)
+ resp = req.get_response(self.middleware)
for key in six.iterkeys(self.service_token_expected_env):
header_key = key[len('HTTP_'):].replace('_', '-')
self.assertFalse(req.headers.get(header_key))
self.assertEqual('Bar', req.headers.get('X-Foo'))
- self.assertEqual(418, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ self.assertEqual(418, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
def test_composite_auth_delay_invalid_user_token(self):
self.middleware._delay_auth_decision = True
@@ -2409,14 +2121,12 @@ class CommonCompositeAuthTests(object):
}
self.update_expected_env(expected_env)
- req = webob.Request.blank('/')
token = 'invalid-token'
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Auth-Token'] = token
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(403, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ resp = self.call_middleware(headers={'X-Auth-Token': token,
+ 'X-Service-Token': service_token})
+ self.assertEqual(403, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
def test_composite_auth_delay_no_user_token(self):
self.middleware._delay_auth_decision = True
@@ -2426,12 +2136,10 @@ class CommonCompositeAuthTests(object):
}
self.update_expected_env(expected_env)
- req = webob.Request.blank('/')
service_token = self.token_dict['uuid_service_token_default']
- req.headers['X-Service-Token'] = service_token
- body = self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(403, self.response_status)
- self.assertEqual([FakeApp.FORBIDDEN], body)
+ resp = self.call_middleware(headers={'X-Service-Token': service_token})
+ self.assertEqual(403, resp.status_int)
+ self.assertEqual(FakeApp.FORBIDDEN, resp.body)
class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
@@ -2458,25 +2166,26 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_service_token_default,
}
- self.requests.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
+ self.requests_mock.get(BASE_URI,
+ json=VERSION_LIST_v2,
+ status_code=300)
- self.requests.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
- self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST,
- status_code=200)
+ self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
+ text=self.examples.SIGNED_REVOCATION_LIST,
+ status_code=200)
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
- self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
- text=self.examples.JSON_TOKEN_RESPONSES[token])
+ text = self.examples.JSON_TOKEN_RESPONSES[token]
+ self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
+ text=text)
for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
"%s/v2.0/tokens/invalid-service-token" % BASE_URI):
- self.requests.get(invalid_uri, text='', status_code=404)
+ self.requests_mock.get(invalid_uri, text='', status_code=404)
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.service_token_expected_env = dict(
@@ -2508,19 +2217,19 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
'uuid_service_token_default': uuid_serv_token_default,
}
- self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
+ self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
# TODO(jamielennox): auth_token middleware uses a v2 admin token
# regardless of the auth_version that is set.
- self.requests.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
+ self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
+ text=FAKE_ADMIN_TOKEN)
- # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
- self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
+ self.requests_mock.get('%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI,
+ text=self.examples.SIGNED_REVOCATION_LIST)
- self.requests.get('%s/v3/auth/tokens' % BASE_URI,
- text=self.token_response)
+ self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
+ text=self.token_response,
+ headers={'X-Subject-Token': uuid.uuid4().hex})
self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
@@ -2539,7 +2248,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
response = ""
if token_id == ERROR_TOKEN:
- raise exceptions.ConnectionError("Network connection error.")
+ raise exceptions.ConnectionRefused("Network connection refused.")
try:
response = self.examples.JSON_TOKEN_RESPONSES[token_id]
@@ -2555,18 +2264,15 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(OtherTests, self).setUp()
self.logger = self.useFixture(fixtures.FakeLogger())
- self.cfg = self.useFixture(cfg_fixture.Config())
def test_unknown_server_versions(self):
versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI)
self.set_middleware()
- self.requests.get(BASE_URI, json=versions, status_code=300)
+ self.requests_mock.get(BASE_URI, json=versions, status_code=300)
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = uuid.uuid4().hex
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(503, self.response_status)
+ resp = self.call_middleware(headers={'X-Auth-Token': uuid.uuid4().hex})
+ self.assertEqual(503, resp.status_int)
self.assertIn('versions [v3.0, v2.0]', self.logger.output)
@@ -2589,11 +2295,11 @@ class OtherTests(BaseAuthTokenMiddlewareTest):
def test_default_auth_version(self):
# VERSION_LIST_v3 contains both v2 and v3 version elements
- self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
+ self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
self._assert_auth_version(None, (3, 0))
# VERSION_LIST_v2 contains only v2 version elements
- self.requests.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
+ self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
self._assert_auth_version(None, (2, 0))
def test_unsupported_auth_version(self):
@@ -2615,20 +2321,19 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(AuthProtocolLoadingTests, self).setUp()
- self.cfg = self.useFixture(cfg_fixture.Config())
self.project_id = uuid.uuid4().hex
# first touch is to discover the available versions at the auth_url
- self.requests.get(self.AUTH_URL,
- json=fixture.DiscoveryList(href=self.DISC_URL),
- status_code=300)
+ self.requests_mock.get(self.AUTH_URL,
+ json=fixture.DiscoveryList(href=self.DISC_URL),
+ status_code=300)
# then we do discovery on the URL from the service catalog. In practice
# this is mostly the same URL as before but test the full range.
- self.requests.get(self.KEYSTONE_BASE_URL + '/',
- json=fixture.DiscoveryList(href=self.CRUD_URL),
- status_code=300)
+ self.requests_mock.get(self.KEYSTONE_BASE_URL + '/',
+ json=fixture.DiscoveryList(href=self.CRUD_URL),
+ status_code=300)
def good_request(self, app):
# admin_token is the token that the service will get back from auth
@@ -2637,9 +2342,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
s = admin_token.add_service('identity', name='keystone')
s.add_standard_endpoints(admin=self.KEYSTONE_URL)
- self.requests.post(self.DISC_URL + '/v3/auth/tokens',
- json=admin_token,
- headers={'X-Subject-Token': admin_token_id})
+ self.requests_mock.post(self.DISC_URL + '/v3/auth/tokens',
+ json=admin_token,
+ headers={'X-Subject-Token': admin_token_id})
# user_token is the data from the user's inputted token
user_token_id = uuid.uuid4().hex
@@ -2649,15 +2354,13 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
request_headers = {'X-Subject-Token': user_token_id,
'X-Auth-Token': admin_token_id}
- self.requests.get(self.CRUD_URL + '/v3/auth/tokens',
- request_headers=request_headers,
- json=user_token)
+ self.requests_mock.get(self.CRUD_URL + '/v3/auth/tokens',
+ request_headers=request_headers,
+ json=user_token,
+ headers={'X-Subject-Token': uuid.uuid4().hex})
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = user_token_id
- resp = app(req.environ, self.start_fake_response)
-
- self.assertEqual(200, self.response_status)
+ resp = self.call(app, headers={'X-Auth-Token': user_token_id})
+ self.assertEqual(200, resp.status_int)
return resp
def test_loading_password_plugin(self):
@@ -2668,6 +2371,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
project_id = uuid.uuid4().hex
+ # Register the authentication options
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
+
# configure the authentication options
self.cfg.config(auth_plugin='password',
username='testuser',
@@ -2678,22 +2384,23 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
group=_base.AUTHTOKEN_GROUP)
body = uuid.uuid4().hex
- app = auth_token.AuthProtocol(new_app('200 OK', body)(), {})
+ app = self.create_simple_middleware(body=body)
resp = self.good_request(app)
- self.assertEqual(six.b(body), resp[0])
+ self.assertEqual(six.b(body), resp.body)
@staticmethod
def get_plugin(app):
return app._identity_server._adapter.auth
- def test_invalid_plugin_fails_to_intialize(self):
+ def test_invalid_plugin_fails_to_initialize(self):
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
self.cfg.config(auth_plugin=uuid.uuid4().hex,
group=_base.AUTHTOKEN_GROUP)
self.assertRaises(
exceptions.NoMatchingPlugin,
- lambda: auth_token.AuthProtocol(new_app('200 OK', '')(), {}))
+ self.create_simple_middleware)
def test_plugin_loading_mixed_opts(self):
# some options via override and some via conf
@@ -2703,6 +2410,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
username = 'testuser'
password = 'testpass'
+ # Register the authentication options
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
+
# configure the authentication options
self.cfg.config(auth_plugin='password',
password=password,
@@ -2713,10 +2423,10 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
conf = {'username': username, 'auth_url': self.AUTH_URL}
body = uuid.uuid4().hex
- app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf)
+ app = self.create_simple_middleware(body=body, conf=conf)
resp = self.good_request(app)
- self.assertEqual(six.b(body), resp[0])
+ self.assertEqual(six.b(body), resp.body)
plugin = self.get_plugin(app)
@@ -2735,6 +2445,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
opts = auth.get_plugin_options('password')
self.cfg.register_opts(opts, group=section)
+ # Register the authentication options
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
+
# configure the authentication options
self.cfg.config(auth_section=section, group=_base.AUTHTOKEN_GROUP)
self.cfg.config(auth_plugin='password',
@@ -2746,10 +2459,10 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
conf = {'username': username, 'auth_url': self.AUTH_URL}
body = uuid.uuid4().hex
- app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf)
+ app = self.create_simple_middleware(body=body, conf=conf)
resp = self.good_request(app)
- self.assertEqual(six.b(body), resp[0])
+ self.assertEqual(six.b(body), resp.body)
plugin = self.get_plugin(app)
@@ -2759,5 +2472,106 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
self.assertEqual(self.project_id, plugin._project_id)
+class TestAuthPluginUserAgentGeneration(BaseAuthTokenMiddlewareTest):
+
+ def setUp(self):
+ super(TestAuthPluginUserAgentGeneration, self).setUp()
+ self.auth_url = uuid.uuid4().hex
+ self.project_id = uuid.uuid4().hex
+ self.username = uuid.uuid4().hex
+ self.password = uuid.uuid4().hex
+ self.section = uuid.uuid4().hex
+ self.user_domain_id = uuid.uuid4().hex
+
+ auth.register_conf_options(self.cfg.conf, group=self.section)
+ opts = auth.get_plugin_options('password')
+ self.cfg.register_opts(opts, group=self.section)
+
+ # Register the authentication options
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
+
+ # configure the authentication options
+ self.cfg.config(auth_section=self.section, group=_base.AUTHTOKEN_GROUP)
+ self.cfg.config(auth_plugin='password',
+ password=self.password,
+ project_id=self.project_id,
+ user_domain_id=self.user_domain_id,
+ group=self.section)
+
+ def test_no_project_configured(self):
+ ksm_version = uuid.uuid4().hex
+ conf = {'username': self.username, 'auth_url': self.auth_url}
+
+ app = self._create_app(conf, ksm_version)
+ self._assert_user_agent(app, '', ksm_version)
+
+ def test_project_in_configuration(self):
+ project = uuid.uuid4().hex
+ project_version = uuid.uuid4().hex
+
+ conf = {'username': self.username,
+ 'auth_url': self.auth_url,
+ 'project': project}
+ app = self._create_app(conf, project_version)
+ project_with_version = '{0}/{1} '.format(project, project_version)
+ self._assert_user_agent(app, project_with_version, project_version)
+
+ def test_project_in_oslo_configuration(self):
+ project = uuid.uuid4().hex
+ project_version = uuid.uuid4().hex
+
+ conf = {'username': self.username, 'auth_url': self.auth_url}
+ with mock.patch.object(cfg.CONF, 'project', new=project, create=True):
+ app = self._create_app(conf, project_version)
+ project = '{0}/{1} '.format(project, project_version)
+ self._assert_user_agent(app, project, project_version)
+
+ def _create_app(self, conf, project_version):
+ fake_pkg_resources = mock.Mock()
+ fake_pkg_resources.get_distribution().version = project_version
+
+ body = uuid.uuid4().hex
+ with mock.patch('keystonemiddleware.auth_token.pkg_resources',
+ new=fake_pkg_resources):
+ return self.create_simple_middleware(body=body, conf=conf,
+ use_global_conf=True)
+
+ def _assert_user_agent(self, app, project, ksm_version):
+ sess = app._identity_server._adapter.session
+ expected_ua = ('{0}keystonemiddleware.auth_token/{1}'
+ .format(project, ksm_version))
+ self.assertEqual(expected_ua, sess.user_agent)
+
+
+class TestAuthPluginLocalOsloConfig(BaseAuthTokenMiddlewareTest):
+ def test_project_in_local_oslo_configuration(self):
+ options = {
+ 'auth_plugin': 'password',
+ 'auth_uri': uuid.uuid4().hex,
+ 'password': uuid.uuid4().hex,
+ }
+
+ content = ("[keystone_authtoken]\n"
+ "auth_plugin=%(auth_plugin)s\n"
+ "auth_uri=%(auth_uri)s\n"
+ "password=%(password)s\n" % options)
+ conf_file_fixture = self.useFixture(
+ createfile.CreateFileWithContent("my_app", content))
+ conf = {'oslo_config_project': 'my_app',
+ 'oslo_config_file': conf_file_fixture.path}
+ app = self._create_app(conf, uuid.uuid4().hex)
+ for option in options:
+ self.assertEqual(options[option], app._conf_get(option))
+
+ def _create_app(self, conf, project_version):
+ fake_pkg_resources = mock.Mock()
+ fake_pkg_resources.get_distribution().version = project_version
+
+ body = uuid.uuid4().hex
+ with mock.patch('keystonemiddleware.auth_token.pkg_resources',
+ new=fake_pkg_resources):
+ return self.create_simple_middleware(body=body, conf=conf)
+
+
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
new file mode 100644
index 00000000..b213f546
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
@@ -0,0 +1,202 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import datetime
+import uuid
+
+from keystoneclient import fixture
+import mock
+import six
+import testtools
+import webob
+
+from keystonemiddleware import auth_token
+from keystonemiddleware.auth_token import _request
+
+
+class FakeApp(object):
+
+ @webob.dec.wsgify
+ def __call__(self, req):
+ return webob.Response()
+
+
+class FetchingMiddleware(auth_token._BaseAuthProtocol):
+
+ def __init__(self, app, token_dict={}, **kwargs):
+ super(FetchingMiddleware, self).__init__(app, **kwargs)
+ self.token_dict = token_dict
+
+ def _fetch_token(self, token):
+ try:
+ return self.token_dict[token]
+ except KeyError:
+ raise auth_token.InvalidToken()
+
+
+class BaseAuthProtocolTests(testtools.TestCase):
+
+ @mock.patch.multiple(auth_token._BaseAuthProtocol,
+ process_request=mock.DEFAULT,
+ process_response=mock.DEFAULT)
+ def test_process_flow(self, process_request, process_response):
+ m = auth_token._BaseAuthProtocol(FakeApp())
+
+ process_request.return_value = None
+ process_response.side_effect = lambda x: x
+
+ req = webob.Request.blank('/', method='GET')
+ resp = req.get_response(m)
+
+ self.assertEqual(200, resp.status_code)
+
+ self.assertEqual(1, process_request.call_count)
+ self.assertIsInstance(process_request.call_args[0][0],
+ _request._AuthTokenRequest)
+
+ self.assertEqual(1, process_response.call_count)
+ self.assertIsInstance(process_response.call_args[0][0], webob.Response)
+
+ @classmethod
+ def call(cls, middleware, method='GET', path='/', headers=None):
+ req = webob.Request.blank(path)
+ req.method = method
+
+ for k, v in six.iteritems(headers or {}):
+ req.headers[k] = v
+
+ resp = req.get_response(middleware)
+ resp.request = req
+ return resp
+
+ def test_good_v3_user_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ role = t.add_role()
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+
+ self.assertEqual('Confirmed', req.headers['X-Identity-Status'])
+ self.assertNotIn('X-Service-Token', req.headers)
+
+ p = req.environ['keystone.token_auth']
+
+ self.assertTrue(p.has_user_token)
+ self.assertFalse(p.has_service_token)
+
+ self.assertEqual(t.project_id, p.user.project_id)
+ self.assertEqual(t.project_domain_id, p.user.project_domain_id)
+ self.assertEqual(t.user_id, p.user.user_id)
+ self.assertEqual(t.user_domain_id, p.user.user_domain_id)
+ self.assertIn(role['name'], p.user.role_names)
+
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_invalid_user_token(self):
+ token_id = uuid.uuid4().hex
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid', req.headers['X-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_expired_user_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid', req.headers['X-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Auth-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict=token_dict)
+ self.call(m, headers={'X-Auth-Token': token_id})
+
+ def test_good_v3_service_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ role = t.add_role()
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+
+ self.assertEqual('Confirmed',
+ req.headers['X-Service-Identity-Status'])
+ self.assertNotIn('X-Auth-Token', req.headers)
+
+ p = req.environ['keystone.token_auth']
+
+ self.assertFalse(p.has_user_token)
+ self.assertTrue(p.has_service_token)
+
+ self.assertEqual(t.project_id, p.service.project_id)
+ self.assertEqual(t.project_domain_id, p.service.project_domain_id)
+ self.assertEqual(t.user_id, p.service.user_id)
+ self.assertEqual(t.user_domain_id, p.service.user_domain_id)
+ self.assertIn(role['name'], p.service.role_names)
+
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict)
+ self.call(m, headers={'X-Service-Token': token_id})
+
+ def test_invalid_service_token(self):
+ token_id = uuid.uuid4().hex
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid',
+ req.headers['X-Service-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb)
+ self.call(m, headers={'X-Service-Token': token_id})
+
+ def test_expired_service_token(self):
+ t = fixture.V3Token()
+ t.set_project_scope()
+ t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
+
+ token_id = uuid.uuid4().hex
+ token_dict = {token_id: t}
+
+ @webob.dec.wsgify
+ def _do_cb(req):
+ self.assertEqual('Invalid',
+ req.headers['X-Service-Identity-Status'])
+ self.assertEqual(token_id, req.headers['X-Service-Token'])
+ return webob.Response()
+
+ m = FetchingMiddleware(_do_cb, token_dict=token_dict)
+ self.call(m, headers={'X-Service-Token': token_id})
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py
index 75c7f759..e9189831 100644
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py
@@ -11,12 +11,12 @@
# under the License.
import six
-import testtools
from keystonemiddleware.auth_token import _memcache_crypt as memcache_crypt
+from keystonemiddleware.tests.unit import utils
-class MemcacheCryptPositiveTests(testtools.TestCase):
+class MemcacheCryptPositiveTests(utils.BaseTestCase):
def _setup_keys(self, strategy):
return memcache_crypt.derive_keys(b'token', b'secret', strategy)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
new file mode 100644
index 00000000..223433f8
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
@@ -0,0 +1,253 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import itertools
+import uuid
+
+from keystoneclient import access
+from keystoneclient import fixture
+
+from keystonemiddleware.auth_token import _request
+from keystonemiddleware.tests.unit import utils
+
+
+class RequestObjectTests(utils.TestCase):
+
+ def setUp(self):
+ super(RequestObjectTests, self).setUp()
+ self.request = _request._AuthTokenRequest.blank('/')
+
+ def test_setting_user_token_valid(self):
+ self.assertNotIn('X-Identity-Status', self.request.headers)
+
+ self.request.user_token_valid = True
+ self.assertEqual('Confirmed',
+ self.request.headers['X-Identity-Status'])
+ self.assertTrue(self.request.user_token_valid)
+
+ self.request.user_token_valid = False
+ self.assertEqual('Invalid',
+ self.request.headers['X-Identity-Status'])
+ self.assertFalse(self.request.user_token_valid)
+
+ def test_setting_service_token_valid(self):
+ self.assertNotIn('X-Service-Identity-Status', self.request.headers)
+
+ self.request.service_token_valid = True
+ self.assertEqual('Confirmed',
+ self.request.headers['X-Service-Identity-Status'])
+ self.assertTrue(self.request.service_token_valid)
+
+ self.request.service_token_valid = False
+ self.assertEqual('Invalid',
+ self.request.headers['X-Service-Identity-Status'])
+ self.assertFalse(self.request.service_token_valid)
+
+ def test_removing_headers(self):
+ GOOD = ('X-Auth-Token',
+ 'unknownstring',
+ uuid.uuid4().hex)
+
+ BAD = ('X-Domain-Id',
+ 'X-Domain-Name',
+ 'X-Project-Id',
+ 'X-Project-Name',
+ 'X-Project-Domain-Id',
+ 'X-Project-Domain-Name',
+ 'X-User-Id',
+ 'X-User-Name',
+ 'X-User-Domain-Id',
+ 'X-User-Domain-Name',
+ 'X-Roles',
+ 'X-Identity-Status',
+
+ 'X-Service-Domain-Id',
+ 'X-Service-Domain-Name',
+ 'X-Service-Project-Id',
+ 'X-Service-Project-Name',
+ 'X-Service-Project-Domain-Id',
+ 'X-Service-Project-Domain-Name',
+ 'X-Service-User-Id',
+ 'X-Service-User-Name',
+ 'X-Service-User-Domain-Id',
+ 'X-Service-User-Domain-Name',
+ 'X-Service-Roles',
+ 'X-Service-Identity-Status',
+
+ 'X-Service-Catalog',
+
+ 'X-Role',
+ 'X-User',
+ 'X-Tenant-Id',
+ 'X-Tenant-Name',
+ 'X-Tenant',
+ )
+
+ header_vals = {}
+
+ for header in itertools.chain(GOOD, BAD):
+ v = uuid.uuid4().hex
+ header_vals[header] = v
+ self.request.headers[header] = v
+
+ self.request.remove_auth_headers()
+
+ for header in BAD:
+ self.assertNotIn(header, self.request.headers)
+
+ for header in GOOD:
+ self.assertEqual(header_vals[header], self.request.headers[header])
+
+ def _test_v3_headers(self, token, prefix):
+ self.assertEqual(token.domain_id,
+ self.request.headers['X%s-Domain-Id' % prefix])
+ self.assertEqual(token.domain_name,
+ self.request.headers['X%s-Domain-Name' % prefix])
+ self.assertEqual(token.project_id,
+ self.request.headers['X%s-Project-Id' % prefix])
+ self.assertEqual(token.project_name,
+ self.request.headers['X%s-Project-Name' % prefix])
+ self.assertEqual(
+ token.project_domain_id,
+ self.request.headers['X%s-Project-Domain-Id' % prefix])
+ self.assertEqual(
+ token.project_domain_name,
+ self.request.headers['X%s-Project-Domain-Name' % prefix])
+
+ self.assertEqual(token.user_id,
+ self.request.headers['X%s-User-Id' % prefix])
+ self.assertEqual(token.user_name,
+ self.request.headers['X%s-User-Name' % prefix])
+ self.assertEqual(
+ token.user_domain_id,
+ self.request.headers['X%s-User-Domain-Id' % prefix])
+ self.assertEqual(
+ token.user_domain_name,
+ self.request.headers['X%s-User-Domain-Name' % prefix])
+
+ def test_project_scoped_user_headers(self):
+ token = fixture.V3Token()
+ token.set_project_scope()
+ token_id = uuid.uuid4().hex
+
+ auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
+ self.request.set_user_headers(auth_ref, include_service_catalog=True)
+
+ self._test_v3_headers(token, '')
+
+ def test_project_scoped_service_headers(self):
+ token = fixture.V3Token()
+ token.set_project_scope()
+ token_id = uuid.uuid4().hex
+
+ auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
+ self.request.set_service_headers(auth_ref)
+
+ self._test_v3_headers(token, '-Service')
+
+ def test_auth_type(self):
+ self.assertIsNone(self.request.auth_type)
+ self.request.environ['AUTH_TYPE'] = 'NeGoTiatE'
+ self.assertEqual('negotiate', self.request.auth_type)
+
+ def test_user_token(self):
+ token = uuid.uuid4().hex
+ self.assertIsNone(self.request.user_token)
+ self.request.headers['X-Auth-Token'] = token
+ self.assertEqual(token, self.request.user_token)
+
+ def test_storage_token(self):
+ storage_token = uuid.uuid4().hex
+ user_token = uuid.uuid4().hex
+
+ self.assertIsNone(self.request.user_token)
+ self.request.headers['X-Storage-Token'] = storage_token
+ self.assertEqual(storage_token, self.request.user_token)
+ self.request.headers['X-Auth-Token'] = user_token
+ self.assertEqual(user_token, self.request.user_token)
+
+ def test_service_token(self):
+ token = uuid.uuid4().hex
+ self.assertIsNone(self.request.service_token)
+ self.request.headers['X-Service-Token'] = token
+ self.assertEqual(token, self.request.service_token)
+
+ def test_token_auth(self):
+ plugin = object()
+
+ self.assertNotIn('keystone.token_auth', self.request.environ)
+ self.request.token_auth = plugin
+ self.assertIs(plugin, self.request.environ['keystone.token_auth'])
+ self.assertIs(plugin, self.request.token_auth)
+
+
+class CatalogConversionTests(utils.TestCase):
+
+ PUBLIC_URL = 'http://server:5000/v2.0'
+ ADMIN_URL = 'http://admin:35357/v2.0'
+ INTERNAL_URL = 'http://internal:5000/v2.0'
+
+ REGION_ONE = 'RegionOne'
+ REGION_TWO = 'RegionTwo'
+ REGION_THREE = 'RegionThree'
+
+ def test_basic_convert(self):
+ token = fixture.V3Token()
+ s = token.add_service(type='identity')
+ s.add_standard_endpoints(public=self.PUBLIC_URL,
+ admin=self.ADMIN_URL,
+ internal=self.INTERNAL_URL,
+ region=self.REGION_ONE)
+
+ auth_ref = access.AccessInfo.factory(body=token)
+ catalog_data = auth_ref.service_catalog.get_data()
+ catalog = _request._v3_to_v2_catalog(catalog_data)
+
+ self.assertEqual(1, len(catalog))
+ service = catalog[0]
+ self.assertEqual(1, len(service['endpoints']))
+ endpoints = service['endpoints'][0]
+
+ self.assertEqual('identity', service['type'])
+ self.assertEqual(4, len(endpoints))
+ self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
+ self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
+ self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
+ self.assertEqual(self.REGION_ONE, endpoints['region'])
+
+ def test_multi_region(self):
+ token = fixture.V3Token()
+ s = token.add_service(type='identity')
+
+ s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
+ s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
+ s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
+
+ auth_ref = access.AccessInfo.factory(body=token)
+ catalog_data = auth_ref.service_catalog.get_data()
+ catalog = _request._v3_to_v2_catalog(catalog_data)
+
+ self.assertEqual(1, len(catalog))
+ service = catalog[0]
+
+ # the 3 regions will come through as 3 separate endpoints
+ expected = [{'internalURL': self.INTERNAL_URL,
+ 'region': self.REGION_ONE},
+ {'publicURL': self.PUBLIC_URL,
+ 'region': self.REGION_TWO},
+ {'adminURL': self.ADMIN_URL,
+ 'region': self.REGION_THREE}]
+
+ self.assertEqual('identity', service['type'])
+ self.assertEqual(3, len(service['endpoints']))
+ for e in expected:
+ self.assertIn(e, expected)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py
index d144bb6c..cef65b8e 100644
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py
@@ -18,14 +18,14 @@ import shutil
import uuid
import mock
-import testtools
from keystonemiddleware.auth_token import _exceptions as exc
from keystonemiddleware.auth_token import _revocations
from keystonemiddleware.auth_token import _signing_dir
+from keystonemiddleware.tests.unit import utils
-class RevocationsTests(testtools.TestCase):
+class RevocationsTests(utils.BaseTestCase):
def _check_with_list(self, revoked_list, token_ids):
directory_name = '/tmp/%s' % uuid.uuid4().hex
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py
index bef62747..b2ef95dd 100644
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py
@@ -15,12 +15,11 @@ import shutil
import stat
import uuid
-import testtools
-
from keystonemiddleware.auth_token import _signing_dir
+from keystonemiddleware.tests.unit import utils
-class SigningDirectoryTests(testtools.TestCase):
+class SigningDirectoryTests(utils.BaseTestCase):
def test_directory_created_when_doesnt_exist(self):
# When _SigningDirectory is created, if the directory doesn't exist
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py
new file mode 100644
index 00000000..52d29737
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py
@@ -0,0 +1,195 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from keystoneclient import auth
+from keystoneclient import fixture
+
+from keystonemiddleware.auth_token import _base
+from keystonemiddleware.tests.unit.auth_token import base
+
+# NOTE(jamielennox): just some sample values that we can use for testing
+BASE_URI = 'https://keystone.example.com:1234'
+AUTH_URL = 'https://keystone.auth.com:1234'
+
+
+class BaseUserPluginTests(object):
+
+ def configure_middleware(self,
+ auth_plugin,
+ group='keystone_authtoken',
+ **kwargs):
+ opts = auth.get_plugin_class(auth_plugin).get_options()
+ self.cfg.register_opts(opts, group=group)
+
+ # Since these tests cfg.config() themselves rather than waiting for
+ # auth_token to do it on __init__ we need to register the base auth
+ # options (e.g., auth_plugin)
+ auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
+
+ self.cfg.config(group=group,
+ auth_plugin=auth_plugin,
+ **kwargs)
+
+ def assertTokenDataEqual(self, token_id, token, token_data):
+ self.assertEqual(token_id, token_data.auth_token)
+ self.assertEqual(token.user_id, token_data.user_id)
+ try:
+ trust_id = token.trust_id
+ except KeyError:
+ trust_id = None
+ self.assertEqual(trust_id, token_data.trust_id)
+ self.assertEqual(self.get_role_names(token), token_data.role_names)
+
+ def get_plugin(self, token_id, service_token_id=None):
+ headers = {'X-Auth-Token': token_id}
+
+ if service_token_id:
+ headers['X-Service-Token'] = service_token_id
+
+ m = self.create_simple_middleware()
+
+ resp = self.call(m, headers=headers)
+ self.assertEqual(200, resp.status_int)
+ return resp.request.environ['keystone.token_auth']
+
+ def test_user_information(self):
+ token_id, token = self.get_token()
+ plugin = self.get_plugin(token_id)
+
+ self.assertTokenDataEqual(token_id, token, plugin.user)
+ self.assertFalse(plugin.has_service_token)
+ self.assertIsNone(plugin.service)
+
+ def test_with_service_information(self):
+ token_id, token = self.get_token()
+ service_id, service = self.get_token()
+
+ plugin = self.get_plugin(token_id, service_id)
+
+ self.assertTokenDataEqual(token_id, token, plugin.user)
+ self.assertTokenDataEqual(service_id, service, plugin.service)
+
+
+class V2UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
+
+ def setUp(self):
+ super(V2UserPluginTests, self).setUp()
+
+ self.service_token = fixture.V2Token()
+ self.service_token.set_scope()
+ s = self.service_token.add_service('identity', name='keystone')
+
+ s.add_endpoint(public=BASE_URI,
+ admin=BASE_URI,
+ internal=BASE_URI)
+
+ self.configure_middleware(auth_plugin='v2password',
+ auth_url='%s/v2.0/' % AUTH_URL,
+ user_id=self.service_token.user_id,
+ password=uuid.uuid4().hex,
+ tenant_id=self.service_token.tenant_id)
+
+ auth_discovery = fixture.DiscoveryList(href=AUTH_URL, v3=False)
+ self.requests_mock.get(AUTH_URL, json=auth_discovery)
+
+ base_discovery = fixture.DiscoveryList(href=BASE_URI, v3=False)
+ self.requests_mock.get(BASE_URI, json=base_discovery)
+
+ url = '%s/v2.0/tokens' % AUTH_URL
+ self.requests_mock.post(url, json=self.service_token)
+
+ def get_role_names(self, token):
+ return set(x['name'] for x in token['access']['user'].get('roles', []))
+
+ def get_token(self):
+ token = fixture.V2Token()
+ token.set_scope()
+ token.add_role()
+
+ request_headers = {'X-Auth-Token': self.service_token.token_id}
+
+ url = '%s/v2.0/tokens/%s' % (BASE_URI, token.token_id)
+ self.requests_mock.get(url,
+ request_headers=request_headers,
+ json=token)
+
+ return token.token_id, token
+
+ def assertTokenDataEqual(self, token_id, token, token_data):
+ super(V2UserPluginTests, self).assertTokenDataEqual(token_id,
+ token,
+ token_data)
+
+ self.assertEqual(token.tenant_id, token_data.project_id)
+ self.assertIsNone(token_data.user_domain_id)
+ self.assertIsNone(token_data.project_domain_id)
+
+
+class V3UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
+
+ def setUp(self):
+ super(V3UserPluginTests, self).setUp()
+
+ self.service_token_id = uuid.uuid4().hex
+ self.service_token = fixture.V3Token()
+ s = self.service_token.add_service('identity', name='keystone')
+ s.add_standard_endpoints(public=BASE_URI,
+ admin=BASE_URI,
+ internal=BASE_URI)
+
+ self.configure_middleware(auth_plugin='v3password',
+ auth_url='%s/v3/' % AUTH_URL,
+ user_id=self.service_token.user_id,
+ password=uuid.uuid4().hex,
+ project_id=self.service_token.project_id)
+
+ auth_discovery = fixture.DiscoveryList(href=AUTH_URL)
+ self.requests_mock.get(AUTH_URL, json=auth_discovery)
+
+ base_discovery = fixture.DiscoveryList(href=BASE_URI)
+ self.requests_mock.get(BASE_URI, json=base_discovery)
+
+ self.requests_mock.post(
+ '%s/v3/auth/tokens' % AUTH_URL,
+ headers={'X-Subject-Token': self.service_token_id},
+ json=self.service_token)
+
+ def get_role_names(self, token):
+ return set(x['name'] for x in token['token'].get('roles', []))
+
+ def get_token(self):
+ token_id = uuid.uuid4().hex
+ token = fixture.V3Token()
+ token.set_project_scope()
+ token.add_role()
+
+ request_headers = {'X-Auth-Token': self.service_token_id,
+ 'X-Subject-Token': token_id}
+ headers = {'X-Subject-Token': token_id}
+
+ self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
+ request_headers=request_headers,
+ headers=headers,
+ json=token)
+
+ return token_id, token
+
+ def assertTokenDataEqual(self, token_id, token, token_data):
+ super(V3UserPluginTests, self).assertTokenDataEqual(token_id,
+ token,
+ token_data)
+
+ self.assertEqual(token.user_domain_id, token_data.user_domain_id)
+ self.assertEqual(token.project_id, token_data.project_id)
+ self.assertEqual(token.project_domain_id, token_data.project_domain_id)