aboutsummaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/tests
diff options
context:
space:
mode:
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/__init__.py0
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py0
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py0
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py73
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py102
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py2634
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py202
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py118
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py97
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py253
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py104
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py137
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py201
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py37
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py452
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py560
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py86
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py268
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py150
19 files changed, 0 insertions, 5474 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/__init__.py b/keystonemiddleware-moon/keystonemiddleware/tests/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/__init__.py
+++ /dev/null
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/__init__.py
+++ /dev/null
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py
deleted file mode 100644
index e69de29b..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/__init__.py
+++ /dev/null
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py
deleted file mode 100644
index d76572a8..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/base.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-
-import fixtures
-from oslo_config import cfg
-from oslo_config import fixture as cfg_fixture
-from requests_mock.contrib import fixture as rm_fixture
-import six
-import webob.dec
-
-from keystonemiddleware import auth_token
-from keystonemiddleware.tests.unit import utils
-
-
-class BaseAuthTokenTestCase(utils.BaseTestCase):
-
- def setUp(self):
- super(BaseAuthTokenTestCase, self).setUp()
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
- self.logger = fixtures.FakeLogger(level=logging.DEBUG)
- self.cfg = self.useFixture(cfg_fixture.Config(conf=cfg.ConfigOpts()))
-
- def create_middleware(self, cb, conf=None, use_global_conf=False):
-
- @webob.dec.wsgify
- def _do_cb(req):
- return cb(req)
-
- if use_global_conf:
- opts = conf or {}
- else:
- opts = {
- 'oslo_config_project': 'keystonemiddleware',
- 'oslo_config_config': self.cfg.conf,
- }
- opts.update(conf or {})
-
- return auth_token.AuthProtocol(_do_cb, opts)
-
- def create_simple_middleware(self,
- status='200 OK',
- body='',
- headers=None,
- **kwargs):
- def cb(req):
- resp = webob.Response(body, status)
- resp.headers.update(headers or {})
- return resp
-
- return self.create_middleware(cb, **kwargs)
-
- @classmethod
- def call(cls, middleware, method='GET', path='/', headers=None):
- req = webob.Request.blank(path)
- req.method = method
-
- for k, v in six.iteritems(headers or {}):
- req.headers[k] = v
-
- resp = req.get_response(middleware)
- resp.request = req
- return resp
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py
deleted file mode 100644
index d6ebc9a0..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import uuid
-
-from keystoneclient import auth
-from keystoneclient import fixture
-from keystoneclient import session
-from requests_mock.contrib import fixture as rm_fixture
-import six
-
-from keystonemiddleware.auth_token import _auth
-from keystonemiddleware.tests.unit import utils
-
-
-class DefaultAuthPluginTests(utils.BaseTestCase):
-
- def new_plugin(self, auth_host=None, auth_port=None, auth_protocol=None,
- auth_admin_prefix=None, admin_user=None,
- admin_password=None, admin_tenant_name=None,
- admin_token=None, identity_uri=None, log=None):
- if not log:
- log = self.logger
-
- return _auth.AuthTokenPlugin.load_from_options(
- auth_host=auth_host,
- auth_port=auth_port,
- auth_protocol=auth_protocol,
- auth_admin_prefix=auth_admin_prefix,
- admin_user=admin_user,
- admin_password=admin_password,
- admin_tenant_name=admin_tenant_name,
- admin_token=admin_token,
- identity_uri=identity_uri,
- log=log)
-
- def setUp(self):
- super(DefaultAuthPluginTests, self).setUp()
-
- self.stream = six.StringIO()
- self.logger = logging.getLogger(__name__)
- self.session = session.Session()
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def test_auth_uri_from_fragments(self):
- auth_protocol = 'http'
- auth_host = 'testhost'
- auth_port = 8888
- auth_admin_prefix = 'admin'
-
- expected = '%s://%s:%d/admin' % (auth_protocol, auth_host, auth_port)
-
- plugin = self.new_plugin(auth_host=auth_host,
- auth_protocol=auth_protocol,
- auth_port=auth_port,
- auth_admin_prefix=auth_admin_prefix)
-
- self.assertEqual(expected,
- plugin.get_endpoint(self.session,
- interface=auth.AUTH_INTERFACE))
-
- def test_identity_uri_overrides_fragments(self):
- identity_uri = 'http://testhost:8888/admin'
- plugin = self.new_plugin(identity_uri=identity_uri,
- auth_host='anotherhost',
- auth_port=9999,
- auth_protocol='ftp')
-
- self.assertEqual(identity_uri,
- plugin.get_endpoint(self.session,
- interface=auth.AUTH_INTERFACE))
-
- def test_with_admin_token(self):
- token = uuid.uuid4().hex
- plugin = self.new_plugin(identity_uri='http://testhost:8888/admin',
- admin_token=token)
- self.assertEqual(token, plugin.get_token(self.session))
-
- def test_with_user_pass(self):
- base_uri = 'http://testhost:8888/admin'
- token = fixture.V2Token()
- admin_tenant_name = uuid.uuid4().hex
-
- self.requests_mock.post(base_uri + '/v2.0/tokens',
- json=token)
-
- plugin = self.new_plugin(identity_uri=base_uri,
- admin_user=uuid.uuid4().hex,
- admin_password=uuid.uuid4().hex,
- admin_tenant_name=admin_tenant_name)
-
- self.assertEqual(token.token_id, plugin.get_token(self.session))
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
deleted file mode 100644
index e6a495f4..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_auth_token_middleware.py
+++ /dev/null
@@ -1,2634 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import json
-import logging
-import os
-import shutil
-import stat
-import tempfile
-import time
-import uuid
-import warnings
-
-import fixtures
-from keystoneclient import auth
-from keystoneclient.common import cms
-from keystoneclient import exceptions
-from keystoneclient import fixture
-from keystoneclient import session
-import mock
-from oslo_config import cfg
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from oslotest import createfile
-import six
-import testresources
-import testtools
-from testtools import matchers
-import webob
-import webob.dec
-
-from keystonemiddleware import auth_token
-from keystonemiddleware.auth_token import _base
-from keystonemiddleware.auth_token import _exceptions as exc
-from keystonemiddleware.auth_token import _revocations
-from keystonemiddleware.openstack.common import memorycache
-from keystonemiddleware.tests.unit.auth_token import base
-from keystonemiddleware.tests.unit import client_fixtures
-from keystonemiddleware.tests.unit import utils
-
-
-EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_name1',
- 'HTTP_X_USER_ID': 'user_id1',
- 'HTTP_X_USER_NAME': 'user_name1',
- 'HTTP_X_ROLES': 'role1,role2',
- 'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
- 'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
-}
-
-EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE = {
- 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Confirmed',
- 'HTTP_X_SERVICE_PROJECT_ID': 'service_project_id1',
- 'HTTP_X_SERVICE_PROJECT_NAME': 'service_project_name1',
- 'HTTP_X_SERVICE_USER_ID': 'service_user_id1',
- 'HTTP_X_SERVICE_USER_NAME': 'service_user_name1',
- 'HTTP_X_SERVICE_ROLES': 'service_role1,service_role2',
-}
-
-EXPECTED_V3_DEFAULT_ENV_ADDITIONS = {
- 'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
- 'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_USER_DOMAIN_NAME': 'domain_name1',
-}
-
-EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS = {
- 'HTTP_X_SERVICE_PROJECT_DOMAIN_ID': 'service_domain_id1',
- 'HTTP_X_SERVICE_PROJECT_DOMAIN_NAME': 'service_domain_name1',
- 'HTTP_X_SERVICE_USER_DOMAIN_ID': 'service_domain_id1',
- 'HTTP_X_SERVICE_USER_DOMAIN_NAME': 'service_domain_name1'
-}
-
-
-BASE_HOST = 'https://keystone.example.com:1234'
-BASE_URI = '%s/testadmin' % BASE_HOST
-FAKE_ADMIN_TOKEN_ID = 'admin_token2'
-FAKE_ADMIN_TOKEN = jsonutils.dumps(
- {'access': {'token': {'id': FAKE_ADMIN_TOKEN_ID,
- 'expires': '2022-10-03T16:58:01Z'}}})
-
-VERSION_LIST_v3 = fixture.DiscoveryList(href=BASE_URI)
-VERSION_LIST_v2 = fixture.DiscoveryList(v3=False, href=BASE_URI)
-
-ERROR_TOKEN = '7ae290c2a06244c4b41692eb4e9225f2'
-MEMCACHED_SERVERS = ['localhost:11211']
-MEMCACHED_AVAILABLE = None
-
-
-def memcached_available():
- """Do a sanity check against memcached.
-
- Returns ``True`` if the following conditions are met (otherwise, returns
- ``False``):
-
- - ``python-memcached`` is installed
- - a usable ``memcached`` instance is available via ``MEMCACHED_SERVERS``
- - the client is able to set and get a key/value pair
-
- """
- global MEMCACHED_AVAILABLE
-
- if MEMCACHED_AVAILABLE is None:
- try:
- import memcache
- c = memcache.Client(MEMCACHED_SERVERS)
- c.set('ping', 'pong', time=1)
- MEMCACHED_AVAILABLE = c.get('ping') == 'pong'
- except ImportError:
- MEMCACHED_AVAILABLE = False
-
- return MEMCACHED_AVAILABLE
-
-
-def cleanup_revoked_file(filename):
- try:
- os.remove(filename)
- except OSError:
- pass
-
-
-def strtime(at=None):
- at = at or timeutils.utcnow()
- return at.strftime(timeutils.PERFECT_TIME_FORMAT)
-
-
-class TimezoneFixture(fixtures.Fixture):
- @staticmethod
- def supported():
- # tzset is only supported on Unix.
- return hasattr(time, 'tzset')
-
- def __init__(self, new_tz):
- super(TimezoneFixture, self).__init__()
- self.tz = new_tz
- self.old_tz = os.environ.get('TZ')
-
- def setUp(self):
- super(TimezoneFixture, self).setUp()
- if not self.supported():
- raise NotImplementedError('timezone override is not supported.')
- os.environ['TZ'] = self.tz
- time.tzset()
- self.addCleanup(self.cleanup)
-
- def cleanup(self):
- if self.old_tz is not None:
- os.environ['TZ'] = self.old_tz
- elif 'TZ' in os.environ:
- del os.environ['TZ']
- time.tzset()
-
-
-class TimeFixture(fixtures.Fixture):
-
- def __init__(self, new_time, normalize=True):
- super(TimeFixture, self).__init__()
- if isinstance(new_time, six.string_types):
- new_time = timeutils.parse_isotime(new_time)
- if normalize:
- new_time = timeutils.normalize_time(new_time)
- self.new_time = new_time
-
- def setUp(self):
- super(TimeFixture, self).setUp()
- timeutils.set_time_override(self.new_time)
- self.addCleanup(timeutils.clear_time_override)
-
-
-class FakeApp(object):
- """This represents a WSGI app protected by the auth_token middleware."""
-
- SUCCESS = b'SUCCESS'
- FORBIDDEN = b'FORBIDDEN'
- expected_env = {}
-
- def __init__(self, expected_env=None, need_service_token=False):
- self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
-
- if expected_env:
- self.expected_env.update(expected_env)
-
- self.need_service_token = need_service_token
-
- @webob.dec.wsgify
- def __call__(self, req):
- for k, v in self.expected_env.items():
- assert req.environ[k] == v, '%s != %s' % (req.environ[k], v)
-
- resp = webob.Response()
-
- if (req.environ.get('HTTP_X_IDENTITY_STATUS') == 'Invalid' and
- req.environ['HTTP_X_SERVICE_IDENTITY_STATUS'] == 'Invalid'):
- # Simulate delayed auth forbidding access with arbitrary status
- # code to differentiate checking this code path
- resp.status = 419
- resp.body = FakeApp.FORBIDDEN
- elif req.environ.get('HTTP_X_SERVICE_IDENTITY_STATUS') == 'Invalid':
- # Simulate delayed auth forbidding access with arbitrary status
- # code to differentiate checking this code path
- resp.status = 420
- resp.body = FakeApp.FORBIDDEN
- elif req.environ['HTTP_X_IDENTITY_STATUS'] == 'Invalid':
- # Simulate delayed auth forbidding access
- resp.status = 403
- resp.body = FakeApp.FORBIDDEN
- elif (self.need_service_token is True and
- req.environ.get('HTTP_X_SERVICE_TOKEN') is None):
- # Simulate requiring composite auth
- # Arbitrary value to allow checking this code path
- resp.status = 418
- resp.body = FakeApp.FORBIDDEN
- else:
- resp.body = FakeApp.SUCCESS
-
- return resp
-
-
-class v3FakeApp(FakeApp):
- """This represents a v3 WSGI app protected by the auth_token middleware."""
-
- def __init__(self, expected_env=None, need_service_token=False):
-
- # with v3 additions, these are for the DEFAULT TOKEN
- v3_default_env_additions = dict(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
- if expected_env:
- v3_default_env_additions.update(expected_env)
- super(v3FakeApp, self).__init__(expected_env=v3_default_env_additions,
- need_service_token=need_service_token)
-
-
-class CompositeBase(object):
- """Base composite auth object with common service token environment."""
-
- def __init__(self, expected_env=None):
- comp_expected_env = dict(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
-
- if expected_env:
- comp_expected_env.update(expected_env)
-
- super(CompositeBase, self).__init__(
- expected_env=comp_expected_env, need_service_token=True)
-
-
-class CompositeFakeApp(CompositeBase, FakeApp):
- """A fake v2 WSGI app protected by composite auth_token middleware."""
-
- def __init__(self, expected_env):
- super(CompositeFakeApp, self).__init__(expected_env=expected_env)
-
-
-class v3CompositeFakeApp(CompositeBase, v3FakeApp):
- """A fake v3 WSGI app protected by composite auth_token middleware."""
-
- def __init__(self, expected_env=None):
-
- # with v3 additions, these are for the DEFAULT SERVICE TOKEN
- v3_default_service_env_additions = dict(
- EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS)
-
- if expected_env:
- v3_default_service_env_additions.update(expected_env)
-
- super(v3CompositeFakeApp, self).__init__(
- v3_default_service_env_additions)
-
-
-class BaseAuthTokenMiddlewareTest(base.BaseAuthTokenTestCase):
- """Base test class for auth_token middleware.
-
- All the tests allow for running with auth_token
- configured for receiving v2 or v3 tokens, with the
- choice being made by passing configuration data into
- setUp().
-
- The base class will, by default, run all the tests
- expecting v2 token formats. Child classes can override
- this to specify, for instance, v3 format.
-
- """
- def setUp(self, expected_env=None, auth_version=None, fake_app=None):
- super(BaseAuthTokenMiddlewareTest, self).setUp()
-
- self.expected_env = expected_env or dict()
- self.fake_app = fake_app or FakeApp
- self.middleware = None
-
- signing_dir = self._setup_signing_directory()
-
- self.conf = {
- 'identity_uri': 'https://keystone.example.com:1234/testadmin/',
- 'signing_dir': signing_dir,
- 'auth_version': auth_version,
- 'auth_uri': 'https://keystone.example.com:1234',
- 'admin_user': uuid.uuid4().hex,
- }
-
- self.auth_version = auth_version
- self.response_status = None
- self.response_headers = None
-
- # NOTE(gyee): For this test suite and for the stable liberty branch
- # only, we will ignore deprecated calls that keystonemiddleware makes.
- warnings.filterwarnings('ignore', category=DeprecationWarning,
- module='^keystonemiddleware\\.')
-
- def call_middleware(self, **kwargs):
- return self.call(self.middleware, **kwargs)
-
- def _setup_signing_directory(self):
- directory_name = self.useFixture(fixtures.TempDir()).path
-
- # Copy the sample certificate files into the temporary directory.
- for filename in ['cacert.pem', 'signing_cert.pem', ]:
- shutil.copy2(os.path.join(client_fixtures.CERTDIR, filename),
- os.path.join(directory_name, filename))
-
- return directory_name
-
- def set_middleware(self, expected_env=None, conf=None):
- """Configure the class ready to call the auth_token middleware.
-
- Set up the various fake items needed to run the middleware.
- Individual tests that need to further refine these can call this
- function to override the class defaults.
-
- """
- if conf:
- self.conf.update(conf)
-
- if expected_env:
- self.expected_env.update(expected_env)
-
- self.middleware = auth_token.AuthProtocol(
- self.fake_app(self.expected_env), self.conf)
-
- self.middleware._revocations._list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
-
- def update_expected_env(self, expected_env={}):
- self.middleware._app.expected_env.update(expected_env)
-
- def purge_token_expected_env(self):
- for key in six.iterkeys(self.token_expected_env):
- del self.middleware._app.expected_env[key]
-
- def purge_service_token_expected_env(self):
- for key in six.iterkeys(self.service_token_expected_env):
- del self.middleware._app.expected_env[key]
-
- def assertLastPath(self, path):
- if path:
- self.assertEqual(BASE_URI + path,
- self.requests_mock.last_request.url)
- else:
- self.assertIsNone(self.requests_mock.last_request)
-
-
-class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- """Auth Token middleware should understand Diablo keystone responses."""
- def setUp(self):
- # pre-diablo only had Tenant ID, which was also the Name
- expected_env = {
- 'HTTP_X_TENANT_ID': 'tenant_id1',
- 'HTTP_X_TENANT_NAME': 'tenant_id1',
- # now deprecated (diablo-compat)
- 'HTTP_X_TENANT': 'tenant_id1',
- }
-
- super(DiabloAuthTokenMiddlewareTest, self).setUp(
- expected_env=expected_env)
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post("%s/v2.0/tokens" % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.token_id = self.examples.VALID_DIABLO_TOKEN
- token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
-
- url = "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id)
- self.requests_mock.get(url, text=token_response)
-
- self.set_middleware()
-
- def test_valid_diablo_response(self):
- resp = self.call_middleware(headers={'X-Auth-Token': self.token_id})
- self.assertEqual(200, resp.status_int)
- self.assertIn('keystone.token_info', resp.request.environ)
-
-
-class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
- """These tests will not have the memcache module available."""
-
- def setUp(self):
- super(NoMemcacheAuthToken, self).setUp()
- self.useFixture(utils.DisableModuleFixture('memcache'))
-
- def test_nomemcache(self):
- conf = {
- 'admin_token': 'admin_token1',
- 'auth_host': 'keystone.example.com',
- 'auth_port': '1234',
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'auth_uri': 'https://keystone.example.com:1234',
- }
-
- auth_token.AuthProtocol(FakeApp(), conf)
-
-
-class CachePoolTest(BaseAuthTokenMiddlewareTest):
- def test_use_cache_from_env(self):
- """If `swift.cache` is set in the environment and `cache` is set in the
- config then the env cache is used.
- """
- env = {'swift.cache': 'CACHE_TEST'}
- conf = {
- 'cache': 'swift.cache'
- }
- self.set_middleware(conf=conf)
- self.middleware._token_cache.initialize(env)
- with self.middleware._token_cache._cache_pool.reserve() as cache:
- self.assertEqual(cache, 'CACHE_TEST')
-
- def test_not_use_cache_from_env(self):
- """If `swift.cache` is set in the environment but `cache` isn't set in
- the config then the env cache isn't used.
- """
- self.set_middleware()
- env = {'swift.cache': 'CACHE_TEST'}
- self.middleware._token_cache.initialize(env)
- with self.middleware._token_cache._cache_pool.reserve() as cache:
- self.assertNotEqual(cache, 'CACHE_TEST')
-
- def test_multiple_context_managers_share_single_client(self):
- self.set_middleware()
- token_cache = self.middleware._token_cache
- env = {}
- token_cache.initialize(env)
-
- caches = []
-
- with token_cache._cache_pool.reserve() as cache:
- caches.append(cache)
-
- with token_cache._cache_pool.reserve() as cache:
- caches.append(cache)
-
- self.assertIs(caches[0], caches[1])
- self.assertEqual(set(caches), set(token_cache._cache_pool))
-
- def test_nested_context_managers_create_multiple_clients(self):
- self.set_middleware()
- env = {}
- self.middleware._token_cache.initialize(env)
- token_cache = self.middleware._token_cache
-
- with token_cache._cache_pool.reserve() as outer_cache:
- with token_cache._cache_pool.reserve() as inner_cache:
- self.assertNotEqual(outer_cache, inner_cache)
-
- self.assertEqual(
- set([inner_cache, outer_cache]),
- set(token_cache._cache_pool))
-
-
-class GeneralAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """These tests are not affected by the token format
- (see CommonAuthTokenMiddlewareTest).
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_token_is_v2_accepts_v2(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertTrue(auth_token._token_is_v2(token_response))
-
- def test_token_is_v2_rejects_v3(self):
- token = self.examples.v3_UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertFalse(auth_token._token_is_v2(token_response))
-
- def test_token_is_v3_rejects_v2(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertFalse(auth_token._token_is_v3(token_response))
-
- def test_token_is_v3_accepts_v3(self):
- token = self.examples.v3_UUID_TOKEN_DEFAULT
- token_response = self.examples.TOKEN_RESPONSES[token]
- self.assertTrue(auth_token._token_is_v3(token_response))
-
- def test_fixed_cache_key_length(self):
- self.set_middleware()
- short_string = uuid.uuid4().hex
- long_string = 8 * uuid.uuid4().hex
-
- token_cache = self.middleware._token_cache
- hashed_short_string_key, context_ = token_cache._get_cache_key(
- short_string)
- hashed_long_string_key, context_ = token_cache._get_cache_key(
- long_string)
-
- # The hash keys should always match in length
- self.assertThat(hashed_short_string_key,
- matchers.HasLength(len(hashed_long_string_key)))
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_encrypt_cache_data(self):
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'encrypt',
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = b'my_token'
- data = 'this_data'
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache.get(token), data)
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_sign_cache_data(self):
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'mac',
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = b'my_token'
- data = 'this_data'
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache.get(token), data)
-
- @testtools.skipUnless(memcached_available(), 'memcached not available')
- def test_no_memcache_protection(self):
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_secret_key': 'mysecret'
- }
- self.set_middleware(conf=conf)
- token = 'my_token'
- data = 'this_data'
- token_cache = self.middleware._token_cache
- token_cache.initialize({})
- token_cache._cache_store(token, data)
- self.assertEqual(token_cache.get(token), data)
-
- def test_assert_valid_memcache_protection_config(self):
- # test missing memcache_secret_key
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'Encrypt'
- }
- self.assertRaises(exc.ConfigurationError, self.set_middleware,
- conf=conf)
- # test invalue memcache_security_strategy
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'whatever'
- }
- self.assertRaises(exc.ConfigurationError, self.set_middleware,
- conf=conf)
- # test missing memcache_secret_key
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'mac'
- }
- self.assertRaises(exc.ConfigurationError, self.set_middleware,
- conf=conf)
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'Encrypt',
- 'memcache_secret_key': ''
- }
- self.assertRaises(exc.ConfigurationError, self.set_middleware,
- conf=conf)
- conf = {
- 'memcached_servers': ','.join(MEMCACHED_SERVERS),
- 'memcache_security_strategy': 'mAc',
- 'memcache_secret_key': ''
- }
- self.assertRaises(exc.ConfigurationError, self.set_middleware,
- conf=conf)
-
- def test_config_revocation_cache_timeout(self):
- conf = {
- 'revocation_cache_time': '24',
- 'auth_uri': 'https://keystone.example.com:1234',
- 'admin_user': uuid.uuid4().hex
- }
- middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(middleware._revocations._cache_timeout,
- datetime.timedelta(seconds=24))
-
- def test_conf_values_type_convert(self):
- conf = {
- 'revocation_cache_time': '24',
- 'identity_uri': 'https://keystone.example.com:1234',
- 'include_service_catalog': '0',
- 'nonexsit_option': '0',
- }
-
- middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(datetime.timedelta(seconds=24),
- middleware._revocations._cache_timeout)
- self.assertEqual(False, middleware._include_service_catalog)
- self.assertEqual('0', middleware._conf['nonexsit_option'])
-
- def test_deprecated_conf_values(self):
- conf = {
- 'memcache_servers': ','.join(MEMCACHED_SERVERS),
- }
-
- middleware = auth_token.AuthProtocol(self.fake_app, conf)
- self.assertEqual(MEMCACHED_SERVERS,
- middleware._conf_get('memcached_servers'))
-
- def test_conf_values_type_convert_with_wrong_value(self):
- conf = {
- 'include_service_catalog': '123',
- }
- self.assertRaises(exc.ConfigurationError,
- auth_token.AuthProtocol, self.fake_app, conf)
-
- def test_auth_region_name(self):
- token = fixture.V3Token()
-
- auth_url = 'http://keystone-auth.example.com:5000'
- east_url = 'http://keystone-east.example.com:5000'
- west_url = 'http://keystone-west.example.com:5000'
-
- auth_versions = fixture.DiscoveryList(href=auth_url)
- east_versions = fixture.DiscoveryList(href=east_url)
- west_versions = fixture.DiscoveryList(href=west_url)
-
- s = token.add_service('identity')
- s.add_endpoint(interface='admin', url=east_url, region='east')
- s.add_endpoint(interface='admin', url=west_url, region='west')
-
- self.requests_mock.get(auth_url, json=auth_versions)
- self.requests_mock.get(east_url, json=east_versions)
- self.requests_mock.get(west_url, json=west_versions)
-
- self.requests_mock.post(
- '%s/v3/auth/tokens' % auth_url,
- headers={'X-Subject-Token': uuid.uuid4().hex},
- json=token)
-
- east_mock = self.requests_mock.get(
- '%s/v3/auth/tokens' % east_url,
- headers={'X-Subject-Token': uuid.uuid4().hex},
- json=fixture.V3Token())
-
- west_mock = self.requests_mock.get(
- '%s/v3/auth/tokens' % west_url,
- headers={'X-Subject-Token': uuid.uuid4().hex},
- json=fixture.V3Token())
-
- conf = {'auth_uri': auth_url,
- 'auth_url': auth_url + '/v3',
- 'auth_plugin': 'v3password',
- 'username': 'user',
- 'password': 'pass'}
-
- self.assertEqual(0, east_mock.call_count)
- self.assertEqual(0, west_mock.call_count)
-
- east_app = self.create_simple_middleware(conf=dict(region_name='east',
- **conf))
- self.call(east_app, headers={'X-Auth-Token': uuid.uuid4().hex})
-
- self.assertEqual(1, east_mock.call_count)
- self.assertEqual(0, west_mock.call_count)
-
- west_app = self.create_simple_middleware(conf=dict(region_name='west',
- **conf))
-
- self.call(west_app, headers={'X-Auth-Token': uuid.uuid4().hex})
-
- self.assertEqual(1, east_mock.call_count)
- self.assertEqual(1, west_mock.call_count)
-
-
-class CommonAuthTokenMiddlewareTest(object):
- """These tests are run once using v2 tokens and again using v3 tokens."""
-
- def test_init_does_not_call_http(self):
- conf = {
- 'revocation_cache_time': '1'
- }
- self.create_simple_middleware(conf=conf)
- self.assertLastPath(None)
-
- def test_auth_with_no_token_does_not_call_http(self):
- middleware = self.create_simple_middleware()
- resp = self.call(middleware)
- self.assertLastPath(None)
- self.assertEqual(401, resp.status_int)
-
- def test_init_by_ipv6Addr_auth_host(self):
- del self.conf['identity_uri']
- conf = {
- 'auth_host': '2001:2013:1:f101::1',
- 'auth_port': '1234',
- 'auth_protocol': 'http',
- 'auth_uri': None,
- 'auth_version': 'v3.0',
- }
- middleware = self.create_simple_middleware(conf=conf)
- self.assertEqual('http://[2001:2013:1:f101::1]:1234',
- middleware._auth_uri)
-
- def assert_valid_request_200(self, token, with_catalog=True):
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
- if with_catalog:
- self.assertTrue(resp.request.headers.get('X-Service-Catalog'))
- else:
- self.assertNotIn('X-Service-Catalog', resp.request.headers)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
- self.assertIn('keystone.token_info', resp.request.environ)
- return resp.request
-
- def test_valid_uuid_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- token = self.token_dict['uuid_token_default']
- self.assert_valid_request_200(token)
- self.assert_valid_last_url(token)
-
- def test_valid_uuid_request_with_auth_fragments(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = '/testadmin'
- self.set_middleware()
- self.assert_valid_request_200(self.token_dict['uuid_token_default'])
- self.assert_valid_last_url(self.token_dict['uuid_token_default'])
-
- def _test_cache_revoked(self, token, revoked_form=None):
- # When the token is cached and revoked, 401 is returned.
- self.middleware._check_revocations_for_cached = True
-
- # Token should be cached as ok after this.
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
-
- # Put it in revocation list.
- self.middleware._revocations._list = self.get_revocation_list_json(
- token_ids=[revoked_form or token])
-
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
-
- def test_cached_revoked_error(self):
- # When the token is cached and revocation list retrieval fails,
- # 503 is returned
- token = self.token_dict['uuid_token_default']
- self.middleware._check_revocations_for_cached = True
-
- # Token should be cached as ok after this.
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
-
- # Cause the revocation list to be fetched again next time so we can
- # test the case where that retrieval fails
- self.middleware._revocations._fetched_time = datetime.datetime.min
- with mock.patch.object(self.middleware._revocations, '_fetch',
- side_effect=exc.RevocationListError):
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(503, resp.status_int)
-
- def test_unexpected_exception_in_validate_offline(self):
- # When an unexpected exception is hit during _validate_offline,
- # 500 is returned
- token = self.token_dict['uuid_token_default']
- with mock.patch.object(self.middleware, '_validate_offline',
- side_effect=Exception):
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(500, resp.status_int)
-
- def test_cached_revoked_uuid(self):
- # When the UUID token is cached and revoked, 401 is returned.
- self._test_cache_revoked(self.token_dict['uuid_token_default'])
-
- def test_valid_signed_request(self):
- for _ in range(2): # Do it twice because first result was cached.
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_valid_signed_compressed_request(self):
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
- # ensure that signed requests do not generate HTTP traffic
- self.assertLastPath(None)
-
- def test_revoked_token_receives_401(self):
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
-
- token = self.token_dict['revoked_token']
- resp = self.call_middleware(headers={'X-Auth-Token': token})
-
- self.assertEqual(401, resp.status_int)
-
- def test_revoked_token_receives_401_sha256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- self.middleware._revocations._list = (
- self.get_revocation_list_json(mode='sha256'))
-
- token = self.token_dict['revoked_token']
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
-
- def test_cached_revoked_pki(self):
- # When the PKI token is cached and revoked, 401 is returned.
- token = self.token_dict['signed_token_scoped']
- revoked_form = cms.cms_hash_token(token)
- self._test_cache_revoked(token, revoked_form)
-
- def test_cached_revoked_pkiz(self):
- # When the PKIZ token is cached and revoked, 401 is returned.
- token = self.token_dict['signed_token_scoped_pkiz']
- revoked_form = cms.cms_hash_token(token)
- self._test_cache_revoked(token, revoked_form)
-
- def test_revoked_token_receives_401_md5_secondary(self):
- # When hash_algorithms has 'md5' as the secondary hash and the
- # revocation list contains the md5 hash for a token, that token is
- # considered revoked so returns 401.
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
-
- token = self.token_dict['revoked_token']
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
-
- def _test_revoked_hashed_token(self, token_name):
- # If hash_algorithms is set as ['sha256', 'md5'],
- # and check_revocations_for_cached is True,
- # and a token is in the cache because it was successfully validated
- # using the md5 hash, then
- # if the token is in the revocation list by md5 hash, it'll be
- # rejected and auth_token returns 401.
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.conf['check_revocations_for_cached'] = 'true'
- self.set_middleware()
-
- token = self.token_dict[token_name]
-
- # Put the token in the revocation list.
- token_hashed = cms.cms_hash_token(token)
- self.middleware._revocations._list = self.get_revocation_list_json(
- token_ids=[token_hashed])
-
- # First, request is using the hashed token, is valid so goes in
- # cache using the given hash.
- resp = self.call_middleware(headers={'X-Auth-Token': token_hashed})
- self.assertEqual(200, resp.status_int)
-
- # This time use the PKI(Z) token
- resp = self.call_middleware(headers={'X-Auth-Token': token})
-
- # Should find the token in the cache and revocation list.
- self.assertEqual(401, resp.status_int)
-
- def test_revoked_hashed_pki_token(self):
- self._test_revoked_hashed_token('signed_token_scoped')
-
- def test_revoked_hashed_pkiz_token(self):
- self._test_revoked_hashed_token('signed_token_scoped_pkiz')
-
- def test_revoked_pki_token_by_audit_id(self):
- # When the audit ID is in the revocation list, the token is invalid.
- self.set_middleware()
- token = self.token_dict['signed_token_scoped']
-
- # Put the token audit ID in the revocation list,
- # the entry will have a false token ID so the token ID doesn't match.
- fake_token_id = uuid.uuid4().hex
- # The audit_id value is in examples/pki/cms/auth_*_token_scoped.json.
- audit_id = 'SLIXlXQUQZWUi9VJrqdXqA'
- revocation_list_data = {
- 'revoked': [
- {
- 'id': fake_token_id,
- 'audit_id': audit_id
- },
- ]
- }
- self.middleware._revocations._list = jsonutils.dumps(
- revocation_list_data)
-
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
-
- def get_revocation_list_json(self, token_ids=None, mode=None):
- if token_ids is None:
- key = 'revoked_token_hash' + (('_' + mode) if mode else '')
- token_ids = [self.token_dict[key]]
- revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
- for x in token_ids]}
- return jsonutils.dumps(revocation_list)
-
- def test_is_signed_token_revoked_returns_false(self):
- # explicitly setting an empty revocation list here to document intent
- self.middleware._revocations._list = jsonutils.dumps(
- {"revoked": [], "extra": "success"})
- result = self.middleware._revocations._any_revoked(
- [self.token_dict['revoked_token_hash']])
- self.assertFalse(result)
-
- def test_is_signed_token_revoked_returns_true(self):
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
- result = self.middleware._revocations._any_revoked(
- [self.token_dict['revoked_token_hash']])
- self.assertTrue(result)
-
- def test_is_signed_token_revoked_returns_true_sha256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- self.middleware._revocations._list = (
- self.get_revocation_list_json(mode='sha256'))
- result = self.middleware._revocations._any_revoked(
- [self.token_dict['revoked_token_hash_sha256']])
- self.assertTrue(result)
-
- def test_verify_signed_token_raises_exception_for_revoked_token(self):
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
- self.assertRaises(exc.InvalidToken,
- self.middleware._verify_signed_token,
- self.token_dict['revoked_token'],
- [self.token_dict['revoked_token_hash']])
-
- def test_verify_signed_token_raises_exception_for_revoked_token_s256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- self.middleware._revocations._list = (
- self.get_revocation_list_json(mode='sha256'))
- self.assertRaises(exc.InvalidToken,
- self.middleware._verify_signed_token,
- self.token_dict['revoked_token'],
- [self.token_dict['revoked_token_hash_sha256'],
- self.token_dict['revoked_token_hash']])
-
- def test_verify_signed_token_raises_exception_for_revoked_pkiz_token(self):
- self.middleware._revocations._list = (
- self.examples.REVOKED_TOKEN_PKIZ_LIST_JSON)
- self.assertRaises(exc.InvalidToken,
- self.middleware._verify_pkiz_token,
- self.token_dict['revoked_token_pkiz'],
- [self.token_dict['revoked_token_pkiz_hash']])
-
- def assertIsValidJSON(self, text):
- json.loads(text)
-
- def test_verify_signed_token_succeeds_for_unrevoked_token(self):
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
- text = self.middleware._verify_signed_token(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_verify_signed_compressed_token_succeeds_for_unrevoked_token(self):
- self.middleware._revocations._list = (
- self.get_revocation_list_json())
- text = self.middleware._verify_pkiz_token(
- self.token_dict['signed_token_scoped_pkiz'],
- [self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_verify_signed_token_succeeds_for_unrevoked_token_sha256(self):
- self.conf['hash_algorithms'] = ','.join(['sha256', 'md5'])
- self.set_middleware()
- self.middleware._revocations._list = (
- self.get_revocation_list_json(mode='sha256'))
- text = self.middleware._verify_signed_token(
- self.token_dict['signed_token_scoped'],
- [self.token_dict['signed_token_scoped_hash_sha256'],
- self.token_dict['signed_token_scoped_hash']])
- self.assertIsValidJSON(text)
-
- def test_get_token_revocation_list_fetched_time_returns_min(self):
- self.middleware._revocations._fetched_time = None
-
- # Get rid of the revoked file
- revoked_path = self.middleware._signing_directory.calc_path(
- _revocations.Revocations._FILE_NAME)
- os.remove(revoked_path)
-
- self.assertEqual(self.middleware._revocations._fetched_time,
- datetime.datetime.min)
-
- # FIXME(blk-u): move the unit tests into unit/test_auth_token.py
- def test_get_token_revocation_list_fetched_time_returns_mtime(self):
- self.middleware._revocations._fetched_time = None
- revoked_path = self.middleware._signing_directory.calc_path(
- _revocations.Revocations._FILE_NAME)
- mtime = os.path.getmtime(revoked_path)
- fetched_time = datetime.datetime.utcfromtimestamp(mtime)
- self.assertEqual(fetched_time,
- self.middleware._revocations._fetched_time)
-
- @testtools.skipUnless(TimezoneFixture.supported(),
- 'TimezoneFixture not supported')
- def test_get_token_revocation_list_fetched_time_returns_utc(self):
- with TimezoneFixture('UTC-1'):
- self.middleware._revocations._list = jsonutils.dumps(
- self.examples.REVOCATION_LIST)
- self.middleware._revocations._fetched_time = None
- fetched_time = self.middleware._revocations._fetched_time
- self.assertTrue(timeutils.is_soon(fetched_time, 1))
-
- def test_get_token_revocation_list_fetched_time_returns_value(self):
- expected = self.middleware._revocations._fetched_time
- self.assertEqual(self.middleware._revocations._fetched_time,
- expected)
-
- def test_get_revocation_list_returns_fetched_list(self):
- # auth_token uses v2 to fetch this, so don't allow the v3
- # tests to override the fake http connection
- self.middleware._revocations._fetched_time = None
-
- # Get rid of the revoked file
- revoked_path = self.middleware._signing_directory.calc_path(
- _revocations.Revocations._FILE_NAME)
- os.remove(revoked_path)
-
- self.assertEqual(self.middleware._revocations._list,
- self.examples.REVOCATION_LIST)
-
- def test_get_revocation_list_returns_current_list_from_memory(self):
- self.assertEqual(self.middleware._revocations._list,
- self.middleware._revocations._list_prop)
-
- def test_get_revocation_list_returns_current_list_from_disk(self):
- in_memory_list = self.middleware._revocations._list
- self.middleware._revocations._list_prop = None
- self.assertEqual(self.middleware._revocations._list,
- in_memory_list)
-
- def test_invalid_revocation_list_raises_error(self):
- self.requests_mock.get(self.revocation_url, json={})
- self.assertRaises(exc.RevocationListError,
- self.middleware._revocations._fetch)
-
- def test_fetch_revocation_list(self):
- # auth_token uses v2 to fetch this, so don't allow the v3
- # tests to override the fake http connection
- fetched = jsonutils.loads(self.middleware._revocations._fetch())
- self.assertEqual(fetched, self.examples.REVOCATION_LIST)
-
- def test_request_invalid_uuid_token(self):
- # remember because we are testing the middleware we stub the connection
- # to the keystone server, but this is not what gets returned
- invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests_mock.get(invalid_uri, status_code=404)
-
- resp = self.call_middleware(headers={'X-Auth-Token': 'invalid-token'})
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_request_invalid_signed_token(self):
- token = self.examples.INVALID_SIGNED_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_request_invalid_signed_pkiz_token(self):
- token = self.examples.INVALID_SIGNED_PKIZ_TOKEN
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_request_no_token(self):
- resp = self.call_middleware()
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_request_no_token_http(self):
- resp = self.call_middleware(method='HEAD')
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_request_blank_token(self):
- resp = self.call_middleware(headers={'X-Auth-Token': ''})
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def _get_cached_token(self, token, mode='md5'):
- token_id = cms.cms_hash_token(token, mode=mode)
- return self.middleware._token_cache.get(token_id)
-
- def test_memcache(self):
- token = self.token_dict['signed_token_scoped']
- self.call_middleware(headers={'X-Auth-Token': token})
- self.assertIsNotNone(self._get_cached_token(token))
-
- def test_expired(self):
- token = self.token_dict['signed_token_scoped_expired']
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
-
- def test_memcache_set_invalid_uuid(self):
- invalid_uri = "%s/v2.0/tokens/invalid-token" % BASE_URI
- self.requests_mock.get(invalid_uri, status_code=404)
-
- token = 'invalid-token'
- self.call_middleware(headers={'X-Auth-Token': token})
- self.assertRaises(exc.InvalidToken, self._get_cached_token, token)
-
- def test_memcache_set_expired(self, extra_conf={}, extra_environ={}):
- token_cache_time = 10
- conf = {
- 'token_cache_time': '%s' % token_cache_time,
- }
- conf.update(extra_conf)
- self.set_middleware(conf=conf)
-
- token = self.token_dict['signed_token_scoped']
- self.call_middleware(headers={'X-Auth-Token': token})
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
- req.environ.update(extra_environ)
-
- now = datetime.datetime.utcnow()
- self.useFixture(TimeFixture(now))
- req.get_response(self.middleware)
- self.assertIsNotNone(self._get_cached_token(token))
-
- timeutils.advance_time_seconds(token_cache_time)
- self.assertIsNone(self._get_cached_token(token))
-
- def test_swift_memcache_set_expired(self):
- extra_conf = {'cache': 'swift.cache'}
- extra_environ = {'swift.cache': memorycache.Client()}
- self.test_memcache_set_expired(extra_conf, extra_environ)
-
- def test_http_error_not_cached_token(self):
- """Test to don't cache token as invalid on network errors.
-
- We use UUID tokens since they are the easiest one to reach
- get_http_connection.
- """
- self.middleware._http_request_max_retries = 0
- self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN})
- self.assertIsNone(self._get_cached_token(ERROR_TOKEN))
- self.assert_valid_last_url(ERROR_TOKEN)
-
- def test_http_request_max_retries(self):
- times_retry = 10
-
- conf = {'http_request_max_retries': '%s' % times_retry}
- self.set_middleware(conf=conf)
-
- with mock.patch('time.sleep') as mock_obj:
- self.call_middleware(headers={'X-Auth-Token': ERROR_TOKEN})
-
- self.assertEqual(mock_obj.call_count, times_retry)
-
- def test_nocatalog(self):
- conf = {
- 'include_service_catalog': 'False'
- }
- self.set_middleware(conf=conf)
- self.assert_valid_request_200(self.token_dict['uuid_token_default'],
- with_catalog=False)
-
- def assert_kerberos_bind(self, token, bind_level,
- use_kerberos=True, success=True):
- conf = {
- 'enforce_token_bind': bind_level,
- 'auth_version': self.auth_version,
- }
- self.set_middleware(conf=conf)
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = token
-
- if use_kerberos:
- if use_kerberos is True:
- req.environ['REMOTE_USER'] = self.examples.KERBEROS_BIND
- else:
- req.environ['REMOTE_USER'] = use_kerberos
-
- req.environ['AUTH_TYPE'] = 'Negotiate'
-
- resp = req.get_response(self.middleware)
-
- if success:
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
- self.assertIn('keystone.token_info', req.environ)
- self.assert_valid_last_url(token)
- else:
- self.assertEqual(401, resp.status_int)
- msg = "Keystone uri='https://keystone.example.com:1234'"
- self.assertEqual(msg, resp.headers['WWW-Authenticate'])
-
- def test_uuid_bind_token_disabled_with_kerb_user(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='disabled',
- use_kerberos=use_kerberos,
- success=True)
-
- def test_uuid_bind_token_disabled_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_permissive_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='permissive',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_permissive_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='permissive',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_permissive_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='permissive',
- use_kerberos=use_kerberos,
- success=True)
-
- def test_uuid_bind_token_permissive_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_strict_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='strict',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_strict_with_kerbout_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='strict',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_strict_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='strict',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_required_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='required',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_required_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='required',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_required_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='required',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_required_without_bind(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_default'],
- bind_level='required',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos=True,
- success=True)
-
- def test_uuid_bind_token_named_kerberos_without_kerb_user(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos=False,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_unknown_bind(self):
- token = self.token_dict['uuid_token_unknown_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='kerberos',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_without_bind(self):
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(self.token_dict['uuid_token_default'],
- bind_level='kerberos',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_uuid_bind_token_named_kerberos_with_incorrect_ticket(self):
- self.assert_kerberos_bind(self.token_dict['uuid_token_bind'],
- bind_level='kerberos',
- use_kerberos='ronald@MCDONALDS.COM',
- success=False)
-
- def test_uuid_bind_token_with_unknown_named_FOO(self):
- token = self.token_dict['uuid_token_bind']
-
- for use_kerberos in [True, False]:
- self.assert_kerberos_bind(token,
- bind_level='FOO',
- use_kerberos=use_kerberos,
- success=False)
-
- def test_caching_token_on_verify(self):
- # When the token is cached it isn't cached again when it's verified.
-
- # The token cache has to be initialized with our cache instance.
- self.middleware._token_cache._env_cache_name = 'cache'
- cache = memorycache.Client()
- self.middleware._token_cache.initialize(env={'cache': cache})
-
- # Mock cache.set since then the test can verify call_count.
- orig_cache_set = cache.set
- cache.set = mock.Mock(side_effect=orig_cache_set)
-
- token = self.token_dict['signed_token_scoped']
-
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
-
- self.assertThat(1, matchers.Equals(cache.set.call_count))
-
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
-
- # Assert that the token wasn't cached again.
- self.assertThat(1, matchers.Equals(cache.set.call_count))
-
- def test_auth_plugin(self):
-
- for service_url in (self.examples.UNVERSIONED_SERVICE_URL,
- self.examples.SERVICE_URL):
- self.requests_mock.get(service_url,
- json=VERSION_LIST_v3,
- status_code=300)
-
- token = self.token_dict['uuid_token_default']
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- token_auth = resp.request.environ['keystone.token_auth']
- endpoint_filter = {'service_type': self.examples.SERVICE_TYPE,
- 'version': 3}
-
- url = token_auth.get_endpoint(session.Session(), **endpoint_filter)
- self.assertEqual('%s/v3' % BASE_URI, url)
-
- self.assertTrue(token_auth.has_user_token)
- self.assertFalse(token_auth.has_service_token)
- self.assertIsNone(token_auth.service)
-
- def test_doesnt_auto_set_content_type(self):
- # webob will set content_type = 'text/html' by default if nothing is
- # provided. We don't want our middleware messing with the content type
- # of the underlying applications.
-
- text = uuid.uuid4().hex
-
- def _middleware(environ, start_response):
- start_response(200, [])
- return text
-
- def _start_response(status_code, headerlist, exc_info=None):
- self.assertIn('200', status_code) # will be '200 OK'
- self.assertEqual([], headerlist)
-
- m = auth_token.AuthProtocol(_middleware, self.conf)
-
- env = {'REQUEST_METHOD': 'GET',
- 'HTTP_X_AUTH_TOKEN': self.token_dict['uuid_token_default']}
-
- r = m(env, _start_response)
- self.assertEqual(text, r)
-
-
-class V2CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def __init__(self, *args, **kwargs):
- super(V2CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v2.0'
- self.fake_app = None
- self.ca_path = '/v2.0/certificates/ca'
- self.signing_path = '/v2.0/certificates/signing'
-
- def setUp(self):
- super(V2CertDownloadMiddlewareTest, self).setUp(
- auth_version=self.auth_version,
- fake_app=self.fake_app)
- self.base_dir = tempfile.mkdtemp()
- self.addCleanup(shutil.rmtree, self.base_dir)
- self.cert_dir = os.path.join(self.base_dir, 'certs')
- os.makedirs(self.cert_dir, stat.S_IRWXU)
- conf = {
- 'signing_dir': self.cert_dir,
- 'auth_version': self.auth_version,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- self.set_middleware(conf=conf)
-
- # Usually we supply a signed_dir with pre-installed certificates,
- # so invocation of /usr/bin/openssl succeeds. This time we give it
- # an empty directory, so it fails.
- def test_request_no_token_dummy(self):
- cms._ensure_subprocess()
-
- self.requests_mock.get('%s%s' % (BASE_URI, self.ca_path),
- status_code=404)
- self.requests_mock.get('%s%s' % (BASE_URI, self.signing_path),
- status_code=404)
- self.assertRaises(exceptions.CertificateConfigError,
- self.middleware._verify_signed_token,
- self.examples.SIGNED_TOKEN_SCOPED,
- [self.examples.SIGNED_TOKEN_SCOPED_HASH])
-
- def test_fetch_signing_cert(self):
- data = 'FAKE CERT'
- url = "%s%s" % (BASE_URI, self.signing_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_signing_cert()
-
- signing_cert_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CERT_FILE_NAME)
- with open(signing_cert_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_fetch_signing_ca(self):
- data = 'FAKE CA'
- url = "%s%s" % (BASE_URI, self.ca_path)
- self.requests_mock.get(url, text=data)
- self.middleware._fetch_ca_cert()
-
- ca_file_path = self.middleware._signing_directory.calc_path(
- self.middleware._SIGNING_CA_FILE_NAME)
- with open(ca_file_path, 'r') as f:
- self.assertEqual(f.read(), data)
-
- self.assertEqual(url, self.requests_mock.last_request.url)
-
- def test_prefix_trailing_slash(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = '/newadmin/'
-
- base_url = '%s/newadmin' % BASE_HOST
- ca_url = "%s%s" % (base_url, self.ca_path)
- signing_url = "%s%s" % (base_url, self.signing_path)
-
- self.requests_mock.get(base_url,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
- def test_without_prefix(self):
- del self.conf['identity_uri']
- self.conf['auth_protocol'] = 'https'
- self.conf['auth_host'] = 'keystone.example.com'
- self.conf['auth_port'] = '1234'
- self.conf['auth_admin_prefix'] = ''
-
- ca_url = "%s%s" % (BASE_HOST, self.ca_path)
- signing_url = "%s%s" % (BASE_HOST, self.signing_path)
-
- self.requests_mock.get(BASE_HOST,
- json=VERSION_LIST_v3,
- status_code=300)
- self.requests_mock.get(ca_url, text='FAKECA')
- self.requests_mock.get(signing_url, text='FAKECERT')
-
- self.set_middleware(conf=self.conf)
-
- self.middleware._fetch_ca_cert()
- self.assertEqual(ca_url, self.requests_mock.last_request.url)
-
- self.middleware._fetch_signing_cert()
- self.assertEqual(signing_url, self.requests_mock.last_request.url)
-
-
-class V3CertDownloadMiddlewareTest(V2CertDownloadMiddlewareTest):
-
- def __init__(self, *args, **kwargs):
- super(V3CertDownloadMiddlewareTest, self).__init__(*args, **kwargs)
- self.auth_version = 'v3.0'
- self.fake_app = v3FakeApp
- self.ca_path = '/v3/OS-SIMPLE-CERT/ca'
- self.signing_path = '/v3/OS-SIMPLE-CERT/certificates'
-
-
-def network_error_response(request, context):
- raise exceptions.ConnectionRefused("Network connection refused.")
-
-
-class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- CommonAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """v2 token specific tests.
-
- There are some differences between how the auth-token middleware handles
- v2 and v3 tokens over and above the token formats, namely:
-
- - A v3 keystone server will auto scope a token to a user's default project
- if no scope is specified. A v2 server assumes that the auth-token
- middleware will do that.
- - A v2 keystone server may issue a token without a catalog, even with a
- tenant
-
- The tests below were originally part of the generic AuthTokenMiddlewareTest
- class, but now, since they really are v2 specific, they are included here.
-
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v2AuthTokenMiddlewareTest, self).setUp()
-
- self.token_dict = {
- 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT,
- 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
- 'uuid_token_bind': self.examples.UUID_TOKEN_BIND,
- 'uuid_token_unknown_bind': self.examples.UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz': self.examples.SIGNED_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash': self.examples.SIGNED_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
- 'revoked_token': self.examples.REVOKED_TOKEN,
- 'revoked_token_pkiz': self.examples.REVOKED_TOKEN_PKIZ,
- 'revoked_token_pkiz_hash':
- self.examples.REVOKED_TOKEN_PKIZ_HASH,
- 'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH,
- 'revoked_token_hash_sha256':
- self.examples.REVOKED_TOKEN_HASH_SHA256,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.revocation_url = '%s/v2.0/tokens/revoked' % BASE_URI
- self.requests_mock.get(self.revocation_url,
- text=self.examples.SIGNED_REVOCATION_LIST)
-
- for token in (self.examples.UUID_TOKEN_DEFAULT,
- self.examples.UUID_TOKEN_UNSCOPED,
- self.examples.UUID_TOKEN_BIND,
- self.examples.UUID_TOKEN_UNKNOWN_BIND,
- self.examples.UUID_TOKEN_NO_SERVICE_CATALOG,
- self.examples.SIGNED_TOKEN_SCOPED_KEY,
- self.examples.SIGNED_TOKEN_SCOPED_PKIZ_KEY,):
- url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get(url, text=text)
-
- url = '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN)
- self.requests_mock.get(url, text=network_error_response)
-
- self.set_middleware()
-
- def assert_unscoped_default_tenant_auto_scopes(self, token):
- """Unscoped v2 requests with a default tenant should "auto-scope."
-
- The implied scope is the user's tenant ID.
-
- """
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
- self.assertIn('keystone.token_info', resp.request.environ)
-
- def assert_valid_last_url(self, token_id):
- self.assertLastPath("/v2.0/tokens/%s" % token_id)
-
- def test_default_tenant_uuid_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.UUID_TOKEN_DEFAULT)
-
- def test_default_tenant_signed_token(self):
- self.assert_unscoped_default_tenant_auto_scopes(
- self.examples.SIGNED_TOKEN_SCOPED)
-
- def assert_unscoped_token_receives_401(self, token):
- """Unscoped requests with no default tenant ID should be rejected."""
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='https://keystone.example.com:1234'",
- resp.headers['WWW-Authenticate'])
-
- def test_unscoped_uuid_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.UUID_TOKEN_UNSCOPED)
-
- def test_unscoped_pki_token_receives_401(self):
- self.assert_unscoped_token_receives_401(
- self.examples.SIGNED_TOKEN_UNSCOPED)
-
- def test_request_prevent_service_catalog_injection(self):
- token = self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
- resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
- 'X-Auth-Token': token})
-
- self.assertEqual(200, resp.status_int)
- self.assertFalse(resp.request.headers.get('X-Service-Catalog'))
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- def test_user_plugin_token_properties(self):
- token = self.examples.UUID_TOKEN_DEFAULT
- token_data = self.examples.TOKEN_RESPONSES[token]
-
- resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
- 'X-Auth-Token': token,
- 'X-Service-Token': token})
-
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- token_auth = resp.request.environ['keystone.token_auth']
-
- self.assertTrue(token_auth.has_user_token)
- self.assertTrue(token_auth.has_service_token)
-
- for t in [token_auth.user, token_auth.service]:
- self.assertEqual(token_data.user_id, t.user_id)
- self.assertEqual(token_data.tenant_id, t.project_id)
-
- self.assertThat(t.role_names, matchers.HasLength(2))
- self.assertIn('role1', t.role_names)
- self.assertIn('role2', t.role_names)
-
- self.assertIsNone(t.trust_id)
- self.assertIsNone(t.user_domain_id)
- self.assertIsNone(t.project_domain_id)
-
-
-class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def test_valid_uuid_request_forced_to_2_0(self):
- """Test forcing auth_token to use lower api version.
-
- By installing the v3 http hander, auth_token will be get
- a version list that looks like a v3 server - from which it
- would normally chose v3.0 as the auth version. However, here
- we specify v2.0 in the configuration - which should force
- auth_token to use that version instead.
-
- """
- conf = {
- 'auth_version': 'v2.0'
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- token = self.examples.UUID_TOKEN_DEFAULT
- url = "%s/v2.0/tokens/%s" % (BASE_URI, token)
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get(url, text=text)
-
- self.set_middleware(conf=conf)
-
- # This tests will only work is auth_token has chosen to use the
- # lower, v2, api version
- resp = self.call_middleware(headers={'X-Auth-Token': token})
- self.assertEqual(200, resp.status_int)
- self.assertEqual(url, self.requests_mock.last_request.url)
-
-
-class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
- CommonAuthTokenMiddlewareTest,
- testresources.ResourcedTestCase):
- """Test auth_token middleware with v3 tokens.
-
- Re-execute the AuthTokenMiddlewareTest class tests, but with the
- auth_token middleware configured to expect v3 tokens back from
- a keystone server.
-
- This is done by configuring the AuthTokenMiddlewareTest class via
- its Setup(), passing in v3 style data that will then be used by
- the tests themselves. This approach has been used to ensure we
- really are running the same tests for both v2 and v3 tokens.
-
- There a few additional specific test for v3 only:
-
- - We allow an unscoped token to be validated (as unscoped), where
- as for v2 tokens, the auth_token middleware is expected to try and
- auto-scope it (and fail if there is no default tenant)
- - Domain scoped tokens
-
- Since we don't specify an auth version for auth_token to use, by
- definition we are thefore implicitely testing that it will use
- the highest available auth version, i.e. v3.0
-
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v3AuthTokenMiddlewareTest, self).setUp(
- auth_version='v3.0',
- fake_app=v3FakeApp)
-
- self.token_dict = {
- 'uuid_token_default': self.examples.v3_UUID_TOKEN_DEFAULT,
- 'uuid_token_unscoped': self.examples.v3_UUID_TOKEN_UNSCOPED,
- 'uuid_token_bind': self.examples.v3_UUID_TOKEN_BIND,
- 'uuid_token_unknown_bind':
- self.examples.v3_UUID_TOKEN_UNKNOWN_BIND,
- 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
- 'signed_token_scoped_pkiz':
- self.examples.SIGNED_v3_TOKEN_SCOPED_PKIZ,
- 'signed_token_scoped_hash':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH,
- 'signed_token_scoped_hash_sha256':
- self.examples.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256,
- 'signed_token_scoped_expired':
- self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
- 'revoked_token': self.examples.REVOKED_v3_TOKEN,
- 'revoked_token_pkiz': self.examples.REVOKED_v3_TOKEN_PKIZ,
- 'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH,
- 'revoked_token_hash_sha256':
- self.examples.REVOKED_v3_TOKEN_HASH_SHA256,
- 'revoked_token_pkiz_hash':
- self.examples.REVOKED_v3_PKIZ_TOKEN_HASH,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v3,
- status_code=300)
-
- # TODO(jamielennox): auth_token middleware uses a v2 admin token
- # regardless of the auth_version that is set.
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.revocation_url = '%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI
- self.requests_mock.get(self.revocation_url,
- text=self.examples.SIGNED_REVOCATION_LIST)
-
- self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
- text=self.token_response,
- headers={'X-Subject-Token': uuid.uuid4().hex})
-
- self.set_middleware()
-
- def token_response(self, request, context):
- auth_id = request.headers.get('X-Auth-Token')
- token_id = request.headers.get('X-Subject-Token')
- self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
-
- if token_id == ERROR_TOKEN:
- raise exceptions.ConnectionRefused("Network connection refused.")
-
- try:
- response = self.examples.JSON_TOKEN_RESPONSES[token_id]
- except KeyError:
- response = ""
- context.status_code = 404
-
- return response
-
- def assert_valid_last_url(self, token_id):
- self.assertLastPath('/v3/auth/tokens')
-
- def test_valid_unscoped_uuid_request(self):
- # Remove items that won't be in an unscoped token
- delta_expected_env = {
- 'HTTP_X_PROJECT_ID': None,
- 'HTTP_X_PROJECT_NAME': None,
- 'HTTP_X_PROJECT_DOMAIN_ID': None,
- 'HTTP_X_PROJECT_DOMAIN_NAME': None,
- 'HTTP_X_TENANT_ID': None,
- 'HTTP_X_TENANT_NAME': None,
- 'HTTP_X_ROLES': '',
- 'HTTP_X_TENANT': None,
- 'HTTP_X_ROLE': '',
- }
- self.set_middleware(expected_env=delta_expected_env)
- self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED,
- with_catalog=False)
- self.assertLastPath('/v3/auth/tokens')
-
- def test_domain_scoped_uuid_request(self):
- # Modify items compared to default token for a domain scope
- delta_expected_env = {
- 'HTTP_X_DOMAIN_ID': 'domain_id1',
- 'HTTP_X_DOMAIN_NAME': 'domain_name1',
- 'HTTP_X_PROJECT_ID': None,
- 'HTTP_X_PROJECT_NAME': None,
- 'HTTP_X_PROJECT_DOMAIN_ID': None,
- 'HTTP_X_PROJECT_DOMAIN_NAME': None,
- 'HTTP_X_TENANT_ID': None,
- 'HTTP_X_TENANT_NAME': None,
- 'HTTP_X_TENANT': None
- }
- self.set_middleware(expected_env=delta_expected_env)
- self.assert_valid_request_200(
- self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
- self.assertLastPath('/v3/auth/tokens')
-
- def test_gives_v2_catalog(self):
- self.set_middleware()
- req = self.assert_valid_request_200(
- self.examples.SIGNED_v3_TOKEN_SCOPED)
-
- catalog = jsonutils.loads(req.headers['X-Service-Catalog'])
-
- for service in catalog:
- for endpoint in service['endpoints']:
- # no point checking everything, just that it's in v2 format
- self.assertIn('adminURL', endpoint)
- self.assertIn('publicURL', endpoint)
- self.assertIn('adminURL', endpoint)
-
- def test_fallback_to_online_validation_with_signing_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/certificates' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_ca_error(self):
- self.requests_mock.get('%s/v3/OS-SIMPLE-CERT/ca' % BASE_URI,
- status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_fallback_to_online_validation_with_revocation_list_error(self):
- self.requests_mock.get(self.revocation_url, status_code=404)
- self.assert_valid_request_200(self.token_dict['signed_token_scoped'])
- self.assert_valid_request_200(
- self.token_dict['signed_token_scoped_pkiz'])
-
- def test_user_plugin_token_properties(self):
- token = self.examples.v3_UUID_TOKEN_DEFAULT
- token_data = self.examples.TOKEN_RESPONSES[token]
-
- resp = self.call_middleware(headers={'X-Service-Catalog': '[]',
- 'X-Auth-Token': token,
- 'X-Service-Token': token})
-
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- token_auth = resp.request.environ['keystone.token_auth']
-
- self.assertTrue(token_auth.has_user_token)
- self.assertTrue(token_auth.has_service_token)
-
- for t in [token_auth.user, token_auth.service]:
- self.assertEqual(token_data.user_id, t.user_id)
- self.assertEqual(token_data.project_id, t.project_id)
- self.assertEqual(token_data.user_domain_id, t.user_domain_id)
- self.assertEqual(token_data.project_domain_id, t.project_domain_id)
-
- self.assertThat(t.role_names, matchers.HasLength(2))
- self.assertIn('role1', t.role_names)
- self.assertIn('role2', t.role_names)
-
- self.assertIsNone(t.trust_id)
-
- def test_expire_stored_in_cache(self):
- # tests the upgrade path from storing a tuple vs just the data in the
- # cache. Can be removed in the future.
- token = 'mytoken'
- data = 'this_data'
- self.set_middleware()
- self.middleware._token_cache.initialize({})
- now = datetime.datetime.utcnow()
- delta = datetime.timedelta(hours=1)
- expires = strtime(at=(now + delta))
- self.middleware._token_cache.store(token, (data, expires))
- self.assertEqual(self.middleware._token_cache.get(token), data)
-
-
-class DelayedAuthTests(BaseAuthTokenMiddlewareTest):
-
- def test_header_in_401(self):
- body = uuid.uuid4().hex
- auth_uri = 'http://local.test'
- conf = {'delay_auth_decision': 'True',
- 'auth_version': 'v3.0',
- 'auth_uri': auth_uri}
-
- middleware = self.create_simple_middleware(status='401 Unauthorized',
- body=body,
- conf=conf)
- resp = self.call(middleware)
- self.assertEqual(six.b(body), resp.body)
-
- self.assertEqual(401, resp.status_int)
- self.assertEqual("Keystone uri='%s'" % auth_uri,
- resp.headers['WWW-Authenticate'])
-
- def test_delayed_auth_values(self):
- conf = {'auth_uri': 'http://local.test'}
- status = '401 Unauthorized'
-
- middleware = self.create_simple_middleware(status=status, conf=conf)
- self.assertFalse(middleware._delay_auth_decision)
-
- for v in ('True', '1', 'on', 'yes'):
- conf = {'delay_auth_decision': v,
- 'auth_uri': 'http://local.test'}
-
- middleware = self.create_simple_middleware(status=status,
- conf=conf)
- self.assertTrue(middleware._delay_auth_decision)
-
- for v in ('False', '0', 'no'):
- conf = {'delay_auth_decision': v,
- 'auth_uri': 'http://local.test'}
-
- middleware = self.create_simple_middleware(status=status,
- conf=conf)
- self.assertFalse(middleware._delay_auth_decision)
-
- def test_auth_plugin_with_no_tokens(self):
- body = uuid.uuid4().hex
- auth_uri = 'http://local.test'
- conf = {'delay_auth_decision': True, 'auth_uri': auth_uri}
-
- middleware = self.create_simple_middleware(body=body, conf=conf)
- resp = self.call(middleware)
- self.assertEqual(six.b(body), resp.body)
-
- token_auth = resp.request.environ['keystone.token_auth']
-
- self.assertFalse(token_auth.has_user_token)
- self.assertIsNone(token_auth.user)
- self.assertFalse(token_auth.has_service_token)
- self.assertIsNone(token_auth.service)
-
-
-class CommonCompositeAuthTests(object):
- """Test Composite authentication.
-
- Test the behaviour of adding a service-token.
- """
-
- def test_composite_auth_ok(self):
- token = self.token_dict['uuid_token_default']
- service_token = self.token_dict['uuid_service_token_default']
- fake_logger = fixtures.FakeLogger(level=logging.DEBUG)
- self.middleware.logger = self.useFixture(fake_logger)
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
- expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
- expected_env.update(EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
-
- # role list may get reordered, check for string pieces individually
- self.assertIn('Received request from user: ', fake_logger.output)
- self.assertIn('user_id %(HTTP_X_USER_ID)s, '
- 'project_id %(HTTP_X_TENANT_ID)s, '
- 'roles ' % expected_env, fake_logger.output)
- self.assertIn('service: user_id %(HTTP_X_SERVICE_USER_ID)s, '
- 'project_id %(HTTP_X_SERVICE_PROJECT_ID)s, '
- 'roles ' % expected_env, fake_logger.output)
-
- roles = ','.join([expected_env['HTTP_X_SERVICE_ROLES'],
- expected_env['HTTP_X_ROLES']])
-
- for r in roles.split(','):
- self.assertIn(r, fake_logger.output)
-
- def test_composite_auth_invalid_service_token(self):
- token = self.token_dict['uuid_token_default']
- service_token = 'invalid-service-token'
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual(b'Authentication required', resp.body)
-
- def test_composite_auth_no_service_token(self):
- self.purge_service_token_expected_env()
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
-
- # Ensure injection of service headers is not possible
- for key, value in six.iteritems(self.service_token_expected_env):
- header_key = key[len('HTTP_'):].replace('_', '-')
- req.headers[header_key] = value
- # Check arbitrary headers not removed
- req.headers['X-Foo'] = 'Bar'
- resp = req.get_response(self.middleware)
- for key in six.iterkeys(self.service_token_expected_env):
- header_key = key[len('HTTP_'):].replace('_', '-')
- self.assertFalse(req.headers.get(header_key))
- self.assertEqual('Bar', req.headers.get('X-Foo'))
- self.assertEqual(418, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
- def test_composite_auth_invalid_user_token(self):
- token = 'invalid-token'
- service_token = self.token_dict['uuid_service_token_default']
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual(b'Authentication required', resp.body)
-
- def test_composite_auth_no_user_token(self):
- service_token = self.token_dict['uuid_service_token_default']
- resp = self.call_middleware(headers={'X-Service-Token': service_token})
- self.assertEqual(401, resp.status_int)
- self.assertEqual(b'Authentication required', resp.body)
-
- def test_composite_auth_delay_ok(self):
- self.middleware._delay_auth_decision = True
- token = self.token_dict['uuid_token_default']
- service_token = self.token_dict['uuid_service_token_default']
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(200, resp.status_int)
- self.assertEqual(FakeApp.SUCCESS, resp.body)
-
- def test_composite_auth_delay_invalid_service_token(self):
- self.middleware._delay_auth_decision = True
- self.purge_service_token_expected_env()
- expected_env = {
- 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Invalid',
- }
- self.update_expected_env(expected_env)
-
- token = self.token_dict['uuid_token_default']
- service_token = 'invalid-service-token'
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(420, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
- def test_composite_auth_delay_invalid_service_and_user_tokens(self):
- self.middleware._delay_auth_decision = True
- self.purge_service_token_expected_env()
- self.purge_token_expected_env()
- expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Invalid',
- 'HTTP_X_SERVICE_IDENTITY_STATUS': 'Invalid',
- }
- self.update_expected_env(expected_env)
-
- token = 'invalid-token'
- service_token = 'invalid-service-token'
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(419, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
- def test_composite_auth_delay_no_service_token(self):
- self.middleware._delay_auth_decision = True
- self.purge_service_token_expected_env()
-
- req = webob.Request.blank('/')
- req.headers['X-Auth-Token'] = self.token_dict['uuid_token_default']
-
- # Ensure injection of service headers is not possible
- for key, value in six.iteritems(self.service_token_expected_env):
- header_key = key[len('HTTP_'):].replace('_', '-')
- req.headers[header_key] = value
- # Check arbitrary headers not removed
- req.headers['X-Foo'] = 'Bar'
- resp = req.get_response(self.middleware)
- for key in six.iterkeys(self.service_token_expected_env):
- header_key = key[len('HTTP_'):].replace('_', '-')
- self.assertFalse(req.headers.get(header_key))
- self.assertEqual('Bar', req.headers.get('X-Foo'))
- self.assertEqual(418, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
- def test_composite_auth_delay_invalid_user_token(self):
- self.middleware._delay_auth_decision = True
- self.purge_token_expected_env()
- expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Invalid',
- }
- self.update_expected_env(expected_env)
-
- token = 'invalid-token'
- service_token = self.token_dict['uuid_service_token_default']
- resp = self.call_middleware(headers={'X-Auth-Token': token,
- 'X-Service-Token': service_token})
- self.assertEqual(403, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
- def test_composite_auth_delay_no_user_token(self):
- self.middleware._delay_auth_decision = True
- self.purge_token_expected_env()
- expected_env = {
- 'HTTP_X_IDENTITY_STATUS': 'Invalid',
- }
- self.update_expected_env(expected_env)
-
- service_token = self.token_dict['uuid_service_token_default']
- resp = self.call_middleware(headers={'X-Service-Token': service_token})
- self.assertEqual(403, resp.status_int)
- self.assertEqual(FakeApp.FORBIDDEN, resp.body)
-
-
-class v2CompositeAuthTests(BaseAuthTokenMiddlewareTest,
- CommonCompositeAuthTests,
- testresources.ResourcedTestCase):
- """Test auth_token middleware with v2 token based composite auth.
-
- Execute the Composite auth class tests, but with the
- auth_token middleware configured to expect v2 tokens back from
- a keystone server.
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v2CompositeAuthTests, self).setUp(
- expected_env=EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE,
- fake_app=CompositeFakeApp)
-
- uuid_token_default = self.examples.UUID_TOKEN_DEFAULT
- uuid_service_token_default = self.examples.UUID_SERVICE_TOKEN_DEFAULT
- self.token_dict = {
- 'uuid_token_default': uuid_token_default,
- 'uuid_service_token_default': uuid_service_token_default,
- }
-
- self.requests_mock.get(BASE_URI,
- json=VERSION_LIST_v2,
- status_code=300)
-
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.requests_mock.get('%s/v2.0/tokens/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST,
- status_code=200)
-
- for token in (self.examples.UUID_TOKEN_DEFAULT,
- self.examples.UUID_SERVICE_TOKEN_DEFAULT,):
- text = self.examples.JSON_TOKEN_RESPONSES[token]
- self.requests_mock.get('%s/v2.0/tokens/%s' % (BASE_URI, token),
- text=text)
-
- for invalid_uri in ("%s/v2.0/tokens/invalid-token" % BASE_URI,
- "%s/v2.0/tokens/invalid-service-token" % BASE_URI):
- self.requests_mock.get(invalid_uri, text='', status_code=404)
-
- self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
- self.service_token_expected_env = dict(
- EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
- self.set_middleware()
-
-
-class v3CompositeAuthTests(BaseAuthTokenMiddlewareTest,
- CommonCompositeAuthTests,
- testresources.ResourcedTestCase):
- """Test auth_token middleware with v3 token based composite auth.
-
- Execute the Composite auth class tests, but with the
- auth_token middleware configured to expect v3 tokens back from
- a keystone server.
- """
-
- resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
-
- def setUp(self):
- super(v3CompositeAuthTests, self).setUp(
- auth_version='v3.0',
- fake_app=v3CompositeFakeApp)
-
- uuid_token_default = self.examples.v3_UUID_TOKEN_DEFAULT
- uuid_serv_token_default = self.examples.v3_UUID_SERVICE_TOKEN_DEFAULT
- self.token_dict = {
- 'uuid_token_default': uuid_token_default,
- 'uuid_service_token_default': uuid_serv_token_default,
- }
-
- self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
-
- # TODO(jamielennox): auth_token middleware uses a v2 admin token
- # regardless of the auth_version that is set.
- self.requests_mock.post('%s/v2.0/tokens' % BASE_URI,
- text=FAKE_ADMIN_TOKEN)
-
- self.requests_mock.get('%s/v3/auth/tokens/OS-PKI/revoked' % BASE_URI,
- text=self.examples.SIGNED_REVOCATION_LIST)
-
- self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
- text=self.token_response,
- headers={'X-Subject-Token': uuid.uuid4().hex})
-
- self.token_expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
- self.token_expected_env.update(EXPECTED_V3_DEFAULT_ENV_ADDITIONS)
- self.service_token_expected_env = dict(
- EXPECTED_V2_DEFAULT_SERVICE_ENV_RESPONSE)
- self.service_token_expected_env.update(
- EXPECTED_V3_DEFAULT_SERVICE_ENV_ADDITIONS)
- self.set_middleware()
-
- def token_response(self, request, context):
- auth_id = request.headers.get('X-Auth-Token')
- token_id = request.headers.get('X-Subject-Token')
- self.assertEqual(auth_id, FAKE_ADMIN_TOKEN_ID)
-
- status = 200
- response = ""
-
- if token_id == ERROR_TOKEN:
- raise exceptions.ConnectionRefused("Network connection refused.")
-
- try:
- response = self.examples.JSON_TOKEN_RESPONSES[token_id]
- except KeyError:
- status = 404
-
- context.status_code = status
- return response
-
-
-class OtherTests(BaseAuthTokenMiddlewareTest):
-
- def setUp(self):
- super(OtherTests, self).setUp()
- self.logger = self.useFixture(fixtures.FakeLogger())
-
- def test_unknown_server_versions(self):
- versions = fixture.DiscoveryList(v2=False, v3_id='v4', href=BASE_URI)
- self.set_middleware()
-
- self.requests_mock.get(BASE_URI, json=versions, status_code=300)
-
- resp = self.call_middleware(headers={'X-Auth-Token': uuid.uuid4().hex})
- self.assertEqual(503, resp.status_int)
-
- self.assertIn('versions [v3.0, v2.0]', self.logger.output)
-
- def _assert_auth_version(self, conf_version, identity_server_version):
- self.set_middleware(conf={'auth_version': conf_version})
- identity_server = self.middleware._create_identity_server()
- self.assertEqual(identity_server_version,
- identity_server.auth_version)
-
- def test_micro_version(self):
- self._assert_auth_version('v2', (2, 0))
- self._assert_auth_version('v2.0', (2, 0))
- self._assert_auth_version('v3', (3, 0))
- self._assert_auth_version('v3.0', (3, 0))
- self._assert_auth_version('v3.1', (3, 0))
- self._assert_auth_version('v3.2', (3, 0))
- self._assert_auth_version('v3.9', (3, 0))
- self._assert_auth_version('v3.3.1', (3, 0))
- self._assert_auth_version('v3.3.5', (3, 0))
-
- def test_default_auth_version(self):
- # VERSION_LIST_v3 contains both v2 and v3 version elements
- self.requests_mock.get(BASE_URI, json=VERSION_LIST_v3, status_code=300)
- self._assert_auth_version(None, (3, 0))
-
- # VERSION_LIST_v2 contains only v2 version elements
- self.requests_mock.get(BASE_URI, json=VERSION_LIST_v2, status_code=300)
- self._assert_auth_version(None, (2, 0))
-
- def test_unsupported_auth_version(self):
- # If the requested version isn't supported we will use v2
- self._assert_auth_version('v1', (2, 0))
- self._assert_auth_version('v10', (2, 0))
-
-
-class AuthProtocolLoadingTests(BaseAuthTokenMiddlewareTest):
-
- AUTH_URL = 'http://auth.url/prefix'
- DISC_URL = 'http://disc.url/prefix'
- KEYSTONE_BASE_URL = 'http://keystone.url/prefix'
- CRUD_URL = 'http://crud.url/prefix'
-
- # NOTE(jamielennox): use the /v2.0 prefix here because this is what's most
- # likely to be in the service catalog and we should be able to ignore it.
- KEYSTONE_URL = KEYSTONE_BASE_URL + '/v2.0'
-
- def setUp(self):
- super(AuthProtocolLoadingTests, self).setUp()
-
- self.project_id = uuid.uuid4().hex
-
- # first touch is to discover the available versions at the auth_url
- self.requests_mock.get(self.AUTH_URL,
- json=fixture.DiscoveryList(href=self.DISC_URL),
- status_code=300)
-
- # then we do discovery on the URL from the service catalog. In practice
- # this is mostly the same URL as before but test the full range.
- self.requests_mock.get(self.KEYSTONE_BASE_URL + '/',
- json=fixture.DiscoveryList(href=self.CRUD_URL),
- status_code=300)
-
- def good_request(self, app):
- # admin_token is the token that the service will get back from auth
- admin_token_id = uuid.uuid4().hex
- admin_token = fixture.V3Token(project_id=self.project_id)
- s = admin_token.add_service('identity', name='keystone')
- s.add_standard_endpoints(admin=self.KEYSTONE_URL)
-
- self.requests_mock.post(self.DISC_URL + '/v3/auth/tokens',
- json=admin_token,
- headers={'X-Subject-Token': admin_token_id})
-
- # user_token is the data from the user's inputted token
- user_token_id = uuid.uuid4().hex
- user_token = fixture.V3Token()
- user_token.set_project_scope()
-
- request_headers = {'X-Subject-Token': user_token_id,
- 'X-Auth-Token': admin_token_id}
-
- self.requests_mock.get(self.CRUD_URL + '/v3/auth/tokens',
- request_headers=request_headers,
- json=user_token,
- headers={'X-Subject-Token': uuid.uuid4().hex})
-
- resp = self.call(app, headers={'X-Auth-Token': user_token_id})
- self.assertEqual(200, resp.status_int)
- return resp
-
- def test_loading_password_plugin(self):
- # the password options aren't set on config until loading time, but we
- # need them set so we can override the values for testing, so force it
- opts = auth.get_plugin_options('password')
- self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP)
-
- project_id = uuid.uuid4().hex
-
- # Register the authentication options
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
-
- # configure the authentication options
- self.cfg.config(auth_plugin='password',
- username='testuser',
- password='testpass',
- auth_url=self.AUTH_URL,
- project_id=project_id,
- user_domain_id='userdomainid',
- group=_base.AUTHTOKEN_GROUP)
-
- body = uuid.uuid4().hex
- app = self.create_simple_middleware(body=body)
-
- resp = self.good_request(app)
- self.assertEqual(six.b(body), resp.body)
-
- @staticmethod
- def get_plugin(app):
- return app._identity_server._adapter.auth
-
- def test_invalid_plugin_fails_to_initialize(self):
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
- self.cfg.config(auth_plugin=uuid.uuid4().hex,
- group=_base.AUTHTOKEN_GROUP)
-
- self.assertRaises(
- exceptions.NoMatchingPlugin,
- self.create_simple_middleware)
-
- def test_plugin_loading_mixed_opts(self):
- # some options via override and some via conf
- opts = auth.get_plugin_options('password')
- self.cfg.register_opts(opts, group=_base.AUTHTOKEN_GROUP)
-
- username = 'testuser'
- password = 'testpass'
-
- # Register the authentication options
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
-
- # configure the authentication options
- self.cfg.config(auth_plugin='password',
- password=password,
- project_id=self.project_id,
- user_domain_id='userdomainid',
- group=_base.AUTHTOKEN_GROUP)
-
- conf = {'username': username, 'auth_url': self.AUTH_URL}
-
- body = uuid.uuid4().hex
- app = self.create_simple_middleware(body=body, conf=conf)
-
- resp = self.good_request(app)
- self.assertEqual(six.b(body), resp.body)
-
- plugin = self.get_plugin(app)
-
- self.assertEqual(self.AUTH_URL, plugin.auth_url)
- self.assertEqual(username, plugin._username)
- self.assertEqual(password, plugin._password)
- self.assertEqual(self.project_id, plugin._project_id)
-
- def test_plugin_loading_with_auth_section(self):
- # some options via override and some via conf
- section = 'testsection'
- username = 'testuser'
- password = 'testpass'
-
- auth.register_conf_options(self.cfg.conf, group=section)
- opts = auth.get_plugin_options('password')
- self.cfg.register_opts(opts, group=section)
-
- # Register the authentication options
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
-
- # configure the authentication options
- self.cfg.config(auth_section=section, group=_base.AUTHTOKEN_GROUP)
- self.cfg.config(auth_plugin='password',
- password=password,
- project_id=self.project_id,
- user_domain_id='userdomainid',
- group=section)
-
- conf = {'username': username, 'auth_url': self.AUTH_URL}
-
- body = uuid.uuid4().hex
- app = self.create_simple_middleware(body=body, conf=conf)
-
- resp = self.good_request(app)
- self.assertEqual(six.b(body), resp.body)
-
- plugin = self.get_plugin(app)
-
- self.assertEqual(self.AUTH_URL, plugin.auth_url)
- self.assertEqual(username, plugin._username)
- self.assertEqual(password, plugin._password)
- self.assertEqual(self.project_id, plugin._project_id)
-
-
-class TestAuthPluginUserAgentGeneration(BaseAuthTokenMiddlewareTest):
-
- def setUp(self):
- super(TestAuthPluginUserAgentGeneration, self).setUp()
- self.auth_url = uuid.uuid4().hex
- self.project_id = uuid.uuid4().hex
- self.username = uuid.uuid4().hex
- self.password = uuid.uuid4().hex
- self.section = uuid.uuid4().hex
- self.user_domain_id = uuid.uuid4().hex
-
- auth.register_conf_options(self.cfg.conf, group=self.section)
- opts = auth.get_plugin_options('password')
- self.cfg.register_opts(opts, group=self.section)
-
- # Register the authentication options
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
-
- # configure the authentication options
- self.cfg.config(auth_section=self.section, group=_base.AUTHTOKEN_GROUP)
- self.cfg.config(auth_plugin='password',
- password=self.password,
- project_id=self.project_id,
- user_domain_id=self.user_domain_id,
- group=self.section)
-
- def test_no_project_configured(self):
- ksm_version = uuid.uuid4().hex
- conf = {'username': self.username, 'auth_url': self.auth_url}
-
- app = self._create_app(conf, ksm_version)
- self._assert_user_agent(app, '', ksm_version)
-
- def test_project_in_configuration(self):
- project = uuid.uuid4().hex
- project_version = uuid.uuid4().hex
-
- conf = {'username': self.username,
- 'auth_url': self.auth_url,
- 'project': project}
- app = self._create_app(conf, project_version)
- project_with_version = '{0}/{1} '.format(project, project_version)
- self._assert_user_agent(app, project_with_version, project_version)
-
- def test_project_in_oslo_configuration(self):
- project = uuid.uuid4().hex
- project_version = uuid.uuid4().hex
-
- conf = {'username': self.username, 'auth_url': self.auth_url}
- with mock.patch.object(cfg.CONF, 'project', new=project, create=True):
- app = self._create_app(conf, project_version)
- project = '{0}/{1} '.format(project, project_version)
- self._assert_user_agent(app, project, project_version)
-
- def _create_app(self, conf, project_version):
- fake_pkg_resources = mock.Mock()
- fake_pkg_resources.get_distribution().version = project_version
-
- body = uuid.uuid4().hex
- with mock.patch('keystonemiddleware.auth_token.pkg_resources',
- new=fake_pkg_resources):
- return self.create_simple_middleware(body=body, conf=conf,
- use_global_conf=True)
-
- def _assert_user_agent(self, app, project, ksm_version):
- sess = app._identity_server._adapter.session
- expected_ua = ('{0}keystonemiddleware.auth_token/{1}'
- .format(project, ksm_version))
- self.assertEqual(expected_ua, sess.user_agent)
-
-
-class TestAuthPluginLocalOsloConfig(BaseAuthTokenMiddlewareTest):
- def test_project_in_local_oslo_configuration(self):
- options = {
- 'auth_plugin': 'password',
- 'auth_uri': uuid.uuid4().hex,
- 'password': uuid.uuid4().hex,
- }
-
- content = ("[keystone_authtoken]\n"
- "auth_plugin=%(auth_plugin)s\n"
- "auth_uri=%(auth_uri)s\n"
- "password=%(password)s\n" % options)
- conf_file_fixture = self.useFixture(
- createfile.CreateFileWithContent("my_app", content))
- conf = {'oslo_config_project': 'my_app',
- 'oslo_config_file': conf_file_fixture.path}
- app = self._create_app(conf, uuid.uuid4().hex)
- for option in options:
- self.assertEqual(options[option], app._conf_get(option))
-
- def _create_app(self, conf, project_version):
- fake_pkg_resources = mock.Mock()
- fake_pkg_resources.get_distribution().version = project_version
-
- body = uuid.uuid4().hex
- with mock.patch('keystonemiddleware.auth_token.pkg_resources',
- new=fake_pkg_resources):
- return self.create_simple_middleware(body=body, conf=conf)
-
-
-def load_tests(loader, tests, pattern):
- return testresources.OptimisingTestSuite(tests)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
deleted file mode 100644
index b213f546..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_base_middleware.py
+++ /dev/null
@@ -1,202 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import uuid
-
-from keystoneclient import fixture
-import mock
-import six
-import testtools
-import webob
-
-from keystonemiddleware import auth_token
-from keystonemiddleware.auth_token import _request
-
-
-class FakeApp(object):
-
- @webob.dec.wsgify
- def __call__(self, req):
- return webob.Response()
-
-
-class FetchingMiddleware(auth_token._BaseAuthProtocol):
-
- def __init__(self, app, token_dict={}, **kwargs):
- super(FetchingMiddleware, self).__init__(app, **kwargs)
- self.token_dict = token_dict
-
- def _fetch_token(self, token):
- try:
- return self.token_dict[token]
- except KeyError:
- raise auth_token.InvalidToken()
-
-
-class BaseAuthProtocolTests(testtools.TestCase):
-
- @mock.patch.multiple(auth_token._BaseAuthProtocol,
- process_request=mock.DEFAULT,
- process_response=mock.DEFAULT)
- def test_process_flow(self, process_request, process_response):
- m = auth_token._BaseAuthProtocol(FakeApp())
-
- process_request.return_value = None
- process_response.side_effect = lambda x: x
-
- req = webob.Request.blank('/', method='GET')
- resp = req.get_response(m)
-
- self.assertEqual(200, resp.status_code)
-
- self.assertEqual(1, process_request.call_count)
- self.assertIsInstance(process_request.call_args[0][0],
- _request._AuthTokenRequest)
-
- self.assertEqual(1, process_response.call_count)
- self.assertIsInstance(process_response.call_args[0][0], webob.Response)
-
- @classmethod
- def call(cls, middleware, method='GET', path='/', headers=None):
- req = webob.Request.blank(path)
- req.method = method
-
- for k, v in six.iteritems(headers or {}):
- req.headers[k] = v
-
- resp = req.get_response(middleware)
- resp.request = req
- return resp
-
- def test_good_v3_user_token(self):
- t = fixture.V3Token()
- t.set_project_scope()
- role = t.add_role()
-
- token_id = uuid.uuid4().hex
- token_dict = {token_id: t}
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual(token_id, req.headers['X-Auth-Token'])
-
- self.assertEqual('Confirmed', req.headers['X-Identity-Status'])
- self.assertNotIn('X-Service-Token', req.headers)
-
- p = req.environ['keystone.token_auth']
-
- self.assertTrue(p.has_user_token)
- self.assertFalse(p.has_service_token)
-
- self.assertEqual(t.project_id, p.user.project_id)
- self.assertEqual(t.project_domain_id, p.user.project_domain_id)
- self.assertEqual(t.user_id, p.user.user_id)
- self.assertEqual(t.user_domain_id, p.user.user_domain_id)
- self.assertIn(role['name'], p.user.role_names)
-
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb, token_dict)
- self.call(m, headers={'X-Auth-Token': token_id})
-
- def test_invalid_user_token(self):
- token_id = uuid.uuid4().hex
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual('Invalid', req.headers['X-Identity-Status'])
- self.assertEqual(token_id, req.headers['X-Auth-Token'])
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb)
- self.call(m, headers={'X-Auth-Token': token_id})
-
- def test_expired_user_token(self):
- t = fixture.V3Token()
- t.set_project_scope()
- t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
-
- token_id = uuid.uuid4().hex
- token_dict = {token_id: t}
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual('Invalid', req.headers['X-Identity-Status'])
- self.assertEqual(token_id, req.headers['X-Auth-Token'])
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb, token_dict=token_dict)
- self.call(m, headers={'X-Auth-Token': token_id})
-
- def test_good_v3_service_token(self):
- t = fixture.V3Token()
- t.set_project_scope()
- role = t.add_role()
-
- token_id = uuid.uuid4().hex
- token_dict = {token_id: t}
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual(token_id, req.headers['X-Service-Token'])
-
- self.assertEqual('Confirmed',
- req.headers['X-Service-Identity-Status'])
- self.assertNotIn('X-Auth-Token', req.headers)
-
- p = req.environ['keystone.token_auth']
-
- self.assertFalse(p.has_user_token)
- self.assertTrue(p.has_service_token)
-
- self.assertEqual(t.project_id, p.service.project_id)
- self.assertEqual(t.project_domain_id, p.service.project_domain_id)
- self.assertEqual(t.user_id, p.service.user_id)
- self.assertEqual(t.user_domain_id, p.service.user_domain_id)
- self.assertIn(role['name'], p.service.role_names)
-
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb, token_dict)
- self.call(m, headers={'X-Service-Token': token_id})
-
- def test_invalid_service_token(self):
- token_id = uuid.uuid4().hex
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual('Invalid',
- req.headers['X-Service-Identity-Status'])
- self.assertEqual(token_id, req.headers['X-Service-Token'])
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb)
- self.call(m, headers={'X-Service-Token': token_id})
-
- def test_expired_service_token(self):
- t = fixture.V3Token()
- t.set_project_scope()
- t.expires = datetime.datetime.utcnow() - datetime.timedelta(minutes=10)
-
- token_id = uuid.uuid4().hex
- token_dict = {token_id: t}
-
- @webob.dec.wsgify
- def _do_cb(req):
- self.assertEqual('Invalid',
- req.headers['X-Service-Identity-Status'])
- self.assertEqual(token_id, req.headers['X-Service-Token'])
- return webob.Response()
-
- m = FetchingMiddleware(_do_cb, token_dict=token_dict)
- self.call(m, headers={'X-Service-Token': token_id})
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py
deleted file mode 100644
index 074d1e5d..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_connection_pool.py
+++ /dev/null
@@ -1,118 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import time
-
-import mock
-from six.moves import queue
-import testtools
-from testtools import matchers
-
-from keystonemiddleware.auth_token import _memcache_pool
-from keystonemiddleware.tests.unit import utils
-
-
-class _TestConnectionPool(_memcache_pool.ConnectionPool):
- destroyed_value = 'destroyed'
-
- def _create_connection(self):
- return mock.MagicMock()
-
- def _destroy_connection(self, conn):
- conn(self.destroyed_value)
-
-
-class TestConnectionPool(utils.TestCase):
- def setUp(self):
- super(TestConnectionPool, self).setUp()
- self.unused_timeout = 10
- self.maxsize = 2
- self.connection_pool = _TestConnectionPool(
- maxsize=self.maxsize,
- unused_timeout=self.unused_timeout)
-
- def test_get_context_manager(self):
- self.assertThat(self.connection_pool.queue, matchers.HasLength(0))
- with self.connection_pool.acquire() as conn:
- self.assertEqual(1, self.connection_pool._acquired)
- self.assertEqual(0, self.connection_pool._acquired)
- self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
- self.assertEqual(conn, self.connection_pool.queue[0].connection)
-
- def test_cleanup_pool(self):
- self.test_get_context_manager()
- newtime = time.time() + self.unused_timeout * 2
- non_expired_connection = _memcache_pool._PoolItem(
- ttl=(newtime * 2),
- connection=mock.MagicMock())
- self.connection_pool.queue.append(non_expired_connection)
- self.assertThat(self.connection_pool.queue, matchers.HasLength(2))
- with mock.patch.object(time, 'time', return_value=newtime):
- conn = self.connection_pool.queue[0].connection
- with self.connection_pool.acquire():
- pass
- conn.assert_has_calls(
- [mock.call(self.connection_pool.destroyed_value)])
- self.assertThat(self.connection_pool.queue, matchers.HasLength(1))
- self.assertEqual(0, non_expired_connection.connection.call_count)
-
- def test_acquire_conn_exception_returns_acquired_count(self):
- class TestException(Exception):
- pass
-
- with mock.patch.object(_TestConnectionPool, '_create_connection',
- side_effect=TestException):
- with testtools.ExpectedException(TestException):
- with self.connection_pool.acquire():
- pass
- self.assertThat(self.connection_pool.queue,
- matchers.HasLength(0))
- self.assertEqual(0, self.connection_pool._acquired)
-
- def test_connection_pool_limits_maximum_connections(self):
- # NOTE(morganfainberg): To ensure we don't lockup tests until the
- # job limit, explicitly call .get_nowait() and .put_nowait() in this
- # case.
- conn1 = self.connection_pool.get_nowait()
- conn2 = self.connection_pool.get_nowait()
-
- # Use a nowait version to raise an Empty exception indicating we would
- # not get another connection until one is placed back into the queue.
- self.assertRaises(queue.Empty, self.connection_pool.get_nowait)
-
- # Place the connections back into the pool.
- self.connection_pool.put_nowait(conn1)
- self.connection_pool.put_nowait(conn2)
-
- # Make sure we can get a connection out of the pool again.
- self.connection_pool.get_nowait()
-
- def test_connection_pool_maximum_connection_get_timeout(self):
- connection_pool = _TestConnectionPool(
- maxsize=1,
- unused_timeout=self.unused_timeout,
- conn_get_timeout=0)
-
- def _acquire_connection():
- with connection_pool.acquire():
- pass
-
- # Make sure we've consumed the only available connection from the pool
- conn = connection_pool.get_nowait()
-
- self.assertRaises(_memcache_pool.ConnectionGetTimeoutException,
- _acquire_connection)
-
- # Put the connection back and ensure we can acquire the connection
- # after it is available.
- connection_pool.put_nowait(conn)
- _acquire_connection()
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py
deleted file mode 100644
index e9189831..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_memcache_crypt.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-
-from keystonemiddleware.auth_token import _memcache_crypt as memcache_crypt
-from keystonemiddleware.tests.unit import utils
-
-
-class MemcacheCryptPositiveTests(utils.BaseTestCase):
- def _setup_keys(self, strategy):
- return memcache_crypt.derive_keys(b'token', b'secret', strategy)
-
- def test_constant_time_compare(self):
- # make sure it works as a compare, the "constant time" aspect
- # isn't appropriate to test in unittests
- ctc = memcache_crypt.constant_time_compare
- self.assertTrue(ctc('abcd', 'abcd'))
- self.assertTrue(ctc('', ''))
- self.assertFalse(ctc('abcd', 'efgh'))
- self.assertFalse(ctc('abc', 'abcd'))
- self.assertFalse(ctc('abc', 'abc\x00'))
- self.assertFalse(ctc('', 'abc'))
-
- # For Python 3, we want to test these functions with both str and bytes
- # as input.
- if six.PY3:
- self.assertTrue(ctc(b'abcd', b'abcd'))
- self.assertTrue(ctc(b'', b''))
- self.assertFalse(ctc(b'abcd', b'efgh'))
- self.assertFalse(ctc(b'abc', b'abcd'))
- self.assertFalse(ctc(b'abc', b'abc\x00'))
- self.assertFalse(ctc(b'', b'abc'))
-
- def test_derive_keys(self):
- keys = self._setup_keys(b'strategy')
- self.assertEqual(len(keys['ENCRYPTION']),
- len(keys['CACHE_KEY']))
- self.assertEqual(len(keys['CACHE_KEY']),
- len(keys['MAC']))
- self.assertNotEqual(keys['ENCRYPTION'],
- keys['MAC'])
- self.assertIn('strategy', keys.keys())
-
- def test_key_strategy_diff(self):
- k1 = self._setup_keys(b'MAC')
- k2 = self._setup_keys(b'ENCRYPT')
- self.assertNotEqual(k1, k2)
-
- def test_sign_data(self):
- keys = self._setup_keys(b'MAC')
- sig = memcache_crypt.sign_data(keys['MAC'], b'data')
- self.assertEqual(len(sig), memcache_crypt.DIGEST_LENGTH_B64)
-
- def test_encryption(self):
- keys = self._setup_keys(b'ENCRYPT')
- # what you put in is what you get out
- for data in [b'data', b'1234567890123456', b'\x00\xFF' * 13
- ] + [six.int2byte(x % 256) * x for x in range(768)]:
- crypt = memcache_crypt.encrypt_data(keys['ENCRYPTION'], data)
- decrypt = memcache_crypt.decrypt_data(keys['ENCRYPTION'], crypt)
- self.assertEqual(data, decrypt)
- self.assertRaises(memcache_crypt.DecryptError,
- memcache_crypt.decrypt_data,
- keys['ENCRYPTION'], crypt[:-1])
-
- def test_protect_wrappers(self):
- data = b'My Pretty Little Data'
- for strategy in [b'MAC', b'ENCRYPT']:
- keys = self._setup_keys(strategy)
- protected = memcache_crypt.protect_data(keys, data)
- self.assertNotEqual(protected, data)
- if strategy == b'ENCRYPT':
- self.assertNotIn(data, protected)
- unprotected = memcache_crypt.unprotect_data(keys, protected)
- self.assertEqual(data, unprotected)
- self.assertRaises(memcache_crypt.InvalidMacError,
- memcache_crypt.unprotect_data,
- keys, protected[:-1])
- self.assertIsNone(memcache_crypt.unprotect_data(keys, None))
-
- def test_no_pycrypt(self):
- aes = memcache_crypt.AES
- memcache_crypt.AES = None
- self.assertRaises(memcache_crypt.CryptoUnavailableError,
- memcache_crypt.encrypt_data, 'token', 'secret',
- 'data')
- memcache_crypt.AES = aes
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
deleted file mode 100644
index 223433f8..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
+++ /dev/null
@@ -1,253 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import itertools
-import uuid
-
-from keystoneclient import access
-from keystoneclient import fixture
-
-from keystonemiddleware.auth_token import _request
-from keystonemiddleware.tests.unit import utils
-
-
-class RequestObjectTests(utils.TestCase):
-
- def setUp(self):
- super(RequestObjectTests, self).setUp()
- self.request = _request._AuthTokenRequest.blank('/')
-
- def test_setting_user_token_valid(self):
- self.assertNotIn('X-Identity-Status', self.request.headers)
-
- self.request.user_token_valid = True
- self.assertEqual('Confirmed',
- self.request.headers['X-Identity-Status'])
- self.assertTrue(self.request.user_token_valid)
-
- self.request.user_token_valid = False
- self.assertEqual('Invalid',
- self.request.headers['X-Identity-Status'])
- self.assertFalse(self.request.user_token_valid)
-
- def test_setting_service_token_valid(self):
- self.assertNotIn('X-Service-Identity-Status', self.request.headers)
-
- self.request.service_token_valid = True
- self.assertEqual('Confirmed',
- self.request.headers['X-Service-Identity-Status'])
- self.assertTrue(self.request.service_token_valid)
-
- self.request.service_token_valid = False
- self.assertEqual('Invalid',
- self.request.headers['X-Service-Identity-Status'])
- self.assertFalse(self.request.service_token_valid)
-
- def test_removing_headers(self):
- GOOD = ('X-Auth-Token',
- 'unknownstring',
- uuid.uuid4().hex)
-
- BAD = ('X-Domain-Id',
- 'X-Domain-Name',
- 'X-Project-Id',
- 'X-Project-Name',
- 'X-Project-Domain-Id',
- 'X-Project-Domain-Name',
- 'X-User-Id',
- 'X-User-Name',
- 'X-User-Domain-Id',
- 'X-User-Domain-Name',
- 'X-Roles',
- 'X-Identity-Status',
-
- 'X-Service-Domain-Id',
- 'X-Service-Domain-Name',
- 'X-Service-Project-Id',
- 'X-Service-Project-Name',
- 'X-Service-Project-Domain-Id',
- 'X-Service-Project-Domain-Name',
- 'X-Service-User-Id',
- 'X-Service-User-Name',
- 'X-Service-User-Domain-Id',
- 'X-Service-User-Domain-Name',
- 'X-Service-Roles',
- 'X-Service-Identity-Status',
-
- 'X-Service-Catalog',
-
- 'X-Role',
- 'X-User',
- 'X-Tenant-Id',
- 'X-Tenant-Name',
- 'X-Tenant',
- )
-
- header_vals = {}
-
- for header in itertools.chain(GOOD, BAD):
- v = uuid.uuid4().hex
- header_vals[header] = v
- self.request.headers[header] = v
-
- self.request.remove_auth_headers()
-
- for header in BAD:
- self.assertNotIn(header, self.request.headers)
-
- for header in GOOD:
- self.assertEqual(header_vals[header], self.request.headers[header])
-
- def _test_v3_headers(self, token, prefix):
- self.assertEqual(token.domain_id,
- self.request.headers['X%s-Domain-Id' % prefix])
- self.assertEqual(token.domain_name,
- self.request.headers['X%s-Domain-Name' % prefix])
- self.assertEqual(token.project_id,
- self.request.headers['X%s-Project-Id' % prefix])
- self.assertEqual(token.project_name,
- self.request.headers['X%s-Project-Name' % prefix])
- self.assertEqual(
- token.project_domain_id,
- self.request.headers['X%s-Project-Domain-Id' % prefix])
- self.assertEqual(
- token.project_domain_name,
- self.request.headers['X%s-Project-Domain-Name' % prefix])
-
- self.assertEqual(token.user_id,
- self.request.headers['X%s-User-Id' % prefix])
- self.assertEqual(token.user_name,
- self.request.headers['X%s-User-Name' % prefix])
- self.assertEqual(
- token.user_domain_id,
- self.request.headers['X%s-User-Domain-Id' % prefix])
- self.assertEqual(
- token.user_domain_name,
- self.request.headers['X%s-User-Domain-Name' % prefix])
-
- def test_project_scoped_user_headers(self):
- token = fixture.V3Token()
- token.set_project_scope()
- token_id = uuid.uuid4().hex
-
- auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
- self.request.set_user_headers(auth_ref, include_service_catalog=True)
-
- self._test_v3_headers(token, '')
-
- def test_project_scoped_service_headers(self):
- token = fixture.V3Token()
- token.set_project_scope()
- token_id = uuid.uuid4().hex
-
- auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
- self.request.set_service_headers(auth_ref)
-
- self._test_v3_headers(token, '-Service')
-
- def test_auth_type(self):
- self.assertIsNone(self.request.auth_type)
- self.request.environ['AUTH_TYPE'] = 'NeGoTiatE'
- self.assertEqual('negotiate', self.request.auth_type)
-
- def test_user_token(self):
- token = uuid.uuid4().hex
- self.assertIsNone(self.request.user_token)
- self.request.headers['X-Auth-Token'] = token
- self.assertEqual(token, self.request.user_token)
-
- def test_storage_token(self):
- storage_token = uuid.uuid4().hex
- user_token = uuid.uuid4().hex
-
- self.assertIsNone(self.request.user_token)
- self.request.headers['X-Storage-Token'] = storage_token
- self.assertEqual(storage_token, self.request.user_token)
- self.request.headers['X-Auth-Token'] = user_token
- self.assertEqual(user_token, self.request.user_token)
-
- def test_service_token(self):
- token = uuid.uuid4().hex
- self.assertIsNone(self.request.service_token)
- self.request.headers['X-Service-Token'] = token
- self.assertEqual(token, self.request.service_token)
-
- def test_token_auth(self):
- plugin = object()
-
- self.assertNotIn('keystone.token_auth', self.request.environ)
- self.request.token_auth = plugin
- self.assertIs(plugin, self.request.environ['keystone.token_auth'])
- self.assertIs(plugin, self.request.token_auth)
-
-
-class CatalogConversionTests(utils.TestCase):
-
- PUBLIC_URL = 'http://server:5000/v2.0'
- ADMIN_URL = 'http://admin:35357/v2.0'
- INTERNAL_URL = 'http://internal:5000/v2.0'
-
- REGION_ONE = 'RegionOne'
- REGION_TWO = 'RegionTwo'
- REGION_THREE = 'RegionThree'
-
- def test_basic_convert(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
- s.add_standard_endpoints(public=self.PUBLIC_URL,
- admin=self.ADMIN_URL,
- internal=self.INTERNAL_URL,
- region=self.REGION_ONE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = _request._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
- self.assertEqual(1, len(service['endpoints']))
- endpoints = service['endpoints'][0]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(4, len(endpoints))
- self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
- self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
- self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
- self.assertEqual(self.REGION_ONE, endpoints['region'])
-
- def test_multi_region(self):
- token = fixture.V3Token()
- s = token.add_service(type='identity')
-
- s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
- s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
- s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
-
- auth_ref = access.AccessInfo.factory(body=token)
- catalog_data = auth_ref.service_catalog.get_data()
- catalog = _request._v3_to_v2_catalog(catalog_data)
-
- self.assertEqual(1, len(catalog))
- service = catalog[0]
-
- # the 3 regions will come through as 3 separate endpoints
- expected = [{'internalURL': self.INTERNAL_URL,
- 'region': self.REGION_ONE},
- {'publicURL': self.PUBLIC_URL,
- 'region': self.REGION_TWO},
- {'adminURL': self.ADMIN_URL,
- 'region': self.REGION_THREE}]
-
- self.assertEqual('identity', service['type'])
- self.assertEqual(3, len(service['endpoints']))
- for e in expected:
- self.assertIn(e, expected)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py
deleted file mode 100644
index 258e195a..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_revocations.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# Copyright 2014 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import datetime
-import json
-import shutil
-import uuid
-
-import mock
-
-from keystonemiddleware.auth_token import _exceptions as exc
-from keystonemiddleware.auth_token import _revocations
-from keystonemiddleware.auth_token import _signing_dir
-from keystonemiddleware.tests.unit import utils
-
-
-class RevocationsTests(utils.BaseTestCase):
-
- def _setup_revocations(self, revoked_list):
- directory_name = '/tmp/%s' % uuid.uuid4().hex
- signing_directory = _signing_dir.SigningDirectory(directory_name)
- self.addCleanup(shutil.rmtree, directory_name)
-
- identity_server = mock.Mock()
-
- verify_result_obj = {'revoked': revoked_list}
- cms_verify = mock.Mock(return_value=json.dumps(verify_result_obj))
-
- revocations = _revocations.Revocations(
- timeout=datetime.timedelta(1), signing_directory=signing_directory,
- identity_server=identity_server, cms_verify=cms_verify)
- return revocations
-
- def _check_with_list(self, revoked_list, token_ids):
- revoked_list = list({'id': r} for r in revoked_list)
- revocations = self._setup_revocations(revoked_list)
- revocations.check(token_ids)
-
- def test_check_empty_list(self):
- # When the identity server returns an empty list, a token isn't
- # revoked.
-
- revoked_tokens = []
- token_ids = [uuid.uuid4().hex]
- # No assert because this would raise
- self._check_with_list(revoked_tokens, token_ids)
-
- def test_check_revoked(self):
- # When the identity server returns a list with a token in it, that
- # token is revoked.
-
- token_id = uuid.uuid4().hex
- revoked_tokens = [token_id]
- token_ids = [token_id]
- self.assertRaises(exc.InvalidToken,
- self._check_with_list, revoked_tokens, token_ids)
-
- def test_check_by_audit_id_revoked(self):
- # When the audit ID is in the revocation list, InvalidToken is raised.
- audit_id = uuid.uuid4().hex
- revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': audit_id}]
- revocations = self._setup_revocations(revoked_list)
- self.assertRaises(exc.InvalidToken,
- revocations.check_by_audit_id, [audit_id])
-
- def test_check_by_audit_id_chain_revoked(self):
- # When the token's audit chain ID is in the revocation list,
- # InvalidToken is raised.
- revoked_audit_id = uuid.uuid4().hex
- revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': revoked_audit_id}]
- revocations = self._setup_revocations(revoked_list)
-
- token_audit_ids = [uuid.uuid4().hex, revoked_audit_id]
- self.assertRaises(exc.InvalidToken,
- revocations.check_by_audit_id, token_audit_ids)
-
- def test_check_by_audit_id_not_revoked(self):
- # When the audit ID is not in the revocation list no exception.
- revoked_list = [{'id': uuid.uuid4().hex, 'audit_id': uuid.uuid4().hex}]
- revocations = self._setup_revocations(revoked_list)
-
- audit_id = uuid.uuid4().hex
- revocations.check_by_audit_id([audit_id])
-
- def test_check_by_audit_id_no_audit_ids(self):
- # Older identity servers don't send audit_ids in the revocation list.
- # When this happens, check_by_audit_id still works, just doesn't
- # verify anything.
- revoked_list = [{'id': uuid.uuid4().hex}]
- revocations = self._setup_revocations(revoked_list)
-
- audit_id = uuid.uuid4().hex
- revocations.check_by_audit_id([audit_id])
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py
deleted file mode 100644
index b2ef95dd..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_signing_dir.py
+++ /dev/null
@@ -1,137 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import shutil
-import stat
-import uuid
-
-from keystonemiddleware.auth_token import _signing_dir
-from keystonemiddleware.tests.unit import utils
-
-
-class SigningDirectoryTests(utils.BaseTestCase):
-
- def test_directory_created_when_doesnt_exist(self):
- # When _SigningDirectory is created, if the directory doesn't exist
- # it's created with the expected permissions.
- tmp_name = uuid.uuid4().hex
- parent_directory = '/tmp/%s' % tmp_name
- directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
-
- # Directories are created by __init__.
- _signing_dir.SigningDirectory(directory_name)
- self.addCleanup(shutil.rmtree, parent_directory)
-
- self.assertTrue(os.path.isdir(directory_name))
- self.assertTrue(os.access(directory_name, os.W_OK))
- self.assertEqual(os.stat(directory_name).st_uid, os.getuid())
- self.assertEqual(stat.S_IMODE(os.stat(directory_name).st_mode),
- stat.S_IRWXU)
-
- def test_use_directory_already_exists(self):
- # The directory can already exist.
-
- tmp_name = uuid.uuid4().hex
- parent_directory = '/tmp/%s' % tmp_name
- directory_name = '/tmp/%s/%s' % ((tmp_name,) * 2)
- os.makedirs(directory_name, stat.S_IRWXU)
- self.addCleanup(shutil.rmtree, parent_directory)
-
- _signing_dir.SigningDirectory(directory_name)
-
- def test_write_file(self):
- # write_file when the file doesn't exist creates the file.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- file_path = signing_directory.calc_path(file_name)
- with open(file_path) as f:
- actual_contents = f.read()
-
- self.assertEqual(contents, actual_contents)
-
- def test_replace_file(self):
- # write_file when the file already exists overwrites it.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- orig_contents = self.getUniqueString()
- signing_directory.write_file(file_name, orig_contents)
-
- new_contents = self.getUniqueString()
- signing_directory.write_file(file_name, new_contents)
-
- file_path = signing_directory.calc_path(file_name)
- with open(file_path) as f:
- actual_contents = f.read()
-
- self.assertEqual(new_contents, actual_contents)
-
- def test_recreate_directory(self):
- # If the original directory is lost, it gets recreated when a file
- # is written.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- # Delete the directory.
- shutil.rmtree(signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- actual_contents = signing_directory.read_file(file_name)
- self.assertEqual(contents, actual_contents)
-
- def test_read_file(self):
- # Can read a file that was written.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- contents = self.getUniqueString()
- signing_directory.write_file(file_name, contents)
-
- actual_contents = signing_directory.read_file(file_name)
-
- self.assertEqual(contents, actual_contents)
-
- def test_read_file_doesnt_exist(self):
- # Show what happens when try to read a file that wasn't written.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- self.assertRaises(IOError, signing_directory.read_file, file_name)
-
- def test_calc_path(self):
- # calc_path returns the actual filename built from the directory name.
-
- signing_directory = _signing_dir.SigningDirectory()
- self.addCleanup(shutil.rmtree, signing_directory._directory_name)
-
- file_name = self.getUniqueString()
- actual_path = signing_directory.calc_path(file_name)
- expected_path = os.path.join(signing_directory._directory_name,
- file_name)
- self.assertEqual(expected_path, actual_path)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py
deleted file mode 100644
index 19d3d7a9..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_user_auth_plugin.py
+++ /dev/null
@@ -1,201 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-import warnings
-
-from keystoneclient import auth
-from keystoneclient import fixture
-
-from keystonemiddleware.auth_token import _base
-from keystonemiddleware.tests.unit.auth_token import base
-
-# NOTE(jamielennox): just some sample values that we can use for testing
-BASE_URI = 'https://keystone.example.com:1234'
-AUTH_URL = 'https://keystone.auth.com:1234'
-
-
-class BaseUserPluginTests(object):
-
- def configure_middleware(self,
- auth_plugin,
- group='keystone_authtoken',
- **kwargs):
- # NOTE(gyee): For this test suite and for the stable liberty branch
- # only, we will ignore deprecated calls that keystonemiddleware makes.
- warnings.filterwarnings('ignore', category=DeprecationWarning,
- module='^keystonemiddleware\\.')
-
- opts = auth.get_plugin_class(auth_plugin).get_options()
- self.cfg.register_opts(opts, group=group)
-
- # Since these tests cfg.config() themselves rather than waiting for
- # auth_token to do it on __init__ we need to register the base auth
- # options (e.g., auth_plugin)
- auth.register_conf_options(self.cfg.conf, group=_base.AUTHTOKEN_GROUP)
-
- self.cfg.config(group=group,
- auth_plugin=auth_plugin,
- **kwargs)
-
- def assertTokenDataEqual(self, token_id, token, token_data):
- self.assertEqual(token_id, token_data.auth_token)
- self.assertEqual(token.user_id, token_data.user_id)
- try:
- trust_id = token.trust_id
- except KeyError:
- trust_id = None
- self.assertEqual(trust_id, token_data.trust_id)
- self.assertEqual(self.get_role_names(token), token_data.role_names)
-
- def get_plugin(self, token_id, service_token_id=None):
- headers = {'X-Auth-Token': token_id}
-
- if service_token_id:
- headers['X-Service-Token'] = service_token_id
-
- m = self.create_simple_middleware()
-
- resp = self.call(m, headers=headers)
- self.assertEqual(200, resp.status_int)
- return resp.request.environ['keystone.token_auth']
-
- def test_user_information(self):
- token_id, token = self.get_token()
- plugin = self.get_plugin(token_id)
-
- self.assertTokenDataEqual(token_id, token, plugin.user)
- self.assertFalse(plugin.has_service_token)
- self.assertIsNone(plugin.service)
-
- def test_with_service_information(self):
- token_id, token = self.get_token()
- service_id, service = self.get_token()
-
- plugin = self.get_plugin(token_id, service_id)
-
- self.assertTokenDataEqual(token_id, token, plugin.user)
- self.assertTokenDataEqual(service_id, service, plugin.service)
-
-
-class V2UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
-
- def setUp(self):
- super(V2UserPluginTests, self).setUp()
-
- self.service_token = fixture.V2Token()
- self.service_token.set_scope()
- s = self.service_token.add_service('identity', name='keystone')
-
- s.add_endpoint(public=BASE_URI,
- admin=BASE_URI,
- internal=BASE_URI)
-
- self.configure_middleware(auth_plugin='v2password',
- auth_url='%s/v2.0/' % AUTH_URL,
- user_id=self.service_token.user_id,
- password=uuid.uuid4().hex,
- tenant_id=self.service_token.tenant_id)
-
- auth_discovery = fixture.DiscoveryList(href=AUTH_URL, v3=False)
- self.requests_mock.get(AUTH_URL, json=auth_discovery)
-
- base_discovery = fixture.DiscoveryList(href=BASE_URI, v3=False)
- self.requests_mock.get(BASE_URI, json=base_discovery)
-
- url = '%s/v2.0/tokens' % AUTH_URL
- self.requests_mock.post(url, json=self.service_token)
-
- def get_role_names(self, token):
- return set(x['name'] for x in token['access']['user'].get('roles', []))
-
- def get_token(self):
- token = fixture.V2Token()
- token.set_scope()
- token.add_role()
-
- request_headers = {'X-Auth-Token': self.service_token.token_id}
-
- url = '%s/v2.0/tokens/%s' % (BASE_URI, token.token_id)
- self.requests_mock.get(url,
- request_headers=request_headers,
- json=token)
-
- return token.token_id, token
-
- def assertTokenDataEqual(self, token_id, token, token_data):
- super(V2UserPluginTests, self).assertTokenDataEqual(token_id,
- token,
- token_data)
-
- self.assertEqual(token.tenant_id, token_data.project_id)
- self.assertIsNone(token_data.user_domain_id)
- self.assertIsNone(token_data.project_domain_id)
-
-
-class V3UserPluginTests(BaseUserPluginTests, base.BaseAuthTokenTestCase):
-
- def setUp(self):
- super(V3UserPluginTests, self).setUp()
-
- self.service_token_id = uuid.uuid4().hex
- self.service_token = fixture.V3Token()
- s = self.service_token.add_service('identity', name='keystone')
- s.add_standard_endpoints(public=BASE_URI,
- admin=BASE_URI,
- internal=BASE_URI)
-
- self.configure_middleware(auth_plugin='v3password',
- auth_url='%s/v3/' % AUTH_URL,
- user_id=self.service_token.user_id,
- password=uuid.uuid4().hex,
- project_id=self.service_token.project_id)
-
- auth_discovery = fixture.DiscoveryList(href=AUTH_URL)
- self.requests_mock.get(AUTH_URL, json=auth_discovery)
-
- base_discovery = fixture.DiscoveryList(href=BASE_URI)
- self.requests_mock.get(BASE_URI, json=base_discovery)
-
- self.requests_mock.post(
- '%s/v3/auth/tokens' % AUTH_URL,
- headers={'X-Subject-Token': self.service_token_id},
- json=self.service_token)
-
- def get_role_names(self, token):
- return set(x['name'] for x in token['token'].get('roles', []))
-
- def get_token(self):
- token_id = uuid.uuid4().hex
- token = fixture.V3Token()
- token.set_project_scope()
- token.add_role()
-
- request_headers = {'X-Auth-Token': self.service_token_id,
- 'X-Subject-Token': token_id}
- headers = {'X-Subject-Token': token_id}
-
- self.requests_mock.get('%s/v3/auth/tokens' % BASE_URI,
- request_headers=request_headers,
- headers=headers,
- json=token)
-
- return token_id, token
-
- def assertTokenDataEqual(self, token_id, token, token_data):
- super(V3UserPluginTests, self).assertTokenDataEqual(token_id,
- token,
- token_data)
-
- self.assertEqual(token.user_domain_id, token_data.user_domain_id)
- self.assertEqual(token.project_id, token_data.project_id)
- self.assertEqual(token.project_domain_id, token_data.project_domain_id)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py
deleted file mode 100644
index fcd1e628..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_utils.py
+++ /dev/null
@@ -1,37 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-from keystonemiddleware.auth_token import _utils
-
-
-class TokenEncodingTest(testtools.TestCase):
-
- def test_unquoted_token(self):
- self.assertEqual('foo%20bar', _utils.safe_quote('foo bar'))
-
- def test_quoted_token(self):
- self.assertEqual('foo%20bar', _utils.safe_quote('foo%20bar'))
-
- def test_messages_encoded_as_bytes(self):
- """Test that string are passed around as bytes for PY3."""
- msg = "This is an error"
-
- class FakeResp(_utils.MiniResp):
- def __init__(self, error, env):
- super(FakeResp, self).__init__(error, env)
-
- fake_resp = FakeResp(msg, dict(REQUEST_METHOD='GET'))
- # On Py2 .encode() don't do much but that's better than to
- # have a ifdef with six.PY3
- self.assertEqual(msg.encode(), fake_resp.body[0])
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py
deleted file mode 100644
index ee4111ec..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/client_fixtures.py
+++ /dev/null
@@ -1,452 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-
-import fixtures
-from keystoneclient.common import cms
-from keystoneclient import fixture
-from keystoneclient import utils
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-import six
-import testresources
-
-
-TESTDIR = os.path.dirname(os.path.abspath(__file__))
-ROOTDIR = os.path.normpath(os.path.join(TESTDIR, '..', '..', '..'))
-CERTDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'certs')
-CMSDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'cms')
-KEYDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'private')
-
-
-def _hash_signed_token_safe(signed_text, **kwargs):
- if isinstance(signed_text, six.text_type):
- signed_text = signed_text.encode('utf-8')
- return utils.hash_signed_token(signed_text, **kwargs)
-
-
-class Examples(fixtures.Fixture):
- """Example tokens and certs loaded from the examples directory.
-
- To use this class correctly, the module needs to override the test suite
- class to use testresources.OptimisingTestSuite (otherwise the files will
- be read on every test). This is done by defining a load_tests function
- in the module, like this:
-
- def load_tests(loader, tests, pattern):
- return testresources.OptimisingTestSuite(tests)
-
- (see http://docs.python.org/2/library/unittest.html#load-tests-protocol )
-
- """
-
- def setUp(self):
- super(Examples, self).setUp()
-
- # The data for several tests are signed using openssl and are stored in
- # files in the signing subdirectory. In order to keep the values
- # consistent between the tests and the signed documents, we read them
- # in for use in the tests.
- with open(os.path.join(CMSDIR, 'auth_token_scoped.json')) as f:
- self.TOKEN_SCOPED_DATA = cms.cms_to_token(f.read())
-
- with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
- self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
- self.SIGNED_TOKEN_SCOPED_HASH = _hash_signed_token_safe(
- self.SIGNED_TOKEN_SCOPED)
- self.SIGNED_TOKEN_SCOPED_HASH_SHA256 = _hash_signed_token_safe(
- self.SIGNED_TOKEN_SCOPED, mode='sha256')
- with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
- self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
- self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
- self.SIGNED_v3_TOKEN_SCOPED_HASH = _hash_signed_token_safe(
- self.SIGNED_v3_TOKEN_SCOPED)
- self.SIGNED_v3_TOKEN_SCOPED_HASH_SHA256 = _hash_signed_token_safe(
- self.SIGNED_v3_TOKEN_SCOPED, mode='sha256')
- with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
- self.REVOKED_TOKEN = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
- self.SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
- self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_token_scoped.pkiz')) as f:
- self.SIGNED_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_token_unscoped.pkiz')) as f:
- self.SIGNED_TOKEN_UNSCOPED_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pkiz')) as f:
- self.SIGNED_v3_TOKEN_SCOPED_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_token_revoked.pkiz')) as f:
- self.REVOKED_TOKEN_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR,
- 'auth_token_scoped_expired.pkiz')) as f:
- self.SIGNED_TOKEN_SCOPED_EXPIRED_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pkiz')) as f:
- self.REVOKED_v3_TOKEN_PKIZ = cms.cms_to_token(f.read())
- with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
- self.REVOCATION_LIST = jsonutils.loads(f.read())
- with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
- self.SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
-
- self.SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem')
- with open(self.SIGNING_CERT_FILE) as f:
- self.SIGNING_CERT = f.read()
-
- self.KERBEROS_BIND = 'USER@REALM'
-
- self.SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem')
- with open(self.SIGNING_KEY_FILE) as f:
- self.SIGNING_KEY = f.read()
-
- self.SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem')
- with open(self.SIGNING_CA_FILE) as f:
- self.SIGNING_CA = f.read()
-
- self.UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
- self.UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
- self.UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
- self.UUID_TOKEN_BIND = '3fc54048ad64405c98225ce0897af7c5'
- self.UUID_TOKEN_UNKNOWN_BIND = '8885fdf4d42e4fb9879e6379fa1eaf48'
- self.VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
- self.v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
- self.v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
- self.v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
- self.v3_UUID_TOKEN_BIND = '2f61f73e1c854cbb9534c487f9bd63c2'
- self.v3_UUID_TOKEN_UNKNOWN_BIND = '7ed9781b62cd4880b8d8c6788ab1d1e2'
-
- self.UUID_SERVICE_TOKEN_DEFAULT = 'fe4c0710ec2f492748596c1b53ab124'
- self.v3_UUID_SERVICE_TOKEN_DEFAULT = 'g431071bbc2f492748596c1b53cb229'
-
- revoked_token = self.REVOKED_TOKEN
- if isinstance(revoked_token, six.text_type):
- revoked_token = revoked_token.encode('utf-8')
- self.REVOKED_TOKEN_HASH = utils.hash_signed_token(revoked_token)
- self.REVOKED_TOKEN_HASH_SHA256 = utils.hash_signed_token(revoked_token,
- mode='sha256')
- self.REVOKED_TOKEN_LIST = (
- {'revoked': [{'id': self.REVOKED_TOKEN_HASH,
- 'expires': timeutils.utcnow()}]})
- self.REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(self.REVOKED_TOKEN_LIST)
-
- revoked_v3_token = self.REVOKED_v3_TOKEN
- if isinstance(revoked_v3_token, six.text_type):
- revoked_v3_token = revoked_v3_token.encode('utf-8')
- self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(revoked_v3_token)
- hash = utils.hash_signed_token(revoked_v3_token, mode='sha256')
- self.REVOKED_v3_TOKEN_HASH_SHA256 = hash
- self.REVOKED_v3_TOKEN_LIST = (
- {'revoked': [{'id': self.REVOKED_v3_TOKEN_HASH,
- 'expires': timeutils.utcnow()}]})
- self.REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(
- self.REVOKED_v3_TOKEN_LIST)
-
- revoked_token_pkiz = self.REVOKED_TOKEN_PKIZ
- if isinstance(revoked_token_pkiz, six.text_type):
- revoked_token_pkiz = revoked_token_pkiz.encode('utf-8')
- self.REVOKED_TOKEN_PKIZ_HASH = utils.hash_signed_token(
- revoked_token_pkiz)
- revoked_v3_token_pkiz = self.REVOKED_v3_TOKEN_PKIZ
- if isinstance(revoked_v3_token_pkiz, six.text_type):
- revoked_v3_token_pkiz = revoked_v3_token_pkiz.encode('utf-8')
- self.REVOKED_v3_PKIZ_TOKEN_HASH = utils.hash_signed_token(
- revoked_v3_token_pkiz)
-
- self.REVOKED_TOKEN_PKIZ_LIST = (
- {'revoked': [{'id': self.REVOKED_TOKEN_PKIZ_HASH,
- 'expires': timeutils.utcnow()},
- {'id': self.REVOKED_v3_PKIZ_TOKEN_HASH,
- 'expires': timeutils.utcnow()},
- ]})
- self.REVOKED_TOKEN_PKIZ_LIST_JSON = jsonutils.dumps(
- self.REVOKED_TOKEN_PKIZ_LIST)
-
- self.SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(
- self.SIGNED_TOKEN_SCOPED)
- self.SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(
- self.SIGNED_TOKEN_UNSCOPED)
- self.SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(
- self.SIGNED_v3_TOKEN_SCOPED)
-
- self.SIGNED_TOKEN_SCOPED_PKIZ_KEY = cms.cms_hash_token(
- self.SIGNED_TOKEN_SCOPED_PKIZ)
- self.SIGNED_TOKEN_UNSCOPED_PKIZ_KEY = cms.cms_hash_token(
- self.SIGNED_TOKEN_UNSCOPED_PKIZ)
- self.SIGNED_v3_TOKEN_SCOPED_PKIZ_KEY = cms.cms_hash_token(
- self.SIGNED_v3_TOKEN_SCOPED_PKIZ)
-
- self.INVALID_SIGNED_TOKEN = (
- "MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
- "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
- "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
- "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
- "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
- "0000000000000000000000000000000000000000000000000000000000000000"
- "1111111111111111111111111111111111111111111111111111111111111111"
- "2222222222222222222222222222222222222222222222222222222222222222"
- "3333333333333333333333333333333333333333333333333333333333333333"
- "4444444444444444444444444444444444444444444444444444444444444444"
- "5555555555555555555555555555555555555555555555555555555555555555"
- "6666666666666666666666666666666666666666666666666666666666666666"
- "7777777777777777777777777777777777777777777777777777777777777777"
- "8888888888888888888888888888888888888888888888888888888888888888"
- "9999999999999999999999999999999999999999999999999999999999999999"
- "0000000000000000000000000000000000000000000000000000000000000000")
-
- self.INVALID_SIGNED_PKIZ_TOKEN = (
- "PKIZ_AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
- "BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
- "CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
- "DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
- "EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
- "FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
- "0000000000000000000000000000000000000000000000000000000000000000"
- "1111111111111111111111111111111111111111111111111111111111111111"
- "2222222222222222222222222222222222222222222222222222222222222222"
- "3333333333333333333333333333333333333333333333333333333333333333"
- "4444444444444444444444444444444444444444444444444444444444444444"
- "5555555555555555555555555555555555555555555555555555555555555555"
- "6666666666666666666666666666666666666666666666666666666666666666"
- "7777777777777777777777777777777777777777777777777777777777777777"
- "8888888888888888888888888888888888888888888888888888888888888888"
- "9999999999999999999999999999999999999999999999999999999999999999"
- "0000000000000000000000000000000000000000000000000000000000000000")
-
- # JSON responses keyed by token ID
- self.TOKEN_RESPONSES = {}
-
- # basic values
- PROJECT_ID = 'tenant_id1'
- PROJECT_NAME = 'tenant_name1'
- USER_ID = 'user_id1'
- USER_NAME = 'user_name1'
- DOMAIN_ID = 'domain_id1'
- DOMAIN_NAME = 'domain_name1'
- ROLE_NAME1 = 'role1'
- ROLE_NAME2 = 'role2'
-
- SERVICE_PROJECT_ID = 'service_project_id1'
- SERVICE_PROJECT_NAME = 'service_project_name1'
- SERVICE_USER_ID = 'service_user_id1'
- SERVICE_USER_NAME = 'service_user_name1'
- SERVICE_DOMAIN_ID = 'service_domain_id1'
- SERVICE_DOMAIN_NAME = 'service_domain_name1'
- SERVICE_ROLE_NAME1 = 'service_role1'
- SERVICE_ROLE_NAME2 = 'service_role2'
-
- self.SERVICE_TYPE = 'identity'
- self.UNVERSIONED_SERVICE_URL = 'http://keystone.server:5000/'
- self.SERVICE_URL = self.UNVERSIONED_SERVICE_URL + 'v2.0'
-
- # Old Tokens
-
- self.TOKEN_RESPONSES[self.VALID_DIABLO_TOKEN] = {
- 'access': {
- 'token': {
- 'id': self.VALID_DIABLO_TOKEN,
- 'expires': '2020-01-01T00:00:10.000123Z',
- 'tenantId': PROJECT_ID,
- },
- 'user': {
- 'id': USER_ID,
- 'name': USER_NAME,
- 'roles': [
- {'name': ROLE_NAME1},
- {'name': ROLE_NAME2},
- ],
- },
- },
- }
-
- # Generated V2 Tokens
-
- token = fixture.V2Token(token_id=self.UUID_TOKEN_DEFAULT,
- tenant_id=PROJECT_ID,
- tenant_name=PROJECT_NAME,
- user_id=USER_ID,
- user_name=USER_NAME)
- token.add_role(name=ROLE_NAME1)
- token.add_role(name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint(public=self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.UUID_TOKEN_DEFAULT] = token
-
- token = fixture.V2Token(token_id=self.UUID_TOKEN_UNSCOPED,
- user_id=USER_ID,
- user_name=USER_NAME)
- self.TOKEN_RESPONSES[self.UUID_TOKEN_UNSCOPED] = token
-
- token = fixture.V2Token(token_id='valid-token',
- tenant_id=PROJECT_ID,
- tenant_name=PROJECT_NAME,
- user_id=USER_ID,
- user_name=USER_NAME)
- token.add_role(ROLE_NAME1)
- token.add_role(ROLE_NAME2)
- self.TOKEN_RESPONSES[self.UUID_TOKEN_NO_SERVICE_CATALOG] = token
-
- token = fixture.V2Token(token_id=self.SIGNED_TOKEN_SCOPED_KEY,
- tenant_id=PROJECT_ID,
- tenant_name=PROJECT_NAME,
- user_id=USER_ID,
- user_name=USER_NAME)
- token.add_role(ROLE_NAME1)
- token.add_role(ROLE_NAME2)
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = token
-
- token = fixture.V2Token(token_id=self.SIGNED_TOKEN_UNSCOPED_KEY,
- user_id=USER_ID,
- user_name=USER_NAME)
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_KEY] = token
-
- token = fixture.V2Token(token_id=self.UUID_TOKEN_BIND,
- tenant_id=PROJECT_ID,
- tenant_name=PROJECT_NAME,
- user_id=USER_ID,
- user_name=USER_NAME)
- token.add_role(ROLE_NAME1)
- token.add_role(ROLE_NAME2)
- token['access']['token']['bind'] = {'kerberos': self.KERBEROS_BIND}
- self.TOKEN_RESPONSES[self.UUID_TOKEN_BIND] = token
-
- token = fixture.V2Token(token_id=self.UUID_TOKEN_UNKNOWN_BIND,
- tenant_id=PROJECT_ID,
- tenant_name=PROJECT_NAME,
- user_id=USER_ID,
- user_name=USER_NAME)
- token.add_role(ROLE_NAME1)
- token.add_role(ROLE_NAME2)
- token['access']['token']['bind'] = {'FOO': 'BAR'}
- self.TOKEN_RESPONSES[self.UUID_TOKEN_UNKNOWN_BIND] = token
-
- token = fixture.V2Token(token_id=self.UUID_SERVICE_TOKEN_DEFAULT,
- tenant_id=SERVICE_PROJECT_ID,
- tenant_name=SERVICE_PROJECT_NAME,
- user_id=SERVICE_USER_ID,
- user_name=SERVICE_USER_NAME)
- token.add_role(name=SERVICE_ROLE_NAME1)
- token.add_role(name=SERVICE_ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint(public=self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.UUID_SERVICE_TOKEN_DEFAULT] = token
-
- # Generated V3 Tokens
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME,
- project_id=PROJECT_ID,
- project_name=PROJECT_NAME,
- project_domain_id=DOMAIN_ID,
- project_domain_name=DOMAIN_NAME)
- token.add_role(id=ROLE_NAME1, name=ROLE_NAME1)
- token.add_role(id=ROLE_NAME2, name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_DEFAULT] = token
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME)
- self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_UNSCOPED] = token
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME,
- domain_id=DOMAIN_ID,
- domain_name=DOMAIN_NAME)
- token.add_role(id=ROLE_NAME1, name=ROLE_NAME1)
- token.add_role(id=ROLE_NAME2, name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_DOMAIN_SCOPED] = token
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME,
- project_id=PROJECT_ID,
- project_name=PROJECT_NAME,
- project_domain_id=DOMAIN_ID,
- project_domain_name=DOMAIN_NAME)
- token.add_role(name=ROLE_NAME1)
- token.add_role(name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY] = token
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME,
- project_id=PROJECT_ID,
- project_name=PROJECT_NAME,
- project_domain_id=DOMAIN_ID,
- project_domain_name=DOMAIN_NAME)
- token.add_role(name=ROLE_NAME1)
- token.add_role(name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- token['token']['bind'] = {'kerberos': self.KERBEROS_BIND}
- self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_BIND] = token
-
- token = fixture.V3Token(user_id=USER_ID,
- user_name=USER_NAME,
- user_domain_id=DOMAIN_ID,
- user_domain_name=DOMAIN_NAME,
- project_id=PROJECT_ID,
- project_name=PROJECT_NAME,
- project_domain_id=DOMAIN_ID,
- project_domain_name=DOMAIN_NAME)
- token.add_role(name=ROLE_NAME1)
- token.add_role(name=ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- token['token']['bind'] = {'FOO': 'BAR'}
- self.TOKEN_RESPONSES[self.v3_UUID_TOKEN_UNKNOWN_BIND] = token
-
- token = fixture.V3Token(user_id=SERVICE_USER_ID,
- user_name=SERVICE_USER_NAME,
- user_domain_id=SERVICE_DOMAIN_ID,
- user_domain_name=SERVICE_DOMAIN_NAME,
- project_id=SERVICE_PROJECT_ID,
- project_name=SERVICE_PROJECT_NAME,
- project_domain_id=SERVICE_DOMAIN_ID,
- project_domain_name=SERVICE_DOMAIN_NAME)
- token.add_role(id=SERVICE_ROLE_NAME1,
- name=SERVICE_ROLE_NAME1)
- token.add_role(id=SERVICE_ROLE_NAME2,
- name=SERVICE_ROLE_NAME2)
- svc = token.add_service(self.SERVICE_TYPE)
- svc.add_endpoint('public', self.SERVICE_URL)
- self.TOKEN_RESPONSES[self.v3_UUID_SERVICE_TOKEN_DEFAULT] = token
-
- # PKIZ tokens generally link to above tokens
-
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_PKIZ_KEY] = (
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY])
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_PKIZ_KEY] = (
- self.TOKEN_RESPONSES[self.SIGNED_TOKEN_UNSCOPED_KEY])
- self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_PKIZ_KEY] = (
- self.TOKEN_RESPONSES[self.SIGNED_v3_TOKEN_SCOPED_KEY])
-
- self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
- six.iteritems(self.TOKEN_RESPONSES)])
-
-
-EXAMPLES_RESOURCE = testresources.FixtureResource(Examples())
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py
deleted file mode 100644
index fc761c0f..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_audit_middleware.py
+++ /dev/null
@@ -1,560 +0,0 @@
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import os
-import tempfile
-import uuid
-import warnings
-
-import mock
-from oslo_config import cfg
-from pycadf import identifier
-from testtools import matchers
-import webob
-
-from keystonemiddleware import audit
-from keystonemiddleware.tests.unit import utils
-
-
-class FakeApp(object):
- def __call__(self, env, start_response):
- body = 'Some response'
- start_response('200 OK', [
- ('Content-Type', 'text/plain'),
- ('Content-Length', str(sum(map(len, body))))
- ])
- return [body]
-
-
-class FakeFailingApp(object):
- def __call__(self, env, start_response):
- raise Exception('It happens!')
-
-
-class BaseAuditMiddlewareTest(utils.BaseTestCase):
- def setUp(self):
- super(BaseAuditMiddlewareTest, self).setUp()
- self.fd, self.audit_map = tempfile.mkstemp()
-
- with open(self.audit_map, "w") as f:
- f.write("[custom_actions]\n")
- f.write("reboot = start/reboot\n")
- f.write("os-migrations/get = read\n\n")
- f.write("[path_keywords]\n")
- f.write("action = None\n")
- f.write("os-hosts = host\n")
- f.write("os-migrations = None\n")
- f.write("reboot = None\n")
- f.write("servers = server\n\n")
- f.write("[service_endpoints]\n")
- f.write("compute = service/compute")
-
- cfg.CONF([], project='keystonemiddleware')
-
- self.middleware = audit.AuditMiddleware(
- FakeApp(), audit_map_file=self.audit_map,
- service_name='pycadf')
-
- # NOTE(stevemar): For this test suite and for the stable liberty branch
- # only, we will ignore deprecated calls that keystonemiddleware makes.
- warnings.filterwarnings('ignore', category=DeprecationWarning,
- module='^keystonemiddleware\\.')
-
- self.addCleanup(lambda: os.close(self.fd))
- self.addCleanup(cfg.CONF.reset)
-
- @staticmethod
- def get_environ_header(req_type):
- env_headers = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "publicURL":
- "http://public_host:8774",
- "internalURL":
- "http://internal_host:8774",
- "id": "resource_id"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed'}
- env_headers['REQUEST_METHOD'] = req_type
- return env_headers
-
-
-@mock.patch('oslo_messaging.get_transport', mock.MagicMock())
-class AuditMiddlewareTest(BaseAuditMiddlewareTest):
-
- def test_api_request(self):
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info') as notify:
- self.middleware(req)
- # Check first notification with only 'request'
- call_args = notify.call_args_list[0][0]
- self.assertEqual('audit.http.request', call_args[1])
- self.assertEqual('/foo/bar', call_args[2]['requestPath'])
- self.assertEqual('pending', call_args[2]['outcome'])
- self.assertNotIn('reason', call_args[2])
- self.assertNotIn('reporterchain', call_args[2])
-
- # Check second notification with request + response
- call_args = notify.call_args_list[1][0]
- self.assertEqual('audit.http.response', call_args[1])
- self.assertEqual('/foo/bar', call_args[2]['requestPath'])
- self.assertEqual('success', call_args[2]['outcome'])
- self.assertIn('reason', call_args[2])
- self.assertIn('reporterchain', call_args[2])
-
- def test_api_request_failure(self):
- self.middleware = audit.AuditMiddleware(
- FakeFailingApp(),
- audit_map_file=self.audit_map,
- service_name='pycadf')
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info') as notify:
- try:
- self.middleware(req)
- self.fail('Application exception has not been re-raised')
- except Exception:
- pass
- # Check first notification with only 'request'
- call_args = notify.call_args_list[0][0]
- self.assertEqual('audit.http.request', call_args[1])
- self.assertEqual('/foo/bar', call_args[2]['requestPath'])
- self.assertEqual('pending', call_args[2]['outcome'])
- self.assertNotIn('reporterchain', call_args[2])
-
- # Check second notification with request + response
- call_args = notify.call_args_list[1][0]
- self.assertEqual('audit.http.response', call_args[1])
- self.assertEqual('/foo/bar', call_args[2]['requestPath'])
- self.assertEqual('unknown', call_args[2]['outcome'])
- self.assertIn('reporterchain', call_args[2])
-
- def test_process_request_fail(self):
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info',
- side_effect=Exception('error')) as notify:
- self.middleware._process_request(req)
- self.assertTrue(notify.called)
-
- def test_process_response_fail(self):
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info',
- side_effect=Exception('error')) as notify:
- self.middleware._process_response(req, webob.response.Response())
- self.assertTrue(notify.called)
-
- def test_ignore_req_opt(self):
- self.middleware = audit.AuditMiddleware(FakeApp(),
- audit_map_file=self.audit_map,
- ignore_req_list='get, PUT')
- req = webob.Request.blank('/skip/foo',
- environ=self.get_environ_header('GET'))
- req1 = webob.Request.blank('/skip/foo',
- environ=self.get_environ_header('PUT'))
- req2 = webob.Request.blank('/accept/foo',
- environ=self.get_environ_header('POST'))
- with mock.patch('oslo_messaging.Notifier.info') as notify:
- # Check GET/PUT request does not send notification
- self.middleware(req)
- self.middleware(req1)
- self.assertEqual([], notify.call_args_list)
-
- # Check non-GET/PUT request does send notification
- self.middleware(req2)
- self.assertThat(notify.call_args_list, matchers.HasLength(2))
- call_args = notify.call_args_list[0][0]
- self.assertEqual('audit.http.request', call_args[1])
- self.assertEqual('/accept/foo', call_args[2]['requestPath'])
-
- call_args = notify.call_args_list[1][0]
- self.assertEqual('audit.http.response', call_args[1])
- self.assertEqual('/accept/foo', call_args[2]['requestPath'])
-
- def test_api_request_no_messaging(self):
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('keystonemiddleware.audit.messaging', None):
- with mock.patch('keystonemiddleware.audit._LOG.info') as log:
- self.middleware(req)
- # Check first notification with only 'request'
- call_args = log.call_args_list[0][0]
- self.assertEqual('audit.http.request',
- call_args[1]['event_type'])
-
- # Check second notification with request + response
- call_args = log.call_args_list[1][0]
- self.assertEqual('audit.http.response',
- call_args[1]['event_type'])
-
- def test_cadf_event_scoped_to_request(self):
- middleware = audit.AuditMiddleware(
- FakeApp(),
- audit_map_file=self.audit_map,
- service_name='pycadf')
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info') as notify:
- middleware(req)
- self.assertIsNotNone(req.environ.get('cadf_event'))
-
- # ensure exact same event is used between request and response
- self.assertEqual(notify.call_args_list[0][0][2]['id'],
- notify.call_args_list[1][0][2]['id'])
-
- def test_cadf_event_scoped_to_request_on_error(self):
- middleware = audit.AuditMiddleware(
- FakeApp(),
- audit_map_file=self.audit_map,
- service_name='pycadf')
- req = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info',
- side_effect=Exception('error')) as notify:
- middleware._process_request(req)
- self.assertTrue(notify.called)
- req2 = webob.Request.blank('/foo/bar',
- environ=self.get_environ_header('GET'))
- with mock.patch('oslo_messaging.Notifier.info') as notify:
- middleware._process_response(req2, webob.response.Response())
- self.assertTrue(notify.called)
- # ensure event is not the same across requests
- self.assertNotEqual(req.environ['cadf_event'].id,
- notify.call_args_list[0][0][2]['id'])
-
-
-@mock.patch('oslo_messaging.rpc', mock.MagicMock())
-class AuditApiLogicTest(BaseAuditMiddlewareTest):
-
- def api_request(self, method, url):
- req = webob.Request.blank(url, environ=self.get_environ_header(method),
- remote_addr='192.168.0.1')
- self.middleware._process_request(req)
- return req
-
- def test_get_list(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['typeURI'],
- 'http://schemas.dmtf.org/cloud/audit/1.0/event')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['eventType'], 'activity')
- self.assertEqual(payload['target']['name'], 'nova')
- self.assertEqual(payload['target']['id'], 'openstack:resource_id')
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(len(payload['target']['addresses']), 3)
- self.assertEqual(payload['target']['addresses'][0]['name'], 'admin')
- self.assertEqual(payload['target']['addresses'][0]['url'],
- 'http://admin_host:8774')
- self.assertEqual(payload['initiator']['id'], 'openstack:user_id')
- self.assertEqual(payload['initiator']['name'], 'user_name')
- self.assertEqual(payload['initiator']['project_id'],
- 'openstack:tenant_id')
- self.assertEqual(payload['initiator']['host']['address'],
- '192.168.0.1')
- self.assertEqual(payload['initiator']['typeURI'],
- 'service/security/account/user')
- self.assertNotEqual(payload['initiator']['credential']['token'],
- 'token')
- self.assertEqual(payload['initiator']['credential']['identity_status'],
- 'Confirmed')
- self.assertNotIn('reason', payload)
- self.assertNotIn('reporterchain', payload)
- self.assertEqual(payload['observer']['id'], 'target')
- self.assertEqual(req.path, payload['requestPath'])
-
- def test_get_read(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/'
- + str(uuid.uuid4()))
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/server')
- self.assertEqual(payload['action'], 'read')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_get_unknown_endpoint(self):
- req = self.api_request('GET', 'http://unknown:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['target']['name'], 'unknown')
- self.assertEqual(payload['target']['id'], 'unknown')
- self.assertEqual(payload['target']['typeURI'], 'unknown')
-
- def test_get_unknown_endpoint_default_set(self):
- with open(self.audit_map, "w") as f:
- f.write("[DEFAULT]\n")
- f.write("target_endpoint_type = compute\n")
- f.write("[path_keywords]\n")
- f.write("servers = server\n\n")
- f.write("[service_endpoints]\n")
- f.write("compute = service/compute")
-
- self.middleware = audit.AuditMiddleware(
- FakeApp(), audit_map_file=self.audit_map,
- service_name='pycadf')
-
- req = self.api_request('GET', 'http://unknown:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['action'], 'read/list')
- self.assertEqual(payload['outcome'], 'pending')
- self.assertEqual(payload['target']['name'], 'nova')
- self.assertEqual(payload['target']['id'], 'openstack:resource_id')
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
-
- def test_put(self):
- req = self.api_request('PUT', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'update')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_delete(self):
- req = self.api_request('DELETE', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'delete')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_head(self):
- req = self.api_request('HEAD', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'read')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_update(self):
- req = self.api_request('POST',
- 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/'
- + str(uuid.uuid4()))
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/server')
- self.assertEqual(payload['action'], 'update')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_create(self):
- req = self.api_request('POST', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers')
- self.assertEqual(payload['action'], 'create')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_action(self):
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/action',
- environ=self.get_environ_header('POST'))
- req.body = b'{"createImage" : {"name" : "new-image","metadata": ' \
- b'{"ImageType": "Gold","ImageVersion": "2.0"}}}'
- self.middleware._process_request(req)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/action')
- self.assertEqual(payload['action'], 'update/createImage')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_post_empty_body_action(self):
- req = self.api_request('POST', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers/action')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/servers/action')
- self.assertEqual(payload['action'], 'create')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_custom_action(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-hosts/'
- + str(uuid.uuid4()) + '/reboot')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-hosts/host/reboot')
- self.assertEqual(payload['action'], 'start/reboot')
- self.assertEqual(payload['outcome'], 'pending')
-
- def test_custom_action_complex(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-migrations')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-migrations')
- self.assertEqual(payload['action'], 'read')
- req = self.api_request('POST', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/os-migrations')
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['typeURI'],
- 'service/compute/os-migrations')
- self.assertEqual(payload['action'], 'create')
-
- def test_response_mod_msg(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.middleware._process_response(req, webob.Response())
- payload2 = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['id'], payload2['id'])
- self.assertEqual(payload['tags'], payload2['tags'])
- self.assertEqual(payload2['outcome'], 'success')
- self.assertEqual(payload2['reason']['reasonType'], 'HTTP')
- self.assertEqual(payload2['reason']['reasonCode'], '200')
- self.assertEqual(len(payload2['reporterchain']), 1)
- self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
- self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
- 'target')
-
- def test_no_response(self):
- req = self.api_request('GET', 'http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers')
- payload = req.environ['cadf_event'].as_dict()
- self.middleware._process_response(req, None)
- payload2 = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['id'], payload2['id'])
- self.assertEqual(payload['tags'], payload2['tags'])
- self.assertEqual(payload2['outcome'], 'unknown')
- self.assertNotIn('reason', payload2)
- self.assertEqual(len(payload2['reporterchain']), 1)
- self.assertEqual(payload2['reporterchain'][0]['role'], 'modifier')
- self.assertEqual(payload2['reporterchain'][0]['reporter']['id'],
- 'target')
-
- def test_missing_req(self):
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=self.get_environ_header('GET'))
- self.assertNotIn('cadf_event', req.environ)
- self.middleware._process_response(req, webob.Response())
- self.assertIn('cadf_event', req.environ)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['outcome'], 'success')
- self.assertEqual(payload['reason']['reasonType'], 'HTTP')
- self.assertEqual(payload['reason']['reasonCode'], '200')
- self.assertEqual(payload['observer']['id'], 'target')
-
- def test_missing_catalog_endpoint_id(self):
- env_headers = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "publicURL":
- "http://public_host:8774",
- "internalURL":
- "http://internal_host:8774"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'REQUEST_METHOD': 'GET'}
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=env_headers)
- self.middleware._process_request(req)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual(payload['target']['id'], identifier.norm_ns('nova'))
-
- def test_endpoint_missing_internal_url(self):
- env_headers = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "publicURL":
- "http://public_host:8774"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'REQUEST_METHOD': 'GET'}
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=env_headers)
- self.middleware._process_request(req)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual((payload['target']['addresses'][1]['url']), "unknown")
-
- def test_endpoint_missing_public_url(self):
- env_headers = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"adminURL":
- "http://admin_host:8774",
- "region": "RegionOne",
- "internalURL":
- "http://internal_host:8774"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'REQUEST_METHOD': 'GET'}
- req = webob.Request.blank('http://admin_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=env_headers)
- self.middleware._process_request(req)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual((payload['target']['addresses'][2]['url']), "unknown")
-
- def test_endpoint_missing_admin_url(self):
- env_headers = {'HTTP_X_SERVICE_CATALOG':
- '''[{"endpoints_links": [],
- "endpoints": [{"region": "RegionOne",
- "publicURL":
- "http://public_host:8774",
- "internalURL":
- "http://internal_host:8774"}],
- "type": "compute",
- "name": "nova"},]''',
- 'HTTP_X_USER_ID': 'user_id',
- 'HTTP_X_USER_NAME': 'user_name',
- 'HTTP_X_AUTH_TOKEN': 'token',
- 'HTTP_X_PROJECT_ID': 'tenant_id',
- 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
- 'REQUEST_METHOD': 'GET'}
- req = webob.Request.blank('http://public_host:8774/v2/'
- + str(uuid.uuid4()) + '/servers',
- environ=env_headers)
- self.middleware._process_request(req)
- payload = req.environ['cadf_event'].as_dict()
- self.assertEqual((payload['target']['addresses'][0]['url']), "unknown")
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py
deleted file mode 100644
index 9ddb8005..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_opts.py
+++ /dev/null
@@ -1,86 +0,0 @@
-# Copyright (c) 2014 OpenStack Foundation.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import stevedore
-from testtools import matchers
-
-from keystonemiddleware import opts
-from keystonemiddleware.tests.unit import utils
-
-
-class OptsTestCase(utils.TestCase):
-
- def _test_list_auth_token_opts(self, result):
- self.assertThat(result, matchers.HasLength(1))
-
- for group in (g for (g, _l) in result):
- self.assertEqual('keystone_authtoken', group)
-
- expected_opt_names = [
- 'auth_admin_prefix',
- 'auth_host',
- 'auth_port',
- 'auth_protocol',
- 'auth_uri',
- 'identity_uri',
- 'auth_version',
- 'delay_auth_decision',
- 'http_connect_timeout',
- 'http_request_max_retries',
- 'admin_token',
- 'admin_user',
- 'admin_password',
- 'admin_tenant_name',
- 'cache',
- 'certfile',
- 'keyfile',
- 'cafile',
- 'region_name',
- 'insecure',
- 'signing_dir',
- 'memcached_servers',
- 'token_cache_time',
- 'revocation_cache_time',
- 'memcache_security_strategy',
- 'memcache_secret_key',
- 'memcache_use_advanced_pool',
- 'memcache_pool_dead_retry',
- 'memcache_pool_maxsize',
- 'memcache_pool_unused_timeout',
- 'memcache_pool_conn_get_timeout',
- 'memcache_pool_socket_timeout',
- 'include_service_catalog',
- 'enforce_token_bind',
- 'check_revocations_for_cached',
- 'hash_algorithms'
- ]
- opt_names = [o.name for (g, l) in result for o in l]
- self.assertThat(opt_names, matchers.HasLength(len(expected_opt_names)))
-
- for opt in opt_names:
- self.assertIn(opt, expected_opt_names)
-
- def test_list_auth_token_opts(self):
- self._test_list_auth_token_opts(opts.list_auth_token_opts())
-
- def test_entry_point(self):
- em = stevedore.ExtensionManager('oslo.config.opts',
- invoke_on_load=True)
- for extension in em:
- if extension.name == 'keystonemiddleware.auth_token':
- break
- else:
- self.fail('keystonemiddleware.auth_token not found')
-
- self._test_list_auth_token_opts(extension.obj)
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py
deleted file mode 100644
index b0993886..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py
+++ /dev/null
@@ -1,268 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import mock
-from oslo_serialization import jsonutils
-import requests
-from requests_mock.contrib import fixture as rm_fixture
-import six
-from six.moves import urllib
-import webob
-
-from keystonemiddleware import s3_token
-from keystonemiddleware.tests.unit import utils
-
-
-GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID',
- 'tenant': {'id': 'TENANT_ID'}}}}
-
-
-class FakeApp(object):
- """This represents a WSGI app protected by the auth_token middleware."""
- def __call__(self, env, start_response):
- resp = webob.Response()
- resp.environ = env
- return resp(env, start_response)
-
-
-class S3TokenMiddlewareTestBase(utils.TestCase):
-
- TEST_PROTOCOL = 'https'
- TEST_HOST = 'fakehost'
- TEST_PORT = 35357
- TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL,
- TEST_HOST,
- TEST_PORT)
-
- def setUp(self):
- super(S3TokenMiddlewareTestBase, self).setUp()
-
- self.conf = {
- 'auth_host': self.TEST_HOST,
- 'auth_port': self.TEST_PORT,
- 'auth_protocol': self.TEST_PROTOCOL,
- }
-
- self.requests_mock = self.useFixture(rm_fixture.Fixture())
-
- def start_fake_response(self, status, headers):
- self.response_status = int(status.split(' ', 1)[0])
- self.response_headers = dict(headers)
-
-
-class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
-
- def setUp(self):
- super(S3TokenMiddlewareTestGood, self).setUp()
- self.middleware = s3_token.S3Token(FakeApp(), self.conf)
-
- self.requests_mock.post(self.TEST_URL,
- status_code=201,
- json=GOOD_RESPONSE)
-
- # Ignore the request and pass to the next middleware in the
- # pipeline if no path has been specified.
- def test_no_path_request(self):
- req = webob.Request.blank('/')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- # Ignore the request and pass to the next middleware in the
- # pipeline if no Authorization header has been specified
- def test_without_authorization(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- def test_without_auth_storage_token(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'badboy'
- self.middleware(req.environ, self.start_fake_response)
- self.assertEqual(self.response_status, 200)
-
- def test_authorized(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
-
- def test_authorized_http(self):
- self.requests_mock.post(self.TEST_URL.replace('https', 'http'),
- status_code=201,
- json=GOOD_RESPONSE)
-
- self.middleware = (
- s3_token.filter_factory({'auth_protocol': 'http',
- 'auth_host': self.TEST_HOST,
- 'auth_port': self.TEST_PORT})(FakeApp()))
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
- self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
-
- def test_authorization_nova_toconnect(self):
- req = webob.Request.blank('/v1/AUTH_swiftint/c/o')
- req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
- path = req.environ['PATH_INFO']
- self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
-
- @mock.patch.object(requests, 'post')
- def test_insecure(self, MOCK_REQUEST):
- self.middleware = (
- s3_token.filter_factory({'insecure': 'True'})(FakeApp()))
-
- text_return_value = jsonutils.dumps(GOOD_RESPONSE)
- if six.PY3:
- text_return_value = text_return_value.encode()
- MOCK_REQUEST.return_value = utils.TestResponse({
- 'status_code': 201,
- 'text': text_return_value})
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
-
- self.assertTrue(MOCK_REQUEST.called)
- mock_args, mock_kwargs = MOCK_REQUEST.call_args
- self.assertIs(mock_kwargs['verify'], False)
-
- def test_insecure_option(self):
- # insecure is passed as a string.
-
- # Some non-secure values.
- true_values = ['true', 'True', '1', 'yes']
- for val in true_values:
- config = {'insecure': val, 'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertIs(False, middleware._verify)
-
- # Some "secure" values, including unexpected value.
- false_values = ['false', 'False', '0', 'no', 'someweirdvalue']
- for val in false_values:
- config = {'insecure': val, 'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertEqual('false_ind', middleware._verify)
-
- # Default is secure.
- config = {'certfile': 'false_ind'}
- middleware = s3_token.filter_factory(config)(FakeApp())
- self.assertIs('false_ind', middleware._verify)
-
- def test_unicode_path(self):
- url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8')
- req = webob.Request.blank(urllib.parse.quote(url))
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- req.get_response(self.middleware)
-
-
-class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
- def setUp(self):
- super(S3TokenMiddlewareTestBad, self).setUp()
- self.middleware = s3_token.S3Token(FakeApp(), self.conf)
-
- def test_unauthorized_token(self):
- ret = {"error":
- {"message": "EC2 access key not found.",
- "code": 401,
- "title": "Unauthorized"}}
- self.requests_mock.post(self.TEST_URL, status_code=403, json=ret)
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- s3_denied_req = self.middleware._deny_request('AccessDenied')
- self.assertEqual(resp.body, s3_denied_req.body)
- self.assertEqual(resp.status_int, s3_denied_req.status_int)
-
- def test_bogus_authorization(self):
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'badboy'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- self.assertEqual(resp.status_int, 400)
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
- def test_fail_to_connect_to_keystone(self):
- with mock.patch.object(self.middleware, '_json_request') as o:
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- o.side_effect = s3_token.ServiceError(s3_invalid_req)
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
- def test_bad_reply(self):
- self.requests_mock.post(self.TEST_URL,
- status_code=201,
- text="<badreply>")
-
- req = webob.Request.blank('/v1/AUTH_cfa/c/o')
- req.headers['Authorization'] = 'access:signature'
- req.headers['X-Storage-Token'] = 'token'
- resp = req.get_response(self.middleware)
- s3_invalid_req = self.middleware._deny_request('InvalidURI')
- self.assertEqual(resp.body, s3_invalid_req.body)
- self.assertEqual(resp.status_int, s3_invalid_req.status_int)
-
-
-class S3TokenMiddlewareTestUtil(utils.BaseTestCase):
- def test_split_path_failed(self):
- self.assertRaises(ValueError, s3_token._split_path, '')
- self.assertRaises(ValueError, s3_token._split_path, '/')
- self.assertRaises(ValueError, s3_token._split_path, '//')
- self.assertRaises(ValueError, s3_token._split_path, '//a')
- self.assertRaises(ValueError, s3_token._split_path, '/a/c')
- self.assertRaises(ValueError, s3_token._split_path, '//c')
- self.assertRaises(ValueError, s3_token._split_path, '/a/c/')
- self.assertRaises(ValueError, s3_token._split_path, '/a//')
- self.assertRaises(ValueError, s3_token._split_path, '/a', 2)
- self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3)
- self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3, True)
- self.assertRaises(ValueError, s3_token._split_path, '/a/c/o/r', 3, 3)
- self.assertRaises(ValueError, s3_token._split_path, '/a', 5, 4)
-
- def test_split_path_success(self):
- self.assertEqual(s3_token._split_path('/a'), ['a'])
- self.assertEqual(s3_token._split_path('/a/'), ['a'])
- self.assertEqual(s3_token._split_path('/a/c', 2), ['a', 'c'])
- self.assertEqual(s3_token._split_path('/a/c/o', 3), ['a', 'c', 'o'])
- self.assertEqual(s3_token._split_path('/a/c/o/r', 3, 3, True),
- ['a', 'c', 'o/r'])
- self.assertEqual(s3_token._split_path('/a/c', 2, 3, True),
- ['a', 'c', None])
- self.assertEqual(s3_token._split_path('/a/c/', 2), ['a', 'c'])
- self.assertEqual(s3_token._split_path('/a/c/', 2, 3), ['a', 'c', ''])
-
- def test_split_path_invalid_path(self):
- try:
- s3_token._split_path('o\nn e', 2)
- except ValueError as err:
- self.assertEqual(str(err), 'Invalid path: o%0An%20e')
- try:
- s3_token._split_path('o\nn e', 2, 3, True)
- except ValueError as err:
- self.assertEqual(str(err), 'Invalid path: o%0An%20e')
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py
deleted file mode 100644
index 8c6c0e9a..00000000
--- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/utils.py
+++ /dev/null
@@ -1,150 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import logging
-import sys
-import time
-import warnings
-
-import fixtures
-import mock
-import oslotest.base as oslotest
-import requests
-import uuid
-
-
-class BaseTestCase(oslotest.BaseTestCase):
- def setUp(self):
- super(BaseTestCase, self).setUp()
-
- # If keystonemiddleware calls any deprecated function this will raise
- # an exception.
- warnings.filterwarnings('error', category=DeprecationWarning,
- module='^keystonemiddleware\\.')
- self.addCleanup(warnings.resetwarnings)
-
-
-class TestCase(BaseTestCase):
- TEST_DOMAIN_ID = '1'
- TEST_DOMAIN_NAME = 'aDomain'
- TEST_GROUP_ID = uuid.uuid4().hex
- TEST_ROLE_ID = uuid.uuid4().hex
- TEST_TENANT_ID = '1'
- TEST_TENANT_NAME = 'aTenant'
- TEST_TOKEN = 'aToken'
- TEST_TRUST_ID = 'aTrust'
- TEST_USER = 'test'
- TEST_USER_ID = uuid.uuid4().hex
-
- TEST_ROOT_URL = 'http://127.0.0.1:5000/'
-
- def setUp(self):
- super(TestCase, self).setUp()
- self.logger = self.useFixture(fixtures.FakeLogger(level=logging.DEBUG))
- self.time_patcher = mock.patch.object(time, 'time', lambda: 1234)
- self.time_patcher.start()
-
- def tearDown(self):
- self.time_patcher.stop()
- super(TestCase, self).tearDown()
-
-
-if tuple(sys.version_info)[0:2] < (2, 7):
-
- def assertDictEqual(self, d1, d2, msg=None):
- # Simple version taken from 2.7
- self.assertIsInstance(d1, dict,
- 'First argument is not a dictionary')
- self.assertIsInstance(d2, dict,
- 'Second argument is not a dictionary')
- if d1 != d2:
- if msg:
- self.fail(msg)
- else:
- standardMsg = '%r != %r' % (d1, d2)
- self.fail(standardMsg)
-
- TestCase.assertDictEqual = assertDictEqual
-
-
-class TestResponse(requests.Response):
- """Class used to wrap requests.Response and provide some
- convenience to initialize with a dict.
- """
-
- def __init__(self, data):
- self._text = None
- super(TestResponse, self).__init__()
- if isinstance(data, dict):
- self.status_code = data.get('status_code', 200)
- headers = data.get('headers')
- if headers:
- self.headers.update(headers)
- # Fake the text attribute to streamline Response creation
- # _content is defined by requests.Response
- self._content = data.get('text')
- else:
- self.status_code = data
-
- def __eq__(self, other):
- return self.__dict__ == other.__dict__
-
- @property
- def text(self):
- return self.content
-
-
-class DisableModuleFixture(fixtures.Fixture):
- """A fixture to provide support for unloading/disabling modules."""
-
- def __init__(self, module, *args, **kw):
- super(DisableModuleFixture, self).__init__(*args, **kw)
- self.module = module
- self._finders = []
- self._cleared_modules = {}
-
- def tearDown(self):
- super(DisableModuleFixture, self).tearDown()
- for finder in self._finders:
- sys.meta_path.remove(finder)
- sys.modules.update(self._cleared_modules)
-
- def clear_module(self):
- cleared_modules = {}
- for fullname in list(sys.modules.keys()):
- if (fullname == self.module or
- fullname.startswith(self.module + '.')):
- cleared_modules[fullname] = sys.modules.pop(fullname)
- return cleared_modules
-
- def setUp(self):
- """Ensure ImportError for the specified module."""
-
- super(DisableModuleFixture, self).setUp()
-
- # Clear 'module' references in sys.modules
- self._cleared_modules.update(self.clear_module())
-
- finder = NoModuleFinder(self.module)
- self._finders.append(finder)
- sys.meta_path.insert(0, finder)
-
-
-class NoModuleFinder(object):
- """Disallow further imports of 'module'."""
-
- def __init__(self, module):
- self.module = module
-
- def find_module(self, fullname, path):
- if fullname == self.module or fullname.startswith(self.module + '.'):
- raise ImportError