diff options
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py')
-rw-r--r-- | keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py | 268 |
1 files changed, 0 insertions, 268 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py deleted file mode 100644 index b0993886..00000000 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py +++ /dev/null @@ -1,268 +0,0 @@ -# Copyright 2012 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import mock -from oslo_serialization import jsonutils -import requests -from requests_mock.contrib import fixture as rm_fixture -import six -from six.moves import urllib -import webob - -from keystonemiddleware import s3_token -from keystonemiddleware.tests.unit import utils - - -GOOD_RESPONSE = {'access': {'token': {'id': 'TOKEN_ID', - 'tenant': {'id': 'TENANT_ID'}}}} - - -class FakeApp(object): - """This represents a WSGI app protected by the auth_token middleware.""" - def __call__(self, env, start_response): - resp = webob.Response() - resp.environ = env - return resp(env, start_response) - - -class S3TokenMiddlewareTestBase(utils.TestCase): - - TEST_PROTOCOL = 'https' - TEST_HOST = 'fakehost' - TEST_PORT = 35357 - TEST_URL = '%s://%s:%d/v2.0/s3tokens' % (TEST_PROTOCOL, - TEST_HOST, - TEST_PORT) - - def setUp(self): - super(S3TokenMiddlewareTestBase, self).setUp() - - self.conf = { - 'auth_host': self.TEST_HOST, - 'auth_port': self.TEST_PORT, - 'auth_protocol': self.TEST_PROTOCOL, - } - - self.requests_mock = self.useFixture(rm_fixture.Fixture()) - - def start_fake_response(self, status, headers): - self.response_status = int(status.split(' ', 1)[0]) - self.response_headers = dict(headers) - - -class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): - - def setUp(self): - super(S3TokenMiddlewareTestGood, self).setUp() - self.middleware = s3_token.S3Token(FakeApp(), self.conf) - - self.requests_mock.post(self.TEST_URL, - status_code=201, - json=GOOD_RESPONSE) - - # Ignore the request and pass to the next middleware in the - # pipeline if no path has been specified. - def test_no_path_request(self): - req = webob.Request.blank('/') - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - - # Ignore the request and pass to the next middleware in the - # pipeline if no Authorization header has been specified - def test_without_authorization(self): - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - - def test_without_auth_storage_token(self): - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'badboy' - self.middleware(req.environ, self.start_fake_response) - self.assertEqual(self.response_status, 200) - - def test_authorized(self): - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - req.get_response(self.middleware) - self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID')) - self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') - - def test_authorized_http(self): - self.requests_mock.post(self.TEST_URL.replace('https', 'http'), - status_code=201, - json=GOOD_RESPONSE) - - self.middleware = ( - s3_token.filter_factory({'auth_protocol': 'http', - 'auth_host': self.TEST_HOST, - 'auth_port': self.TEST_PORT})(FakeApp())) - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - req.get_response(self.middleware) - self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID')) - self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') - - def test_authorization_nova_toconnect(self): - req = webob.Request.blank('/v1/AUTH_swiftint/c/o') - req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature' - req.headers['X-Storage-Token'] = 'token' - req.get_response(self.middleware) - path = req.environ['PATH_INFO'] - self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID')) - - @mock.patch.object(requests, 'post') - def test_insecure(self, MOCK_REQUEST): - self.middleware = ( - s3_token.filter_factory({'insecure': 'True'})(FakeApp())) - - text_return_value = jsonutils.dumps(GOOD_RESPONSE) - if six.PY3: - text_return_value = text_return_value.encode() - MOCK_REQUEST.return_value = utils.TestResponse({ - 'status_code': 201, - 'text': text_return_value}) - - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - req.get_response(self.middleware) - - self.assertTrue(MOCK_REQUEST.called) - mock_args, mock_kwargs = MOCK_REQUEST.call_args - self.assertIs(mock_kwargs['verify'], False) - - def test_insecure_option(self): - # insecure is passed as a string. - - # Some non-secure values. - true_values = ['true', 'True', '1', 'yes'] - for val in true_values: - config = {'insecure': val, 'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) - self.assertIs(False, middleware._verify) - - # Some "secure" values, including unexpected value. - false_values = ['false', 'False', '0', 'no', 'someweirdvalue'] - for val in false_values: - config = {'insecure': val, 'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) - self.assertEqual('false_ind', middleware._verify) - - # Default is secure. - config = {'certfile': 'false_ind'} - middleware = s3_token.filter_factory(config)(FakeApp()) - self.assertIs('false_ind', middleware._verify) - - def test_unicode_path(self): - url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8') - req = webob.Request.blank(urllib.parse.quote(url)) - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - req.get_response(self.middleware) - - -class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): - def setUp(self): - super(S3TokenMiddlewareTestBad, self).setUp() - self.middleware = s3_token.S3Token(FakeApp(), self.conf) - - def test_unauthorized_token(self): - ret = {"error": - {"message": "EC2 access key not found.", - "code": 401, - "title": "Unauthorized"}} - self.requests_mock.post(self.TEST_URL, status_code=403, json=ret) - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - resp = req.get_response(self.middleware) - s3_denied_req = self.middleware._deny_request('AccessDenied') - self.assertEqual(resp.body, s3_denied_req.body) - self.assertEqual(resp.status_int, s3_denied_req.status_int) - - def test_bogus_authorization(self): - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'badboy' - req.headers['X-Storage-Token'] = 'token' - resp = req.get_response(self.middleware) - self.assertEqual(resp.status_int, 400) - s3_invalid_req = self.middleware._deny_request('InvalidURI') - self.assertEqual(resp.body, s3_invalid_req.body) - self.assertEqual(resp.status_int, s3_invalid_req.status_int) - - def test_fail_to_connect_to_keystone(self): - with mock.patch.object(self.middleware, '_json_request') as o: - s3_invalid_req = self.middleware._deny_request('InvalidURI') - o.side_effect = s3_token.ServiceError(s3_invalid_req) - - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - resp = req.get_response(self.middleware) - self.assertEqual(resp.body, s3_invalid_req.body) - self.assertEqual(resp.status_int, s3_invalid_req.status_int) - - def test_bad_reply(self): - self.requests_mock.post(self.TEST_URL, - status_code=201, - text="<badreply>") - - req = webob.Request.blank('/v1/AUTH_cfa/c/o') - req.headers['Authorization'] = 'access:signature' - req.headers['X-Storage-Token'] = 'token' - resp = req.get_response(self.middleware) - s3_invalid_req = self.middleware._deny_request('InvalidURI') - self.assertEqual(resp.body, s3_invalid_req.body) - self.assertEqual(resp.status_int, s3_invalid_req.status_int) - - -class S3TokenMiddlewareTestUtil(utils.BaseTestCase): - def test_split_path_failed(self): - self.assertRaises(ValueError, s3_token._split_path, '') - self.assertRaises(ValueError, s3_token._split_path, '/') - self.assertRaises(ValueError, s3_token._split_path, '//') - self.assertRaises(ValueError, s3_token._split_path, '//a') - self.assertRaises(ValueError, s3_token._split_path, '/a/c') - self.assertRaises(ValueError, s3_token._split_path, '//c') - self.assertRaises(ValueError, s3_token._split_path, '/a/c/') - self.assertRaises(ValueError, s3_token._split_path, '/a//') - self.assertRaises(ValueError, s3_token._split_path, '/a', 2) - self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3) - self.assertRaises(ValueError, s3_token._split_path, '/a', 2, 3, True) - self.assertRaises(ValueError, s3_token._split_path, '/a/c/o/r', 3, 3) - self.assertRaises(ValueError, s3_token._split_path, '/a', 5, 4) - - def test_split_path_success(self): - self.assertEqual(s3_token._split_path('/a'), ['a']) - self.assertEqual(s3_token._split_path('/a/'), ['a']) - self.assertEqual(s3_token._split_path('/a/c', 2), ['a', 'c']) - self.assertEqual(s3_token._split_path('/a/c/o', 3), ['a', 'c', 'o']) - self.assertEqual(s3_token._split_path('/a/c/o/r', 3, 3, True), - ['a', 'c', 'o/r']) - self.assertEqual(s3_token._split_path('/a/c', 2, 3, True), - ['a', 'c', None]) - self.assertEqual(s3_token._split_path('/a/c/', 2), ['a', 'c']) - self.assertEqual(s3_token._split_path('/a/c/', 2, 3), ['a', 'c', '']) - - def test_split_path_invalid_path(self): - try: - s3_token._split_path('o\nn e', 2) - except ValueError as err: - self.assertEqual(str(err), 'Invalid path: o%0An%20e') - try: - s3_token._split_path('o\nn e', 2, 3, True) - except ValueError as err: - self.assertEqual(str(err), 'Invalid path: o%0An%20e') |