From 0be7a3d4e0647dc0d94a34e4fc2f8c364de46602 Mon Sep 17 00:00:00 2001 From: asteroide Date: Thu, 24 Sep 2015 14:39:09 +0200 Subject: Update code from KeystoneMiddleware Github repository (Master). Change-Id: Id28c5bf48b3dbb6c8a08e66411b5785029f6857d --- .../tests/unit/test_s3_token_middleware.py | 53 ++++++++++++++++++---- 1 file changed, 43 insertions(+), 10 deletions(-) (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py') diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py index 2bcdf894..b0993886 100644 --- a/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py +++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/test_s3_token_middleware.py @@ -17,7 +17,7 @@ from oslo_serialization import jsonutils import requests from requests_mock.contrib import fixture as rm_fixture import six -import testtools +from six.moves import urllib import webob from keystonemiddleware import s3_token @@ -54,7 +54,7 @@ class S3TokenMiddlewareTestBase(utils.TestCase): 'auth_protocol': self.TEST_PROTOCOL, } - self.requests = self.useFixture(rm_fixture.Fixture()) + self.requests_mock = self.useFixture(rm_fixture.Fixture()) def start_fake_response(self, status, headers): self.response_status = int(status.split(' ', 1)[0]) @@ -67,7 +67,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): super(S3TokenMiddlewareTestGood, self).setUp() self.middleware = s3_token.S3Token(FakeApp(), self.conf) - self.requests.post(self.TEST_URL, status_code=201, json=GOOD_RESPONSE) + self.requests_mock.post(self.TEST_URL, + status_code=201, + json=GOOD_RESPONSE) # Ignore the request and pass to the next middleware in the # pipeline if no path has been specified. @@ -98,9 +100,9 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID') def test_authorized_http(self): - self.requests.post(self.TEST_URL.replace('https', 'http'), - status_code=201, - json=GOOD_RESPONSE) + self.requests_mock.post(self.TEST_URL.replace('https', 'http'), + status_code=201, + json=GOOD_RESPONSE) self.middleware = ( s3_token.filter_factory({'auth_protocol': 'http', @@ -124,7 +126,7 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): @mock.patch.object(requests, 'post') def test_insecure(self, MOCK_REQUEST): self.middleware = ( - s3_token.filter_factory({'insecure': True})(FakeApp())) + s3_token.filter_factory({'insecure': 'True'})(FakeApp())) text_return_value = jsonutils.dumps(GOOD_RESPONSE) if six.PY3: @@ -142,6 +144,35 @@ class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase): mock_args, mock_kwargs = MOCK_REQUEST.call_args self.assertIs(mock_kwargs['verify'], False) + def test_insecure_option(self): + # insecure is passed as a string. + + # Some non-secure values. + true_values = ['true', 'True', '1', 'yes'] + for val in true_values: + config = {'insecure': val, 'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertIs(False, middleware._verify) + + # Some "secure" values, including unexpected value. + false_values = ['false', 'False', '0', 'no', 'someweirdvalue'] + for val in false_values: + config = {'insecure': val, 'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertEqual('false_ind', middleware._verify) + + # Default is secure. + config = {'certfile': 'false_ind'} + middleware = s3_token.filter_factory(config)(FakeApp()) + self.assertIs('false_ind', middleware._verify) + + def test_unicode_path(self): + url = u'/v1/AUTH_cfa/c/euro\u20ac'.encode('utf8') + req = webob.Request.blank(urllib.parse.quote(url)) + req.headers['Authorization'] = 'access:signature' + req.headers['X-Storage-Token'] = 'token' + req.get_response(self.middleware) + class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): def setUp(self): @@ -153,7 +184,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): {"message": "EC2 access key not found.", "code": 401, "title": "Unauthorized"}} - self.requests.post(self.TEST_URL, status_code=403, json=ret) + self.requests_mock.post(self.TEST_URL, status_code=403, json=ret) req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' req.headers['X-Storage-Token'] = 'token' @@ -185,7 +216,9 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): self.assertEqual(resp.status_int, s3_invalid_req.status_int) def test_bad_reply(self): - self.requests.post(self.TEST_URL, status_code=201, text="") + self.requests_mock.post(self.TEST_URL, + status_code=201, + text="") req = webob.Request.blank('/v1/AUTH_cfa/c/o') req.headers['Authorization'] = 'access:signature' @@ -196,7 +229,7 @@ class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase): self.assertEqual(resp.status_int, s3_invalid_req.status_int) -class S3TokenMiddlewareTestUtil(testtools.TestCase): +class S3TokenMiddlewareTestUtil(utils.BaseTestCase): def test_split_path_failed(self): self.assertRaises(ValueError, s3_token._split_path, '') self.assertRaises(ValueError, s3_token._split_path, '/') -- cgit 1.2.3-korg