diff options
author | asteroide <thomas.duval@orange.com> | 2015-09-24 14:39:09 +0200 |
---|---|---|
committer | asteroide <thomas.duval@orange.com> | 2015-09-24 14:39:09 +0200 |
commit | 0be7a3d4e0647dc0d94a34e4fc2f8c364de46602 (patch) | |
tree | 14214bb0bbf2430b6ee0df387ddbdbf13c4c4d63 /keystonemiddleware-moon/keystonemiddleware/tests | |
parent | e35decd4e989773c96a9abb263257291bd51ae1e (diff) |
Update code from KeystoneMiddleware Github repository (Master).
Change-Id: Id28c5bf48b3dbb6c8a08e66411b5785029f6857d
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests')
13 files changed, 1444 insertions, 793 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py new file mode 100644 index 00000000..d76572a8 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py @@ -0,0 +1,73 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +import fixtures +from oslo_config import cfg +from oslo_config import fixture as cfg_fixture +from requests_mock.contrib import fixture as rm_fixture +import six +import webob.dec + +from keystonemiddleware import auth_token +from keystonemiddleware.tests.unit import utils + + +class BaseAuthTokenTestCase(utils.BaseTestCase): + + def setUp(self): + super(BaseAuthTokenTestCase, self).setUp() + self.requests_mock = self.useFixture(rm_fixture.Fixture()) + self.logger = fixtures.FakeLogger(level=logging.DEBUG) + self.cfg = self.useFixture(cfg_fixture.Config(conf=cfg.ConfigOpts())) + + def create_middleware(self, cb, conf=None, use_global_conf=False): + + @webob.dec.wsgify + def _do_cb(req): + return cb(req) + + if use_global_conf: + opts = conf or {} + else: + opts = { + 'oslo_config_project': 'keystonemiddleware', + 'oslo_config_config': self.cfg.conf, + } + opts.update(conf or {}) + + return auth_token.AuthProtocol(_do_cb, opts) + + def create_simple_middleware(self, + status='200 OK', + body='', + headers=None, + **kwargs): + def cb(req): + resp = webob.Response(body, status) + resp.headers.update(headers or {}) + return resp + + return self.create_middleware(cb, **kwargs) + + @classmethod + def call(cls, middleware, method='GET', path='/', headers=None): + req = webob.Request.blank(path) + req.method = method + + for k, v in six.iteritems(headers or {}): + req.headers[k] = v + + resp = req.get_response(middleware) + resp.request = req + return resp diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py index 517d597b..d6ebc9a0 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py @@ -18,12 +18,12 @@ from keystoneclient import fixture from keystoneclient import session from requests_mock.contrib import fixture as rm_fixture import six -import testtools from keystonemiddleware.auth_token import _auth +from keystonemiddleware.tests.unit import utils -class DefaultAuthPluginTests(testtools.TestCase): +class DefaultAuthPluginTests(utils.BaseTestCase): def new_plugin(self, auth_host=None, auth_port=None, auth_protocol=None, auth_admin_prefix=None, admin_user=None, @@ -50,7 +50,7 @@ class DefaultAuthPluginTests(testtools.TestCase): self.stream = six.StringIO() self.logger = logging.getLogger(__name__) self.session = session.Session() - self.requests = self.useFixture(rm_fixture.Fixture()) + self.requests_mock = self.useFixture(rm_fixture.Fixture()) def test_auth_uri_from_fragments(self): auth_protocol = 'http' @@ -91,8 +91,8 @@ class DefaultAuthPluginTests(testtools.TestCase): token = fixture.V2Token() admin_tenant_name = uuid.uuid4().hex - self.requests.post(base_uri + '/v2.0/tokens', - json=token) + self.requests_mock.post(base_uri + '/v2.0/tokens', + json=token) plugin = self.new_plugin(identity_uri=base_uri, admin_user=uuid.uuid4().hex, diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py index 97fcc557..bb572aa3 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import calendar import datetime import json import logging @@ -24,17 +23,16 @@ import time import uuid import fixtures -from keystoneclient import access from keystoneclient import auth from keystoneclient.common import cms from keystoneclient import exceptions from keystoneclient import fixture from keystoneclient import session import mock -from oslo_config import fixture as cfg_fixture +from oslo_config import cfg from oslo_serialization import jsonutils from oslo_utils import timeutils -from requests_mock.contrib import fixture as rm_fixture +from oslotest import createfile import six import testresources import testtools @@ -47,6 +45,7 @@ from keystonemiddleware.auth_token import _base from keystonemiddleware.auth_token import _exceptions as exc from keystonemiddleware.auth_token import _revocations from keystonemiddleware.openstack.common import memorycache +from keystonemiddleware.tests.unit.auth_token import base from keystonemiddleware.tests.unit import client_fixtures from keystonemiddleware.tests.unit import utils @@ -134,6 +133,11 @@ def cleanup_revoked_file(filename): pass +def strtime(at=None): + at = at or timeutils.utcnow() + return at.strftime(timeutils.PERFECT_TIME_FORMAT) + + class TimezoneFixture(fixtures.Fixture): @staticmethod def supported(): @@ -192,29 +196,30 @@ class FakeApp(object): self.need_service_token = need_service_token - def __call__(self, env, start_response): + @webob.dec.wsgify + def __call__(self, req): for k, v in self.expected_env.items(): - assert env[k] == v, '%s != %s' % (env[k], v) + assert req.environ[k] == v, '%s != %s' % (req.environ[k], v) resp = webob.Response() - if (env.get('HTTP_X_IDENTITY_STATUS') == 'Invalid' - and env['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'): + if (req.environ.get('HTTP_X_IDENTITY_STATUS') == 'Invalid' and + req.environ['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'): # Simulate delayed auth forbidding access with arbitrary status # code to differentiate checking this code path resp.status = 419 resp.body = FakeApp.FORBIDDEN - elif env.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid': + elif req.environ.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid': # Simulate delayed auth forbidding access with arbitrary status # code to differentiate checking this code path resp.status = 420 resp.body = FakeApp.FORBIDDEN - elif env['HTTP_X_IDENTITY_STATUS'] == 'Invalid': + elif req.environ['HTTP_X_IDENTITY_STATUS'] == 'Invalid': # Simulate delayed auth forbidding access resp.status = 403 resp.body = FakeApp.FORBIDDEN elif (self.need_service_token is True and - env.get('HTTP_X_SERVICE_TOKEN') is None): + req.environ.get('HTTP_X_SERVICE_TOKEN') is None): # Simulate requiring composite auth # Arbitrary value to allow checking this code path resp.status = 418 @@ -222,7 +227,7 @@ class FakeApp(object): else: resp.body = FakeApp.SUCCESS - return resp(env, start_response) + return resp class v3FakeApp(FakeApp): @@ -274,23 +279,7 @@ class v3CompositeFakeApp(CompositeBase, v3FakeApp): v3_default_service_env_additions) -def new_app(status, body, headers={}): - - class _App(object): - - def __init__(self, expected_env=None): - self.expected_env = expected_env - - @webob.dec.wsgify - def __call__(self, req): - resp = webob.Response(body, status) - resp.headers.update(headers) - return resp - - return _App - - -class BaseAuthTokenMiddlewareTest(testtools.TestCase): +class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase): """Base test class for auth_token middleware. All the tests allow for running with auth_token @@ -309,7 +298,6 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): self.expected_env = expected_env or dict() self.fake_app = fake_app or FakeApp self.middleware = None - self.requests = self.useFixture(rm_fixture.Fixture()) signing_dir = self._setup_signing_directory() @@ -325,6 +313,9 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): self.response_status = None self.response_headers = None + def call_middleware(self, **kwargs): + return self.call(self.middleware, **kwargs) + def _setup_signing_directory(self): directory_name = self.useFixture(fixtures.TempDir()).path @@ -366,15 +357,12 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase): for key in six.iterkeys(self.service_token_expected_env): del self.middleware._app.expected_env[key] - def start_fake_response(self, status, headers, exc_info=None): - self.response_status = int(status.split(' ', 1)[0]) - self.response_headers = dict(headers) - def assertLastPath(self, path): if path: - self.assertEqual(BASE_URI + path, self.requests.last_request.url) + self.assertEqual(BASE_URI + path, + self.requests_mock.last_request.url) else: - self.assertIsNone(self.requests.last_request) + self.assertIsNone(self.requests_mock.last_request) class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @@ -395,27 +383,25 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, super(DiabloAuthTokenMiddlewareTest, self).setUp( expected_env=expected_env) - self.requests.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) - self.requests.post("%s/v2.0/tokens" % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post("%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) self.token_id = self.examples.VALID_DIABLO_TOKEN token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id] url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id) - self.requests.get(url, text=token_response) + self.requests_mock.get(url, text=token_response) self.set_middleware() def test_valid_diablo_response(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.token_id - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertIn('keystone.token_info', req.environ) + resp = self.call_middleware(headers={'X-Auth-Token': self.token_id}) + self.assertEqual(200, resp.status_int) + self.assertIn('keystone.token_info', resp.request.environ) class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): @@ -521,6 +507,21 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, token_response = self.examples.TOKEN_RESPONSES[token] self.assertTrue(auth_token._token_is_v3(token_response)) + def test_fixed_cache_key_length(self): + self.set_middleware() + short_string = uuid.uuid4().hex + long_string = 8 * uuid.uuid4().hex + + token_cache = self.middleware._token_cache + hashed_short_string_key, context_ = token_cache._get_cache_key( + short_string) + hashed_long_string_key, context_ = token_cache._get_cache_key( + long_string) + + # The hash keys should always match in length + self.assertThat(hashed_short_string_key, + matchers.HasLength(len(hashed_long_string_key))) + @testtools.skipUnless(memcached_available(), 'memcached not available') def test_encrypt_cache_data(self): conf = { @@ -530,13 +531,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, } self.set_middleware(conf=conf) token = b'my_token' - some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) - expires = timeutils.strtime(some_time_later) - data = ('this_data', expires) + data = 'this_data' token_cache = self.middleware._token_cache token_cache.initialize({}) token_cache._cache_store(token, data) - self.assertEqual(token_cache._cache_get(token), data[0]) + self.assertEqual(token_cache.get(token), data) @testtools.skipUnless(memcached_available(), 'memcached not available') def test_sign_cache_data(self): @@ -547,13 +546,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, } self.set_middleware(conf=conf) token = b'my_token' - some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) - expires = timeutils.strtime(some_time_later) - data = ('this_data', expires) + data = 'this_data' token_cache = self.middleware._token_cache token_cache.initialize({}) token_cache._cache_store(token, data) - self.assertEqual(token_cache._cache_get(token), data[0]) + self.assertEqual(token_cache.get(token), data) @testtools.skipUnless(memcached_available(), 'memcached not available') def test_no_memcache_protection(self): @@ -563,13 +560,11 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, } self.set_middleware(conf=conf) token = 'my_token' - some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) - expires = timeutils.strtime(some_time_later) - data = ('this_data', expires) + data = 'this_data' token_cache = self.middleware._token_cache token_cache.initialize({}) token_cache._cache_store(token, data) - self.assertEqual(token_cache._cache_get(token), data[0]) + self.assertEqual(token_cache.get(token), data) def test_assert_valid_memcache_protection_config(self): # test missing memcache_secret_key @@ -648,6 +643,64 @@ class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertRaises(exc.ConfigurationError, auth_token.AuthProtocol, self.fake_app, conf) + def test_auth_region_name(self): + token = fixture.V3Token() + + auth_url = 'http://keystone-auth.example.com:5000' + east_url = 'http://keystone-east.example.com:5000' + west_url = 'http://keystone-west.example.com:5000' + + auth_versions = fixture.DiscoveryList(href=auth_url) + east_versions = fixture.DiscoveryList(href=east_url) + west_versions = fixture.DiscoveryList(href=west_url) + + s = token.add_service('identity') + s.add_endpoint(interface='admin', url=east_url, region='east') + s.add_endpoint(interface='admin', url=west_url, region='west') + + self.requests_mock.get(auth_url, json=auth_versions) + self.requests_mock.get(east_url, json=east_versions) + self.requests_mock.get(west_url, json=west_versions) + + self.requests_mock.post( + '%s/v3/auth/tokens' % auth_url, + headers={'X-Subject-Token': uuid.uuid4().hex}, + json=token) + + east_mock = self.requests_mock.get( + '%s/v3/auth/tokens' % east_url, + headers={'X-Subject-Token': uuid.uuid4().hex}, + json=fixture.V3Token()) + + west_mock = self.requests_mock.get( + '%s/v3/auth/tokens' % west_url, + headers={'X-Subject-Token': uuid.uuid4().hex}, + json=fixture.V3Token()) + + conf = {'auth_uri': auth_url, + 'auth_url': auth_url + '/v3', + 'auth_plugin': 'v3password', + 'username': 'user', + 'password': 'pass'} + + self.assertEqual(0, east_mock.call_count) + self.assertEqual(0, west_mock.call_count) + + east_app = self.create_simple_middleware(conf=dict(region_name='east', + **conf)) + self.call(east_app, headers={'X-Auth-Token': uuid.uuid4().hex}) + + self.assertEqual(1, east_mock.call_count) + self.assertEqual(0, west_mock.call_count) + + west_app = self.create_simple_middleware(conf=dict(region_name='west', + **conf)) + + self.call(west_app, headers={'X-Auth-Token': uuid.uuid4().hex}) + + self.assertEqual(1, east_mock.call_count) + self.assertEqual(1, west_mock.call_count) + class CommonAuthTokenMiddlewareTest(object): """These tests are run once using v2 tokens and again using v3 tokens.""" @@ -656,15 +709,14 @@ class CommonAuthTokenMiddlewareTest(object): conf = { 'revocation_cache_time': '1' } - self.set_middleware(conf=conf) + self.create_simple_middleware(conf=conf) self.assertLastPath(None) def test_auth_with_no_token_does_not_call_http(self): - self.set_middleware() - req = webob.Request.blank('/') - self.middleware(req.environ, self.start_fake_response) + middleware = self.create_simple_middleware() + resp = self.call(middleware) self.assertLastPath(None) - self.assertEqual(401, self.response_status) + self.assertEqual(401, resp.status_int) def test_init_by_ipv6Addr_auth_host(self): del self.conf['identity_uri'] @@ -675,23 +727,20 @@ class CommonAuthTokenMiddlewareTest(object): 'auth_uri': None, 'auth_version': 'v3.0', } - self.set_middleware(conf=conf) - expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234' - self.assertEqual(expected_auth_uri, - self.middleware._auth_uri) + middleware = self.create_simple_middleware(conf=conf) + self.assertEqual('http://[2001:2013:1:f101::1]:1234', + middleware._auth_uri) def assert_valid_request_200(self, token, with_catalog=True): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) if with_catalog: - self.assertTrue(req.headers.get('X-Service-Catalog')) + self.assertTrue(resp.request.headers.get('X-Service-Catalog')) else: - self.assertNotIn('X-Service-Catalog', req.headers) - self.assertEqual(body, [FakeApp.SUCCESS]) - self.assertIn('keystone.token_info', req.environ) - return req + self.assertNotIn('X-Service-Catalog', resp.request.headers) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self.assertIn('keystone.token_info', resp.request.environ) + return resp.request def test_valid_uuid_request(self): for _ in range(2): # Do it twice because first result was cached. @@ -713,18 +762,16 @@ class CommonAuthTokenMiddlewareTest(object): # When the token is cached and revoked, 401 is returned. self.middleware._check_revocations_for_cached = True - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - # Token should be cached as ok after this. - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) # Put it in revocation list. self.middleware._revocations._list = self.get_revocation_list_json( token_ids=[revoked_form or token]) - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) + + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) def test_cached_revoked_uuid(self): # When the UUID token is cached and revoked, 401 is returned. @@ -746,20 +793,21 @@ class CommonAuthTokenMiddlewareTest(object): def test_revoked_token_receives_401(self): self.middleware._revocations._list = ( self.get_revocation_list_json()) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) + + token = self.token_dict['revoked_token'] + resp = self.call_middleware(headers={'X-Auth-Token': token}) + + self.assertEqual(401, resp.status_int) def test_revoked_token_receives_401_sha256(self): self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) self.set_middleware() self.middleware._revocations._list = ( self.get_revocation_list_json(mode='sha256')) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) + + token = self.token_dict['revoked_token'] + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) def test_cached_revoked_pki(self): # When the PKI token is cached and revoked, 401 is returned. @@ -781,10 +829,10 @@ class CommonAuthTokenMiddlewareTest(object): self.set_middleware() self.middleware._revocations._list = ( self.get_revocation_list_json()) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) + + token = self.token_dict['revoked_token'] + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) def _test_revoked_hashed_token(self, token_name): # If hash_algorithms is set as ['sha256', 'md5'], @@ -806,17 +854,14 @@ class CommonAuthTokenMiddlewareTest(object): # First, request is using the hashed token, is valid so goes in # cache using the given hash. - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token_hashed - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) + resp = self.call_middleware(headers={'X-Auth-Token': token_hashed}) + self.assertEqual(200, resp.status_int) # This time use the PKI(Z) token - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) + resp = self.call_middleware(headers={'X-Auth-Token': token}) # Should find the token in the cache and revocation list. - self.assertEqual(401, self.response_status) + self.assertEqual(401, resp.status_int) def test_revoked_hashed_pki_token(self): self._test_revoked_hashed_token('signed_token_scoped') @@ -973,8 +1018,7 @@ class CommonAuthTokenMiddlewareTest(object): in_memory_list) def test_invalid_revocation_list_raises_error(self): - self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={}) - + self.requests_mock.get(self.revocation_url, json={}) self.assertRaises(exc.RevocationListError, self.middleware._revocations._fetch) @@ -988,127 +1032,66 @@ class CommonAuthTokenMiddlewareTest(object): # remember because we are testing the middleware we stub the connection # to the keystone server, but this is not what gets returned invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI - self.requests.get(invalid_uri, status_code=404) + self.requests_mock.get(invalid_uri, status_code=404) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = 'invalid-token' - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'") + resp = self.call_middleware(headers={'X-Auth-Token': 'invalid-token'}) + self.assertEqual(401, resp.status_int) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + resp.headers['WWW-Authenticate']) def test_request_invalid_signed_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) + token = self.examples.INVALID_SIGNED_TOKEN + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) self.assertEqual("Keystone uri='https://keystone.example.com:1234'", - self.response_headers['WWW-Authenticate']) + resp.headers['WWW-Authenticate']) def test_request_invalid_signed_pkiz_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_PKIZ_TOKEN - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) + token = self.examples.INVALID_SIGNED_PKIZ_TOKEN + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) self.assertEqual("Keystone uri='https://keystone.example.com:1234'", - self.response_headers['WWW-Authenticate']) + resp.headers['WWW-Authenticate']) def test_request_no_token(self): - req = webob.Request.blank('/') - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'") - - def test_request_no_token_log_message(self): - class FakeLog(object): - def __init__(self): - self.msg = None - self.debugmsg = None - - def warn(self, msg=None, *args, **kwargs): - self.msg = msg - - def debug(self, msg=None, *args, **kwargs): - self.debugmsg = msg - - self.middleware._LOG = FakeLog() - self.middleware._delay_auth_decision = False - self.assertRaises(exc.InvalidToken, - self.middleware._get_user_token_from_header, {}) - self.assertIsNotNone(self.middleware._LOG.msg) - self.assertIsNotNone(self.middleware._LOG.debugmsg) + resp = self.call_middleware() + self.assertEqual(401, resp.status_int) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + resp.headers['WWW-Authenticate']) def test_request_no_token_http(self): - req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'}) - self.set_middleware() - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'") - self.assertEqual(body, ['']) + resp = self.call_middleware(method='HEAD') + self.assertEqual(401, resp.status_int) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + resp.headers['WWW-Authenticate']) def test_request_blank_token(self): - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = '' - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'") + resp = self.call_middleware(headers={'X-Auth-Token': ''}) + self.assertEqual(401, resp.status_int) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + resp.headers['WWW-Authenticate']) def _get_cached_token(self, token, mode='md5'): token_id = cms.cms_hash_token(token, mode=mode) - return self.middleware._token_cache._cache_get(token_id) + return self.middleware._token_cache.get(token_id) def test_memcache(self): - req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped'] - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) + self.call_middleware(headers={'X-Auth-Token': token}) self.assertIsNotNone(self._get_cached_token(token)) def test_expired(self): - req = webob.Request.blank('/') token = self.token_dict['signed_token_scoped_expired'] - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) def test_memcache_set_invalid_uuid(self): invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI - self.requests.get(invalid_uri, status_code=404) + self.requests_mock.get(invalid_uri, status_code=404) - req = webob.Request.blank('/') token = 'invalid-token' - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertRaises(exc.InvalidToken, - self._get_cached_token, token) - - def _test_memcache_set_invalid_signed(self, hash_algorithms=None, - exp_mode='md5'): - req = webob.Request.blank('/') - token = self.token_dict['signed_token_scoped_expired'] - req.headers['X-Auth-Token'] = token - if hash_algorithms: - self.conf['hash_algorithms'] = ','.join(hash_algorithms) - self.set_middleware() - self.middleware(req.environ, self.start_fake_response) - self.assertRaises(exc.InvalidToken, - self._get_cached_token, token, mode=exp_mode) - - def test_memcache_set_invalid_signed(self): - self._test_memcache_set_invalid_signed() - - def test_memcache_set_invalid_signed_sha256_md5(self): - hash_algorithms = ['sha256', 'md5'] - self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms, - exp_mode='sha256') - - def test_memcache_set_invalid_signed_sha256(self): - hash_algorithms = ['sha256'] - self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms, - exp_mode='sha256') + self.call_middleware(headers={'X-Auth-Token': token}) + self.assertRaises(exc.InvalidToken, self._get_cached_token, token) def test_memcache_set_expired(self, extra_conf={}, extra_environ={}): token_cache_time = 10 @@ -1117,14 +1100,17 @@ class CommonAuthTokenMiddlewareTest(object): } conf.update(extra_conf) self.set_middleware(conf=conf) - req = webob.Request.blank('/') + token = self.token_dict['signed_token_scoped'] + self.call_middleware(headers={'X-Auth-Token': token}) + + req = webob.Request.blank('/') req.headers['X-Auth-Token'] = token req.environ.update(extra_environ) now = datetime.datetime.utcnow() self.useFixture(TimeFixture(now)) - self.middleware(req.environ, self.start_fake_response) + req.get_response(self.middleware) self.assertIsNotNone(self._get_cached_token(token)) timeutils.advance_time_seconds(token_cache_time) @@ -1141,24 +1127,19 @@ class CommonAuthTokenMiddlewareTest(object): We use UUID tokens since they are the easiest one to reach get_http_connection. """ - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = ERROR_TOKEN self.middleware._http_request_max_retries = 0 - self.middleware(req.environ, self.start_fake_response) + self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN}) self.assertIsNone(self._get_cached_token(ERROR_TOKEN)) self.assert_valid_last_url(ERROR_TOKEN) def test_http_request_max_retries(self): times_retry = 10 - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = ERROR_TOKEN - conf = {'http_request_max_retries': '%s' % times_retry} self.set_middleware(conf=conf) with mock.patch('time.sleep') as mock_obj: - self.middleware(req.environ, self.start_fake_response) + self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN}) self.assertEqual(mock_obj.call_count, times_retry) @@ -1189,18 +1170,17 @@ class CommonAuthTokenMiddlewareTest(object): req.environ['AUTH_TYPE'] = 'Negotiate' - body = self.middleware(req.environ, self.start_fake_response) + resp = req.get_response(self.middleware) if success: - self.assertEqual(self.response_status, 200) - self.assertEqual(body, [FakeApp.SUCCESS]) + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) self.assertIn('keystone.token_info', req.environ) self.assert_valid_last_url(token) else: - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'" - ) + self.assertEqual(401, resp.status_int) + msg = "Keystone uri='https://keystone.example.com:1234'" + self.assertEqual(msg, resp.headers['WWW-Authenticate']) def test_uuid_bind_token_disabled_with_kerb_user(self): for use_kerberos in [True, False]: @@ -1348,17 +1328,13 @@ class CommonAuthTokenMiddlewareTest(object): token = self.token_dict['signed_token_scoped'] - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) self.assertThat(1, matchers.Equals(cache.set.call_count)) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) # Assert that the token wasn't cached again. self.assertThat(1, matchers.Equals(cache.set.call_count)) @@ -1367,17 +1343,16 @@ class CommonAuthTokenMiddlewareTest(object): for service_url in (self.examples.UNVERSIONED_SERVICE_URL, self.examples.SERVICE_URL): - self.requests.get(service_url, - json=VERSION_LIST_v3, - status_code=300) + self.requests_mock.get(service_url, + json=VERSION_LIST_v3, + status_code=300) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default'] - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) - self.assertEqual([FakeApp.SUCCESS], body) + token = self.token_dict['uuid_token_default'] + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) - token_auth = req.environ['keystone.token_auth'] + token_auth = resp.request.environ['keystone.token_auth'] endpoint_filter = {'service_type': self.examples.SERVICE_TYPE, 'version': 3} @@ -1388,6 +1363,29 @@ class CommonAuthTokenMiddlewareTest(object): self.assertFalse(token_auth.has_service_token) self.assertIsNone(token_auth.service) + def test_doesnt_auto_set_content_type(self): + # webob will set content_type = 'text/html' by default if nothing is + # provided. We don't want our middleware messing with the content type + # of the underlying applications. + + text = uuid.uuid4().hex + + def _middleware(environ, start_response): + start_response(200, []) + return text + + def _start_response(status_code, headerlist, exc_info=None): + self.assertIn('200', status_code) # will be '200 OK' + self.assertEqual([], headerlist) + + m = auth_token.AuthProtocol(_middleware, self.conf) + + env = {'REQUEST_METHOD': 'GET', + 'HTTP_X_AUTH_TOKEN': self.token_dict['uuid_token_default']} + + r = m(env, _start_response) + self.assertEqual(text, r) + class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, testresources.ResourcedTestCase): @@ -1414,10 +1412,9 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, 'auth_version': self.auth_version, } - self.requests.register_uri('GET', - BASE_URI, - json=VERSION_LIST_v3, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) self.set_middleware(conf=conf) @@ -1427,10 +1424,10 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, def test_request_no_token_dummy(self): cms._ensure_subprocess() - self.requests.get('%s%s' % (BASE_URI, self.ca_path), - status_code=404) - self.requests.get('%s%s' % (BASE_URI, self.signing_path), - status_code=404) + self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path), + status_code=404) + self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path), + status_code=404) self.assertRaises(exceptions.CertificateConfigError, self.middleware._verify_signed_token, self.examples.SIGNED_TOKEN_SCOPED, @@ -1439,7 +1436,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, def test_fetch_signing_cert(self): data = 'FAKE CERT' url = "%s%s" % (BASE_URI, self.signing_path) - self.requests.get(url, text=data) + self.requests_mock.get(url, text=data) self.middleware._fetch_signing_cert() signing_cert_path = self.middleware._signing_directory.calc_path( @@ -1447,12 +1444,12 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, with open(signing_cert_path, 'r') as f: self.assertEqual(f.read(), data) - self.assertEqual(url, self.requests.last_request.url) + self.assertEqual(url, self.requests_mock.last_request.url) def test_fetch_signing_ca(self): data = 'FAKE CA' url = "%s%s" % (BASE_URI, self.ca_path) - self.requests.get(url, text=data) + self.requests_mock.get(url, text=data) self.middleware._fetch_ca_cert() ca_file_path = self.middleware._signing_directory.calc_path( @@ -1460,7 +1457,7 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, with open(ca_file_path, 'r') as f: self.assertEqual(f.read(), data) - self.assertEqual(url, self.requests.last_request.url) + self.assertEqual(url, self.requests_mock.last_request.url) def test_prefix_trailing_slash(self): del self.conf['identity_uri'] @@ -1473,19 +1470,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, ca_url = "%s%s" % (base_url, self.ca_path) signing_url = "%s%s" % (base_url, self.signing_path) - self.requests.get(base_url, - json=VERSION_LIST_v3, - status_code=300) - self.requests.get(ca_url, text='FAKECA') - self.requests.get(signing_url, text='FAKECERT') + self.requests_mock.get(base_url, + json=VERSION_LIST_v3, + status_code=300) + self.requests_mock.get(ca_url, text='FAKECA') + self.requests_mock.get(signing_url, text='FAKECERT') self.set_middleware(conf=self.conf) self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests.last_request.url) + self.assertEqual(ca_url, self.requests_mock.last_request.url) self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests.last_request.url) + self.assertEqual(signing_url, self.requests_mock.last_request.url) def test_without_prefix(self): del self.conf['identity_uri'] @@ -1497,19 +1494,19 @@ class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, ca_url = "%s%s" % (BASE_HOST, self.ca_path) signing_url = "%s%s" % (BASE_HOST, self.signing_path) - self.requests.get(BASE_HOST, - json=VERSION_LIST_v3, - status_code=300) - self.requests.get(ca_url, text='FAKECA') - self.requests.get(signing_url, text='FAKECERT') + self.requests_mock.get(BASE_HOST, + json=VERSION_LIST_v3, + status_code=300) + self.requests_mock.get(ca_url, text='FAKECA') + self.requests_mock.get(signing_url, text='FAKECERT') self.set_middleware(conf=self.conf) self.middleware._fetch_ca_cert() - self.assertEqual(ca_url, self.requests.last_request.url) + self.assertEqual(ca_url, self.requests_mock.last_request.url) self.middleware._fetch_signing_cert() - self.assertEqual(signing_url, self.requests.last_request.url) + self.assertEqual(signing_url, self.requests_mock.last_request.url) class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): @@ -1523,7 +1520,7 @@ class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): def network_error_response(request, context): - raise exceptions.ConnectionError("Network connection error.") + raise exceptions.ConnectionRefused("Network connection refused.") class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @@ -1571,15 +1568,16 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.REVOKED_TOKEN_HASH_SHA256, } - self.requests.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) - self.requests.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) - self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST) + self.revocation_url = '%s/v2.0/tokens/revoked' % BASE_URI + self.requests_mock.get(self.revocation_url, + text=self.examples.SIGNED_REVOCATION_LIST) for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_TOKEN_UNSCOPED, @@ -1590,10 +1588,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): url = "%s/v2.0/tokens/%s" % (BASE_URI, token) text = self.examples.JSON_TOKEN_RESPONSES[token] - self.requests.get(url, text=text) + self.requests_mock.get(url, text=text) url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN) - self.requests.get(url, text=network_error_response) + self.requests_mock.get(url, text=network_error_response) self.set_middleware() @@ -1603,12 +1601,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, The implied scope is the user's tenant ID. """ - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertEqual(body, [FakeApp.SUCCESS]) - self.assertIn('keystone.token_info', req.environ) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) + self.assertIn('keystone.token_info', resp.request.environ) def assert_valid_last_url(self, token_id): self.assertLastPath("/v2.0/tokens/%s" % token_id) @@ -1623,12 +1619,10 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, def assert_unscoped_token_receives_401(self, token): """Unscoped requests with no default tenant ID should be rejected.""" - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = token - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 401) - self.assertEqual(self.response_headers['WWW-Authenticate'], - "Keystone uri='https://keystone.example.com:1234'") + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(401, resp.status_int) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + resp.headers['WWW-Authenticate']) def test_unscoped_uuid_token_receives_401(self): self.assert_unscoped_token_receives_401( @@ -1639,28 +1633,26 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.SIGNED_TOKEN_UNSCOPED) def test_request_prevent_service_catalog_injection(self): - req = webob.Request.blank('/') - req.headers['X-Service-Catalog'] = '[]' - req.headers['X-Auth-Token'] = ( - self.examples.UUID_TOKEN_NO_SERVICE_CATALOG) - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertFalse(req.headers.get('X-Service-Catalog')) - self.assertEqual(body, [FakeApp.SUCCESS]) + token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG + resp = self.call_middleware(headers={'X-Service-Catalog': '[]', + 'X-Auth-Token': token}) + + self.assertEqual(200, resp.status_int) + self.assertFalse(resp.request.headers.get('X-Service-Catalog')) + self.assertEqual(FakeApp.SUCCESS, resp.body) def test_user_plugin_token_properties(self): - req = webob.Request.blank('/') - req.headers['X-Service-Catalog'] = '[]' token = self.examples.UUID_TOKEN_DEFAULT token_data = self.examples.TOKEN_RESPONSES[token] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertEqual([FakeApp.SUCCESS], body) + resp = self.call_middleware(headers={'X-Service-Catalog': '[]', + 'X-Auth-Token': token, + 'X-Service-Token': token}) + + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) - token_auth = req.environ['keystone.token_auth'] + token_auth = resp.request.environ['keystone.token_auth'] self.assertTrue(token_auth.has_user_token) self.assertTrue(token_auth.has_service_token) @@ -1697,27 +1689,25 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, 'auth_version': 'v2.0' } - self.requests.get(BASE_URI, - json=VERSION_LIST_v3, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) - self.requests.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) token = self.examples.UUID_TOKEN_DEFAULT url = "%s/v2.0/tokens/%s" % (BASE_URI, token) text = self.examples.JSON_TOKEN_RESPONSES[token] - self.requests.get(url, text=text) + self.requests_mock.get(url, text=text) self.set_middleware(conf=conf) # This tests will only work is auth_token has chosen to use the # lower, v2, api version - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertEqual(url, self.requests.last_request.url) + resp = self.call_middleware(headers={'X-Auth-Token': token}) + self.assertEqual(200, resp.status_int) + self.assertEqual(url, self.requests_mock.last_request.url) class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, @@ -1778,21 +1768,22 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.examples.REVOKED_v3_PKIZ_TOKEN_HASH, } - self.requests.get(BASE_URI, - json=VERSION_LIST_v3, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) # TODO(jamielennox): auth_token middleware uses a v2 admin token # regardless of the auth_version that is set. - self.requests.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) - # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 - self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST) + self.revocation_url = '%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI + self.requests_mock.get(self.revocation_url, + text=self.examples.SIGNED_REVOCATION_LIST) - self.requests.get('%s/v3/auth/tokens' % BASE_URI, - text=self.token_response) + self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, + text=self.token_response, + headers={'X-Subject-Token': uuid.uuid4().hex}) self.set_middleware() @@ -1802,7 +1793,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) if token_id == ERROR_TOKEN: - raise exceptions.ConnectionError("Network connection error.") + raise exceptions.ConnectionRefused("Network connection refused.") try: response = self.examples.JSON_TOKEN_RESPONSES[token_id] @@ -1866,43 +1857,37 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertIn('adminURL', endpoint) def test_fallback_to_online_validation_with_signing_error(self): - self.requests.register_uri( - 'GET', - '%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI, - status_code=404) + self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI, + status_code=404) self.assert_valid_request_200(self.token_dict['signed_token_scoped']) self.assert_valid_request_200( self.token_dict['signed_token_scoped_pkiz']) def test_fallback_to_online_validation_with_ca_error(self): - self.requests.register_uri('GET', - '%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI, - status_code=404) + self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI, + status_code=404) self.assert_valid_request_200(self.token_dict['signed_token_scoped']) self.assert_valid_request_200( self.token_dict['signed_token_scoped_pkiz']) def test_fallback_to_online_validation_with_revocation_list_error(self): - self.requests.register_uri('GET', - '%s/v2.0/tokens/revoked' % BASE_URI, - status_code=404) + self.requests_mock.get(self.revocation_url, status_code=404) self.assert_valid_request_200(self.token_dict['signed_token_scoped']) self.assert_valid_request_200( self.token_dict['signed_token_scoped_pkiz']) def test_user_plugin_token_properties(self): - req = webob.Request.blank('/') - req.headers['X-Service-Catalog'] = '[]' token = self.examples.v3_UUID_TOKEN_DEFAULT token_data = self.examples.TOKEN_RESPONSES[token] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - self.assertEqual([FakeApp.SUCCESS], body) + resp = self.call_middleware(headers={'X-Service-Catalog': '[]', + 'X-Auth-Token': token, + 'X-Service-Token': token}) + + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) - token_auth = req.environ['keystone.token_auth'] + token_auth = resp.request.environ['keystone.token_auth'] self.assertTrue(token_auth.has_user_token) self.assertTrue(token_auth.has_service_token) @@ -1919,280 +1904,18 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, self.assertIsNone(t.trust_id) - -class TokenExpirationTest(BaseAuthTokenMiddlewareTest): - def setUp(self): - super(TokenExpirationTest, self).setUp() - self.now = timeutils.utcnow() - self.delta = datetime.timedelta(hours=1) - self.one_hour_ago = timeutils.isotime(self.now - self.delta, - subsecond=True) - self.one_hour_earlier = timeutils.isotime(self.now + self.delta, - subsecond=True) - - def create_v2_token_fixture(self, expires=None): - v2_fixture = { - 'access': { - 'token': { - 'id': 'blah', - 'expires': expires or self.one_hour_earlier, - 'tenant': { - 'id': 'tenant_id1', - 'name': 'tenant_name1', - }, - }, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'roles': [ - {'name': 'role1'}, - {'name': 'role2'}, - ], - }, - 'serviceCatalog': {} - }, - } - - return v2_fixture - - def create_v3_token_fixture(self, expires=None): - - v3_fixture = { - 'token': { - 'expires_at': expires or self.one_hour_earlier, - 'user': { - 'id': 'user_id1', - 'name': 'user_name1', - 'domain': { - 'id': 'domain_id1', - 'name': 'domain_name1' - } - }, - 'project': { - 'id': 'tenant_id1', - 'name': 'tenant_name1', - 'domain': { - 'id': 'domain_id1', - 'name': 'domain_name1' - } - }, - 'roles': [ - {'name': 'role1', 'id': 'Role1'}, - {'name': 'role2', 'id': 'Role2'}, - ], - 'catalog': {} - } - } - - return v3_fixture - - def test_no_data(self): - data = {} - self.assertRaises(exc.InvalidToken, - auth_token._get_token_expiration, - data) - - def test_bad_data(self): - data = {'my_happy_token_dict': 'woo'} - self.assertRaises(exc.InvalidToken, - auth_token._get_token_expiration, - data) - - def test_v2_token_get_token_expiration_return_isotime(self): - data = self.create_v2_token_fixture() - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(self.one_hour_earlier, actual_expires) - - def test_v2_token_not_expired(self): - data = self.create_v2_token_fixture() - expected_expires = data['access']['token']['expires'] - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(actual_expires, expected_expires) - - def test_v2_token_expired(self): - data = self.create_v2_token_fixture(expires=self.one_hour_ago) - expires = auth_token._get_token_expiration(data) - self.assertRaises(exc.InvalidToken, - auth_token._confirm_token_not_expired, - expires) - - def test_v2_token_with_timezone_offset_not_expired(self): - self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) - data = self.create_v2_token_fixture( - expires='2000-01-01T05:05:10.000123Z') - expected_expires = '2000-01-01T05:05:10.000123Z' - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(actual_expires, expected_expires) - - def test_v2_token_with_timezone_offset_expired(self): - self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) - data = self.create_v2_token_fixture( - expires='1999-12-31T19:05:10Z') - expires = auth_token._get_token_expiration(data) - self.assertRaises(exc.InvalidToken, - auth_token._confirm_token_not_expired, - expires) - - def test_v3_token_get_token_expiration_return_isotime(self): - data = self.create_v3_token_fixture() - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(self.one_hour_earlier, actual_expires) - - def test_v3_token_not_expired(self): - data = self.create_v3_token_fixture() - expected_expires = data['token']['expires_at'] - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(actual_expires, expected_expires) - - def test_v3_token_expired(self): - data = self.create_v3_token_fixture(expires=self.one_hour_ago) - expires = auth_token._get_token_expiration(data) - self.assertRaises(exc.InvalidToken, - auth_token._confirm_token_not_expired, - expires) - - def test_v3_token_with_timezone_offset_not_expired(self): - self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) - data = self.create_v3_token_fixture( - expires='2000-01-01T05:05:10.000123Z') - expected_expires = '2000-01-01T05:05:10.000123Z' - - actual_expires = auth_token._get_token_expiration(data) - self.assertEqual(actual_expires, expected_expires) - - def test_v3_token_with_timezone_offset_expired(self): - self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) - data = self.create_v3_token_fixture( - expires='1999-12-31T19:05:10Z') - expires = auth_token._get_token_expiration(data) - self.assertRaises(exc.InvalidToken, - auth_token._confirm_token_not_expired, - expires) - - def test_cached_token_not_expired(self): + def test_expire_stored_in_cache(self): + # tests the upgrade path from storing a tuple vs just the data in the + # cache. Can be removed in the future. token = 'mytoken' data = 'this_data' self.set_middleware() self.middleware._token_cache.initialize({}) - some_time_later = timeutils.strtime(at=(self.now + self.delta)) - expires = some_time_later - self.middleware._token_cache.store(token, data, expires) - self.assertEqual(self.middleware._token_cache._cache_get(token), data) - - def test_cached_token_not_expired_with_old_style_nix_timestamp(self): - """Ensure we cannot retrieve a token from the cache. - - Getting a token from the cache should return None when the token data - in the cache stores the expires time as a \*nix style timestamp. - - """ - token = 'mytoken' - data = 'this_data' - self.set_middleware() - token_cache = self.middleware._token_cache - token_cache.initialize({}) - some_time_later = self.now + self.delta - # Store a unix timestamp in the cache. - expires = calendar.timegm(some_time_later.timetuple()) - token_cache.store(token, data, expires) - self.assertIsNone(token_cache._cache_get(token)) - - def test_cached_token_expired(self): - token = 'mytoken' - data = 'this_data' - self.set_middleware() - self.middleware._token_cache.initialize({}) - some_time_earlier = timeutils.strtime(at=(self.now - self.delta)) - expires = some_time_earlier - self.middleware._token_cache.store(token, data, expires) - self.assertThat(lambda: self.middleware._token_cache._cache_get(token), - matchers.raises(exc.InvalidToken)) - - def test_cached_token_with_timezone_offset_not_expired(self): - token = 'mytoken' - data = 'this_data' - self.set_middleware() - self.middleware._token_cache.initialize({}) - timezone_offset = datetime.timedelta(hours=2) - some_time_later = self.now - timezone_offset + self.delta - expires = timeutils.strtime(some_time_later) + '-02:00' - self.middleware._token_cache.store(token, data, expires) - self.assertEqual(self.middleware._token_cache._cache_get(token), data) - - def test_cached_token_with_timezone_offset_expired(self): - token = 'mytoken' - data = 'this_data' - self.set_middleware() - self.middleware._token_cache.initialize({}) - timezone_offset = datetime.timedelta(hours=2) - some_time_earlier = self.now - timezone_offset - self.delta - expires = timeutils.strtime(some_time_earlier) + '-02:00' - self.middleware._token_cache.store(token, data, expires) - self.assertThat(lambda: self.middleware._token_cache._cache_get(token), - matchers.raises(exc.InvalidToken)) - - -class CatalogConversionTests(BaseAuthTokenMiddlewareTest): - - PUBLIC_URL = 'http://server:5000/v2.0' - ADMIN_URL = 'http://admin:35357/v2.0' - INTERNAL_URL = 'http://internal:5000/v2.0' - - REGION_ONE = 'RegionOne' - REGION_TWO = 'RegionTwo' - REGION_THREE = 'RegionThree' - - def test_basic_convert(self): - token = fixture.V3Token() - s = token.add_service(type='identity') - s.add_standard_endpoints(public=self.PUBLIC_URL, - admin=self.ADMIN_URL, - internal=self.INTERNAL_URL, - region=self.REGION_ONE) - - auth_ref = access.AccessInfo.factory(body=token) - catalog_data = auth_ref.service_catalog.get_data() - catalog = auth_token._v3_to_v2_catalog(catalog_data) - - self.assertEqual(1, len(catalog)) - service = catalog[0] - self.assertEqual(1, len(service['endpoints'])) - endpoints = service['endpoints'][0] - - self.assertEqual('identity', service['type']) - self.assertEqual(4, len(endpoints)) - self.assertEqual(self.PUBLIC_URL, endpoints['publicURL']) - self.assertEqual(self.ADMIN_URL, endpoints['adminURL']) - self.assertEqual(self.INTERNAL_URL, endpoints['internalURL']) - self.assertEqual(self.REGION_ONE, endpoints['region']) - - def test_multi_region(self): - token = fixture.V3Token() - s = token.add_service(type='identity') - - s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE) - s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO) - s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE) - - auth_ref = access.AccessInfo.factory(body=token) - catalog_data = auth_ref.service_catalog.get_data() - catalog = auth_token._v3_to_v2_catalog(catalog_data) - - self.assertEqual(1, len(catalog)) - service = catalog[0] - - # the 3 regions will come through as 3 separate endpoints - expected = [{'internalURL': self.INTERNAL_URL, - 'region': self.REGION_ONE}, - {'publicURL': self.PUBLIC_URL, - 'region': self.REGION_TWO}, - {'adminURL': self.ADMIN_URL, - 'region': self.REGION_THREE}] - - self.assertEqual('identity', service['type']) - self.assertEqual(3, len(service['endpoints'])) - for e in expected: - self.assertIn(e, expected) + now = datetime.datetime.utcnow() + delta = datetime.timedelta(hours=1) + expires = strtime(at=(now + delta)) + self.middleware._token_cache.store(token, (data, expires)) + self.assertEqual(self.middleware._token_cache.get(token), data) class DelayedAuthTests(BaseAuthTokenMiddlewareTest): @@ -2204,51 +1927,49 @@ class DelayedAuthTests(BaseAuthTokenMiddlewareTest): 'auth_version': 'v3.0', 'auth_uri': auth_uri} - self.fake_app = new_app('401 Unauthorized', body) - self.set_middleware(conf=conf) + middleware = self.create_simple_middleware(status='401 Unauthorized', + body=body, + conf=conf) + resp = self.call(middleware) + self.assertEqual(six.b(body), resp.body) - req = webob.Request.blank('/') - resp = self.middleware(req.environ, self.start_fake_response) - - self.assertEqual([six.b(body)], resp) - - self.assertEqual(401, self.response_status) + self.assertEqual(401, resp.status_int) self.assertEqual("Keystone uri='%s'" % auth_uri, - self.response_headers['WWW-Authenticate']) + resp.headers['WWW-Authenticate']) def test_delayed_auth_values(self): - fake_app = new_app('401 Unauthorized', uuid.uuid4().hex) - middleware = auth_token.AuthProtocol(fake_app, - {'auth_uri': 'http://local.test'}) + conf = {'auth_uri': 'http://local.test'} + status = '401 Unauthorized' + + middleware = self.create_simple_middleware(status=status, conf=conf) self.assertFalse(middleware._delay_auth_decision) for v in ('True', '1', 'on', 'yes'): conf = {'delay_auth_decision': v, 'auth_uri': 'http://local.test'} - middleware = auth_token.AuthProtocol(fake_app, conf) + middleware = self.create_simple_middleware(status=status, + conf=conf) self.assertTrue(middleware._delay_auth_decision) for v in ('False', '0', 'no'): conf = {'delay_auth_decision': v, 'auth_uri': 'http://local.test'} - middleware = auth_token.AuthProtocol(fake_app, conf) + middleware = self.create_simple_middleware(status=status, + conf=conf) self.assertFalse(middleware._delay_auth_decision) def test_auth_plugin_with_no_tokens(self): body = uuid.uuid4().hex auth_uri = 'http://local.test' conf = {'delay_auth_decision': True, 'auth_uri': auth_uri} - self.fake_app = new_app('200 OK', body) - self.set_middleware(conf=conf) - - req = webob.Request.blank('/') - resp = self.middleware(req.environ, self.start_fake_response) - self.assertEqual([six.b(body)], resp) + middleware = self.create_simple_middleware(body=body, conf=conf) + resp = self.call(middleware) + self.assertEqual(six.b(body), resp.body) - token_auth = req.environ['keystone.token_auth'] + token_auth = resp.request.environ['keystone.token_auth'] self.assertFalse(token_auth.has_user_token) self.assertIsNone(token_auth.user) @@ -2263,42 +1984,44 @@ class CommonCompositeAuthTests(object): """ def test_composite_auth_ok(self): - req = webob.Request.blank('/') token = self.token_dict['uuid_token_default'] service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token fake_logger = fixtures.FakeLogger(level=logging.DEBUG) self.middleware.logger = self.useFixture(fake_logger) - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) - self.assertEqual([FakeApp.SUCCESS], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) expected_env.update(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) - self.assertIn('Received request from user: ' - 'user_id %(HTTP_X_USER_ID)s, ' + + # role list may get reordered, check for string pieces individually + self.assertIn('Received request from user: ', fake_logger.output) + self.assertIn('user_id %(HTTP_X_USER_ID)s, ' 'project_id %(HTTP_X_TENANT_ID)s, ' - 'roles %(HTTP_X_ROLES)s ' - 'service: user_id %(HTTP_X_SERVICE_USER_ID)s, ' + 'roles ' % expected_env, fake_logger.output) + self.assertIn('service: user_id %(HTTP_X_SERVICE_USER_ID)s, ' 'project_id %(HTTP_X_SERVICE_PROJECT_ID)s, ' - 'roles %(HTTP_X_SERVICE_ROLES)s' % expected_env, - fake_logger.output) + 'roles ' % expected_env, fake_logger.output) + + roles = ','.join([expected_env['HTTP_X_SERVICE_ROLES'], + expected_env['HTTP_X_ROLES']]) + + for r in roles.split(','): + self.assertIn(r, fake_logger.output) def test_composite_auth_invalid_service_token(self): - req = webob.Request.blank('/') token = self.token_dict['uuid_token_default'] service_token = 'invalid-service-token' - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) - self.assertEqual([b'Authentication required'], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(401, resp.status_int) + self.assertEqual(b'Authentication required', resp.body) def test_composite_auth_no_service_token(self): self.purge_service_token_expected_env() req = webob.Request.blank('/') - token = self.token_dict['uuid_token_default'] - req.headers['X-Auth-Token'] = token + req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default'] # Ensure injection of service headers is not possible for key, value in six.iteritems(self.service_token_expected_env): @@ -2306,42 +2029,36 @@ class CommonCompositeAuthTests(object): req.headers[header_key] = value # Check arbitrary headers not removed req.headers['X-Foo'] = 'Bar' - body = self.middleware(req.environ, self.start_fake_response) + resp = req.get_response(self.middleware) for key in six.iterkeys(self.service_token_expected_env): header_key = key[len('HTTP_'):].replace('_', '-') self.assertFalse(req.headers.get(header_key)) self.assertEqual('Bar', req.headers.get('X-Foo')) - self.assertEqual(418, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + self.assertEqual(418, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) def test_composite_auth_invalid_user_token(self): - req = webob.Request.blank('/') token = 'invalid-token' service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) - self.assertEqual([b'Authentication required'], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(401, resp.status_int) + self.assertEqual(b'Authentication required', resp.body) def test_composite_auth_no_user_token(self): - req = webob.Request.blank('/') service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(401, self.response_status) - self.assertEqual([b'Authentication required'], body) + resp = self.call_middleware(headers={'X-Service-Token': service_token}) + self.assertEqual(401, resp.status_int) + self.assertEqual(b'Authentication required', resp.body) def test_composite_auth_delay_ok(self): self.middleware._delay_auth_decision = True - req = webob.Request.blank('/') token = self.token_dict['uuid_token_default'] service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(200, self.response_status) - self.assertEqual([FakeApp.SUCCESS], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(200, resp.status_int) + self.assertEqual(FakeApp.SUCCESS, resp.body) def test_composite_auth_delay_invalid_service_token(self): self.middleware._delay_auth_decision = True @@ -2351,14 +2068,12 @@ class CommonCompositeAuthTests(object): } self.update_expected_env(expected_env) - req = webob.Request.blank('/') token = self.token_dict['uuid_token_default'] service_token = 'invalid-service-token' - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(420, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(420, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) def test_composite_auth_delay_invalid_service_and_user_tokens(self): self.middleware._delay_auth_decision = True @@ -2370,22 +2085,19 @@ class CommonCompositeAuthTests(object): } self.update_expected_env(expected_env) - req = webob.Request.blank('/') token = 'invalid-user-token' service_token = 'invalid-service-token' - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(419, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(419, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) def test_composite_auth_delay_no_service_token(self): self.middleware._delay_auth_decision = True self.purge_service_token_expected_env() req = webob.Request.blank('/') - token = self.token_dict['uuid_token_default'] - req.headers['X-Auth-Token'] = token + req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default'] # Ensure injection of service headers is not possible for key, value in six.iteritems(self.service_token_expected_env): @@ -2393,13 +2105,13 @@ class CommonCompositeAuthTests(object): req.headers[header_key] = value # Check arbitrary headers not removed req.headers['X-Foo'] = 'Bar' - body = self.middleware(req.environ, self.start_fake_response) + resp = req.get_response(self.middleware) for key in six.iterkeys(self.service_token_expected_env): header_key = key[len('HTTP_'):].replace('_', '-') self.assertFalse(req.headers.get(header_key)) self.assertEqual('Bar', req.headers.get('X-Foo')) - self.assertEqual(418, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + self.assertEqual(418, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) def test_composite_auth_delay_invalid_user_token(self): self.middleware._delay_auth_decision = True @@ -2409,14 +2121,12 @@ class CommonCompositeAuthTests(object): } self.update_expected_env(expected_env) - req = webob.Request.blank('/') token = 'invalid-token' service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Auth-Token'] = token - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(403, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + resp = self.call_middleware(headers={'X-Auth-Token': token, + 'X-Service-Token': service_token}) + self.assertEqual(403, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) def test_composite_auth_delay_no_user_token(self): self.middleware._delay_auth_decision = True @@ -2426,12 +2136,10 @@ class CommonCompositeAuthTests(object): } self.update_expected_env(expected_env) - req = webob.Request.blank('/') service_token = self.token_dict['uuid_service_token_default'] - req.headers['X-Service-Token'] = service_token - body = self.middleware(req.environ, self.start_fake_response) - self.assertEqual(403, self.response_status) - self.assertEqual([FakeApp.FORBIDDEN], body) + resp = self.call_middleware(headers={'X-Service-Token': service_token}) + self.assertEqual(403, resp.status_int) + self.assertEqual(FakeApp.FORBIDDEN, resp.body) class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, @@ -2458,25 +2166,26 @@ class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, 'uuid_service_token_default': uuid_service_token_default, } - self.requests.get(BASE_URI, - json=VERSION_LIST_v2, - status_code=300) + self.requests_mock.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) - self.requests.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) - self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST, - status_code=200) + self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST, + status_code=200) for token in (self.examples.UUID_TOKEN_DEFAULT, self.examples.UUID_SERVICE_TOKEN_DEFAULT,): - self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token), - text=self.examples.JSON_TOKEN_RESPONSES[token]) + text = self.examples.JSON_TOKEN_RESPONSES[token] + self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token), + text=text) for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI, "%s/v2.0/tokens/invalid-service-token" % BASE_URI): - self.requests.get(invalid_uri, text='', status_code=404) + self.requests_mock.get(invalid_uri, text='', status_code=404) self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) self.service_token_expected_env = dict( @@ -2508,19 +2217,19 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, 'uuid_service_token_default': uuid_serv_token_default, } - self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) + self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) # TODO(jamielennox): auth_token middleware uses a v2 admin token # regardless of the auth_version that is set. - self.requests.post('%s/v2.0/tokens' % BASE_URI, - text=FAKE_ADMIN_TOKEN) + self.requests_mock.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) - # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 - self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, - text=self.examples.SIGNED_REVOCATION_LIST) + self.requests_mock.get('%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) - self.requests.get('%s/v3/auth/tokens' % BASE_URI, - text=self.token_response) + self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, + text=self.token_response, + headers={'X-Subject-Token': uuid.uuid4().hex}) self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) @@ -2539,7 +2248,7 @@ class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, response = "" if token_id == ERROR_TOKEN: - raise exceptions.ConnectionError("Network connection error.") + raise exceptions.ConnectionRefused("Network connection refused.") try: response = self.examples.JSON_TOKEN_RESPONSES[token_id] @@ -2555,18 +2264,15 @@ class OtherTests(BaseAuthTokenMiddlewareTest): def setUp(self): super(OtherTests, self).setUp() self.logger = self.useFixture(fixtures.FakeLogger()) - self.cfg = self.useFixture(cfg_fixture.Config()) def test_unknown_server_versions(self): versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI) self.set_middleware() - self.requests.get(BASE_URI, json=versions, status_code=300) + self.requests_mock.get(BASE_URI, json=versions, status_code=300) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = uuid.uuid4().hex - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(503, self.response_status) + resp = self.call_middleware(headers={'X-Auth-Token': uuid.uuid4().hex}) + self.assertEqual(503, resp.status_int) self.assertIn('versions [v3.0, v2.0]', self.logger.output) @@ -2589,11 +2295,11 @@ class OtherTests(BaseAuthTokenMiddlewareTest): def test_default_auth_version(self): # VERSION_LIST_v3 contains both v2 and v3 version elements - self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) + self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) self._assert_auth_version(None, (3, 0)) # VERSION_LIST_v2 contains only v2 version elements - self.requests.get(BASE_URI, json=VERSION_LIST_v2, status_code=300) + self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300) self._assert_auth_version(None, (2, 0)) def test_unsupported_auth_version(self): @@ -2615,20 +2321,19 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): def setUp(self): super(AuthProtocolLoadingTests, self).setUp() - self.cfg = self.useFixture(cfg_fixture.Config()) self.project_id = uuid.uuid4().hex # first touch is to discover the available versions at the auth_url - self.requests.get(self.AUTH_URL, - json=fixture.DiscoveryList(href=self.DISC_URL), - status_code=300) + self.requests_mock.get(self.AUTH_URL, + json=fixture.DiscoveryList(href=self.DISC_URL), + status_code=300) # then we do discovery on the URL from the service catalog. In practice # this is mostly the same URL as before but test the full range. - self.requests.get(self.KEYSTONE_BASE_URL + '/', - json=fixture.DiscoveryList(href=self.CRUD_URL), - status_code=300) + self.requests_mock.get(self.KEYSTONE_BASE_URL + '/', + json=fixture.DiscoveryList(href=self.CRUD_URL), + status_code=300) def good_request(self, app): # admin_token is the token that the service will get back from auth @@ -2637,9 +2342,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): s = admin_token.add_service('identity', name='keystone') s.add_standard_endpoints(admin=self.KEYSTONE_URL) - self.requests.post(self.DISC_URL + '/v3/auth/tokens', - json=admin_token, - headers={'X-Subject-Token': admin_token_id}) + self.requests_mock.post(self.DISC_URL + '/v3/auth/tokens', + json=admin_token, + headers={'X-Subject-Token': admin_token_id}) # user_token is the data from the user's inputted token user_token_id = uuid.uuid4().hex @@ -2649,15 +2354,13 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): request_headers = {'X-Subject-Token': user_token_id, 'X-Auth-Token': admin_token_id} - self.requests.get(self.CRUD_URL + '/v3/auth/tokens', - request_headers=request_headers, - json=user_token) + self.requests_mock.get(self.CRUD_URL + '/v3/auth/tokens', + request_headers=request_headers, + json=user_token, + headers={'X-Subject-Token': uuid.uuid4().hex}) - req = webob.Request.blank('/') - req.headers['X-Auth-Token'] = user_token_id - resp = app(req.environ, self.start_fake_response) - - self.assertEqual(200, self.response_status) + resp = self.call(app, headers={'X-Auth-Token': user_token_id}) + self.assertEqual(200, resp.status_int) return resp def test_loading_password_plugin(self): @@ -2668,6 +2371,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): project_id = uuid.uuid4().hex + # Register the authentication options + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) + # configure the authentication options self.cfg.config(auth_plugin='password', username='testuser', @@ -2678,22 +2384,23 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): group=_base.AUTHTOKEN_GROUP) body = uuid.uuid4().hex - app = auth_token.AuthProtocol(new_app('200 OK', body)(), {}) + app = self.create_simple_middleware(body=body) resp = self.good_request(app) - self.assertEqual(six.b(body), resp[0]) + self.assertEqual(six.b(body), resp.body) @staticmethod def get_plugin(app): return app._identity_server._adapter.auth - def test_invalid_plugin_fails_to_intialize(self): + def test_invalid_plugin_fails_to_initialize(self): + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) self.cfg.config(auth_plugin=uuid.uuid4().hex, group=_base.AUTHTOKEN_GROUP) self.assertRaises( exceptions.NoMatchingPlugin, - lambda: auth_token.AuthProtocol(new_app('200 OK', '')(), {})) + self.create_simple_middleware) def test_plugin_loading_mixed_opts(self): # some options via override and some via conf @@ -2703,6 +2410,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): username = 'testuser' password = 'testpass' + # Register the authentication options + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) + # configure the authentication options self.cfg.config(auth_plugin='password', password=password, @@ -2713,10 +2423,10 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): conf = {'username': username, 'auth_url': self.AUTH_URL} body = uuid.uuid4().hex - app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf) + app = self.create_simple_middleware(body=body, conf=conf) resp = self.good_request(app) - self.assertEqual(six.b(body), resp[0]) + self.assertEqual(six.b(body), resp.body) plugin = self.get_plugin(app) @@ -2735,6 +2445,9 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): opts = auth.get_plugin_options('password') self.cfg.register_opts(opts, group=section) + # Register the authentication options + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) + # configure the authentication options self.cfg.config(auth_section=section, group=_base.AUTHTOKEN_GROUP) self.cfg.config(auth_plugin='password', @@ -2746,10 +2459,10 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): conf = {'username': username, 'auth_url': self.AUTH_URL} body = uuid.uuid4().hex - app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf) + app = self.create_simple_middleware(body=body, conf=conf) resp = self.good_request(app) - self.assertEqual(six.b(body), resp[0]) + self.assertEqual(six.b(body), resp.body) plugin = self.get_plugin(app) @@ -2759,5 +2472,106 @@ class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): self.assertEqual(self.project_id, plugin._project_id) +class TestAuthPluginUserAgentGeneration(BaseAuthTokenMiddlewareTest): + + def setUp(self): + super(TestAuthPluginUserAgentGeneration, self).setUp() + self.auth_url = uuid.uuid4().hex + self.project_id = uuid.uuid4().hex + self.username = uuid.uuid4().hex + self.password = uuid.uuid4().hex + self.section = uuid.uuid4().hex + self.user_domain_id = uuid.uuid4().hex + + auth.register_conf_options(self.cfg.conf, group=self.section) + opts = auth.get_plugin_options('password') + self.cfg.register_opts(opts, group=self.section) + + # Register the authentication options + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) + + # configure the authentication options + self.cfg.config(auth_section=self.section, group=_base.AUTHTOKEN_GROUP) + self.cfg.config(auth_plugin='password', + password=self.password, + project_id=self.project_id, + user_domain_id=self.user_domain_id, + group=self.section) + + def test_no_project_configured(self): + ksm_version = uuid.uuid4().hex + conf = {'username': self.username, 'auth_url': self.auth_url} + + app = self._create_app(conf, ksm_version) + self._assert_user_agent(app, '', ksm_version) + + def test_project_in_configuration(self): + project = uuid.uuid4().hex + project_version = uuid.uuid4().hex + + conf = {'username': self.username, + 'auth_url': self.auth_url, + 'project': project} + app = self._create_app(conf, project_version) + project_with_version = '{0}/{1} '.format(project, project_version) + self._assert_user_agent(app, project_with_version, project_version) + + def test_project_in_oslo_configuration(self): + project = uuid.uuid4().hex + project_version = uuid.uuid4().hex + + conf = {'username': self.username, 'auth_url': self.auth_url} + with mock.patch.object(cfg.CONF, 'project', new=project, create=True): + app = self._create_app(conf, project_version) + project = '{0}/{1} '.format(project, project_version) + self._assert_user_agent(app, project, project_version) + + def _create_app(self, conf, project_version): + fake_pkg_resources = mock.Mock() + fake_pkg_resources.get_distribution().version = project_version + + body = uuid.uuid4().hex + with mock.patch('keystonemiddleware.auth_token.pkg_resources', + new=fake_pkg_resources): + return self.create_simple_middleware(body=body, conf=conf, + use_global_conf=True) + + def _assert_user_agent(self, app, project, ksm_version): + sess = app._identity_server._adapter.session + expected_ua = ('{0}keystonemiddleware.auth_token/{1}' + .format(project, ksm_version)) + self.assertEqual(expected_ua, sess.user_agent) + + +class TestAuthPluginLocalOsloConfig(BaseAuthTokenMiddlewareTest): + def test_project_in_local_oslo_configuration(self): + options = { + 'auth_plugin': 'password', + 'auth_uri': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, + } + + content = ("[keystone_authtoken]\n" + "auth_plugin=%(auth_plugin)s\n" + "auth_uri=%(auth_uri)s\n" + "password=%(password)s\n" % options) + conf_file_fixture = self.useFixture( + createfile.CreateFileWithContent("my_app", content)) + conf = {'oslo_config_project': 'my_app', + 'oslo_config_file': conf_file_fixture.path} + app = self._create_app(conf, uuid.uuid4().hex) + for option in options: + self.assertEqual(options[option], app._conf_get(option)) + + def _create_app(self, conf, project_version): + fake_pkg_resources = mock.Mock() + fake_pkg_resources.get_distribution().version = project_version + + body = uuid.uuid4().hex + with mock.patch('keystonemiddleware.auth_token.pkg_resources', + new=fake_pkg_resources): + return self.create_simple_middleware(body=body, conf=conf) + + def load_tests(loader, tests, pattern): return testresources.OptimisingTestSuite(tests) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py new file mode 100644 index 00000000..b213f546 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py @@ -0,0 +1,202 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from keystoneclient import fixture +import mock +import six +import testtools +import webob + +from keystonemiddleware import auth_token +from keystonemiddleware.auth_token import _request + + +class FakeApp(object): + + @webob.dec.wsgify + def __call__(self, req): + return webob.Response() + + +class FetchingMiddleware(auth_token._BaseAuthProtocol): + + def __init__(self, app, token_dict={}, **kwargs): + super(FetchingMiddleware, self).__init__(app, **kwargs) + self.token_dict = token_dict + + def _fetch_token(self, token): + try: + return self.token_dict[token] + except KeyError: + raise auth_token.InvalidToken() + + +class BaseAuthProtocolTests(testtools.TestCase): + + @mock.patch.multiple(auth_token._BaseAuthProtocol, + process_request=mock.DEFAULT, + process_response=mock.DEFAULT) + def test_process_flow(self, process_request, process_response): + m = auth_token._BaseAuthProtocol(FakeApp()) + + process_request.return_value = None + process_response.side_effect = lambda x: x + + req = webob.Request.blank('/', method='GET') + resp = req.get_response(m) + + self.assertEqual(200, resp.status_code) + + self.assertEqual(1, process_request.call_count) + self.assertIsInstance(process_request.call_args[0][0], + _request._AuthTokenRequest) + + self.assertEqual(1, process_response.call_count) + self.assertIsInstance(process_response.call_args[0][0], webob.Response) + + @classmethod + def call(cls, middleware, method='GET', path='/', headers=None): + req = webob.Request.blank(path) + req.method = method + + for k, v in six.iteritems(headers or {}): + req.headers[k] = v + + resp = req.get_response(middleware) + resp.request = req + return resp + + def test_good_v3_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Auth-Token']) + + self.assertEqual('Confirmed', req.headers['X-Identity-Status']) + self.assertNotIn('X-Service-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertTrue(p.has_user_token) + self.assertFalse(p.has_service_token) + + self.assertEqual(t.project_id, p.user.project_id) + self.assertEqual(t.project_domain_id, p.user.project_domain_id) + self.assertEqual(t.user_id, p.user.user_id) + self.assertEqual(t.user_domain_id, p.user.user_domain_id) + self.assertIn(role['name'], p.user.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_invalid_user_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_expired_user_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', req.headers['X-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Auth-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Auth-Token': token_id}) + + def test_good_v3_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + role = t.add_role() + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual(token_id, req.headers['X-Service-Token']) + + self.assertEqual('Confirmed', + req.headers['X-Service-Identity-Status']) + self.assertNotIn('X-Auth-Token', req.headers) + + p = req.environ['keystone.token_auth'] + + self.assertFalse(p.has_user_token) + self.assertTrue(p.has_service_token) + + self.assertEqual(t.project_id, p.service.project_id) + self.assertEqual(t.project_domain_id, p.service.project_domain_id) + self.assertEqual(t.user_id, p.service.user_id) + self.assertEqual(t.user_domain_id, p.service.user_domain_id) + self.assertIn(role['name'], p.service.role_names) + + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_invalid_service_token(self): + token_id = uuid.uuid4().hex + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb) + self.call(m, headers={'X-Service-Token': token_id}) + + def test_expired_service_token(self): + t = fixture.V3Token() + t.set_project_scope() + t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10) + + token_id = uuid.uuid4().hex + token_dict = {token_id: t} + + @webob.dec.wsgify + def _do_cb(req): + self.assertEqual('Invalid', + req.headers['X-Service-Identity-Status']) + self.assertEqual(token_id, req.headers['X-Service-Token']) + return webob.Response() + + m = FetchingMiddleware(_do_cb, token_dict=token_dict) + self.call(m, headers={'X-Service-Token': token_id}) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py index 75c7f759..e9189831 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py @@ -11,12 +11,12 @@ # under the License. import six -import testtools from keystonemiddleware.auth_token import _memcache_crypt as memcache_crypt +from keystonemiddleware.tests.unit import utils -class MemcacheCryptPositiveTests(testtools.TestCase): +class MemcacheCryptPositiveTests(utils.BaseTestCase): def _setup_keys(self, strategy): return memcache_crypt.derive_keys(b'token', b'secret', strategy) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py new file mode 100644 index 00000000..223433f8 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py @@ -0,0 +1,253 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import itertools +import uuid + +from keystoneclient import access +from keystoneclient import fixture + +from keystonemiddleware.auth_token import _request +from keystonemiddleware.tests.unit import utils + + +class RequestObjectTests(utils.TestCase): + + def setUp(self): + super(RequestObjectTests, self).setUp() + self.request = _request._AuthTokenRequest.blank('/') + + def test_setting_user_token_valid(self): + self.assertNotIn('X-Identity-Status', self.request.headers) + + self.request.user_token_valid = True + self.assertEqual('Confirmed', + self.request.headers['X-Identity-Status']) + self.assertTrue(self.request.user_token_valid) + + self.request.user_token_valid = False + self.assertEqual('Invalid', + self.request.headers['X-Identity-Status']) + self.assertFalse(self.request.user_token_valid) + + def test_setting_service_token_valid(self): + self.assertNotIn('X-Service-Identity-Status', self.request.headers) + + self.request.service_token_valid = True + self.assertEqual('Confirmed', + self.request.headers['X-Service-Identity-Status']) + self.assertTrue(self.request.service_token_valid) + + self.request.service_token_valid = False + self.assertEqual('Invalid', + self.request.headers['X-Service-Identity-Status']) + self.assertFalse(self.request.service_token_valid) + + def test_removing_headers(self): + GOOD = ('X-Auth-Token', + 'unknownstring', + uuid.uuid4().hex) + + BAD = ('X-Domain-Id', + 'X-Domain-Name', + 'X-Project-Id', + 'X-Project-Name', + 'X-Project-Domain-Id', + 'X-Project-Domain-Name', + 'X-User-Id', + 'X-User-Name', + 'X-User-Domain-Id', + 'X-User-Domain-Name', + 'X-Roles', + 'X-Identity-Status', + + 'X-Service-Domain-Id', + 'X-Service-Domain-Name', + 'X-Service-Project-Id', + 'X-Service-Project-Name', + 'X-Service-Project-Domain-Id', + 'X-Service-Project-Domain-Name', + 'X-Service-User-Id', + 'X-Service-User-Name', + 'X-Service-User-Domain-Id', + 'X-Service-User-Domain-Name', + 'X-Service-Roles', + 'X-Service-Identity-Status', + + 'X-Service-Catalog', + + 'X-Role', + 'X-User', + 'X-Tenant-Id', + 'X-Tenant-Name', + 'X-Tenant', + ) + + header_vals = {} + + for header in itertools.chain(GOOD, BAD): + v = uuid.uuid4().hex + header_vals[header] = v + self.request.headers[header] = v + + self.request.remove_auth_headers() + + for header in BAD: + self.assertNotIn(header, self.request.headers) + + for header in GOOD: + self.assertEqual(header_vals[header], self.request.headers[header]) + + def _test_v3_headers(self, token, prefix): + self.assertEqual(token.domain_id, + self.request.headers['X%s-Domain-Id' % prefix]) + self.assertEqual(token.domain_name, + self.request.headers['X%s-Domain-Name' % prefix]) + self.assertEqual(token.project_id, + self.request.headers['X%s-Project-Id' % prefix]) + self.assertEqual(token.project_name, + self.request.headers['X%s-Project-Name' % prefix]) + self.assertEqual( + token.project_domain_id, + self.request.headers['X%s-Project-Domain-Id' % prefix]) + self.assertEqual( + token.project_domain_name, + self.request.headers['X%s-Project-Domain-Name' % prefix]) + + self.assertEqual(token.user_id, + self.request.headers['X%s-User-Id' % prefix]) + self.assertEqual(token.user_name, + self.request.headers['X%s-User-Name' % prefix]) + self.assertEqual( + token.user_domain_id, + self.request.headers['X%s-User-Domain-Id' % prefix]) + self.assertEqual( + token.user_domain_name, + self.request.headers['X%s-User-Domain-Name' % prefix]) + + def test_project_scoped_user_headers(self): + token = fixture.V3Token() + token.set_project_scope() + token_id = uuid.uuid4().hex + + auth_ref = access.AccessInfo.factory(token_id=token_id, body=token) + self.request.set_user_headers(auth_ref, include_service_catalog=True) + + self._test_v3_headers(token, '') + + def test_project_scoped_service_headers(self): + token = fixture.V3Token() + token.set_project_scope() + token_id = uuid.uuid4().hex + + auth_ref = access.AccessInfo.factory(token_id=token_id, body=token) + self.request.set_service_headers(auth_ref) + + self._test_v3_headers(token, '-Service') + + def test_auth_type(self): + self.assertIsNone(self.request.auth_type) + self.request.environ['AUTH_TYPE'] = 'NeGoTiatE' + self.assertEqual('negotiate', self.request.auth_type) + + def test_user_token(self): + token = uuid.uuid4().hex + self.assertIsNone(self.request.user_token) + self.request.headers['X-Auth-Token'] = token + self.assertEqual(token, self.request.user_token) + + def test_storage_token(self): + storage_token = uuid.uuid4().hex + user_token = uuid.uuid4().hex + + self.assertIsNone(self.request.user_token) + self.request.headers['X-Storage-Token'] = storage_token + self.assertEqual(storage_token, self.request.user_token) + self.request.headers['X-Auth-Token'] = user_token + self.assertEqual(user_token, self.request.user_token) + + def test_service_token(self): + token = uuid.uuid4().hex + self.assertIsNone(self.request.service_token) + self.request.headers['X-Service-Token'] = token + self.assertEqual(token, self.request.service_token) + + def test_token_auth(self): + plugin = object() + + self.assertNotIn('keystone.token_auth', self.request.environ) + self.request.token_auth = plugin + self.assertIs(plugin, self.request.environ['keystone.token_auth']) + self.assertIs(plugin, self.request.token_auth) + + +class CatalogConversionTests(utils.TestCase): + + PUBLIC_URL = 'http://server:5000/v2.0' + ADMIN_URL = 'http://admin:35357/v2.0' + INTERNAL_URL = 'http://internal:5000/v2.0' + + REGION_ONE = 'RegionOne' + REGION_TWO = 'RegionTwo' + REGION_THREE = 'RegionThree' + + def test_basic_convert(self): + token = fixture.V3Token() + s = token.add_service(type='identity') + s.add_standard_endpoints(public=self.PUBLIC_URL, + admin=self.ADMIN_URL, + internal=self.INTERNAL_URL, + region=self.REGION_ONE) + + auth_ref = access.AccessInfo.factory(body=token) + catalog_data = auth_ref.service_catalog.get_data() + catalog = _request._v3_to_v2_catalog(catalog_data) + + self.assertEqual(1, len(catalog)) + service = catalog[0] + self.assertEqual(1, len(service['endpoints'])) + endpoints = service['endpoints'][0] + + self.assertEqual('identity', service['type']) + self.assertEqual(4, len(endpoints)) + self.assertEqual(self.PUBLIC_URL, endpoints['publicURL']) + self.assertEqual(self.ADMIN_URL, endpoints['adminURL']) + self.assertEqual(self.INTERNAL_URL, endpoints['internalURL']) + self.assertEqual(self.REGION_ONE, endpoints['region']) + + def test_multi_region(self): + token = fixture.V3Token() + s = token.add_service(type='identity') + + s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE) + s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO) + s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE) + + auth_ref = access.AccessInfo.factory(body=token) + catalog_data = auth_ref.service_catalog.get_data() + catalog = _request._v3_to_v2_catalog(catalog_data) + + self.assertEqual(1, len(catalog)) + service = catalog[0] + + # the 3 regions will come through as 3 separate endpoints + expected = [{'internalURL': self.INTERNAL_URL, + 'region': self.REGION_ONE}, + {'publicURL': self.PUBLIC_URL, + 'region': self.REGION_TWO}, + {'adminURL': self.ADMIN_URL, + 'region': self.REGION_THREE}] + + self.assertEqual('identity', service['type']) + self.assertEqual(3, len(service['endpoints'])) + for e in expected: + self.assertIn(e, expected) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py index d144bb6c..cef65b8e 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py @@ -18,14 +18,14 @@ import shutil import uuid import mock -import testtools from keystonemiddleware.auth_token import _exceptions as exc from keystonemiddleware.auth_token import _revocations from keystonemiddleware.auth_token import _signing_dir +from keystonemiddleware.tests.unit import utils -class RevocationsTests(testtools.TestCase): +class RevocationsTests(utils.BaseTestCase): def _check_with_list(self, revoked_list, token_ids): directory_name = '/tmp/%s' % uuid.uuid4().hex diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py index bef62747..b2ef95dd 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py @@ -15,12 +15,11 @@ import shutil import stat import uuid -import testtools - from keystonemiddleware.auth_token import _signing_dir +from keystonemiddleware.tests.unit import utils -class SigningDirectoryTests(testtools.TestCase): +class SigningDirectoryTests(utils.BaseTestCase): def test_directory_created_when_doesnt_exist(self): # When _SigningDirectory is created, if the directory doesn't exist diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py new file mode 100644 index 00000000..52d29737 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py @@ -0,0 +1,195 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from keystoneclient import auth +from keystoneclient import fixture + +from keystonemiddleware.auth_token import _base +from keystonemiddleware.tests.unit.auth_token import base + +# NOTE(jamielennox): just some sample values that we can use for testing +BASE_URI = 'https://keystone.example.com:1234' +AUTH_URL = 'https://keystone.auth.com:1234' + + +class BaseUserPluginTests(object): + + def configure_middleware(self, + auth_plugin, + group='keystone_authtoken', + **kwargs): + opts = auth.get_plugin_class(auth_plugin).get_options() + self.cfg.register_opts(opts, group=group) + + # Since these tests cfg.config() themselves rather than waiting for + # auth_token to do it on __init__ we need to register the base auth + # options (e.g., auth_plugin) + auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP) + + self.cfg.config(group=group, + auth_plugin=auth_plugin, + **kwargs) + + def assertTokenDataEqual(self, token_id, token, token_data): + self.assertEqual(token_id, token_data.auth_token) + self.assertEqual(token.user_id, token_data.user_id) + try: + trust_id = token.trust_id + except KeyError: + trust_id = None + self.assertEqual(trust_id, token_data.trust_id) + self.assertEqual(self.get_role_names(token), token_data.role_names) + + def get_plugin(self, token_id, service_token_id=None): + headers = {'X-Auth-Token': token_id} + + if service_token_id: + headers['X-Service-Token'] = service_token_id + + m = self.create_simple_middleware() + + resp = self.call(m, headers=headers) + self.assertEqual(200, resp.status_int) + return resp.request.environ['keystone.token_auth'] + + def test_user_information(self): + token_id, token = self.get_token() + plugin = self.get_plugin(token_id) + + self.assertTokenDataEqual(token_id, token, plugin.user) + self.assertFalse(plugin.has_service_token) + self.assertIsNone(plugin.service) + + def test_with_service_information(self): + token_id, token = self.get_token() + service_id, service = self.get_token() + + plugin = self.get_plugin(token_id, service_id) + + self.assertTokenDataEqual(token_id, token, plugin.user) + self.assertTokenDataEqual(service_id, service, plugin.service) + + +class V2UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase): + + def setUp(self): + super(V2UserPluginTests, self).setUp() + + self.service_token = fixture.V2Token() + self.service_token.set_scope() + s = self.service_token.add_service('identity', name='keystone') + + s.add_endpoint(public=BASE_URI, + admin=BASE_URI, + internal=BASE_URI) + + self.configure_middleware(auth_plugin='v2password', + auth_url='%s/v2.0/' % AUTH_URL, + user_id=self.service_token.user_id, + password=uuid.uuid4().hex, + tenant_id=self.service_token.tenant_id) + + auth_discovery = fixture.DiscoveryList(href=AUTH_URL, v3=False) + self.requests_mock.get(AUTH_URL, json=auth_discovery) + + base_discovery = fixture.DiscoveryList(href=BASE_URI, v3=False) + self.requests_mock.get(BASE_URI, json=base_discovery) + + url = '%s/v2.0/tokens' % AUTH_URL + self.requests_mock.post(url, json=self.service_token) + + def get_role_names(self, token): + return set(x['name'] for x in token['access']['user'].get('roles', [])) + + def get_token(self): + token = fixture.V2Token() + token.set_scope() + token.add_role() + + request_headers = {'X-Auth-Token': self.service_token.token_id} + + url = '%s/v2.0/tokens/%s' % (BASE_URI, token.token_id) + self.requests_mock.get(url, + request_headers=request_headers, + json=token) + + return token.token_id, token + + def assertTokenDataEqual(self, token_id, token, token_data): + super(V2UserPluginTests, self).assertTokenDataEqual(token_id, + token, + token_data) + + self.assertEqual(token.tenant_id, token_data.project_id) + self.assertIsNone(token_data.user_domain_id) + self.assertIsNone(token_data.project_domain_id) + + +class V3UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase): + + def setUp(self): + super(V3UserPluginTests, self).setUp() + + self.service_token_id = uuid.uuid4().hex + self.service_token = fixture.V3Token() + s = self.service_token.add_service('identity', name='keystone') + s.add_standard_endpoints(public=BASE_URI, + admin=BASE_URI, + internal=BASE_URI) + + self.configure_middleware(auth_plugin='v3password', + auth_url='%s/v3/' % AUTH_URL, + user_id=self.service_token.user_id, + password=uuid.uuid4().hex, + project_id=self.service_token.project_id) + + auth_discovery = fixture.DiscoveryList(href=AUTH_URL) + self.requests_mock.get(AUTH_URL, json=auth_discovery) + + base_discovery = fixture.DiscoveryList(href=BASE_URI) + self.requests_mock.get(BASE_URI, json=base_discovery) + + self.requests_mock.post( + '%s/v3/auth/tokens' % AUTH_URL, + headers={'X-Subject-Token': self.service_token_id}, + json=self.service_token) + + def get_role_names(self, token): + return set(x['name'] for x in token['token'].get('roles', [])) + + def get_token(self): + token_id = uuid.uuid4().hex + token = fixture.V3Token() + token.set_project_scope() + token.add_role() + + request_headers = {'X-Auth-Token': self.service_token_id, + 'X-Subject-Token': token_id} + headers = {'X-Subject-Token': token_id} + + self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI, + request_headers=request_headers, + headers=headers, + json=token) + + return token_id, token + + def assertTokenDataEqual(self, token_id, token, token_data): + super(V3UserPluginTests, self).assertTokenDataEqual(token_id, + token, + token_data) + + self.assertEqual(token.user_domain_id, token_data.user_domain_id) + self.assertEqual(token.project_id, token_data.project_id) + self.assertEqual(token.project_domain_id, token_data.project_domain_id) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py index 89e5aa44..48ff9a4f 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py @@ -18,11 +18,11 @@ import uuid import mock from oslo_config import cfg from pycadf import identifier -import testtools from testtools import matchers import webob from keystonemiddleware import audit +from keystonemiddleware.tests.unit import utils class FakeApp(object): @@ -40,7 +40,7 @@ class FakeFailingApp(object): raise Exception('It happens!') -class BaseAuditMiddlewareTest(testtools.TestCase): +class BaseAuditMiddlewareTest(utils.BaseTestCase): def setUp(self): super(BaseAuditMiddlewareTest, self).setUp() self.fd, self.audit_map = tempfile.mkstemp() @@ -90,13 +90,13 @@ class BaseAuditMiddlewareTest(testtools.TestCase): return env_headers -@mock.patch('oslo.messaging.get_transport', mock.MagicMock()) +@mock.patch('oslo_messaging.get_transport', mock.MagicMock()) class AuditMiddlewareTest(BaseAuditMiddlewareTest): def test_api_request(self): req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info') as notify: + with mock.patch('oslo_messaging.Notifier.info') as notify: self.middleware(req) # Check first notification with only 'request' call_args = notify.call_args_list[0][0] @@ -121,7 +121,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): service_name='pycadf') req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info') as notify: + with mock.patch('oslo_messaging.Notifier.info') as notify: try: self.middleware(req) self.fail('Application exception has not been re-raised') @@ -144,7 +144,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): def test_process_request_fail(self): req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info', + with mock.patch('oslo_messaging.Notifier.info', side_effect=Exception('error')) as notify: self.middleware._process_request(req) self.assertTrue(notify.called) @@ -152,7 +152,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): def test_process_response_fail(self): req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info', + with mock.patch('oslo_messaging.Notifier.info', side_effect=Exception('error')) as notify: self.middleware._process_response(req, webob.response.Response()) self.assertTrue(notify.called) @@ -167,7 +167,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): environ=self.get_environ_header('PUT')) req2 = webob.Request.blank('/accept/foo', environ=self.get_environ_header('POST')) - with mock.patch('oslo.messaging.Notifier.info') as notify: + with mock.patch('oslo_messaging.Notifier.info') as notify: # Check GET/PUT request does not send notification self.middleware(req) self.middleware(req1) @@ -207,7 +207,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): service_name='pycadf') req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info') as notify: + with mock.patch('oslo_messaging.Notifier.info') as notify: middleware(req) self.assertIsNotNone(req.environ.get('cadf_event')) @@ -222,13 +222,13 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): service_name='pycadf') req = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info', + with mock.patch('oslo_messaging.Notifier.info', side_effect=Exception('error')) as notify: middleware._process_request(req) self.assertTrue(notify.called) req2 = webob.Request.blank('/foo/bar', environ=self.get_environ_header('GET')) - with mock.patch('oslo.messaging.Notifier.info') as notify: + with mock.patch('oslo_messaging.Notifier.info') as notify: middleware._process_response(req2, webob.response.Response()) self.assertTrue(notify.called) # ensure event is not the same across requests @@ -236,7 +236,7 @@ class AuditMiddlewareTest(BaseAuditMiddlewareTest): notify.call_args_list[0][0][2]['id']) -@mock.patch('oslo.messaging', mock.MagicMock()) +@mock.patch('oslo_messaging.rpc', mock.MagicMock()) class AuditApiLogicTest(BaseAuditMiddlewareTest): def api_request(self, method, url): @@ -483,3 +483,72 @@ class AuditApiLogicTest(BaseAuditMiddlewareTest): self.middleware._process_request(req) payload = req.environ['cadf_event'].as_dict() self.assertEqual(payload['target']['id'], identifier.norm_ns('nova')) + + def test_endpoint_missing_internal_url(self): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "publicURL": + "http://public_host:8774"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'REQUEST_METHOD': 'GET'} + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=env_headers) + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual((payload['target']['addresses'][1]['url']), "unknown") + + def test_endpoint_missing_public_url(self): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "internalURL": + "http://internal_host:8774"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'REQUEST_METHOD': 'GET'} + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=env_headers) + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual((payload['target']['addresses'][2]['url']), "unknown") + + def test_endpoint_missing_admin_url(self): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"region": "RegionOne", + "publicURL": + "http://public_host:8774", + "internalURL": + "http://internal_host:8774"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'REQUEST_METHOD': 'GET'} + req = webob.Request.blank('http://public_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=env_headers) + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual((payload['target']['addresses'][0]['url']), "unknown") diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py index 93e1b06e..9ddb8005 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. -import pkg_resources +import stevedore from testtools import matchers from keystonemiddleware import opts @@ -46,6 +46,7 @@ class OptsTestCase(utils.TestCase): 'certfile', 'keyfile', 'cafile', + 'region_name', 'insecure', 'signing_dir', 'memcached_servers', @@ -74,12 +75,12 @@ class OptsTestCase(utils.TestCase): self._test_list_auth_token_opts(opts.list_auth_token_opts()) def test_entry_point(self): - result = None - for ep in pkg_resources.iter_entry_points('oslo.config.opts'): - if ep.name == 'keystonemiddleware.auth_token': - list_fn = ep.load() - result = list_fn() + em = stevedore.ExtensionManager('oslo.config.opts', + invoke_on_load=True) + for extension in em: + if extension.name == 'keystonemiddleware.auth_token': break + else: + self.fail('keystonemiddleware.auth_token not found') - self.assertIsNotNone(result) - self._test_list_auth_token_opts(result) + self._test_list_auth_token_opts(extension.obj) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py index 2bcdf894..b0993886 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py @@ -17,7 +17,7 @@ from oslo_serialization import jsonutils import requests from requests_mock.contrib import fixture as rm_fixture import six -import testtools +from six.moves import urllib import webob from keystonemiddleware import s3_token @@ -54,7 +54,7 @@ class S3TokenMiddlewareTestBase(utils.TestCase): 'auth_protocol': self.TEST_PROTOCOL, } - self.requests = self.useFixture(rm_fixture.Fixture()) + self.requests_mock = self.useFixture(rm_fixture.Fixture()) def start_fake_response(self, status, headers): self.response_status = int(status.split(' ', 1)[0]) @@ -67,7 +67,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): super(S3TokenMiddlewareTestGood, self).setUp() self.middleware = s3_token.S3Token(FakeApp(), self.conf) - self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE) + self.requests_mock.post(self.TEST_URL, + status_code=201, + json=GOOD_RESPONSE) # Ignore the request and pass to the next middleware in the # pipeline if no path has been specified. @@ -98,9 +100,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') def test_authorized_http(self): - self.requests.post(self.TEST_URL.replace('https', 'http'), - status_code=201, - json=GOOD_RESPONSE) + self.requests_mock.post(self.TEST_URL.replace('https', 'http'), + status_code=201, + json=GOOD_RESPONSE) self.middleware = ( s3_token.filter_factory({'auth_protocol': 'http', @@ -124,7 +126,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): @mock.patch.object(requests, 'post') def test_insecure(self, MOCK_REQUEST): self.middleware = ( - s3_token.filter_factory({'insecure': True})(FakeApp())) + s3_token.filter_factory({'insecure': 'True'})(FakeApp())) text_return_value = jsonutils.dumps(GOOD_RESPONSE) if six.PY3: @@ -142,6 +144,35 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): mock_args, mock_kwargs = MOCK_REQUEST.call_args self.assertIs(mock_kwargs['verify'], False) + def test_insecure_option(self): + # insecure is passed as a string. + + # Some non-secure values. + true_values = ['true', 'True', '1', 'yes'] + for val in true_values: + config = {'insecure': val, 'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertIs(False, middleware._verify) + + # Some "secure" values, including unexpected value. + false_values = ['false', 'False', '0', 'no', 'someweirdvalue'] + for val in false_values: + config = {'insecure': val, 'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertEqual('false_ind', middleware._verify) + + # Default is secure. + config = {'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertIs('false_ind', middleware._verify) + + def test_unicode_path(self): + url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8') + req = webob.Request.blank(urllib.parse.quote(url)) + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): def setUp(self): @@ -153,7 +184,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): {"message": "EC2 access key not found.", "code": 401, "title": "Unauthorized"}} - self.requests.post(self.TEST_URL, status_code=403, json=ret) + self.requests_mock.post(self.TEST_URL, status_code=403, json=ret) req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' req.headers['X-Storage-Token'] = 'token' @@ -185,7 +216,9 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): self.assertEqual(resp.status_int, s3_invalid_req.status_int) def test_bad_reply(self): - self.requests.post(self.TEST_URL, status_code=201, text="<badreply>") + self.requests_mock.post(self.TEST_URL, + status_code=201, + text="<badreply>") req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' @@ -196,7 +229,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): self.assertEqual(resp.status_int, s3_invalid_req.status_int) -class S3TokenMiddlewareTestUtil(testtools.TestCase): +class S3TokenMiddlewareTestUtil(utils.BaseTestCase): def test_split_path_failed(self): self.assertRaises(ValueError, s3_token._split_path, '') self.assertRaises(ValueError, s3_token._split_path, '/') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py index da6f347a..8c6c0e9a 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py @@ -13,15 +13,27 @@ import logging import sys import time +import warnings import fixtures import mock +import oslotest.base as oslotest import requests -import testtools import uuid -class TestCase(testtools.TestCase): +class BaseTestCase(oslotest.BaseTestCase): + def setUp(self): + super(BaseTestCase, self).setUp() + + # If keystonemiddleware calls any deprecated function this will raise + # an exception. + warnings.filterwarnings('error', category=DeprecationWarning, + module='^keystonemiddleware\\.') + self.addCleanup(warnings.resetwarnings) + + +class TestCase(BaseTestCase): TEST_DOMAIN_ID = '1' TEST_DOMAIN_NAME = 'aDomain' TEST_GROUP_ID = uuid.uuid4().hex @@ -108,7 +120,7 @@ class DisableModuleFixture(fixtures.Fixture): def clear_module(self): cleared_modules = {} - for fullname in sys.modules.keys(): + for fullname in list(sys.modules.keys()): if (fullname == self.module or fullname.startswith(self.module + '.')): cleared_modules[fullname] = sys.modules.pop(fullname) |