From 03bf0c32a0c656d4b91bebedc87a005e6d7563bb Mon Sep 17 00:00:00 2001 From: WuKong Date: Wed, 1 Jul 2015 08:54:55 +0200 Subject: migrate openstack hook to opnfv Change-Id: I1e828dae38820fdff93966e57691b344af01140f Signed-off-by: WuKong --- .../keystonemiddleware/tests/unit/__init__.py | 0 .../tests/unit/auth_token/__init__.py | 0 .../tests/unit/auth_token/test_auth.py | 102 + .../unit/auth_token/test_auth_token_middleware.py | 2763 ++++++++++++++++++++ .../tests/unit/auth_token/test_connection_pool.py | 118 + .../tests/unit/auth_token/test_memcache_crypt.py | 97 + .../tests/unit/auth_token/test_revocations.py | 65 + .../tests/unit/auth_token/test_signing_dir.py | 138 + .../tests/unit/auth_token/test_utils.py | 37 + .../tests/unit/client_fixtures.py | 452 ++++ .../tests/unit/test_audit_middleware.py | 485 ++++ .../keystonemiddleware/tests/unit/test_opts.py | 85 + .../tests/unit/test_s3_token_middleware.py | 235 ++ .../keystonemiddleware/tests/unit/utils.py | 138 + 14 files changed, 4715 insertions(+) create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py create mode 100644 keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py new file mode 100644 index 00000000..517d597b --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py @@ -0,0 +1,102 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import uuid + +from keystoneclient import auth +from keystoneclient import fixture +from keystoneclient import session +from requests_mock.contrib import fixture as rm_fixture +import six +import testtools + +from keystonemiddleware.auth_token import _auth + + +class DefaultAuthPluginTests(testtools.TestCase): + + def new_plugin(self, auth_host=None, auth_port=None, auth_protocol=None, + auth_admin_prefix=None, admin_user=None, + admin_password=None, admin_tenant_name=None, + admin_token=None, identity_uri=None, log=None): + if not log: + log = self.logger + + return _auth.AuthTokenPlugin.load_from_options( + auth_host=auth_host, + auth_port=auth_port, + auth_protocol=auth_protocol, + auth_admin_prefix=auth_admin_prefix, + admin_user=admin_user, + admin_password=admin_password, + admin_tenant_name=admin_tenant_name, + admin_token=admin_token, + identity_uri=identity_uri, + log=log) + + def setUp(self): + super(DefaultAuthPluginTests, self).setUp() + + self.stream = six.StringIO() + self.logger = logging.getLogger(__name__) + self.session = session.Session() + self.requests = self.useFixture(rm_fixture.Fixture()) + + def test_auth_uri_from_fragments(self): + auth_protocol = 'http' + auth_host = 'testhost' + auth_port = 8888 + auth_admin_prefix = 'admin' + + expected = '%s://%s:%d/admin' % (auth_protocol, auth_host, auth_port) + + plugin = self.new_plugin(auth_host=auth_host, + auth_protocol=auth_protocol, + auth_port=auth_port, + auth_admin_prefix=auth_admin_prefix) + + self.assertEqual(expected, + plugin.get_endpoint(self.session, + interface=auth.AUTH_INTERFACE)) + + def test_identity_uri_overrides_fragments(self): + identity_uri = 'http://testhost:8888/admin' + plugin = self.new_plugin(identity_uri=identity_uri, + auth_host='anotherhost', + auth_port=9999, + auth_protocol='ftp') + + self.assertEqual(identity_uri, + plugin.get_endpoint(self.session, + interface=auth.AUTH_INTERFACE)) + + def test_with_admin_token(self): + token = uuid.uuid4().hex + plugin = self.new_plugin(identity_uri='http://testhost:8888/admin', + admin_token=token) + self.assertEqual(token, plugin.get_token(self.session)) + + def test_with_user_pass(self): + base_uri = 'http://testhost:8888/admin' + token = fixture.V2Token() + admin_tenant_name = uuid.uuid4().hex + + self.requests.post(base_uri + '/v2.0/tokens', + json=token) + + plugin = self.new_plugin(identity_uri=base_uri, + admin_user=uuid.uuid4().hex, + admin_password=uuid.uuid4().hex, + admin_tenant_name=admin_tenant_name) + + self.assertEqual(token.token_id, plugin.get_token(self.session)) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py new file mode 100644 index 00000000..97fcc557 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py @@ -0,0 +1,2763 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import calendar +import datetime +import json +import logging +import os +import shutil +import stat +import tempfile +import time +import uuid + +import fixtures +from keystoneclient import access +from keystoneclient import auth +from keystoneclient.common import cms +from keystoneclient import exceptions +from keystoneclient import fixture +from keystoneclient import session +import mock +from oslo_config import fixture as cfg_fixture +from oslo_serialization import jsonutils +from oslo_utils import timeutils +from requests_mock.contrib import fixture as rm_fixture +import six +import testresources +import testtools +from testtools import matchers +import webob +import webob.dec + +from keystonemiddleware import auth_token +from keystonemiddleware.auth_token import _base +from keystonemiddleware.auth_token import _exceptions as exc +from keystonemiddleware.auth_token import _revocations +from keystonemiddleware.openstack.common import memorycache +from keystonemiddleware.tests.unit import client_fixtures +from keystonemiddleware.tests.unit import utils + + +EXPECTED_V2_DEFAULT_ENV_RESPONSE = { + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'HTTP_X_TENANT_ID': 'tenant_id1', + 'HTTP_X_TENANT_NAME': 'tenant_name1', + 'HTTP_X_USER_ID': 'user_id1', + 'HTTP_X_USER_NAME': 'user_name1', + 'HTTP_X_ROLES': 'role1,role2', + 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat) + 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat) + 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat) +} + +EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = { + 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Confirmed', + 'HTTP_X_SERVICE_PROJECT_ID': 'service_project_id1', + 'HTTP_X_SERVICE_PROJECT_NAME': 'service_project_name1', + 'HTTP_X_SERVICE_USER_ID': 'service_user_id1', + 'HTTP_X_SERVICE_USER_NAME': 'service_user_name1', + 'HTTP_X_SERVICE_ROLES': 'service_role1,service_role2', +} + +EXPECTED_V3_DEFAULT_ENV_ADDITIONS = { + 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1', + 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1', + 'HTTP_X_USER_DOMAIN_ID': 'domain_id1', + 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1', +} + +EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS = { + 'HTTP_X_SERVICE_PROJECT_DOMAIN_ID': 'service_domain_id1', + 'HTTP_X_SERVICE_PROJECT_DOMAIN_NAME': 'service_domain_name1', + 'HTTP_X_SERVICE_USER_DOMAIN_ID': 'service_domain_id1', + 'HTTP_X_SERVICE_USER_DOMAIN_NAME': 'service_domain_name1' +} + + +BASE_HOST = 'https://keystone.example.com:1234' +BASE_URI = '%s/testadmin' % BASE_HOST +FAKE_ADMIN_TOKEN_ID = 'admin_token2' +FAKE_ADMIN_TOKEN = jsonutils.dumps( + {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID, + 'expires': '2022-10-03T16:58:01Z'}}}) + +VERSION_LIST_v3 = fixture.DiscoveryList(href=BASE_URI) +VERSION_LIST_v2 = fixture.DiscoveryList(v3=False, href=BASE_URI) + +ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2' +MEMCACHED_SERVERS = ['localhost:11211'] +MEMCACHED_AVAILABLE = None + + +def memcached_available(): + """Do a sanity check against memcached. + + Returns ``True`` if the following conditions are met (otherwise, returns + ``False``): + + - ``python-memcached`` is installed + - a usable ``memcached`` instance is available via ``MEMCACHED_SERVERS`` + - the client is able to set and get a key/value pair + + """ + global MEMCACHED_AVAILABLE + + if MEMCACHED_AVAILABLE is None: + try: + import memcache + c = memcache.Client(MEMCACHED_SERVERS) + c.set('ping', 'pong', time=1) + MEMCACHED_AVAILABLE = c.get('ping') == 'pong' + except ImportError: + MEMCACHED_AVAILABLE = False + + return MEMCACHED_AVAILABLE + + +def cleanup_revoked_file(filename): + try: + os.remove(filename) + except OSError: + pass + + +class TimezoneFixture(fixtures.Fixture): + @staticmethod + def supported(): + # tzset is only supported on Unix. + return hasattr(time, 'tzset') + + def __init__(self, new_tz): + super(TimezoneFixture, self).__init__() + self.tz = new_tz + self.old_tz = os.environ.get('TZ') + + def setUp(self): + super(TimezoneFixture, self).setUp() + if not self.supported(): + raise NotImplementedError('timezone override is not supported.') + os.environ['TZ'] = self.tz + time.tzset() + self.addCleanup(self.cleanup) + + def cleanup(self): + if self.old_tz is not None: + os.environ['TZ'] = self.old_tz + elif 'TZ' in os.environ: + del os.environ['TZ'] + time.tzset() + + +class TimeFixture(fixtures.Fixture): + + def __init__(self, new_time, normalize=True): + super(TimeFixture, self).__init__() + if isinstance(new_time, six.string_types): + new_time = timeutils.parse_isotime(new_time) + if normalize: + new_time = timeutils.normalize_time(new_time) + self.new_time = new_time + + def setUp(self): + super(TimeFixture, self).setUp() + timeutils.set_time_override(self.new_time) + self.addCleanup(timeutils.clear_time_override) + + +class FakeApp(object): + """This represents a WSGI app protected by the auth_token middleware.""" + + SUCCESS = b'SUCCESS' + FORBIDDEN = b'FORBIDDEN' + expected_env = {} + + def __init__(self, expected_env=None, need_service_token=False): + self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + + if expected_env: + self.expected_env.update(expected_env) + + self.need_service_token = need_service_token + + def __call__(self, env, start_response): + for k, v in self.expected_env.items(): + assert env[k] == v, '%s != %s' % (env[k], v) + + resp = webob.Response() + + if (env.get('HTTP_X_IDENTITY_STATUS') == 'Invalid' + and env['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'): + # Simulate delayed auth forbidding access with arbitrary status + # code to differentiate checking this code path + resp.status = 419 + resp.body = FakeApp.FORBIDDEN + elif env.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid': + # Simulate delayed auth forbidding access with arbitrary status + # code to differentiate checking this code path + resp.status = 420 + resp.body = FakeApp.FORBIDDEN + elif env['HTTP_X_IDENTITY_STATUS'] == 'Invalid': + # Simulate delayed auth forbidding access + resp.status = 403 + resp.body = FakeApp.FORBIDDEN + elif (self.need_service_token is True and + env.get('HTTP_X_SERVICE_TOKEN') is None): + # Simulate requiring composite auth + # Arbitrary value to allow checking this code path + resp.status = 418 + resp.body = FakeApp.FORBIDDEN + else: + resp.body = FakeApp.SUCCESS + + return resp(env, start_response) + + +class v3FakeApp(FakeApp): + """This represents a v3 WSGI app protected by the auth_token middleware.""" + + def __init__(self, expected_env=None, need_service_token=False): + + # with v3 additions, these are for the DEFAULT TOKEN + v3_default_env_additions = dict(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) + if expected_env: + v3_default_env_additions.update(expected_env) + super(v3FakeApp, self).__init__(expected_env=v3_default_env_additions, + need_service_token=need_service_token) + + +class CompositeBase(object): + """Base composite auth object with common service token environment.""" + + def __init__(self, expected_env=None): + comp_expected_env = dict(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + + if expected_env: + comp_expected_env.update(expected_env) + + super(CompositeBase, self).__init__( + expected_env=comp_expected_env, need_service_token=True) + + +class CompositeFakeApp(CompositeBase, FakeApp): + """A fake v2 WSGI app protected by composite auth_token middleware.""" + + def __init__(self, expected_env): + super(CompositeFakeApp, self).__init__(expected_env=expected_env) + + +class v3CompositeFakeApp(CompositeBase, v3FakeApp): + """A fake v3 WSGI app protected by composite auth_token middleware.""" + + def __init__(self, expected_env=None): + + # with v3 additions, these are for the DEFAULT SERVICE TOKEN + v3_default_service_env_additions = dict( + EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS) + + if expected_env: + v3_default_service_env_additions.update(expected_env) + + super(v3CompositeFakeApp, self).__init__( + v3_default_service_env_additions) + + +def new_app(status, body, headers={}): + + class _App(object): + + def __init__(self, expected_env=None): + self.expected_env = expected_env + + @webob.dec.wsgify + def __call__(self, req): + resp = webob.Response(body, status) + resp.headers.update(headers) + return resp + + return _App + + +class BaseAuthTokenMiddlewareTest(testtools.TestCase): + """Base test class for auth_token middleware. + + All the tests allow for running with auth_token + configured for receiving v2 or v3 tokens, with the + choice being made by passing configuration data into + setUp(). + + The base class will, by default, run all the tests + expecting v2 token formats. Child classes can override + this to specify, for instance, v3 format. + + """ + def setUp(self, expected_env=None, auth_version=None, fake_app=None): + super(BaseAuthTokenMiddlewareTest, self).setUp() + + self.expected_env = expected_env or dict() + self.fake_app = fake_app or FakeApp + self.middleware = None + self.requests = self.useFixture(rm_fixture.Fixture()) + + signing_dir = self._setup_signing_directory() + + self.conf = { + 'identity_uri': 'https://keystone.example.com:1234/testadmin/', + 'signing_dir': signing_dir, + 'auth_version': auth_version, + 'auth_uri': 'https://keystone.example.com:1234', + 'admin_user': uuid.uuid4().hex, + } + + self.auth_version = auth_version + self.response_status = None + self.response_headers = None + + def _setup_signing_directory(self): + directory_name = self.useFixture(fixtures.TempDir()).path + + # Copy the sample certificate files into the temporary directory. + for filename in ['cacert.pem', 'signing_cert.pem', ]: + shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename), + os.path.join(directory_name, filename)) + + return directory_name + + def set_middleware(self, expected_env=None, conf=None): + """Configure the class ready to call the auth_token middleware. + + Set up the various fake items needed to run the middleware. + Individual tests that need to further refine these can call this + function to override the class defaults. + + """ + if conf: + self.conf.update(conf) + + if expected_env: + self.expected_env.update(expected_env) + + self.middleware = auth_token.AuthProtocol( + self.fake_app(self.expected_env), self.conf) + + self.middleware._revocations._list = jsonutils.dumps( + {"revoked": [], "extra": "success"}) + + def update_expected_env(self, expected_env={}): + self.middleware._app.expected_env.update(expected_env) + + def purge_token_expected_env(self): + for key in six.iterkeys(self.token_expected_env): + del self.middleware._app.expected_env[key] + + def purge_service_token_expected_env(self): + for key in six.iterkeys(self.service_token_expected_env): + del self.middleware._app.expected_env[key] + + def start_fake_response(self, status, headers, exc_info=None): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + + def assertLastPath(self, path): + if path: + self.assertEqual(BASE_URI + path, self.requests.last_request.url) + else: + self.assertIsNone(self.requests.last_request) + + +class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + """Auth Token middleware should understand Diablo keystone responses.""" + def setUp(self): + # pre-diablo only had Tenant ID, which was also the Name + expected_env = { + 'HTTP_X_TENANT_ID': 'tenant_id1', + 'HTTP_X_TENANT_NAME': 'tenant_id1', + # now deprecated (diablo-compat) + 'HTTP_X_TENANT': 'tenant_id1', + } + + super(DiabloAuthTokenMiddlewareTest, self).setUp( + expected_env=expected_env) + + self.requests.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) + + self.requests.post("%s/v2.0/tokens" % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + self.token_id = self.examples.VALID_DIABLO_TOKEN + token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id] + + url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id) + self.requests.get(url, text=token_response) + + self.set_middleware() + + def test_valid_diablo_response(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.token_id + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertIn('keystone.token_info', req.environ) + + +class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest): + """These tests will not have the memcache module available.""" + + def setUp(self): + super(NoMemcacheAuthToken, self).setUp() + self.useFixture(utils.DisableModuleFixture('memcache')) + + def test_nomemcache(self): + conf = { + 'admin_token': 'admin_token1', + 'auth_host': 'keystone.example.com', + 'auth_port': '1234', + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'auth_uri': 'https://keystone.example.com:1234', + } + + auth_token.AuthProtocol(FakeApp(), conf) + + +class CachePoolTest(BaseAuthTokenMiddlewareTest): + def test_use_cache_from_env(self): + """If `swift.cache` is set in the environment and `cache` is set in the + config then the env cache is used. + """ + env = {'swift.cache': 'CACHE_TEST'} + conf = { + 'cache': 'swift.cache' + } + self.set_middleware(conf=conf) + self.middleware._token_cache.initialize(env) + with self.middleware._token_cache._cache_pool.reserve() as cache: + self.assertEqual(cache, 'CACHE_TEST') + + def test_not_use_cache_from_env(self): + """If `swift.cache` is set in the environment but `cache` isn't set in + the config then the env cache isn't used. + """ + self.set_middleware() + env = {'swift.cache': 'CACHE_TEST'} + self.middleware._token_cache.initialize(env) + with self.middleware._token_cache._cache_pool.reserve() as cache: + self.assertNotEqual(cache, 'CACHE_TEST') + + def test_multiple_context_managers_share_single_client(self): + self.set_middleware() + token_cache = self.middleware._token_cache + env = {} + token_cache.initialize(env) + + caches = [] + + with token_cache._cache_pool.reserve() as cache: + caches.append(cache) + + with token_cache._cache_pool.reserve() as cache: + caches.append(cache) + + self.assertIs(caches[0], caches[1]) + self.assertEqual(set(caches), set(token_cache._cache_pool)) + + def test_nested_context_managers_create_multiple_clients(self): + self.set_middleware() + env = {} + self.middleware._token_cache.initialize(env) + token_cache = self.middleware._token_cache + + with token_cache._cache_pool.reserve() as outer_cache: + with token_cache._cache_pool.reserve() as inner_cache: + self.assertNotEqual(outer_cache, inner_cache) + + self.assertEqual( + set([inner_cache, outer_cache]), + set(token_cache._cache_pool)) + + +class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + """These tests are not affected by the token format + (see CommonAuthTokenMiddlewareTest). + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def test_token_is_v2_accepts_v2(self): + token = self.examples.UUID_TOKEN_DEFAULT + token_response = self.examples.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v2(token_response)) + + def test_token_is_v2_rejects_v3(self): + token = self.examples.v3_UUID_TOKEN_DEFAULT + token_response = self.examples.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v2(token_response)) + + def test_token_is_v3_rejects_v2(self): + token = self.examples.UUID_TOKEN_DEFAULT + token_response = self.examples.TOKEN_RESPONSES[token] + self.assertFalse(auth_token._token_is_v3(token_response)) + + def test_token_is_v3_accepts_v3(self): + token = self.examples.v3_UUID_TOKEN_DEFAULT + token_response = self.examples.TOKEN_RESPONSES[token] + self.assertTrue(auth_token._token_is_v3(token_response)) + + @testtools.skipUnless(memcached_available(), 'memcached not available') + def test_encrypt_cache_data(self): + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'encrypt', + 'memcache_secret_key': 'mysecret' + } + self.set_middleware(conf=conf) + token = b'my_token' + some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) + expires = timeutils.strtime(some_time_later) + data = ('this_data', expires) + token_cache = self.middleware._token_cache + token_cache.initialize({}) + token_cache._cache_store(token, data) + self.assertEqual(token_cache._cache_get(token), data[0]) + + @testtools.skipUnless(memcached_available(), 'memcached not available') + def test_sign_cache_data(self): + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'mac', + 'memcache_secret_key': 'mysecret' + } + self.set_middleware(conf=conf) + token = b'my_token' + some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) + expires = timeutils.strtime(some_time_later) + data = ('this_data', expires) + token_cache = self.middleware._token_cache + token_cache.initialize({}) + token_cache._cache_store(token, data) + self.assertEqual(token_cache._cache_get(token), data[0]) + + @testtools.skipUnless(memcached_available(), 'memcached not available') + def test_no_memcache_protection(self): + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_secret_key': 'mysecret' + } + self.set_middleware(conf=conf) + token = 'my_token' + some_time_later = timeutils.utcnow() + datetime.timedelta(hours=4) + expires = timeutils.strtime(some_time_later) + data = ('this_data', expires) + token_cache = self.middleware._token_cache + token_cache.initialize({}) + token_cache._cache_store(token, data) + self.assertEqual(token_cache._cache_get(token), data[0]) + + def test_assert_valid_memcache_protection_config(self): + # test missing memcache_secret_key + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'Encrypt' + } + self.assertRaises(exc.ConfigurationError, self.set_middleware, + conf=conf) + # test invalue memcache_security_strategy + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'whatever' + } + self.assertRaises(exc.ConfigurationError, self.set_middleware, + conf=conf) + # test missing memcache_secret_key + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'mac' + } + self.assertRaises(exc.ConfigurationError, self.set_middleware, + conf=conf) + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'Encrypt', + 'memcache_secret_key': '' + } + self.assertRaises(exc.ConfigurationError, self.set_middleware, + conf=conf) + conf = { + 'memcached_servers': ','.join(MEMCACHED_SERVERS), + 'memcache_security_strategy': 'mAc', + 'memcache_secret_key': '' + } + self.assertRaises(exc.ConfigurationError, self.set_middleware, + conf=conf) + + def test_config_revocation_cache_timeout(self): + conf = { + 'revocation_cache_time': '24', + 'auth_uri': 'https://keystone.example.com:1234', + 'admin_user': uuid.uuid4().hex + } + middleware = auth_token.AuthProtocol(self.fake_app, conf) + self.assertEqual(middleware._revocations._cache_timeout, + datetime.timedelta(seconds=24)) + + def test_conf_values_type_convert(self): + conf = { + 'revocation_cache_time': '24', + 'identity_uri': 'https://keystone.example.com:1234', + 'include_service_catalog': '0', + 'nonexsit_option': '0', + } + + middleware = auth_token.AuthProtocol(self.fake_app, conf) + self.assertEqual(datetime.timedelta(seconds=24), + middleware._revocations._cache_timeout) + self.assertEqual(False, middleware._include_service_catalog) + self.assertEqual('0', middleware._conf['nonexsit_option']) + + def test_deprecated_conf_values(self): + conf = { + 'memcache_servers': ','.join(MEMCACHED_SERVERS), + } + + middleware = auth_token.AuthProtocol(self.fake_app, conf) + self.assertEqual(MEMCACHED_SERVERS, + middleware._conf_get('memcached_servers')) + + def test_conf_values_type_convert_with_wrong_value(self): + conf = { + 'include_service_catalog': '123', + } + self.assertRaises(exc.ConfigurationError, + auth_token.AuthProtocol, self.fake_app, conf) + + +class CommonAuthTokenMiddlewareTest(object): + """These tests are run once using v2 tokens and again using v3 tokens.""" + + def test_init_does_not_call_http(self): + conf = { + 'revocation_cache_time': '1' + } + self.set_middleware(conf=conf) + self.assertLastPath(None) + + def test_auth_with_no_token_does_not_call_http(self): + self.set_middleware() + req = webob.Request.blank('/') + self.middleware(req.environ, self.start_fake_response) + self.assertLastPath(None) + self.assertEqual(401, self.response_status) + + def test_init_by_ipv6Addr_auth_host(self): + del self.conf['identity_uri'] + conf = { + 'auth_host': '2001:2013:1:f101::1', + 'auth_port': '1234', + 'auth_protocol': 'http', + 'auth_uri': None, + 'auth_version': 'v3.0', + } + self.set_middleware(conf=conf) + expected_auth_uri = 'http://[2001:2013:1:f101::1]:1234' + self.assertEqual(expected_auth_uri, + self.middleware._auth_uri) + + def assert_valid_request_200(self, token, with_catalog=True): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + if with_catalog: + self.assertTrue(req.headers.get('X-Service-Catalog')) + else: + self.assertNotIn('X-Service-Catalog', req.headers) + self.assertEqual(body, [FakeApp.SUCCESS]) + self.assertIn('keystone.token_info', req.environ) + return req + + def test_valid_uuid_request(self): + for _ in range(2): # Do it twice because first result was cached. + token = self.token_dict['uuid_token_default'] + self.assert_valid_request_200(token) + self.assert_valid_last_url(token) + + def test_valid_uuid_request_with_auth_fragments(self): + del self.conf['identity_uri'] + self.conf['auth_protocol'] = 'https' + self.conf['auth_host'] = 'keystone.example.com' + self.conf['auth_port'] = '1234' + self.conf['auth_admin_prefix'] = '/testadmin' + self.set_middleware() + self.assert_valid_request_200(self.token_dict['uuid_token_default']) + self.assert_valid_last_url(self.token_dict['uuid_token_default']) + + def _test_cache_revoked(self, token, revoked_form=None): + # When the token is cached and revoked, 401 is returned. + self.middleware._check_revocations_for_cached = True + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + + # Token should be cached as ok after this. + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + + # Put it in revocation list. + self.middleware._revocations._list = self.get_revocation_list_json( + token_ids=[revoked_form or token]) + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + + def test_cached_revoked_uuid(self): + # When the UUID token is cached and revoked, 401 is returned. + self._test_cache_revoked(self.token_dict['uuid_token_default']) + + def test_valid_signed_request(self): + for _ in range(2): # Do it twice because first result was cached. + self.assert_valid_request_200( + self.token_dict['signed_token_scoped']) + # ensure that signed requests do not generate HTTP traffic + self.assertLastPath(None) + + def test_valid_signed_compressed_request(self): + self.assert_valid_request_200( + self.token_dict['signed_token_scoped_pkiz']) + # ensure that signed requests do not generate HTTP traffic + self.assertLastPath(None) + + def test_revoked_token_receives_401(self): + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + + def test_revoked_token_receives_401_sha256(self): + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.set_middleware() + self.middleware._revocations._list = ( + self.get_revocation_list_json(mode='sha256')) + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + + def test_cached_revoked_pki(self): + # When the PKI token is cached and revoked, 401 is returned. + token = self.token_dict['signed_token_scoped'] + revoked_form = cms.cms_hash_token(token) + self._test_cache_revoked(token, revoked_form) + + def test_cached_revoked_pkiz(self): + # When the PKIZ token is cached and revoked, 401 is returned. + token = self.token_dict['signed_token_scoped_pkiz'] + revoked_form = cms.cms_hash_token(token) + self._test_cache_revoked(token, revoked_form) + + def test_revoked_token_receives_401_md5_secondary(self): + # When hash_algorithms has 'md5' as the secondary hash and the + # revocation list contains the md5 hash for a token, that token is + # considered revoked so returns 401. + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.set_middleware() + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.token_dict['revoked_token'] + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + + def _test_revoked_hashed_token(self, token_name): + # If hash_algorithms is set as ['sha256', 'md5'], + # and check_revocations_for_cached is True, + # and a token is in the cache because it was successfully validated + # using the md5 hash, then + # if the token is in the revocation list by md5 hash, it'll be + # rejected and auth_token returns 401. + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.conf['check_revocations_for_cached'] = 'true' + self.set_middleware() + + token = self.token_dict[token_name] + + # Put the token in the revocation list. + token_hashed = cms.cms_hash_token(token) + self.middleware._revocations._list = self.get_revocation_list_json( + token_ids=[token_hashed]) + + # First, request is using the hashed token, is valid so goes in + # cache using the given hash. + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token_hashed + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + + # This time use the PKI(Z) token + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + + # Should find the token in the cache and revocation list. + self.assertEqual(401, self.response_status) + + def test_revoked_hashed_pki_token(self): + self._test_revoked_hashed_token('signed_token_scoped') + + def test_revoked_hashed_pkiz_token(self): + self._test_revoked_hashed_token('signed_token_scoped_pkiz') + + def get_revocation_list_json(self, token_ids=None, mode=None): + if token_ids is None: + key = 'revoked_token_hash' + (('_' + mode) if mode else '') + token_ids = [self.token_dict[key]] + revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()} + for x in token_ids]} + return jsonutils.dumps(revocation_list) + + def test_is_signed_token_revoked_returns_false(self): + # explicitly setting an empty revocation list here to document intent + self.middleware._revocations._list = jsonutils.dumps( + {"revoked": [], "extra": "success"}) + result = self.middleware._revocations._any_revoked( + [self.token_dict['revoked_token_hash']]) + self.assertFalse(result) + + def test_is_signed_token_revoked_returns_true(self): + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + result = self.middleware._revocations._any_revoked( + [self.token_dict['revoked_token_hash']]) + self.assertTrue(result) + + def test_is_signed_token_revoked_returns_true_sha256(self): + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.set_middleware() + self.middleware._revocations._list = ( + self.get_revocation_list_json(mode='sha256')) + result = self.middleware._revocations._any_revoked( + [self.token_dict['revoked_token_hash_sha256']]) + self.assertTrue(result) + + def test_verify_signed_token_raises_exception_for_revoked_token(self): + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + self.assertRaises(exc.InvalidToken, + self.middleware._verify_signed_token, + self.token_dict['revoked_token'], + [self.token_dict['revoked_token_hash']]) + + def test_verify_signed_token_raises_exception_for_revoked_token_s256(self): + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.set_middleware() + self.middleware._revocations._list = ( + self.get_revocation_list_json(mode='sha256')) + self.assertRaises(exc.InvalidToken, + self.middleware._verify_signed_token, + self.token_dict['revoked_token'], + [self.token_dict['revoked_token_hash_sha256'], + self.token_dict['revoked_token_hash']]) + + def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self): + self.middleware._revocations._list = ( + self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON) + self.assertRaises(exc.InvalidToken, + self.middleware._verify_pkiz_token, + self.token_dict['revoked_token_pkiz'], + [self.token_dict['revoked_token_pkiz_hash']]) + + def assertIsValidJSON(self, text): + json.loads(text) + + def test_verify_signed_token_succeeds_for_unrevoked_token(self): + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + text = self.middleware._verify_signed_token( + self.token_dict['signed_token_scoped'], + [self.token_dict['signed_token_scoped_hash']]) + self.assertIsValidJSON(text) + + def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self): + self.middleware._revocations._list = ( + self.get_revocation_list_json()) + text = self.middleware._verify_pkiz_token( + self.token_dict['signed_token_scoped_pkiz'], + [self.token_dict['signed_token_scoped_hash']]) + self.assertIsValidJSON(text) + + def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self): + self.conf['hash_algorithms'] = ','.join(['sha256', 'md5']) + self.set_middleware() + self.middleware._revocations._list = ( + self.get_revocation_list_json(mode='sha256')) + text = self.middleware._verify_signed_token( + self.token_dict['signed_token_scoped'], + [self.token_dict['signed_token_scoped_hash_sha256'], + self.token_dict['signed_token_scoped_hash']]) + self.assertIsValidJSON(text) + + def test_get_token_revocation_list_fetched_time_returns_min(self): + self.middleware._revocations._fetched_time = None + + # Get rid of the revoked file + revoked_path = self.middleware._signing_directory.calc_path( + _revocations.Revocations._FILE_NAME) + os.remove(revoked_path) + + self.assertEqual(self.middleware._revocations._fetched_time, + datetime.datetime.min) + + # FIXME(blk-u): move the unit tests into unit/test_auth_token.py + def test_get_token_revocation_list_fetched_time_returns_mtime(self): + self.middleware._revocations._fetched_time = None + revoked_path = self.middleware._signing_directory.calc_path( + _revocations.Revocations._FILE_NAME) + mtime = os.path.getmtime(revoked_path) + fetched_time = datetime.datetime.utcfromtimestamp(mtime) + self.assertEqual(fetched_time, + self.middleware._revocations._fetched_time) + + @testtools.skipUnless(TimezoneFixture.supported(), + 'TimezoneFixture not supported') + def test_get_token_revocation_list_fetched_time_returns_utc(self): + with TimezoneFixture('UTC-1'): + self.middleware._revocations._list = jsonutils.dumps( + self.examples.REVOCATION_LIST) + self.middleware._revocations._fetched_time = None + fetched_time = self.middleware._revocations._fetched_time + self.assertTrue(timeutils.is_soon(fetched_time, 1)) + + def test_get_token_revocation_list_fetched_time_returns_value(self): + expected = self.middleware._revocations._fetched_time + self.assertEqual(self.middleware._revocations._fetched_time, + expected) + + def test_get_revocation_list_returns_fetched_list(self): + # auth_token uses v2 to fetch this, so don't allow the v3 + # tests to override the fake http connection + self.middleware._revocations._fetched_time = None + + # Get rid of the revoked file + revoked_path = self.middleware._signing_directory.calc_path( + _revocations.Revocations._FILE_NAME) + os.remove(revoked_path) + + self.assertEqual(self.middleware._revocations._list, + self.examples.REVOCATION_LIST) + + def test_get_revocation_list_returns_current_list_from_memory(self): + self.assertEqual(self.middleware._revocations._list, + self.middleware._revocations._list_prop) + + def test_get_revocation_list_returns_current_list_from_disk(self): + in_memory_list = self.middleware._revocations._list + self.middleware._revocations._list_prop = None + self.assertEqual(self.middleware._revocations._list, + in_memory_list) + + def test_invalid_revocation_list_raises_error(self): + self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, json={}) + + self.assertRaises(exc.RevocationListError, + self.middleware._revocations._fetch) + + def test_fetch_revocation_list(self): + # auth_token uses v2 to fetch this, so don't allow the v3 + # tests to override the fake http connection + fetched = jsonutils.loads(self.middleware._revocations._fetch()) + self.assertEqual(fetched, self.examples.REVOCATION_LIST) + + def test_request_invalid_uuid_token(self): + # remember because we are testing the middleware we stub the connection + # to the keystone server, but this is not what gets returned + invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI + self.requests.get(invalid_uri, status_code=404) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = 'invalid-token' + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'") + + def test_request_invalid_signed_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + self.response_headers['WWW-Authenticate']) + + def test_request_invalid_signed_pkiz_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_PKIZ_TOKEN + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual("Keystone uri='https://keystone.example.com:1234'", + self.response_headers['WWW-Authenticate']) + + def test_request_no_token(self): + req = webob.Request.blank('/') + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'") + + def test_request_no_token_log_message(self): + class FakeLog(object): + def __init__(self): + self.msg = None + self.debugmsg = None + + def warn(self, msg=None, *args, **kwargs): + self.msg = msg + + def debug(self, msg=None, *args, **kwargs): + self.debugmsg = msg + + self.middleware._LOG = FakeLog() + self.middleware._delay_auth_decision = False + self.assertRaises(exc.InvalidToken, + self.middleware._get_user_token_from_header, {}) + self.assertIsNotNone(self.middleware._LOG.msg) + self.assertIsNotNone(self.middleware._LOG.debugmsg) + + def test_request_no_token_http(self): + req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'}) + self.set_middleware() + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'") + self.assertEqual(body, ['']) + + def test_request_blank_token(self): + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = '' + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'") + + def _get_cached_token(self, token, mode='md5'): + token_id = cms.cms_hash_token(token, mode=mode) + return self.middleware._token_cache._cache_get(token_id) + + def test_memcache(self): + req = webob.Request.blank('/') + token = self.token_dict['signed_token_scoped'] + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertIsNotNone(self._get_cached_token(token)) + + def test_expired(self): + req = webob.Request.blank('/') + token = self.token_dict['signed_token_scoped_expired'] + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + + def test_memcache_set_invalid_uuid(self): + invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI + self.requests.get(invalid_uri, status_code=404) + + req = webob.Request.blank('/') + token = 'invalid-token' + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertRaises(exc.InvalidToken, + self._get_cached_token, token) + + def _test_memcache_set_invalid_signed(self, hash_algorithms=None, + exp_mode='md5'): + req = webob.Request.blank('/') + token = self.token_dict['signed_token_scoped_expired'] + req.headers['X-Auth-Token'] = token + if hash_algorithms: + self.conf['hash_algorithms'] = ','.join(hash_algorithms) + self.set_middleware() + self.middleware(req.environ, self.start_fake_response) + self.assertRaises(exc.InvalidToken, + self._get_cached_token, token, mode=exp_mode) + + def test_memcache_set_invalid_signed(self): + self._test_memcache_set_invalid_signed() + + def test_memcache_set_invalid_signed_sha256_md5(self): + hash_algorithms = ['sha256', 'md5'] + self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms, + exp_mode='sha256') + + def test_memcache_set_invalid_signed_sha256(self): + hash_algorithms = ['sha256'] + self._test_memcache_set_invalid_signed(hash_algorithms=hash_algorithms, + exp_mode='sha256') + + def test_memcache_set_expired(self, extra_conf={}, extra_environ={}): + token_cache_time = 10 + conf = { + 'token_cache_time': '%s' % token_cache_time, + } + conf.update(extra_conf) + self.set_middleware(conf=conf) + req = webob.Request.blank('/') + token = self.token_dict['signed_token_scoped'] + req.headers['X-Auth-Token'] = token + req.environ.update(extra_environ) + + now = datetime.datetime.utcnow() + self.useFixture(TimeFixture(now)) + self.middleware(req.environ, self.start_fake_response) + self.assertIsNotNone(self._get_cached_token(token)) + + timeutils.advance_time_seconds(token_cache_time) + self.assertIsNone(self._get_cached_token(token)) + + def test_swift_memcache_set_expired(self): + extra_conf = {'cache': 'swift.cache'} + extra_environ = {'swift.cache': memorycache.Client()} + self.test_memcache_set_expired(extra_conf, extra_environ) + + def test_http_error_not_cached_token(self): + """Test to don't cache token as invalid on network errors. + + We use UUID tokens since they are the easiest one to reach + get_http_connection. + """ + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = ERROR_TOKEN + self.middleware._http_request_max_retries = 0 + self.middleware(req.environ, self.start_fake_response) + self.assertIsNone(self._get_cached_token(ERROR_TOKEN)) + self.assert_valid_last_url(ERROR_TOKEN) + + def test_http_request_max_retries(self): + times_retry = 10 + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = ERROR_TOKEN + + conf = {'http_request_max_retries': '%s' % times_retry} + self.set_middleware(conf=conf) + + with mock.patch('time.sleep') as mock_obj: + self.middleware(req.environ, self.start_fake_response) + + self.assertEqual(mock_obj.call_count, times_retry) + + def test_nocatalog(self): + conf = { + 'include_service_catalog': 'False' + } + self.set_middleware(conf=conf) + self.assert_valid_request_200(self.token_dict['uuid_token_default'], + with_catalog=False) + + def assert_kerberos_bind(self, token, bind_level, + use_kerberos=True, success=True): + conf = { + 'enforce_token_bind': bind_level, + 'auth_version': self.auth_version, + } + self.set_middleware(conf=conf) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + + if use_kerberos: + if use_kerberos is True: + req.environ['REMOTE_USER'] = self.examples.KERBEROS_BIND + else: + req.environ['REMOTE_USER'] = use_kerberos + + req.environ['AUTH_TYPE'] = 'Negotiate' + + body = self.middleware(req.environ, self.start_fake_response) + + if success: + self.assertEqual(self.response_status, 200) + self.assertEqual(body, [FakeApp.SUCCESS]) + self.assertIn('keystone.token_info', req.environ) + self.assert_valid_last_url(token) + else: + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'" + ) + + def test_uuid_bind_token_disabled_with_kerb_user(self): + for use_kerberos in [True, False]: + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='disabled', + use_kerberos=use_kerberos, + success=True) + + def test_uuid_bind_token_disabled_with_incorrect_ticket(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='kerberos', + use_kerberos='ronald@MCDONALDS.COM', + success=False) + + def test_uuid_bind_token_permissive_with_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='permissive', + use_kerberos=True, + success=True) + + def test_uuid_bind_token_permissive_without_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='permissive', + use_kerberos=False, + success=False) + + def test_uuid_bind_token_permissive_with_unknown_bind(self): + token = self.token_dict['uuid_token_unknown_bind'] + + for use_kerberos in [True, False]: + self.assert_kerberos_bind(token, + bind_level='permissive', + use_kerberos=use_kerberos, + success=True) + + def test_uuid_bind_token_permissive_with_incorrect_ticket(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='kerberos', + use_kerberos='ronald@MCDONALDS.COM', + success=False) + + def test_uuid_bind_token_strict_with_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='strict', + use_kerberos=True, + success=True) + + def test_uuid_bind_token_strict_with_kerbout_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='strict', + use_kerberos=False, + success=False) + + def test_uuid_bind_token_strict_with_unknown_bind(self): + token = self.token_dict['uuid_token_unknown_bind'] + + for use_kerberos in [True, False]: + self.assert_kerberos_bind(token, + bind_level='strict', + use_kerberos=use_kerberos, + success=False) + + def test_uuid_bind_token_required_with_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='required', + use_kerberos=True, + success=True) + + def test_uuid_bind_token_required_without_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='required', + use_kerberos=False, + success=False) + + def test_uuid_bind_token_required_with_unknown_bind(self): + token = self.token_dict['uuid_token_unknown_bind'] + + for use_kerberos in [True, False]: + self.assert_kerberos_bind(token, + bind_level='required', + use_kerberos=use_kerberos, + success=False) + + def test_uuid_bind_token_required_without_bind(self): + for use_kerberos in [True, False]: + self.assert_kerberos_bind(self.token_dict['uuid_token_default'], + bind_level='required', + use_kerberos=use_kerberos, + success=False) + + def test_uuid_bind_token_named_kerberos_with_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='kerberos', + use_kerberos=True, + success=True) + + def test_uuid_bind_token_named_kerberos_without_kerb_user(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='kerberos', + use_kerberos=False, + success=False) + + def test_uuid_bind_token_named_kerberos_with_unknown_bind(self): + token = self.token_dict['uuid_token_unknown_bind'] + + for use_kerberos in [True, False]: + self.assert_kerberos_bind(token, + bind_level='kerberos', + use_kerberos=use_kerberos, + success=False) + + def test_uuid_bind_token_named_kerberos_without_bind(self): + for use_kerberos in [True, False]: + self.assert_kerberos_bind(self.token_dict['uuid_token_default'], + bind_level='kerberos', + use_kerberos=use_kerberos, + success=False) + + def test_uuid_bind_token_named_kerberos_with_incorrect_ticket(self): + self.assert_kerberos_bind(self.token_dict['uuid_token_bind'], + bind_level='kerberos', + use_kerberos='ronald@MCDONALDS.COM', + success=False) + + def test_uuid_bind_token_with_unknown_named_FOO(self): + token = self.token_dict['uuid_token_bind'] + + for use_kerberos in [True, False]: + self.assert_kerberos_bind(token, + bind_level='FOO', + use_kerberos=use_kerberos, + success=False) + + def test_caching_token_on_verify(self): + # When the token is cached it isn't cached again when it's verified. + + # The token cache has to be initialized with our cache instance. + self.middleware._token_cache._env_cache_name = 'cache' + cache = memorycache.Client() + self.middleware._token_cache.initialize(env={'cache': cache}) + + # Mock cache.set since then the test can verify call_count. + orig_cache_set = cache.set + cache.set = mock.Mock(side_effect=orig_cache_set) + + token = self.token_dict['signed_token_scoped'] + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + + self.assertThat(1, matchers.Equals(cache.set.call_count)) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + + # Assert that the token wasn't cached again. + self.assertThat(1, matchers.Equals(cache.set.call_count)) + + def test_auth_plugin(self): + + for service_url in (self.examples.UNVERSIONED_SERVICE_URL, + self.examples.SERVICE_URL): + self.requests.get(service_url, + json=VERSION_LIST_v3, + status_code=300) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default'] + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + self.assertEqual([FakeApp.SUCCESS], body) + + token_auth = req.environ['keystone.token_auth'] + endpoint_filter = {'service_type': self.examples.SERVICE_TYPE, + 'version': 3} + + url = token_auth.get_endpoint(session.Session(), **endpoint_filter) + self.assertEqual('%s/v3' % BASE_URI, url) + + self.assertTrue(token_auth.has_user_token) + self.assertFalse(token_auth.has_service_token) + self.assertIsNone(token_auth.service) + + +class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def __init__(self, *args, **kwargs): + super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) + self.auth_version = 'v2.0' + self.fake_app = None + self.ca_path = '/v2.0/certificates/ca' + self.signing_path = '/v2.0/certificates/signing' + + def setUp(self): + super(V2CertDownloadMiddlewareTest, self).setUp( + auth_version=self.auth_version, + fake_app=self.fake_app) + self.base_dir = tempfile.mkdtemp() + self.addCleanup(shutil.rmtree, self.base_dir) + self.cert_dir = os.path.join(self.base_dir, 'certs') + os.makedirs(self.cert_dir, stat.S_IRWXU) + conf = { + 'signing_dir': self.cert_dir, + 'auth_version': self.auth_version, + } + + self.requests.register_uri('GET', + BASE_URI, + json=VERSION_LIST_v3, + status_code=300) + + self.set_middleware(conf=conf) + + # Usually we supply a signed_dir with pre-installed certificates, + # so invocation of /usr/bin/openssl succeeds. This time we give it + # an empty directory, so it fails. + def test_request_no_token_dummy(self): + cms._ensure_subprocess() + + self.requests.get('%s%s' % (BASE_URI, self.ca_path), + status_code=404) + self.requests.get('%s%s' % (BASE_URI, self.signing_path), + status_code=404) + self.assertRaises(exceptions.CertificateConfigError, + self.middleware._verify_signed_token, + self.examples.SIGNED_TOKEN_SCOPED, + [self.examples.SIGNED_TOKEN_SCOPED_HASH]) + + def test_fetch_signing_cert(self): + data = 'FAKE CERT' + url = "%s%s" % (BASE_URI, self.signing_path) + self.requests.get(url, text=data) + self.middleware._fetch_signing_cert() + + signing_cert_path = self.middleware._signing_directory.calc_path( + self.middleware._SIGNING_CERT_FILE_NAME) + with open(signing_cert_path, 'r') as f: + self.assertEqual(f.read(), data) + + self.assertEqual(url, self.requests.last_request.url) + + def test_fetch_signing_ca(self): + data = 'FAKE CA' + url = "%s%s" % (BASE_URI, self.ca_path) + self.requests.get(url, text=data) + self.middleware._fetch_ca_cert() + + ca_file_path = self.middleware._signing_directory.calc_path( + self.middleware._SIGNING_CA_FILE_NAME) + with open(ca_file_path, 'r') as f: + self.assertEqual(f.read(), data) + + self.assertEqual(url, self.requests.last_request.url) + + def test_prefix_trailing_slash(self): + del self.conf['identity_uri'] + self.conf['auth_protocol'] = 'https' + self.conf['auth_host'] = 'keystone.example.com' + self.conf['auth_port'] = '1234' + self.conf['auth_admin_prefix'] = '/newadmin/' + + base_url = '%s/newadmin' % BASE_HOST + ca_url = "%s%s" % (base_url, self.ca_path) + signing_url = "%s%s" % (base_url, self.signing_path) + + self.requests.get(base_url, + json=VERSION_LIST_v3, + status_code=300) + self.requests.get(ca_url, text='FAKECA') + self.requests.get(signing_url, text='FAKECERT') + + self.set_middleware(conf=self.conf) + + self.middleware._fetch_ca_cert() + self.assertEqual(ca_url, self.requests.last_request.url) + + self.middleware._fetch_signing_cert() + self.assertEqual(signing_url, self.requests.last_request.url) + + def test_without_prefix(self): + del self.conf['identity_uri'] + self.conf['auth_protocol'] = 'https' + self.conf['auth_host'] = 'keystone.example.com' + self.conf['auth_port'] = '1234' + self.conf['auth_admin_prefix'] = '' + + ca_url = "%s%s" % (BASE_HOST, self.ca_path) + signing_url = "%s%s" % (BASE_HOST, self.signing_path) + + self.requests.get(BASE_HOST, + json=VERSION_LIST_v3, + status_code=300) + self.requests.get(ca_url, text='FAKECA') + self.requests.get(signing_url, text='FAKECERT') + + self.set_middleware(conf=self.conf) + + self.middleware._fetch_ca_cert() + self.assertEqual(ca_url, self.requests.last_request.url) + + self.middleware._fetch_signing_cert() + self.assertEqual(signing_url, self.requests.last_request.url) + + +class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest): + + def __init__(self, *args, **kwargs): + super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs) + self.auth_version = 'v3.0' + self.fake_app = v3FakeApp + self.ca_path = '/v3/OS-SIMPLE-CERT/ca' + self.signing_path = '/v3/OS-SIMPLE-CERT/certificates' + + +def network_error_response(request, context): + raise exceptions.ConnectionError("Network connection error.") + + +class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + CommonAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + """v2 token specific tests. + + There are some differences between how the auth-token middleware handles + v2 and v3 tokens over and above the token formats, namely: + + - A v3 keystone server will auto scope a token to a user's default project + if no scope is specified. A v2 server assumes that the auth-token + middleware will do that. + - A v2 keystone server may issue a token without a catalog, even with a + tenant + + The tests below were originally part of the generic AuthTokenMiddlewareTest + class, but now, since they really are v2 specific, they are included here. + + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v2AuthTokenMiddlewareTest, self).setUp() + + self.token_dict = { + 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT, + 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED, + 'uuid_token_bind': self.examples.UUID_TOKEN_BIND, + 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND, + 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED, + 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ, + 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH, + 'signed_token_scoped_hash_sha256': + self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256, + 'signed_token_scoped_expired': + self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, + 'revoked_token': self.examples.REVOKED_TOKEN, + 'revoked_token_pkiz': self.examples.REVOKED_TOKEN_PKIZ, + 'revoked_token_pkiz_hash': + self.examples.REVOKED_TOKEN_PKIZ_HASH, + 'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH, + 'revoked_token_hash_sha256': + self.examples.REVOKED_TOKEN_HASH_SHA256, + } + + self.requests.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) + + self.requests.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) + + for token in (self.examples.UUID_TOKEN_DEFAULT, + self.examples.UUID_TOKEN_UNSCOPED, + self.examples.UUID_TOKEN_BIND, + self.examples.UUID_TOKEN_UNKNOWN_BIND, + self.examples.UUID_TOKEN_NO_SERVICE_CATALOG, + self.examples.SIGNED_TOKEN_SCOPED_KEY, + self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,): + url = "%s/v2.0/tokens/%s" % (BASE_URI, token) + text = self.examples.JSON_TOKEN_RESPONSES[token] + self.requests.get(url, text=text) + + url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN) + self.requests.get(url, text=network_error_response) + + self.set_middleware() + + def assert_unscoped_default_tenant_auto_scopes(self, token): + """Unscoped v2 requests with a default tenant should "auto-scope." + + The implied scope is the user's tenant ID. + + """ + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertEqual(body, [FakeApp.SUCCESS]) + self.assertIn('keystone.token_info', req.environ) + + def assert_valid_last_url(self, token_id): + self.assertLastPath("/v2.0/tokens/%s" % token_id) + + def test_default_tenant_uuid_token(self): + self.assert_unscoped_default_tenant_auto_scopes( + self.examples.UUID_TOKEN_DEFAULT) + + def test_default_tenant_signed_token(self): + self.assert_unscoped_default_tenant_auto_scopes( + self.examples.SIGNED_TOKEN_SCOPED) + + def assert_unscoped_token_receives_401(self, token): + """Unscoped requests with no default tenant ID should be rejected.""" + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = token + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 401) + self.assertEqual(self.response_headers['WWW-Authenticate'], + "Keystone uri='https://keystone.example.com:1234'") + + def test_unscoped_uuid_token_receives_401(self): + self.assert_unscoped_token_receives_401( + self.examples.UUID_TOKEN_UNSCOPED) + + def test_unscoped_pki_token_receives_401(self): + self.assert_unscoped_token_receives_401( + self.examples.SIGNED_TOKEN_UNSCOPED) + + def test_request_prevent_service_catalog_injection(self): + req = webob.Request.blank('/') + req.headers['X-Service-Catalog'] = '[]' + req.headers['X-Auth-Token'] = ( + self.examples.UUID_TOKEN_NO_SERVICE_CATALOG) + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertFalse(req.headers.get('X-Service-Catalog')) + self.assertEqual(body, [FakeApp.SUCCESS]) + + def test_user_plugin_token_properties(self): + req = webob.Request.blank('/') + req.headers['X-Service-Catalog'] = '[]' + token = self.examples.UUID_TOKEN_DEFAULT + token_data = self.examples.TOKEN_RESPONSES[token] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = token + + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertEqual([FakeApp.SUCCESS], body) + + token_auth = req.environ['keystone.token_auth'] + + self.assertTrue(token_auth.has_user_token) + self.assertTrue(token_auth.has_service_token) + + for t in [token_auth.user, token_auth.service]: + self.assertEqual(token_data.user_id, t.user_id) + self.assertEqual(token_data.tenant_id, t.project_id) + + self.assertThat(t.role_names, matchers.HasLength(2)) + self.assertIn('role1', t.role_names) + self.assertIn('role2', t.role_names) + + self.assertIsNone(t.trust_id) + self.assertIsNone(t.user_domain_id) + self.assertIsNone(t.project_domain_id) + + +class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def test_valid_uuid_request_forced_to_2_0(self): + """Test forcing auth_token to use lower api version. + + By installing the v3 http hander, auth_token will be get + a version list that looks like a v3 server - from which it + would normally chose v3.0 as the auth version. However, here + we specify v2.0 in the configuration - which should force + auth_token to use that version instead. + + """ + conf = { + 'auth_version': 'v2.0' + } + + self.requests.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) + + self.requests.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + token = self.examples.UUID_TOKEN_DEFAULT + url = "%s/v2.0/tokens/%s" % (BASE_URI, token) + text = self.examples.JSON_TOKEN_RESPONSES[token] + self.requests.get(url, text=text) + + self.set_middleware(conf=conf) + + # This tests will only work is auth_token has chosen to use the + # lower, v2, api version + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertEqual(url, self.requests.last_request.url) + + +class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, + CommonAuthTokenMiddlewareTest, + testresources.ResourcedTestCase): + """Test auth_token middleware with v3 tokens. + + Re-execute the AuthTokenMiddlewareTest class tests, but with the + auth_token middleware configured to expect v3 tokens back from + a keystone server. + + This is done by configuring the AuthTokenMiddlewareTest class via + its Setup(), passing in v3 style data that will then be used by + the tests themselves. This approach has been used to ensure we + really are running the same tests for both v2 and v3 tokens. + + There a few additional specific test for v3 only: + + - We allow an unscoped token to be validated (as unscoped), where + as for v2 tokens, the auth_token middleware is expected to try and + auto-scope it (and fail if there is no default tenant) + - Domain scoped tokens + + Since we don't specify an auth version for auth_token to use, by + definition we are thefore implicitely testing that it will use + the highest available auth version, i.e. v3.0 + + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v3AuthTokenMiddlewareTest, self).setUp( + auth_version='v3.0', + fake_app=v3FakeApp) + + self.token_dict = { + 'uuid_token_default': self.examples.v3_UUID_TOKEN_DEFAULT, + 'uuid_token_unscoped': self.examples.v3_UUID_TOKEN_UNSCOPED, + 'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND, + 'uuid_token_unknown_bind': + self.examples.v3_UUID_TOKEN_UNKNOWN_BIND, + 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED, + 'signed_token_scoped_pkiz': + self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ, + 'signed_token_scoped_hash': + self.examples.SIGNED_v3_TOKEN_SCOPED_HASH, + 'signed_token_scoped_hash_sha256': + self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256, + 'signed_token_scoped_expired': + self.examples.SIGNED_TOKEN_SCOPED_EXPIRED, + 'revoked_token': self.examples.REVOKED_v3_TOKEN, + 'revoked_token_pkiz': self.examples.REVOKED_v3_TOKEN_PKIZ, + 'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH, + 'revoked_token_hash_sha256': + self.examples.REVOKED_v3_TOKEN_HASH_SHA256, + 'revoked_token_pkiz_hash': + self.examples.REVOKED_v3_PKIZ_TOKEN_HASH, + } + + self.requests.get(BASE_URI, + json=VERSION_LIST_v3, + status_code=300) + + # TODO(jamielennox): auth_token middleware uses a v2 admin token + # regardless of the auth_version that is set. + self.requests.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 + self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) + + self.requests.get('%s/v3/auth/tokens' % BASE_URI, + text=self.token_response) + + self.set_middleware() + + def token_response(self, request, context): + auth_id = request.headers.get('X-Auth-Token') + token_id = request.headers.get('X-Subject-Token') + self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) + + if token_id == ERROR_TOKEN: + raise exceptions.ConnectionError("Network connection error.") + + try: + response = self.examples.JSON_TOKEN_RESPONSES[token_id] + except KeyError: + response = "" + context.status_code = 404 + + return response + + def assert_valid_last_url(self, token_id): + self.assertLastPath('/v3/auth/tokens') + + def test_valid_unscoped_uuid_request(self): + # Remove items that won't be in an unscoped token + delta_expected_env = { + 'HTTP_X_PROJECT_ID': None, + 'HTTP_X_PROJECT_NAME': None, + 'HTTP_X_PROJECT_DOMAIN_ID': None, + 'HTTP_X_PROJECT_DOMAIN_NAME': None, + 'HTTP_X_TENANT_ID': None, + 'HTTP_X_TENANT_NAME': None, + 'HTTP_X_ROLES': '', + 'HTTP_X_TENANT': None, + 'HTTP_X_ROLE': '', + } + self.set_middleware(expected_env=delta_expected_env) + self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED, + with_catalog=False) + self.assertLastPath('/v3/auth/tokens') + + def test_domain_scoped_uuid_request(self): + # Modify items compared to default token for a domain scope + delta_expected_env = { + 'HTTP_X_DOMAIN_ID': 'domain_id1', + 'HTTP_X_DOMAIN_NAME': 'domain_name1', + 'HTTP_X_PROJECT_ID': None, + 'HTTP_X_PROJECT_NAME': None, + 'HTTP_X_PROJECT_DOMAIN_ID': None, + 'HTTP_X_PROJECT_DOMAIN_NAME': None, + 'HTTP_X_TENANT_ID': None, + 'HTTP_X_TENANT_NAME': None, + 'HTTP_X_TENANT': None + } + self.set_middleware(expected_env=delta_expected_env) + self.assert_valid_request_200( + self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED) + self.assertLastPath('/v3/auth/tokens') + + def test_gives_v2_catalog(self): + self.set_middleware() + req = self.assert_valid_request_200( + self.examples.SIGNED_v3_TOKEN_SCOPED) + + catalog = jsonutils.loads(req.headers['X-Service-Catalog']) + + for service in catalog: + for endpoint in service['endpoints']: + # no point checking everything, just that it's in v2 format + self.assertIn('adminURL', endpoint) + self.assertIn('publicURL', endpoint) + self.assertIn('adminURL', endpoint) + + def test_fallback_to_online_validation_with_signing_error(self): + self.requests.register_uri( + 'GET', + '%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI, + status_code=404) + self.assert_valid_request_200(self.token_dict['signed_token_scoped']) + self.assert_valid_request_200( + self.token_dict['signed_token_scoped_pkiz']) + + def test_fallback_to_online_validation_with_ca_error(self): + self.requests.register_uri('GET', + '%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI, + status_code=404) + self.assert_valid_request_200(self.token_dict['signed_token_scoped']) + self.assert_valid_request_200( + self.token_dict['signed_token_scoped_pkiz']) + + def test_fallback_to_online_validation_with_revocation_list_error(self): + self.requests.register_uri('GET', + '%s/v2.0/tokens/revoked' % BASE_URI, + status_code=404) + self.assert_valid_request_200(self.token_dict['signed_token_scoped']) + self.assert_valid_request_200( + self.token_dict['signed_token_scoped_pkiz']) + + def test_user_plugin_token_properties(self): + req = webob.Request.blank('/') + req.headers['X-Service-Catalog'] = '[]' + token = self.examples.v3_UUID_TOKEN_DEFAULT + token_data = self.examples.TOKEN_RESPONSES[token] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = token + + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + self.assertEqual([FakeApp.SUCCESS], body) + + token_auth = req.environ['keystone.token_auth'] + + self.assertTrue(token_auth.has_user_token) + self.assertTrue(token_auth.has_service_token) + + for t in [token_auth.user, token_auth.service]: + self.assertEqual(token_data.user_id, t.user_id) + self.assertEqual(token_data.project_id, t.project_id) + self.assertEqual(token_data.user_domain_id, t.user_domain_id) + self.assertEqual(token_data.project_domain_id, t.project_domain_id) + + self.assertThat(t.role_names, matchers.HasLength(2)) + self.assertIn('role1', t.role_names) + self.assertIn('role2', t.role_names) + + self.assertIsNone(t.trust_id) + + +class TokenExpirationTest(BaseAuthTokenMiddlewareTest): + def setUp(self): + super(TokenExpirationTest, self).setUp() + self.now = timeutils.utcnow() + self.delta = datetime.timedelta(hours=1) + self.one_hour_ago = timeutils.isotime(self.now - self.delta, + subsecond=True) + self.one_hour_earlier = timeutils.isotime(self.now + self.delta, + subsecond=True) + + def create_v2_token_fixture(self, expires=None): + v2_fixture = { + 'access': { + 'token': { + 'id': 'blah', + 'expires': expires or self.one_hour_earlier, + 'tenant': { + 'id': 'tenant_id1', + 'name': 'tenant_name1', + }, + }, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'roles': [ + {'name': 'role1'}, + {'name': 'role2'}, + ], + }, + 'serviceCatalog': {} + }, + } + + return v2_fixture + + def create_v3_token_fixture(self, expires=None): + + v3_fixture = { + 'token': { + 'expires_at': expires or self.one_hour_earlier, + 'user': { + 'id': 'user_id1', + 'name': 'user_name1', + 'domain': { + 'id': 'domain_id1', + 'name': 'domain_name1' + } + }, + 'project': { + 'id': 'tenant_id1', + 'name': 'tenant_name1', + 'domain': { + 'id': 'domain_id1', + 'name': 'domain_name1' + } + }, + 'roles': [ + {'name': 'role1', 'id': 'Role1'}, + {'name': 'role2', 'id': 'Role2'}, + ], + 'catalog': {} + } + } + + return v3_fixture + + def test_no_data(self): + data = {} + self.assertRaises(exc.InvalidToken, + auth_token._get_token_expiration, + data) + + def test_bad_data(self): + data = {'my_happy_token_dict': 'woo'} + self.assertRaises(exc.InvalidToken, + auth_token._get_token_expiration, + data) + + def test_v2_token_get_token_expiration_return_isotime(self): + data = self.create_v2_token_fixture() + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(self.one_hour_earlier, actual_expires) + + def test_v2_token_not_expired(self): + data = self.create_v2_token_fixture() + expected_expires = data['access']['token']['expires'] + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(actual_expires, expected_expires) + + def test_v2_token_expired(self): + data = self.create_v2_token_fixture(expires=self.one_hour_ago) + expires = auth_token._get_token_expiration(data) + self.assertRaises(exc.InvalidToken, + auth_token._confirm_token_not_expired, + expires) + + def test_v2_token_with_timezone_offset_not_expired(self): + self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) + data = self.create_v2_token_fixture( + expires='2000-01-01T05:05:10.000123Z') + expected_expires = '2000-01-01T05:05:10.000123Z' + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(actual_expires, expected_expires) + + def test_v2_token_with_timezone_offset_expired(self): + self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) + data = self.create_v2_token_fixture( + expires='1999-12-31T19:05:10Z') + expires = auth_token._get_token_expiration(data) + self.assertRaises(exc.InvalidToken, + auth_token._confirm_token_not_expired, + expires) + + def test_v3_token_get_token_expiration_return_isotime(self): + data = self.create_v3_token_fixture() + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(self.one_hour_earlier, actual_expires) + + def test_v3_token_not_expired(self): + data = self.create_v3_token_fixture() + expected_expires = data['token']['expires_at'] + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(actual_expires, expected_expires) + + def test_v3_token_expired(self): + data = self.create_v3_token_fixture(expires=self.one_hour_ago) + expires = auth_token._get_token_expiration(data) + self.assertRaises(exc.InvalidToken, + auth_token._confirm_token_not_expired, + expires) + + def test_v3_token_with_timezone_offset_not_expired(self): + self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) + data = self.create_v3_token_fixture( + expires='2000-01-01T05:05:10.000123Z') + expected_expires = '2000-01-01T05:05:10.000123Z' + + actual_expires = auth_token._get_token_expiration(data) + self.assertEqual(actual_expires, expected_expires) + + def test_v3_token_with_timezone_offset_expired(self): + self.useFixture(TimeFixture('2000-01-01T00:01:10.000123Z')) + data = self.create_v3_token_fixture( + expires='1999-12-31T19:05:10Z') + expires = auth_token._get_token_expiration(data) + self.assertRaises(exc.InvalidToken, + auth_token._confirm_token_not_expired, + expires) + + def test_cached_token_not_expired(self): + token = 'mytoken' + data = 'this_data' + self.set_middleware() + self.middleware._token_cache.initialize({}) + some_time_later = timeutils.strtime(at=(self.now + self.delta)) + expires = some_time_later + self.middleware._token_cache.store(token, data, expires) + self.assertEqual(self.middleware._token_cache._cache_get(token), data) + + def test_cached_token_not_expired_with_old_style_nix_timestamp(self): + """Ensure we cannot retrieve a token from the cache. + + Getting a token from the cache should return None when the token data + in the cache stores the expires time as a \*nix style timestamp. + + """ + token = 'mytoken' + data = 'this_data' + self.set_middleware() + token_cache = self.middleware._token_cache + token_cache.initialize({}) + some_time_later = self.now + self.delta + # Store a unix timestamp in the cache. + expires = calendar.timegm(some_time_later.timetuple()) + token_cache.store(token, data, expires) + self.assertIsNone(token_cache._cache_get(token)) + + def test_cached_token_expired(self): + token = 'mytoken' + data = 'this_data' + self.set_middleware() + self.middleware._token_cache.initialize({}) + some_time_earlier = timeutils.strtime(at=(self.now - self.delta)) + expires = some_time_earlier + self.middleware._token_cache.store(token, data, expires) + self.assertThat(lambda: self.middleware._token_cache._cache_get(token), + matchers.raises(exc.InvalidToken)) + + def test_cached_token_with_timezone_offset_not_expired(self): + token = 'mytoken' + data = 'this_data' + self.set_middleware() + self.middleware._token_cache.initialize({}) + timezone_offset = datetime.timedelta(hours=2) + some_time_later = self.now - timezone_offset + self.delta + expires = timeutils.strtime(some_time_later) + '-02:00' + self.middleware._token_cache.store(token, data, expires) + self.assertEqual(self.middleware._token_cache._cache_get(token), data) + + def test_cached_token_with_timezone_offset_expired(self): + token = 'mytoken' + data = 'this_data' + self.set_middleware() + self.middleware._token_cache.initialize({}) + timezone_offset = datetime.timedelta(hours=2) + some_time_earlier = self.now - timezone_offset - self.delta + expires = timeutils.strtime(some_time_earlier) + '-02:00' + self.middleware._token_cache.store(token, data, expires) + self.assertThat(lambda: self.middleware._token_cache._cache_get(token), + matchers.raises(exc.InvalidToken)) + + +class CatalogConversionTests(BaseAuthTokenMiddlewareTest): + + PUBLIC_URL = 'http://server:5000/v2.0' + ADMIN_URL = 'http://admin:35357/v2.0' + INTERNAL_URL = 'http://internal:5000/v2.0' + + REGION_ONE = 'RegionOne' + REGION_TWO = 'RegionTwo' + REGION_THREE = 'RegionThree' + + def test_basic_convert(self): + token = fixture.V3Token() + s = token.add_service(type='identity') + s.add_standard_endpoints(public=self.PUBLIC_URL, + admin=self.ADMIN_URL, + internal=self.INTERNAL_URL, + region=self.REGION_ONE) + + auth_ref = access.AccessInfo.factory(body=token) + catalog_data = auth_ref.service_catalog.get_data() + catalog = auth_token._v3_to_v2_catalog(catalog_data) + + self.assertEqual(1, len(catalog)) + service = catalog[0] + self.assertEqual(1, len(service['endpoints'])) + endpoints = service['endpoints'][0] + + self.assertEqual('identity', service['type']) + self.assertEqual(4, len(endpoints)) + self.assertEqual(self.PUBLIC_URL, endpoints['publicURL']) + self.assertEqual(self.ADMIN_URL, endpoints['adminURL']) + self.assertEqual(self.INTERNAL_URL, endpoints['internalURL']) + self.assertEqual(self.REGION_ONE, endpoints['region']) + + def test_multi_region(self): + token = fixture.V3Token() + s = token.add_service(type='identity') + + s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE) + s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO) + s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE) + + auth_ref = access.AccessInfo.factory(body=token) + catalog_data = auth_ref.service_catalog.get_data() + catalog = auth_token._v3_to_v2_catalog(catalog_data) + + self.assertEqual(1, len(catalog)) + service = catalog[0] + + # the 3 regions will come through as 3 separate endpoints + expected = [{'internalURL': self.INTERNAL_URL, + 'region': self.REGION_ONE}, + {'publicURL': self.PUBLIC_URL, + 'region': self.REGION_TWO}, + {'adminURL': self.ADMIN_URL, + 'region': self.REGION_THREE}] + + self.assertEqual('identity', service['type']) + self.assertEqual(3, len(service['endpoints'])) + for e in expected: + self.assertIn(e, expected) + + +class DelayedAuthTests(BaseAuthTokenMiddlewareTest): + + def test_header_in_401(self): + body = uuid.uuid4().hex + auth_uri = 'http://local.test' + conf = {'delay_auth_decision': 'True', + 'auth_version': 'v3.0', + 'auth_uri': auth_uri} + + self.fake_app = new_app('401 Unauthorized', body) + self.set_middleware(conf=conf) + + req = webob.Request.blank('/') + resp = self.middleware(req.environ, self.start_fake_response) + + self.assertEqual([six.b(body)], resp) + + self.assertEqual(401, self.response_status) + self.assertEqual("Keystone uri='%s'" % auth_uri, + self.response_headers['WWW-Authenticate']) + + def test_delayed_auth_values(self): + fake_app = new_app('401 Unauthorized', uuid.uuid4().hex) + middleware = auth_token.AuthProtocol(fake_app, + {'auth_uri': 'http://local.test'}) + self.assertFalse(middleware._delay_auth_decision) + + for v in ('True', '1', 'on', 'yes'): + conf = {'delay_auth_decision': v, + 'auth_uri': 'http://local.test'} + + middleware = auth_token.AuthProtocol(fake_app, conf) + self.assertTrue(middleware._delay_auth_decision) + + for v in ('False', '0', 'no'): + conf = {'delay_auth_decision': v, + 'auth_uri': 'http://local.test'} + + middleware = auth_token.AuthProtocol(fake_app, conf) + self.assertFalse(middleware._delay_auth_decision) + + def test_auth_plugin_with_no_tokens(self): + body = uuid.uuid4().hex + auth_uri = 'http://local.test' + conf = {'delay_auth_decision': True, 'auth_uri': auth_uri} + self.fake_app = new_app('200 OK', body) + self.set_middleware(conf=conf) + + req = webob.Request.blank('/') + resp = self.middleware(req.environ, self.start_fake_response) + + self.assertEqual([six.b(body)], resp) + + token_auth = req.environ['keystone.token_auth'] + + self.assertFalse(token_auth.has_user_token) + self.assertIsNone(token_auth.user) + self.assertFalse(token_auth.has_service_token) + self.assertIsNone(token_auth.service) + + +class CommonCompositeAuthTests(object): + """Test Composite authentication. + + Test the behaviour of adding a service-token. + """ + + def test_composite_auth_ok(self): + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + fake_logger = fixtures.FakeLogger(level=logging.DEBUG) + self.middleware.logger = self.useFixture(fake_logger) + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + self.assertEqual([FakeApp.SUCCESS], body) + expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + expected_env.update(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + self.assertIn('Received request from user: ' + 'user_id %(HTTP_X_USER_ID)s, ' + 'project_id %(HTTP_X_TENANT_ID)s, ' + 'roles %(HTTP_X_ROLES)s ' + 'service: user_id %(HTTP_X_SERVICE_USER_ID)s, ' + 'project_id %(HTTP_X_SERVICE_PROJECT_ID)s, ' + 'roles %(HTTP_X_SERVICE_ROLES)s' % expected_env, + fake_logger.output) + + def test_composite_auth_invalid_service_token(self): + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = 'invalid-service-token' + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual([b'Authentication required'], body) + + def test_composite_auth_no_service_token(self): + self.purge_service_token_expected_env() + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + req.headers['X-Auth-Token'] = token + + # Ensure injection of service headers is not possible + for key, value in six.iteritems(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + req.headers[header_key] = value + # Check arbitrary headers not removed + req.headers['X-Foo'] = 'Bar' + body = self.middleware(req.environ, self.start_fake_response) + for key in six.iterkeys(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + self.assertFalse(req.headers.get(header_key)) + self.assertEqual('Bar', req.headers.get('X-Foo')) + self.assertEqual(418, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_invalid_user_token(self): + req = webob.Request.blank('/') + token = 'invalid-token' + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual([b'Authentication required'], body) + + def test_composite_auth_no_user_token(self): + req = webob.Request.blank('/') + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(401, self.response_status) + self.assertEqual([b'Authentication required'], body) + + def test_composite_auth_delay_ok(self): + self.middleware._delay_auth_decision = True + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(200, self.response_status) + self.assertEqual([FakeApp.SUCCESS], body) + + def test_composite_auth_delay_invalid_service_token(self): + self.middleware._delay_auth_decision = True + self.purge_service_token_expected_env() + expected_env = { + 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + service_token = 'invalid-service-token' + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(420, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_invalid_service_and_user_tokens(self): + self.middleware._delay_auth_decision = True + self.purge_service_token_expected_env() + self.purge_token_expected_env() + expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Invalid', + 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + token = 'invalid-user-token' + service_token = 'invalid-service-token' + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(419, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_no_service_token(self): + self.middleware._delay_auth_decision = True + self.purge_service_token_expected_env() + + req = webob.Request.blank('/') + token = self.token_dict['uuid_token_default'] + req.headers['X-Auth-Token'] = token + + # Ensure injection of service headers is not possible + for key, value in six.iteritems(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + req.headers[header_key] = value + # Check arbitrary headers not removed + req.headers['X-Foo'] = 'Bar' + body = self.middleware(req.environ, self.start_fake_response) + for key in six.iterkeys(self.service_token_expected_env): + header_key = key[len('HTTP_'):].replace('_', '-') + self.assertFalse(req.headers.get(header_key)) + self.assertEqual('Bar', req.headers.get('X-Foo')) + self.assertEqual(418, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_invalid_user_token(self): + self.middleware._delay_auth_decision = True + self.purge_token_expected_env() + expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + token = 'invalid-token' + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Auth-Token'] = token + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(403, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + def test_composite_auth_delay_no_user_token(self): + self.middleware._delay_auth_decision = True + self.purge_token_expected_env() + expected_env = { + 'HTTP_X_IDENTITY_STATUS': 'Invalid', + } + self.update_expected_env(expected_env) + + req = webob.Request.blank('/') + service_token = self.token_dict['uuid_service_token_default'] + req.headers['X-Service-Token'] = service_token + body = self.middleware(req.environ, self.start_fake_response) + self.assertEqual(403, self.response_status) + self.assertEqual([FakeApp.FORBIDDEN], body) + + +class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest, + CommonCompositeAuthTests, + testresources.ResourcedTestCase): + """Test auth_token middleware with v2 token based composite auth. + + Execute the Composite auth class tests, but with the + auth_token middleware configured to expect v2 tokens back from + a keystone server. + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v2CompositeAuthTests, self).setUp( + expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE, + fake_app=CompositeFakeApp) + + uuid_token_default = self.examples.UUID_TOKEN_DEFAULT + uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT + self.token_dict = { + 'uuid_token_default': uuid_token_default, + 'uuid_service_token_default': uuid_service_token_default, + } + + self.requests.get(BASE_URI, + json=VERSION_LIST_v2, + status_code=300) + + self.requests.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST, + status_code=200) + + for token in (self.examples.UUID_TOKEN_DEFAULT, + self.examples.UUID_SERVICE_TOKEN_DEFAULT,): + self.requests.get('%s/v2.0/tokens/%s' % (BASE_URI, token), + text=self.examples.JSON_TOKEN_RESPONSES[token]) + + for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI, + "%s/v2.0/tokens/invalid-service-token" % BASE_URI): + self.requests.get(invalid_uri, text='', status_code=404) + + self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + self.service_token_expected_env = dict( + EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + self.set_middleware() + + +class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest, + CommonCompositeAuthTests, + testresources.ResourcedTestCase): + """Test auth_token middleware with v3 token based composite auth. + + Execute the Composite auth class tests, but with the + auth_token middleware configured to expect v3 tokens back from + a keystone server. + """ + + resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)] + + def setUp(self): + super(v3CompositeAuthTests, self).setUp( + auth_version='v3.0', + fake_app=v3CompositeFakeApp) + + uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT + uuid_serv_token_default = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT + self.token_dict = { + 'uuid_token_default': uuid_token_default, + 'uuid_service_token_default': uuid_serv_token_default, + } + + self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) + + # TODO(jamielennox): auth_token middleware uses a v2 admin token + # regardless of the auth_version that is set. + self.requests.post('%s/v2.0/tokens' % BASE_URI, + text=FAKE_ADMIN_TOKEN) + + # TODO(jamielennox): there is no v3 revocation url yet, it uses v2 + self.requests.get('%s/v2.0/tokens/revoked' % BASE_URI, + text=self.examples.SIGNED_REVOCATION_LIST) + + self.requests.get('%s/v3/auth/tokens' % BASE_URI, + text=self.token_response) + + self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE) + self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS) + self.service_token_expected_env = dict( + EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE) + self.service_token_expected_env.update( + EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS) + self.set_middleware() + + def token_response(self, request, context): + auth_id = request.headers.get('X-Auth-Token') + token_id = request.headers.get('X-Subject-Token') + self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID) + + status = 200 + response = "" + + if token_id == ERROR_TOKEN: + raise exceptions.ConnectionError("Network connection error.") + + try: + response = self.examples.JSON_TOKEN_RESPONSES[token_id] + except KeyError: + status = 404 + + context.status_code = status + return response + + +class OtherTests(BaseAuthTokenMiddlewareTest): + + def setUp(self): + super(OtherTests, self).setUp() + self.logger = self.useFixture(fixtures.FakeLogger()) + self.cfg = self.useFixture(cfg_fixture.Config()) + + def test_unknown_server_versions(self): + versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI) + self.set_middleware() + + self.requests.get(BASE_URI, json=versions, status_code=300) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = uuid.uuid4().hex + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(503, self.response_status) + + self.assertIn('versions [v3.0, v2.0]', self.logger.output) + + def _assert_auth_version(self, conf_version, identity_server_version): + self.set_middleware(conf={'auth_version': conf_version}) + identity_server = self.middleware._create_identity_server() + self.assertEqual(identity_server_version, + identity_server.auth_version) + + def test_micro_version(self): + self._assert_auth_version('v2', (2, 0)) + self._assert_auth_version('v2.0', (2, 0)) + self._assert_auth_version('v3', (3, 0)) + self._assert_auth_version('v3.0', (3, 0)) + self._assert_auth_version('v3.1', (3, 0)) + self._assert_auth_version('v3.2', (3, 0)) + self._assert_auth_version('v3.9', (3, 0)) + self._assert_auth_version('v3.3.1', (3, 0)) + self._assert_auth_version('v3.3.5', (3, 0)) + + def test_default_auth_version(self): + # VERSION_LIST_v3 contains both v2 and v3 version elements + self.requests.get(BASE_URI, json=VERSION_LIST_v3, status_code=300) + self._assert_auth_version(None, (3, 0)) + + # VERSION_LIST_v2 contains only v2 version elements + self.requests.get(BASE_URI, json=VERSION_LIST_v2, status_code=300) + self._assert_auth_version(None, (2, 0)) + + def test_unsupported_auth_version(self): + # If the requested version isn't supported we will use v2 + self._assert_auth_version('v1', (2, 0)) + self._assert_auth_version('v10', (2, 0)) + + +class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest): + + AUTH_URL = 'http://auth.url/prefix' + DISC_URL = 'http://disc.url/prefix' + KEYSTONE_BASE_URL = 'http://keystone.url/prefix' + CRUD_URL = 'http://crud.url/prefix' + + # NOTE(jamielennox): use the /v2.0 prefix here because this is what's most + # likely to be in the service catalog and we should be able to ignore it. + KEYSTONE_URL = KEYSTONE_BASE_URL + '/v2.0' + + def setUp(self): + super(AuthProtocolLoadingTests, self).setUp() + self.cfg = self.useFixture(cfg_fixture.Config()) + + self.project_id = uuid.uuid4().hex + + # first touch is to discover the available versions at the auth_url + self.requests.get(self.AUTH_URL, + json=fixture.DiscoveryList(href=self.DISC_URL), + status_code=300) + + # then we do discovery on the URL from the service catalog. In practice + # this is mostly the same URL as before but test the full range. + self.requests.get(self.KEYSTONE_BASE_URL + '/', + json=fixture.DiscoveryList(href=self.CRUD_URL), + status_code=300) + + def good_request(self, app): + # admin_token is the token that the service will get back from auth + admin_token_id = uuid.uuid4().hex + admin_token = fixture.V3Token(project_id=self.project_id) + s = admin_token.add_service('identity', name='keystone') + s.add_standard_endpoints(admin=self.KEYSTONE_URL) + + self.requests.post(self.DISC_URL + '/v3/auth/tokens', + json=admin_token, + headers={'X-Subject-Token': admin_token_id}) + + # user_token is the data from the user's inputted token + user_token_id = uuid.uuid4().hex + user_token = fixture.V3Token() + user_token.set_project_scope() + + request_headers = {'X-Subject-Token': user_token_id, + 'X-Auth-Token': admin_token_id} + + self.requests.get(self.CRUD_URL + '/v3/auth/tokens', + request_headers=request_headers, + json=user_token) + + req = webob.Request.blank('/') + req.headers['X-Auth-Token'] = user_token_id + resp = app(req.environ, self.start_fake_response) + + self.assertEqual(200, self.response_status) + return resp + + def test_loading_password_plugin(self): + # the password options aren't set on config until loading time, but we + # need them set so we can override the values for testing, so force it + opts = auth.get_plugin_options('password') + self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP) + + project_id = uuid.uuid4().hex + + # configure the authentication options + self.cfg.config(auth_plugin='password', + username='testuser', + password='testpass', + auth_url=self.AUTH_URL, + project_id=project_id, + user_domain_id='userdomainid', + group=_base.AUTHTOKEN_GROUP) + + body = uuid.uuid4().hex + app = auth_token.AuthProtocol(new_app('200 OK', body)(), {}) + + resp = self.good_request(app) + self.assertEqual(six.b(body), resp[0]) + + @staticmethod + def get_plugin(app): + return app._identity_server._adapter.auth + + def test_invalid_plugin_fails_to_intialize(self): + self.cfg.config(auth_plugin=uuid.uuid4().hex, + group=_base.AUTHTOKEN_GROUP) + + self.assertRaises( + exceptions.NoMatchingPlugin, + lambda: auth_token.AuthProtocol(new_app('200 OK', '')(), {})) + + def test_plugin_loading_mixed_opts(self): + # some options via override and some via conf + opts = auth.get_plugin_options('password') + self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP) + + username = 'testuser' + password = 'testpass' + + # configure the authentication options + self.cfg.config(auth_plugin='password', + password=password, + project_id=self.project_id, + user_domain_id='userdomainid', + group=_base.AUTHTOKEN_GROUP) + + conf = {'username': username, 'auth_url': self.AUTH_URL} + + body = uuid.uuid4().hex + app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf) + + resp = self.good_request(app) + self.assertEqual(six.b(body), resp[0]) + + plugin = self.get_plugin(app) + + self.assertEqual(self.AUTH_URL, plugin.auth_url) + self.assertEqual(username, plugin._username) + self.assertEqual(password, plugin._password) + self.assertEqual(self.project_id, plugin._project_id) + + def test_plugin_loading_with_auth_section(self): + # some options via override and some via conf + section = 'testsection' + username = 'testuser' + password = 'testpass' + + auth.register_conf_options(self.cfg.conf, group=section) + opts = auth.get_plugin_options('password') + self.cfg.register_opts(opts, group=section) + + # configure the authentication options + self.cfg.config(auth_section=section, group=_base.AUTHTOKEN_GROUP) + self.cfg.config(auth_plugin='password', + password=password, + project_id=self.project_id, + user_domain_id='userdomainid', + group=section) + + conf = {'username': username, 'auth_url': self.AUTH_URL} + + body = uuid.uuid4().hex + app = auth_token.AuthProtocol(new_app('200 OK', body)(), conf) + + resp = self.good_request(app) + self.assertEqual(six.b(body), resp[0]) + + plugin = self.get_plugin(app) + + self.assertEqual(self.AUTH_URL, plugin.auth_url) + self.assertEqual(username, plugin._username) + self.assertEqual(password, plugin._password) + self.assertEqual(self.project_id, plugin._project_id) + + +def load_tests(loader, tests, pattern): + return testresources.OptimisingTestSuite(tests) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py new file mode 100644 index 00000000..074d1e5d --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py @@ -0,0 +1,118 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +import mock +from six.moves import queue +import testtools +from testtools import matchers + +from keystonemiddleware.auth_token import _memcache_pool +from keystonemiddleware.tests.unit import utils + + +class _TestConnectionPool(_memcache_pool.ConnectionPool): + destroyed_value = 'destroyed' + + def _create_connection(self): + return mock.MagicMock() + + def _destroy_connection(self, conn): + conn(self.destroyed_value) + + +class TestConnectionPool(utils.TestCase): + def setUp(self): + super(TestConnectionPool, self).setUp() + self.unused_timeout = 10 + self.maxsize = 2 + self.connection_pool = _TestConnectionPool( + maxsize=self.maxsize, + unused_timeout=self.unused_timeout) + + def test_get_context_manager(self): + self.assertThat(self.connection_pool.queue, matchers.HasLength(0)) + with self.connection_pool.acquire() as conn: + self.assertEqual(1, self.connection_pool._acquired) + self.assertEqual(0, self.connection_pool._acquired) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(conn, self.connection_pool.queue[0].connection) + + def test_cleanup_pool(self): + self.test_get_context_manager() + newtime = time.time() + self.unused_timeout * 2 + non_expired_connection = _memcache_pool._PoolItem( + ttl=(newtime * 2), + connection=mock.MagicMock()) + self.connection_pool.queue.append(non_expired_connection) + self.assertThat(self.connection_pool.queue, matchers.HasLength(2)) + with mock.patch.object(time, 'time', return_value=newtime): + conn = self.connection_pool.queue[0].connection + with self.connection_pool.acquire(): + pass + conn.assert_has_calls( + [mock.call(self.connection_pool.destroyed_value)]) + self.assertThat(self.connection_pool.queue, matchers.HasLength(1)) + self.assertEqual(0, non_expired_connection.connection.call_count) + + def test_acquire_conn_exception_returns_acquired_count(self): + class TestException(Exception): + pass + + with mock.patch.object(_TestConnectionPool, '_create_connection', + side_effect=TestException): + with testtools.ExpectedException(TestException): + with self.connection_pool.acquire(): + pass + self.assertThat(self.connection_pool.queue, + matchers.HasLength(0)) + self.assertEqual(0, self.connection_pool._acquired) + + def test_connection_pool_limits_maximum_connections(self): + # NOTE(morganfainberg): To ensure we don't lockup tests until the + # job limit, explicitly call .get_nowait() and .put_nowait() in this + # case. + conn1 = self.connection_pool.get_nowait() + conn2 = self.connection_pool.get_nowait() + + # Use a nowait version to raise an Empty exception indicating we would + # not get another connection until one is placed back into the queue. + self.assertRaises(queue.Empty, self.connection_pool.get_nowait) + + # Place the connections back into the pool. + self.connection_pool.put_nowait(conn1) + self.connection_pool.put_nowait(conn2) + + # Make sure we can get a connection out of the pool again. + self.connection_pool.get_nowait() + + def test_connection_pool_maximum_connection_get_timeout(self): + connection_pool = _TestConnectionPool( + maxsize=1, + unused_timeout=self.unused_timeout, + conn_get_timeout=0) + + def _acquire_connection(): + with connection_pool.acquire(): + pass + + # Make sure we've consumed the only available connection from the pool + conn = connection_pool.get_nowait() + + self.assertRaises(_memcache_pool.ConnectionGetTimeoutException, + _acquire_connection) + + # Put the connection back and ensure we can acquire the connection + # after it is available. + connection_pool.put_nowait(conn) + _acquire_connection() diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py new file mode 100644 index 00000000..75c7f759 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py @@ -0,0 +1,97 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six +import testtools + +from keystonemiddleware.auth_token import _memcache_crypt as memcache_crypt + + +class MemcacheCryptPositiveTests(testtools.TestCase): + def _setup_keys(self, strategy): + return memcache_crypt.derive_keys(b'token', b'secret', strategy) + + def test_constant_time_compare(self): + # make sure it works as a compare, the "constant time" aspect + # isn't appropriate to test in unittests + ctc = memcache_crypt.constant_time_compare + self.assertTrue(ctc('abcd', 'abcd')) + self.assertTrue(ctc('', '')) + self.assertFalse(ctc('abcd', 'efgh')) + self.assertFalse(ctc('abc', 'abcd')) + self.assertFalse(ctc('abc', 'abc\x00')) + self.assertFalse(ctc('', 'abc')) + + # For Python 3, we want to test these functions with both str and bytes + # as input. + if six.PY3: + self.assertTrue(ctc(b'abcd', b'abcd')) + self.assertTrue(ctc(b'', b'')) + self.assertFalse(ctc(b'abcd', b'efgh')) + self.assertFalse(ctc(b'abc', b'abcd')) + self.assertFalse(ctc(b'abc', b'abc\x00')) + self.assertFalse(ctc(b'', b'abc')) + + def test_derive_keys(self): + keys = self._setup_keys(b'strategy') + self.assertEqual(len(keys['ENCRYPTION']), + len(keys['CACHE_KEY'])) + self.assertEqual(len(keys['CACHE_KEY']), + len(keys['MAC'])) + self.assertNotEqual(keys['ENCRYPTION'], + keys['MAC']) + self.assertIn('strategy', keys.keys()) + + def test_key_strategy_diff(self): + k1 = self._setup_keys(b'MAC') + k2 = self._setup_keys(b'ENCRYPT') + self.assertNotEqual(k1, k2) + + def test_sign_data(self): + keys = self._setup_keys(b'MAC') + sig = memcache_crypt.sign_data(keys['MAC'], b'data') + self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64) + + def test_encryption(self): + keys = self._setup_keys(b'ENCRYPT') + # what you put in is what you get out + for data in [b'data', b'1234567890123456', b'\x00\xFF' * 13 + ] + [six.int2byte(x % 256) * x for x in range(768)]: + crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data) + decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt) + self.assertEqual(data, decrypt) + self.assertRaises(memcache_crypt.DecryptError, + memcache_crypt.decrypt_data, + keys['ENCRYPTION'], crypt[:-1]) + + def test_protect_wrappers(self): + data = b'My Pretty Little Data' + for strategy in [b'MAC', b'ENCRYPT']: + keys = self._setup_keys(strategy) + protected = memcache_crypt.protect_data(keys, data) + self.assertNotEqual(protected, data) + if strategy == b'ENCRYPT': + self.assertNotIn(data, protected) + unprotected = memcache_crypt.unprotect_data(keys, protected) + self.assertEqual(data, unprotected) + self.assertRaises(memcache_crypt.InvalidMacError, + memcache_crypt.unprotect_data, + keys, protected[:-1]) + self.assertIsNone(memcache_crypt.unprotect_data(keys, None)) + + def test_no_pycrypt(self): + aes = memcache_crypt.AES + memcache_crypt.AES = None + self.assertRaises(memcache_crypt.CryptoUnavailableError, + memcache_crypt.encrypt_data, 'token', 'secret', + 'data') + memcache_crypt.AES = aes diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py new file mode 100644 index 00000000..d144bb6c --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py @@ -0,0 +1,65 @@ +# Copyright 2014 IBM Corp. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import json +import shutil +import uuid + +import mock +import testtools + +from keystonemiddleware.auth_token import _exceptions as exc +from keystonemiddleware.auth_token import _revocations +from keystonemiddleware.auth_token import _signing_dir + + +class RevocationsTests(testtools.TestCase): + + def _check_with_list(self, revoked_list, token_ids): + directory_name = '/tmp/%s' % uuid.uuid4().hex + signing_directory = _signing_dir.SigningDirectory(directory_name) + self.addCleanup(shutil.rmtree, directory_name) + + identity_server = mock.Mock() + + verify_result_obj = { + 'revoked': list({'id': r} for r in revoked_list) + } + cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj)) + + revocations = _revocations.Revocations( + timeout=datetime.timedelta(1), signing_directory=signing_directory, + identity_server=identity_server, cms_verify=cms_verify) + + revocations.check(token_ids) + + def test_check_empty_list(self): + # When the identity server returns an empty list, a token isn't + # revoked. + + revoked_tokens = [] + token_ids = [uuid.uuid4().hex] + # No assert because this would raise + self._check_with_list(revoked_tokens, token_ids) + + def test_check_revoked(self): + # When the identity server returns a list with a token in it, that + # token is revoked. + + token_id = uuid.uuid4().hex + revoked_tokens = [token_id] + token_ids = [token_id] + self.assertRaises(exc.InvalidToken, + self._check_with_list, revoked_tokens, token_ids) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py new file mode 100644 index 00000000..bef62747 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py @@ -0,0 +1,138 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import shutil +import stat +import uuid + +import testtools + +from keystonemiddleware.auth_token import _signing_dir + + +class SigningDirectoryTests(testtools.TestCase): + + def test_directory_created_when_doesnt_exist(self): + # When _SigningDirectory is created, if the directory doesn't exist + # it's created with the expected permissions. + tmp_name = uuid.uuid4().hex + parent_directory = '/tmp/%s' % tmp_name + directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2) + + # Directories are created by __init__. + _signing_dir.SigningDirectory(directory_name) + self.addCleanup(shutil.rmtree, parent_directory) + + self.assertTrue(os.path.isdir(directory_name)) + self.assertTrue(os.access(directory_name, os.W_OK)) + self.assertEqual(os.stat(directory_name).st_uid, os.getuid()) + self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode), + stat.S_IRWXU) + + def test_use_directory_already_exists(self): + # The directory can already exist. + + tmp_name = uuid.uuid4().hex + parent_directory = '/tmp/%s' % tmp_name + directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2) + os.makedirs(directory_name, stat.S_IRWXU) + self.addCleanup(shutil.rmtree, parent_directory) + + _signing_dir.SigningDirectory(directory_name) + + def test_write_file(self): + # write_file when the file doesn't exist creates the file. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + file_name = self.getUniqueString() + contents = self.getUniqueString() + signing_directory.write_file(file_name, contents) + + file_path = signing_directory.calc_path(file_name) + with open(file_path) as f: + actual_contents = f.read() + + self.assertEqual(contents, actual_contents) + + def test_replace_file(self): + # write_file when the file already exists overwrites it. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + file_name = self.getUniqueString() + orig_contents = self.getUniqueString() + signing_directory.write_file(file_name, orig_contents) + + new_contents = self.getUniqueString() + signing_directory.write_file(file_name, new_contents) + + file_path = signing_directory.calc_path(file_name) + with open(file_path) as f: + actual_contents = f.read() + + self.assertEqual(new_contents, actual_contents) + + def test_recreate_directory(self): + # If the original directory is lost, it gets recreated when a file + # is written. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + # Delete the directory. + shutil.rmtree(signing_directory._directory_name) + + file_name = self.getUniqueString() + contents = self.getUniqueString() + signing_directory.write_file(file_name, contents) + + actual_contents = signing_directory.read_file(file_name) + self.assertEqual(contents, actual_contents) + + def test_read_file(self): + # Can read a file that was written. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + file_name = self.getUniqueString() + contents = self.getUniqueString() + signing_directory.write_file(file_name, contents) + + actual_contents = signing_directory.read_file(file_name) + + self.assertEqual(contents, actual_contents) + + def test_read_file_doesnt_exist(self): + # Show what happens when try to read a file that wasn't written. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + file_name = self.getUniqueString() + self.assertRaises(IOError, signing_directory.read_file, file_name) + + def test_calc_path(self): + # calc_path returns the actual filename built from the directory name. + + signing_directory = _signing_dir.SigningDirectory() + self.addCleanup(shutil.rmtree, signing_directory._directory_name) + + file_name = self.getUniqueString() + actual_path = signing_directory.calc_path(file_name) + expected_path = os.path.join(signing_directory._directory_name, + file_name) + self.assertEqual(expected_path, actual_path) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py new file mode 100644 index 00000000..fcd1e628 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py @@ -0,0 +1,37 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +from keystonemiddleware.auth_token import _utils + + +class TokenEncodingTest(testtools.TestCase): + + def test_unquoted_token(self): + self.assertEqual('foo%20bar', _utils.safe_quote('foo bar')) + + def test_quoted_token(self): + self.assertEqual('foo%20bar', _utils.safe_quote('foo%20bar')) + + def test_messages_encoded_as_bytes(self): + """Test that string are passed around as bytes for PY3.""" + msg = "This is an error" + + class FakeResp(_utils.MiniResp): + def __init__(self, error, env): + super(FakeResp, self).__init__(error, env) + + fake_resp = FakeResp(msg, dict(REQUEST_METHOD='GET')) + # On Py2 .encode() don't do much but that's better than to + # have a ifdef with six.PY3 + self.assertEqual(msg.encode(), fake_resp.body[0]) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py new file mode 100644 index 00000000..ee4111ec --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py @@ -0,0 +1,452 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os + +import fixtures +from keystoneclient.common import cms +from keystoneclient import fixture +from keystoneclient import utils +from oslo_serialization import jsonutils +from oslo_utils import timeutils +import six +import testresources + + +TESTDIR = os.path.dirname(os.path.abspath(__file__)) +ROOTDIR = os.path.normpath(os.path.join(TESTDIR, '..', '..', '..')) +CERTDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'certs') +CMSDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'cms') +KEYDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'private') + + +def _hash_signed_token_safe(signed_text, **kwargs): + if isinstance(signed_text, six.text_type): + signed_text = signed_text.encode('utf-8') + return utils.hash_signed_token(signed_text, **kwargs) + + +class Examples(fixtures.Fixture): + """Example tokens and certs loaded from the examples directory. + + To use this class correctly, the module needs to override the test suite + class to use testresources.OptimisingTestSuite (otherwise the files will + be read on every test). This is done by defining a load_tests function + in the module, like this: + + def load_tests(loader, tests, pattern): + return testresources.OptimisingTestSuite(tests) + + (see http://docs.python.org/2/library/unittest.html#load-tests-protocol ) + + """ + + def setUp(self): + super(Examples, self).setUp() + + # The data for several tests are signed using openssl and are stored in + # files in the signing subdirectory. In order to keep the values + # consistent between the tests and the signed documents, we read them + # in for use in the tests. + with open(os.path.join(CMSDIR, 'auth_token_scoped.json')) as f: + self.TOKEN_SCOPED_DATA = cms.cms_to_token(f.read()) + + with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f: + self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read()) + self.SIGNED_TOKEN_SCOPED_HASH = _hash_signed_token_safe( + self.SIGNED_TOKEN_SCOPED) + self.SIGNED_TOKEN_SCOPED_HASH_SHA256 = _hash_signed_token_safe( + self.SIGNED_TOKEN_SCOPED, mode='sha256') + with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f: + self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f: + self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read()) + self.SIGNED_v3_TOKEN_SCOPED_HASH = _hash_signed_token_safe( + self.SIGNED_v3_TOKEN_SCOPED) + self.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256 = _hash_signed_token_safe( + self.SIGNED_v3_TOKEN_SCOPED, mode='sha256') + with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f: + self.REVOKED_TOKEN = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f: + self.SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f: + self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_token_scoped.pkiz')) as f: + self.SIGNED_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_token_unscoped.pkiz')) as f: + self.SIGNED_TOKEN_UNSCOPED_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pkiz')) as f: + self.SIGNED_v3_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_token_revoked.pkiz')) as f: + self.REVOKED_TOKEN_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, + 'auth_token_scoped_expired.pkiz')) as f: + self.SIGNED_TOKEN_SCOPED_EXPIRED_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pkiz')) as f: + self.REVOKED_v3_TOKEN_PKIZ = cms.cms_to_token(f.read()) + with open(os.path.join(CMSDIR, 'revocation_list.json')) as f: + self.REVOCATION_LIST = jsonutils.loads(f.read()) + with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f: + self.SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()}) + + self.SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem') + with open(self.SIGNING_CERT_FILE) as f: + self.SIGNING_CERT = f.read() + + self.KERBEROS_BIND = 'USER@REALM' + + self.SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem') + with open(self.SIGNING_KEY_FILE) as f: + self.SIGNING_KEY = f.read() + + self.SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem') + with open(self.SIGNING_CA_FILE) as f: + self.SIGNING_CA = f.read() + + self.UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d" + self.UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df' + self.UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776' + self.UUID_TOKEN_BIND = '3fc54048ad64405c98225ce0897af7c5' + self.UUID_TOKEN_UNKNOWN_BIND = '8885fdf4d42e4fb9879e6379fa1eaf48' + self.VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726' + self.v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1' + self.v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79' + self.v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792' + self.v3_UUID_TOKEN_BIND = '2f61f73e1c854cbb9534c487f9bd63c2' + self.v3_UUID_TOKEN_UNKNOWN_BIND = '7ed9781b62cd4880b8d8c6788ab1d1e2' + + self.UUID_SERVICE_TOKEN_DEFAULT = 'fe4c0710ec2f492748596c1b53ab124' + self.v3_UUID_SERVICE_TOKEN_DEFAULT = 'g431071bbc2f492748596c1b53cb229' + + revoked_token = self.REVOKED_TOKEN + if isinstance(revoked_token, six.text_type): + revoked_token = revoked_token.encode('utf-8') + self.REVOKED_TOKEN_HASH = utils.hash_signed_token(revoked_token) + self.REVOKED_TOKEN_HASH_SHA256 = utils.hash_signed_token(revoked_token, + mode='sha256') + self.REVOKED_TOKEN_LIST = ( + {'revoked': [{'id': self.REVOKED_TOKEN_HASH, + 'expires': timeutils.utcnow()}]}) + self.REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(self.REVOKED_TOKEN_LIST) + + revoked_v3_token = self.REVOKED_v3_TOKEN + if isinstance(revoked_v3_token, six.text_type): + revoked_v3_token = revoked_v3_token.encode('utf-8') + self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(revoked_v3_token) + hash = utils.hash_signed_token(revoked_v3_token, mode='sha256') + self.REVOKED_v3_TOKEN_HASH_SHA256 = hash + self.REVOKED_v3_TOKEN_LIST = ( + {'revoked': [{'id': self.REVOKED_v3_TOKEN_HASH, + 'expires': timeutils.utcnow()}]}) + self.REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps( + self.REVOKED_v3_TOKEN_LIST) + + revoked_token_pkiz = self.REVOKED_TOKEN_PKIZ + if isinstance(revoked_token_pkiz, six.text_type): + revoked_token_pkiz = revoked_token_pkiz.encode('utf-8') + self.REVOKED_TOKEN_PKIZ_HASH = utils.hash_signed_token( + revoked_token_pkiz) + revoked_v3_token_pkiz = self.REVOKED_v3_TOKEN_PKIZ + if isinstance(revoked_v3_token_pkiz, six.text_type): + revoked_v3_token_pkiz = revoked_v3_token_pkiz.encode('utf-8') + self.REVOKED_v3_PKIZ_TOKEN_HASH = utils.hash_signed_token( + revoked_v3_token_pkiz) + + self.REVOKED_TOKEN_PKIZ_LIST = ( + {'revoked': [{'id': self.REVOKED_TOKEN_PKIZ_HASH, + 'expires': timeutils.utcnow()}, + {'id': self.REVOKED_v3_PKIZ_TOKEN_HASH, + 'expires': timeutils.utcnow()}, + ]}) + self.REVOKED_TOKEN_PKIZ_LIST_JSON = jsonutils.dumps( + self.REVOKED_TOKEN_PKIZ_LIST) + + self.SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token( + self.SIGNED_TOKEN_SCOPED) + self.SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token( + self.SIGNED_TOKEN_UNSCOPED) + self.SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token( + self.SIGNED_v3_TOKEN_SCOPED) + + self.SIGNED_TOKEN_SCOPED_PKIZ_KEY = cms.cms_hash_token( + self.SIGNED_TOKEN_SCOPED_PKIZ) + self.SIGNED_TOKEN_UNSCOPED_PKIZ_KEY = cms.cms_hash_token( + self.SIGNED_TOKEN_UNSCOPED_PKIZ) + self.SIGNED_v3_TOKEN_SCOPED_PKIZ_KEY = cms.cms_hash_token( + self.SIGNED_v3_TOKEN_SCOPED_PKIZ) + + self.INVALID_SIGNED_TOKEN = ( + "MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" + "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD" + "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" + "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" + "0000000000000000000000000000000000000000000000000000000000000000" + "1111111111111111111111111111111111111111111111111111111111111111" + "2222222222222222222222222222222222222222222222222222222222222222" + "3333333333333333333333333333333333333333333333333333333333333333" + "4444444444444444444444444444444444444444444444444444444444444444" + "5555555555555555555555555555555555555555555555555555555555555555" + "6666666666666666666666666666666666666666666666666666666666666666" + "7777777777777777777777777777777777777777777777777777777777777777" + "8888888888888888888888888888888888888888888888888888888888888888" + "9999999999999999999999999999999999999999999999999999999999999999" + "0000000000000000000000000000000000000000000000000000000000000000") + + self.INVALID_SIGNED_PKIZ_TOKEN = ( + "PKIZ_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" + "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" + "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD" + "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" + "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" + "0000000000000000000000000000000000000000000000000000000000000000" + "1111111111111111111111111111111111111111111111111111111111111111" + "2222222222222222222222222222222222222222222222222222222222222222" + "3333333333333333333333333333333333333333333333333333333333333333" + "4444444444444444444444444444444444444444444444444444444444444444" + "5555555555555555555555555555555555555555555555555555555555555555" + "6666666666666666666666666666666666666666666666666666666666666666" + "7777777777777777777777777777777777777777777777777777777777777777" + "8888888888888888888888888888888888888888888888888888888888888888" + "9999999999999999999999999999999999999999999999999999999999999999" + "0000000000000000000000000000000000000000000000000000000000000000") + + # JSON responses keyed by token ID + self.TOKEN_RESPONSES = {} + + # basic values + PROJECT_ID = 'tenant_id1' + PROJECT_NAME = 'tenant_name1' + USER_ID = 'user_id1' + USER_NAME = 'user_name1' + DOMAIN_ID = 'domain_id1' + DOMAIN_NAME = 'domain_name1' + ROLE_NAME1 = 'role1' + ROLE_NAME2 = 'role2' + + SERVICE_PROJECT_ID = 'service_project_id1' + SERVICE_PROJECT_NAME = 'service_project_name1' + SERVICE_USER_ID = 'service_user_id1' + SERVICE_USER_NAME = 'service_user_name1' + SERVICE_DOMAIN_ID = 'service_domain_id1' + SERVICE_DOMAIN_NAME = 'service_domain_name1' + SERVICE_ROLE_NAME1 = 'service_role1' + SERVICE_ROLE_NAME2 = 'service_role2' + + self.SERVICE_TYPE = 'identity' + self.UNVERSIONED_SERVICE_URL = 'http://keystone.server:5000/' + self.SERVICE_URL = self.UNVERSIONED_SERVICE_URL + 'v2.0' + + # Old Tokens + + self.TOKEN_RESPONSES[self.VALID_DIABLO_TOKEN] = { + 'access': { + 'token': { + 'id': self.VALID_DIABLO_TOKEN, + 'expires': '2020-01-01T00:00:10.000123Z', + 'tenantId': PROJECT_ID, + }, + 'user': { + 'id': USER_ID, + 'name': USER_NAME, + 'roles': [ + {'name': ROLE_NAME1}, + {'name': ROLE_NAME2}, + ], + }, + }, + } + + # Generated V2 Tokens + + token = fixture.V2Token(token_id=self.UUID_TOKEN_DEFAULT, + tenant_id=PROJECT_ID, + tenant_name=PROJECT_NAME, + user_id=USER_ID, + user_name=USER_NAME) + token.add_role(name=ROLE_NAME1) + token.add_role(name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint(public=self.SERVICE_URL) + self.TOKEN_RESPONSES[self.UUID_TOKEN_DEFAULT] = token + + token = fixture.V2Token(token_id=self.UUID_TOKEN_UNSCOPED, + user_id=USER_ID, + user_name=USER_NAME) + self.TOKEN_RESPONSES[self.UUID_TOKEN_UNSCOPED] = token + + token = fixture.V2Token(token_id='valid-token', + tenant_id=PROJECT_ID, + tenant_name=PROJECT_NAME, + user_id=USER_ID, + user_name=USER_NAME) + token.add_role(ROLE_NAME1) + token.add_role(ROLE_NAME2) + self.TOKEN_RESPONSES[self.UUID_TOKEN_NO_SERVICE_CATALOG] = token + + token = fixture.V2Token(token_id=self.SIGNED_TOKEN_SCOPED_KEY, + tenant_id=PROJECT_ID, + tenant_name=PROJECT_NAME, + user_id=USER_ID, + user_name=USER_NAME) + token.add_role(ROLE_NAME1) + token.add_role(ROLE_NAME2) + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = token + + token = fixture.V2Token(token_id=self.SIGNED_TOKEN_UNSCOPED_KEY, + user_id=USER_ID, + user_name=USER_NAME) + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_KEY] = token + + token = fixture.V2Token(token_id=self.UUID_TOKEN_BIND, + tenant_id=PROJECT_ID, + tenant_name=PROJECT_NAME, + user_id=USER_ID, + user_name=USER_NAME) + token.add_role(ROLE_NAME1) + token.add_role(ROLE_NAME2) + token['access']['token']['bind'] = {'kerberos': self.KERBEROS_BIND} + self.TOKEN_RESPONSES[self.UUID_TOKEN_BIND] = token + + token = fixture.V2Token(token_id=self.UUID_TOKEN_UNKNOWN_BIND, + tenant_id=PROJECT_ID, + tenant_name=PROJECT_NAME, + user_id=USER_ID, + user_name=USER_NAME) + token.add_role(ROLE_NAME1) + token.add_role(ROLE_NAME2) + token['access']['token']['bind'] = {'FOO': 'BAR'} + self.TOKEN_RESPONSES[self.UUID_TOKEN_UNKNOWN_BIND] = token + + token = fixture.V2Token(token_id=self.UUID_SERVICE_TOKEN_DEFAULT, + tenant_id=SERVICE_PROJECT_ID, + tenant_name=SERVICE_PROJECT_NAME, + user_id=SERVICE_USER_ID, + user_name=SERVICE_USER_NAME) + token.add_role(name=SERVICE_ROLE_NAME1) + token.add_role(name=SERVICE_ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint(public=self.SERVICE_URL) + self.TOKEN_RESPONSES[self.UUID_SERVICE_TOKEN_DEFAULT] = token + + # Generated V3 Tokens + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME, + project_id=PROJECT_ID, + project_name=PROJECT_NAME, + project_domain_id=DOMAIN_ID, + project_domain_name=DOMAIN_NAME) + token.add_role(id=ROLE_NAME1, name=ROLE_NAME1) + token.add_role(id=ROLE_NAME2, name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_DEFAULT] = token + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME) + self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_UNSCOPED] = token + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME, + domain_id=DOMAIN_ID, + domain_name=DOMAIN_NAME) + token.add_role(id=ROLE_NAME1, name=ROLE_NAME1) + token.add_role(id=ROLE_NAME2, name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_DOMAIN_SCOPED] = token + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME, + project_id=PROJECT_ID, + project_name=PROJECT_NAME, + project_domain_id=DOMAIN_ID, + project_domain_name=DOMAIN_NAME) + token.add_role(name=ROLE_NAME1) + token.add_role(name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY] = token + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME, + project_id=PROJECT_ID, + project_name=PROJECT_NAME, + project_domain_id=DOMAIN_ID, + project_domain_name=DOMAIN_NAME) + token.add_role(name=ROLE_NAME1) + token.add_role(name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + token['token']['bind'] = {'kerberos': self.KERBEROS_BIND} + self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_BIND] = token + + token = fixture.V3Token(user_id=USER_ID, + user_name=USER_NAME, + user_domain_id=DOMAIN_ID, + user_domain_name=DOMAIN_NAME, + project_id=PROJECT_ID, + project_name=PROJECT_NAME, + project_domain_id=DOMAIN_ID, + project_domain_name=DOMAIN_NAME) + token.add_role(name=ROLE_NAME1) + token.add_role(name=ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + token['token']['bind'] = {'FOO': 'BAR'} + self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_UNKNOWN_BIND] = token + + token = fixture.V3Token(user_id=SERVICE_USER_ID, + user_name=SERVICE_USER_NAME, + user_domain_id=SERVICE_DOMAIN_ID, + user_domain_name=SERVICE_DOMAIN_NAME, + project_id=SERVICE_PROJECT_ID, + project_name=SERVICE_PROJECT_NAME, + project_domain_id=SERVICE_DOMAIN_ID, + project_domain_name=SERVICE_DOMAIN_NAME) + token.add_role(id=SERVICE_ROLE_NAME1, + name=SERVICE_ROLE_NAME1) + token.add_role(id=SERVICE_ROLE_NAME2, + name=SERVICE_ROLE_NAME2) + svc = token.add_service(self.SERVICE_TYPE) + svc.add_endpoint('public', self.SERVICE_URL) + self.TOKEN_RESPONSES[self.v3_UUID_SERVICE_TOKEN_DEFAULT] = token + + # PKIZ tokens generally link to above tokens + + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_PKIZ_KEY] = ( + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY]) + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_PKIZ_KEY] = ( + self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_KEY]) + self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_PKIZ_KEY] = ( + self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY]) + + self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in + six.iteritems(self.TOKEN_RESPONSES)]) + + +EXAMPLES_RESOURCE = testresources.FixtureResource(Examples()) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py new file mode 100644 index 00000000..89e5aa44 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py @@ -0,0 +1,485 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import tempfile +import uuid + +import mock +from oslo_config import cfg +from pycadf import identifier +import testtools +from testtools import matchers +import webob + +from keystonemiddleware import audit + + +class FakeApp(object): + def __call__(self, env, start_response): + body = 'Some response' + start_response('200 OK', [ + ('Content-Type', 'text/plain'), + ('Content-Length', str(sum(map(len, body)))) + ]) + return [body] + + +class FakeFailingApp(object): + def __call__(self, env, start_response): + raise Exception('It happens!') + + +class BaseAuditMiddlewareTest(testtools.TestCase): + def setUp(self): + super(BaseAuditMiddlewareTest, self).setUp() + self.fd, self.audit_map = tempfile.mkstemp() + + with open(self.audit_map, "w") as f: + f.write("[custom_actions]\n") + f.write("reboot = start/reboot\n") + f.write("os-migrations/get = read\n\n") + f.write("[path_keywords]\n") + f.write("action = None\n") + f.write("os-hosts = host\n") + f.write("os-migrations = None\n") + f.write("reboot = None\n") + f.write("servers = server\n\n") + f.write("[service_endpoints]\n") + f.write("compute = service/compute") + + cfg.CONF([], project='keystonemiddleware') + + self.middleware = audit.AuditMiddleware( + FakeApp(), audit_map_file=self.audit_map, + service_name='pycadf') + + self.addCleanup(lambda: os.close(self.fd)) + self.addCleanup(cfg.CONF.reset) + + @staticmethod + def get_environ_header(req_type): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "publicURL": + "http://public_host:8774", + "internalURL": + "http://internal_host:8774", + "id": "resource_id"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed'} + env_headers['REQUEST_METHOD'] = req_type + return env_headers + + +@mock.patch('oslo.messaging.get_transport', mock.MagicMock()) +class AuditMiddlewareTest(BaseAuditMiddlewareTest): + + def test_api_request(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + self.middleware(req) + # Check first notification with only 'request' + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('pending', call_args[2]['outcome']) + self.assertNotIn('reason', call_args[2]) + self.assertNotIn('reporterchain', call_args[2]) + + # Check second notification with request + response + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('success', call_args[2]['outcome']) + self.assertIn('reason', call_args[2]) + self.assertIn('reporterchain', call_args[2]) + + def test_api_request_failure(self): + self.middleware = audit.AuditMiddleware( + FakeFailingApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + try: + self.middleware(req) + self.fail('Application exception has not been re-raised') + except Exception: + pass + # Check first notification with only 'request' + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('pending', call_args[2]['outcome']) + self.assertNotIn('reporterchain', call_args[2]) + + # Check second notification with request + response + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/foo/bar', call_args[2]['requestPath']) + self.assertEqual('unknown', call_args[2]['outcome']) + self.assertIn('reporterchain', call_args[2]) + + def test_process_request_fail(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + self.middleware._process_request(req) + self.assertTrue(notify.called) + + def test_process_response_fail(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + self.middleware._process_response(req, webob.response.Response()) + self.assertTrue(notify.called) + + def test_ignore_req_opt(self): + self.middleware = audit.AuditMiddleware(FakeApp(), + audit_map_file=self.audit_map, + ignore_req_list='get, PUT') + req = webob.Request.blank('/skip/foo', + environ=self.get_environ_header('GET')) + req1 = webob.Request.blank('/skip/foo', + environ=self.get_environ_header('PUT')) + req2 = webob.Request.blank('/accept/foo', + environ=self.get_environ_header('POST')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + # Check GET/PUT request does not send notification + self.middleware(req) + self.middleware(req1) + self.assertEqual([], notify.call_args_list) + + # Check non-GET/PUT request does send notification + self.middleware(req2) + self.assertThat(notify.call_args_list, matchers.HasLength(2)) + call_args = notify.call_args_list[0][0] + self.assertEqual('audit.http.request', call_args[1]) + self.assertEqual('/accept/foo', call_args[2]['requestPath']) + + call_args = notify.call_args_list[1][0] + self.assertEqual('audit.http.response', call_args[1]) + self.assertEqual('/accept/foo', call_args[2]['requestPath']) + + def test_api_request_no_messaging(self): + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('keystonemiddleware.audit.messaging', None): + with mock.patch('keystonemiddleware.audit._LOG.info') as log: + self.middleware(req) + # Check first notification with only 'request' + call_args = log.call_args_list[0][0] + self.assertEqual('audit.http.request', + call_args[1]['event_type']) + + # Check second notification with request + response + call_args = log.call_args_list[1][0] + self.assertEqual('audit.http.response', + call_args[1]['event_type']) + + def test_cadf_event_scoped_to_request(self): + middleware = audit.AuditMiddleware( + FakeApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + middleware(req) + self.assertIsNotNone(req.environ.get('cadf_event')) + + # ensure exact same event is used between request and response + self.assertEqual(notify.call_args_list[0][0][2]['id'], + notify.call_args_list[1][0][2]['id']) + + def test_cadf_event_scoped_to_request_on_error(self): + middleware = audit.AuditMiddleware( + FakeApp(), + audit_map_file=self.audit_map, + service_name='pycadf') + req = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info', + side_effect=Exception('error')) as notify: + middleware._process_request(req) + self.assertTrue(notify.called) + req2 = webob.Request.blank('/foo/bar', + environ=self.get_environ_header('GET')) + with mock.patch('oslo.messaging.Notifier.info') as notify: + middleware._process_response(req2, webob.response.Response()) + self.assertTrue(notify.called) + # ensure event is not the same across requests + self.assertNotEqual(req.environ['cadf_event'].id, + notify.call_args_list[0][0][2]['id']) + + +@mock.patch('oslo.messaging', mock.MagicMock()) +class AuditApiLogicTest(BaseAuditMiddlewareTest): + + def api_request(self, method, url): + req = webob.Request.blank(url, environ=self.get_environ_header(method), + remote_addr='192.168.0.1') + self.middleware._process_request(req) + return req + + def test_get_list(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['typeURI'], + 'http://schemas.dmtf.org/cloud/audit/1.0/event') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['eventType'], 'activity') + self.assertEqual(payload['target']['name'], 'nova') + self.assertEqual(payload['target']['id'], 'openstack:resource_id') + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(len(payload['target']['addresses']), 3) + self.assertEqual(payload['target']['addresses'][0]['name'], 'admin') + self.assertEqual(payload['target']['addresses'][0]['url'], + 'http://admin_host:8774') + self.assertEqual(payload['initiator']['id'], 'openstack:user_id') + self.assertEqual(payload['initiator']['name'], 'user_name') + self.assertEqual(payload['initiator']['project_id'], + 'openstack:tenant_id') + self.assertEqual(payload['initiator']['host']['address'], + '192.168.0.1') + self.assertEqual(payload['initiator']['typeURI'], + 'service/security/account/user') + self.assertNotEqual(payload['initiator']['credential']['token'], + 'token') + self.assertEqual(payload['initiator']['credential']['identity_status'], + 'Confirmed') + self.assertNotIn('reason', payload) + self.assertNotIn('reporterchain', payload) + self.assertEqual(payload['observer']['id'], 'target') + self.assertEqual(req.path, payload['requestPath']) + + def test_get_read(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/' + + str(uuid.uuid4())) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/server') + self.assertEqual(payload['action'], 'read') + self.assertEqual(payload['outcome'], 'pending') + + def test_get_unknown_endpoint(self): + req = self.api_request('GET', 'http://unknown:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['target']['name'], 'unknown') + self.assertEqual(payload['target']['id'], 'unknown') + self.assertEqual(payload['target']['typeURI'], 'unknown') + + def test_get_unknown_endpoint_default_set(self): + with open(self.audit_map, "w") as f: + f.write("[DEFAULT]\n") + f.write("target_endpoint_type = compute\n") + f.write("[path_keywords]\n") + f.write("servers = server\n\n") + f.write("[service_endpoints]\n") + f.write("compute = service/compute") + + self.middleware = audit.AuditMiddleware( + FakeApp(), audit_map_file=self.audit_map, + service_name='pycadf') + + req = self.api_request('GET', 'http://unknown:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['action'], 'read/list') + self.assertEqual(payload['outcome'], 'pending') + self.assertEqual(payload['target']['name'], 'nova') + self.assertEqual(payload['target']['id'], 'openstack:resource_id') + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + + def test_put(self): + req = self.api_request('PUT', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'update') + self.assertEqual(payload['outcome'], 'pending') + + def test_delete(self): + req = self.api_request('DELETE', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'delete') + self.assertEqual(payload['outcome'], 'pending') + + def test_head(self): + req = self.api_request('HEAD', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'read') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_update(self): + req = self.api_request('POST', + 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/' + + str(uuid.uuid4())) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/server') + self.assertEqual(payload['action'], 'update') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_create(self): + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers') + self.assertEqual(payload['action'], 'create') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_action(self): + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/action', + environ=self.get_environ_header('POST')) + req.body = b'{"createImage" : {"name" : "new-image","metadata": ' \ + b'{"ImageType": "Gold","ImageVersion": "2.0"}}}' + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/action') + self.assertEqual(payload['action'], 'update/createImage') + self.assertEqual(payload['outcome'], 'pending') + + def test_post_empty_body_action(self): + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers/action') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/servers/action') + self.assertEqual(payload['action'], 'create') + self.assertEqual(payload['outcome'], 'pending') + + def test_custom_action(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-hosts/' + + str(uuid.uuid4()) + '/reboot') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-hosts/host/reboot') + self.assertEqual(payload['action'], 'start/reboot') + self.assertEqual(payload['outcome'], 'pending') + + def test_custom_action_complex(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-migrations') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-migrations') + self.assertEqual(payload['action'], 'read') + req = self.api_request('POST', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/os-migrations') + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['typeURI'], + 'service/compute/os-migrations') + self.assertEqual(payload['action'], 'create') + + def test_response_mod_msg(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.middleware._process_response(req, webob.Response()) + payload2 = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['id'], payload2['id']) + self.assertEqual(payload['tags'], payload2['tags']) + self.assertEqual(payload2['outcome'], 'success') + self.assertEqual(payload2['reason']['reasonType'], 'HTTP') + self.assertEqual(payload2['reason']['reasonCode'], '200') + self.assertEqual(len(payload2['reporterchain']), 1) + self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier') + self.assertEqual(payload2['reporterchain'][0]['reporter']['id'], + 'target') + + def test_no_response(self): + req = self.api_request('GET', 'http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers') + payload = req.environ['cadf_event'].as_dict() + self.middleware._process_response(req, None) + payload2 = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['id'], payload2['id']) + self.assertEqual(payload['tags'], payload2['tags']) + self.assertEqual(payload2['outcome'], 'unknown') + self.assertNotIn('reason', payload2) + self.assertEqual(len(payload2['reporterchain']), 1) + self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier') + self.assertEqual(payload2['reporterchain'][0]['reporter']['id'], + 'target') + + def test_missing_req(self): + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=self.get_environ_header('GET')) + self.assertNotIn('cadf_event', req.environ) + self.middleware._process_response(req, webob.Response()) + self.assertIn('cadf_event', req.environ) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['outcome'], 'success') + self.assertEqual(payload['reason']['reasonType'], 'HTTP') + self.assertEqual(payload['reason']['reasonCode'], '200') + self.assertEqual(payload['observer']['id'], 'target') + + def test_missing_catalog_endpoint_id(self): + env_headers = {'HTTP_X_SERVICE_CATALOG': + '''[{"endpoints_links": [], + "endpoints": [{"adminURL": + "http://admin_host:8774", + "region": "RegionOne", + "publicURL": + "http://public_host:8774", + "internalURL": + "http://internal_host:8774"}], + "type": "compute", + "name": "nova"},]''', + 'HTTP_X_USER_ID': 'user_id', + 'HTTP_X_USER_NAME': 'user_name', + 'HTTP_X_AUTH_TOKEN': 'token', + 'HTTP_X_PROJECT_ID': 'tenant_id', + 'HTTP_X_IDENTITY_STATUS': 'Confirmed', + 'REQUEST_METHOD': 'GET'} + req = webob.Request.blank('http://admin_host:8774/v2/' + + str(uuid.uuid4()) + '/servers', + environ=env_headers) + self.middleware._process_request(req) + payload = req.environ['cadf_event'].as_dict() + self.assertEqual(payload['target']['id'], identifier.norm_ns('nova')) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py new file mode 100644 index 00000000..93e1b06e --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py @@ -0,0 +1,85 @@ +# Copyright (c) 2014 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pkg_resources +from testtools import matchers + +from keystonemiddleware import opts +from keystonemiddleware.tests.unit import utils + + +class OptsTestCase(utils.TestCase): + + def _test_list_auth_token_opts(self, result): + self.assertThat(result, matchers.HasLength(1)) + + for group in (g for (g, _l) in result): + self.assertEqual('keystone_authtoken', group) + + expected_opt_names = [ + 'auth_admin_prefix', + 'auth_host', + 'auth_port', + 'auth_protocol', + 'auth_uri', + 'identity_uri', + 'auth_version', + 'delay_auth_decision', + 'http_connect_timeout', + 'http_request_max_retries', + 'admin_token', + 'admin_user', + 'admin_password', + 'admin_tenant_name', + 'cache', + 'certfile', + 'keyfile', + 'cafile', + 'insecure', + 'signing_dir', + 'memcached_servers', + 'token_cache_time', + 'revocation_cache_time', + 'memcache_security_strategy', + 'memcache_secret_key', + 'memcache_use_advanced_pool', + 'memcache_pool_dead_retry', + 'memcache_pool_maxsize', + 'memcache_pool_unused_timeout', + 'memcache_pool_conn_get_timeout', + 'memcache_pool_socket_timeout', + 'include_service_catalog', + 'enforce_token_bind', + 'check_revocations_for_cached', + 'hash_algorithms' + ] + opt_names = [o.name for (g, l) in result for o in l] + self.assertThat(opt_names, matchers.HasLength(len(expected_opt_names))) + + for opt in opt_names: + self.assertIn(opt, expected_opt_names) + + def test_list_auth_token_opts(self): + self._test_list_auth_token_opts(opts.list_auth_token_opts()) + + def test_entry_point(self): + result = None + for ep in pkg_resources.iter_entry_points('oslo.config.opts'): + if ep.name == 'keystonemiddleware.auth_token': + list_fn = ep.load() + result = list_fn() + break + + self.assertIsNotNone(result) + self._test_list_auth_token_opts(result) diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py new file mode 100644 index 00000000..2bcdf894 --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py @@ -0,0 +1,235 @@ +# Copyright 2012 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_serialization import jsonutils +import requests +from requests_mock.contrib import fixture as rm_fixture +import six +import testtools +import webob + +from keystonemiddleware import s3_token +from keystonemiddleware.tests.unit import utils + + +GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID', + 'tenant': {'id': 'TENANT_ID'}}}} + + +class FakeApp(object): + """This represents a WSGI app protected by the auth_token middleware.""" + def __call__(self, env, start_response): + resp = webob.Response() + resp.environ = env + return resp(env, start_response) + + +class S3TokenMiddlewareTestBase(utils.TestCase): + + TEST_PROTOCOL = 'https' + TEST_HOST = 'fakehost' + TEST_PORT = 35357 + TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL, + TEST_HOST, + TEST_PORT) + + def setUp(self): + super(S3TokenMiddlewareTestBase, self).setUp() + + self.conf = { + 'auth_host': self.TEST_HOST, + 'auth_port': self.TEST_PORT, + 'auth_protocol': self.TEST_PROTOCOL, + } + + self.requests = self.useFixture(rm_fixture.Fixture()) + + def start_fake_response(self, status, headers): + self.response_status = int(status.split(' ', 1)[0]) + self.response_headers = dict(headers) + + +class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): + + def setUp(self): + super(S3TokenMiddlewareTestGood, self).setUp() + self.middleware = s3_token.S3Token(FakeApp(), self.conf) + + self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE) + + # Ignore the request and pass to the next middleware in the + # pipeline if no path has been specified. + def test_no_path_request(self): + req = webob.Request.blank('/') + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + + # Ignore the request and pass to the next middleware in the + # pipeline if no Authorization header has been specified + def test_without_authorization(self): + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + + def test_without_auth_storage_token(self): + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'badboy' + self.middleware(req.environ, self.start_fake_response) + self.assertEqual(self.response_status, 200) + + def test_authorized(self): + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID')) + self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') + + def test_authorized_http(self): + self.requests.post(self.TEST_URL.replace('https', 'http'), + status_code=201, + json=GOOD_RESPONSE) + + self.middleware = ( + s3_token.filter_factory({'auth_protocol': 'http', + 'auth_host': self.TEST_HOST, + 'auth_port': self.TEST_PORT})(FakeApp())) + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID')) + self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') + + def test_authorization_nova_toconnect(self): + req = webob.Request.blank('/v1/AUTH_swiftint/c/o') + req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + path = req.environ['PATH_INFO'] + self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID')) + + @mock.patch.object(requests, 'post') + def test_insecure(self, MOCK_REQUEST): + self.middleware = ( + s3_token.filter_factory({'insecure': True})(FakeApp())) + + text_return_value = jsonutils.dumps(GOOD_RESPONSE) + if six.PY3: + text_return_value = text_return_value.encode() + MOCK_REQUEST.return_value = utils.TestResponse({ + 'status_code': 201, + 'text': text_return_value}) + + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + + self.assertTrue(MOCK_REQUEST.called) + mock_args, mock_kwargs = MOCK_REQUEST.call_args + self.assertIs(mock_kwargs['verify'], False) + + +class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): + def setUp(self): + super(S3TokenMiddlewareTestBad, self).setUp() + self.middleware = s3_token.S3Token(FakeApp(), self.conf) + + def test_unauthorized_token(self): + ret = {"error": + {"message": "EC2 access key not found.", + "code": 401, + "title": "Unauthorized"}} + self.requests.post(self.TEST_URL, status_code=403, json=ret) + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + resp = req.get_response(self.middleware) + s3_denied_req = self.middleware._deny_request('AccessDenied') + self.assertEqual(resp.body, s3_denied_req.body) + self.assertEqual(resp.status_int, s3_denied_req.status_int) + + def test_bogus_authorization(self): + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'badboy' + req.headers['X-Storage-Token'] = 'token' + resp = req.get_response(self.middleware) + self.assertEqual(resp.status_int, 400) + s3_invalid_req = self.middleware._deny_request('InvalidURI') + self.assertEqual(resp.body, s3_invalid_req.body) + self.assertEqual(resp.status_int, s3_invalid_req.status_int) + + def test_fail_to_connect_to_keystone(self): + with mock.patch.object(self.middleware, '_json_request') as o: + s3_invalid_req = self.middleware._deny_request('InvalidURI') + o.side_effect = s3_token.ServiceError(s3_invalid_req) + + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + resp = req.get_response(self.middleware) + self.assertEqual(resp.body, s3_invalid_req.body) + self.assertEqual(resp.status_int, s3_invalid_req.status_int) + + def test_bad_reply(self): + self.requests.post(self.TEST_URL, status_code=201, text="") + + req = webob.Request.blank('/v1/AUTH_cfa/c/o') + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + resp = req.get_response(self.middleware) + s3_invalid_req = self.middleware._deny_request('InvalidURI') + self.assertEqual(resp.body, s3_invalid_req.body) + self.assertEqual(resp.status_int, s3_invalid_req.status_int) + + +class S3TokenMiddlewareTestUtil(testtools.TestCase): + def test_split_path_failed(self): + self.assertRaises(ValueError, s3_token._split_path, '') + self.assertRaises(ValueError, s3_token._split_path, '/') + self.assertRaises(ValueError, s3_token._split_path, '//') + self.assertRaises(ValueError, s3_token._split_path, '//a') + self.assertRaises(ValueError, s3_token._split_path, '/a/c') + self.assertRaises(ValueError, s3_token._split_path, '//c') + self.assertRaises(ValueError, s3_token._split_path, '/a/c/') + self.assertRaises(ValueError, s3_token._split_path, '/a//') + self.assertRaises(ValueError, s3_token._split_path, '/a', 2) + self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3) + self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3, True) + self.assertRaises(ValueError, s3_token._split_path, '/a/c/o/r', 3, 3) + self.assertRaises(ValueError, s3_token._split_path, '/a', 5, 4) + + def test_split_path_success(self): + self.assertEqual(s3_token._split_path('/a'), ['a']) + self.assertEqual(s3_token._split_path('/a/'), ['a']) + self.assertEqual(s3_token._split_path('/a/c', 2), ['a', 'c']) + self.assertEqual(s3_token._split_path('/a/c/o', 3), ['a', 'c', 'o']) + self.assertEqual(s3_token._split_path('/a/c/o/r', 3, 3, True), + ['a', 'c', 'o/r']) + self.assertEqual(s3_token._split_path('/a/c', 2, 3, True), + ['a', 'c', None]) + self.assertEqual(s3_token._split_path('/a/c/', 2), ['a', 'c']) + self.assertEqual(s3_token._split_path('/a/c/', 2, 3), ['a', 'c', '']) + + def test_split_path_invalid_path(self): + try: + s3_token._split_path('o\nn e', 2) + except ValueError as err: + self.assertEqual(str(err), 'Invalid path: o%0An%20e') + try: + s3_token._split_path('o\nn e', 2, 3, True) + except ValueError as err: + self.assertEqual(str(err), 'Invalid path: o%0An%20e') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py new file mode 100644 index 00000000..da6f347a --- /dev/null +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py @@ -0,0 +1,138 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import sys +import time + +import fixtures +import mock +import requests +import testtools +import uuid + + +class TestCase(testtools.TestCase): + TEST_DOMAIN_ID = '1' + TEST_DOMAIN_NAME = 'aDomain' + TEST_GROUP_ID = uuid.uuid4().hex + TEST_ROLE_ID = uuid.uuid4().hex + TEST_TENANT_ID = '1' + TEST_TENANT_NAME = 'aTenant' + TEST_TOKEN = 'aToken' + TEST_TRUST_ID = 'aTrust' + TEST_USER = 'test' + TEST_USER_ID = uuid.uuid4().hex + + TEST_ROOT_URL = 'http://127.0.0.1:5000/' + + def setUp(self): + super(TestCase, self).setUp() + self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG)) + self.time_patcher = mock.patch.object(time, 'time', lambda: 1234) + self.time_patcher.start() + + def tearDown(self): + self.time_patcher.stop() + super(TestCase, self).tearDown() + + +if tuple(sys.version_info)[0:2] < (2, 7): + + def assertDictEqual(self, d1, d2, msg=None): + # Simple version taken from 2.7 + self.assertIsInstance(d1, dict, + 'First argument is not a dictionary') + self.assertIsInstance(d2, dict, + 'Second argument is not a dictionary') + if d1 != d2: + if msg: + self.fail(msg) + else: + standardMsg = '%r != %r' % (d1, d2) + self.fail(standardMsg) + + TestCase.assertDictEqual = assertDictEqual + + +class TestResponse(requests.Response): + """Class used to wrap requests.Response and provide some + convenience to initialize with a dict. + """ + + def __init__(self, data): + self._text = None + super(TestResponse, self).__init__() + if isinstance(data, dict): + self.status_code = data.get('status_code', 200) + headers = data.get('headers') + if headers: + self.headers.update(headers) + # Fake the text attribute to streamline Response creation + # _content is defined by requests.Response + self._content = data.get('text') + else: + self.status_code = data + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + @property + def text(self): + return self.content + + +class DisableModuleFixture(fixtures.Fixture): + """A fixture to provide support for unloading/disabling modules.""" + + def __init__(self, module, *args, **kw): + super(DisableModuleFixture, self).__init__(*args, **kw) + self.module = module + self._finders = [] + self._cleared_modules = {} + + def tearDown(self): + super(DisableModuleFixture, self).tearDown() + for finder in self._finders: + sys.meta_path.remove(finder) + sys.modules.update(self._cleared_modules) + + def clear_module(self): + cleared_modules = {} + for fullname in sys.modules.keys(): + if (fullname == self.module or + fullname.startswith(self.module + '.')): + cleared_modules[fullname] = sys.modules.pop(fullname) + return cleared_modules + + def setUp(self): + """Ensure ImportError for the specified module.""" + + super(DisableModuleFixture, self).setUp() + + # Clear 'module' references in sys.modules + self._cleared_modules.update(self.clear_module()) + + finder = NoModuleFinder(self.module) + self._finders.append(finder) + sys.meta_path.insert(0, finder) + + +class NoModuleFinder(object): + """Disallow further imports of 'module'.""" + + def __init__(self, module): + self.module = module + + def find_module(self, fullname, path): + if fullname == self.module or fullname.startswith(self.module + '.'): + raise ImportError -- cgit 1.2.3-korg