aboutsummaryrefslogtreecommitdiffstats
path: root/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
diff options
context:
space:
mode:
authorasteroide <thomas.duval@orange.com>2015-09-24 14:39:09 +0200
committerasteroide <thomas.duval@orange.com>2015-09-24 14:39:09 +0200
commit0be7a3d4e0647dc0d94a34e4fc2f8c364de46602 (patch)
tree14214bb0bbf2430b6ee0df387ddbdbf13c4c4d63 /keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
parente35decd4e989773c96a9abb263257291bd51ae1e (diff)
Update code from KeystoneMiddleware Github repository (Master).
Change-Id: Id28c5bf48b3dbb6c8a08e66411b5785029f6857d
Diffstat (limited to 'keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py')
-rw-r--r--keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py253
1 files changed, 253 insertions, 0 deletions
diff --git a/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
new file mode 100644
index 00000000..223433f8
--- /dev/null
+++ b/keystonemiddleware-moon/keystonemiddleware/tests/unit/auth_token/test_request.py
@@ -0,0 +1,253 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import itertools
+import uuid
+
+from keystoneclient import access
+from keystoneclient import fixture
+
+from keystonemiddleware.auth_token import _request
+from keystonemiddleware.tests.unit import utils
+
+
+class RequestObjectTests(utils.TestCase):
+
+ def setUp(self):
+ super(RequestObjectTests, self).setUp()
+ self.request = _request._AuthTokenRequest.blank('/')
+
+ def test_setting_user_token_valid(self):
+ self.assertNotIn('X-Identity-Status', self.request.headers)
+
+ self.request.user_token_valid = True
+ self.assertEqual('Confirmed',
+ self.request.headers['X-Identity-Status'])
+ self.assertTrue(self.request.user_token_valid)
+
+ self.request.user_token_valid = False
+ self.assertEqual('Invalid',
+ self.request.headers['X-Identity-Status'])
+ self.assertFalse(self.request.user_token_valid)
+
+ def test_setting_service_token_valid(self):
+ self.assertNotIn('X-Service-Identity-Status', self.request.headers)
+
+ self.request.service_token_valid = True
+ self.assertEqual('Confirmed',
+ self.request.headers['X-Service-Identity-Status'])
+ self.assertTrue(self.request.service_token_valid)
+
+ self.request.service_token_valid = False
+ self.assertEqual('Invalid',
+ self.request.headers['X-Service-Identity-Status'])
+ self.assertFalse(self.request.service_token_valid)
+
+ def test_removing_headers(self):
+ GOOD = ('X-Auth-Token',
+ 'unknownstring',
+ uuid.uuid4().hex)
+
+ BAD = ('X-Domain-Id',
+ 'X-Domain-Name',
+ 'X-Project-Id',
+ 'X-Project-Name',
+ 'X-Project-Domain-Id',
+ 'X-Project-Domain-Name',
+ 'X-User-Id',
+ 'X-User-Name',
+ 'X-User-Domain-Id',
+ 'X-User-Domain-Name',
+ 'X-Roles',
+ 'X-Identity-Status',
+
+ 'X-Service-Domain-Id',
+ 'X-Service-Domain-Name',
+ 'X-Service-Project-Id',
+ 'X-Service-Project-Name',
+ 'X-Service-Project-Domain-Id',
+ 'X-Service-Project-Domain-Name',
+ 'X-Service-User-Id',
+ 'X-Service-User-Name',
+ 'X-Service-User-Domain-Id',
+ 'X-Service-User-Domain-Name',
+ 'X-Service-Roles',
+ 'X-Service-Identity-Status',
+
+ 'X-Service-Catalog',
+
+ 'X-Role',
+ 'X-User',
+ 'X-Tenant-Id',
+ 'X-Tenant-Name',
+ 'X-Tenant',
+ )
+
+ header_vals = {}
+
+ for header in itertools.chain(GOOD, BAD):
+ v = uuid.uuid4().hex
+ header_vals[header] = v
+ self.request.headers[header] = v
+
+ self.request.remove_auth_headers()
+
+ for header in BAD:
+ self.assertNotIn(header, self.request.headers)
+
+ for header in GOOD:
+ self.assertEqual(header_vals[header], self.request.headers[header])
+
+ def _test_v3_headers(self, token, prefix):
+ self.assertEqual(token.domain_id,
+ self.request.headers['X%s-Domain-Id' % prefix])
+ self.assertEqual(token.domain_name,
+ self.request.headers['X%s-Domain-Name' % prefix])
+ self.assertEqual(token.project_id,
+ self.request.headers['X%s-Project-Id' % prefix])
+ self.assertEqual(token.project_name,
+ self.request.headers['X%s-Project-Name' % prefix])
+ self.assertEqual(
+ token.project_domain_id,
+ self.request.headers['X%s-Project-Domain-Id' % prefix])
+ self.assertEqual(
+ token.project_domain_name,
+ self.request.headers['X%s-Project-Domain-Name' % prefix])
+
+ self.assertEqual(token.user_id,
+ self.request.headers['X%s-User-Id' % prefix])
+ self.assertEqual(token.user_name,
+ self.request.headers['X%s-User-Name' % prefix])
+ self.assertEqual(
+ token.user_domain_id,
+ self.request.headers['X%s-User-Domain-Id' % prefix])
+ self.assertEqual(
+ token.user_domain_name,
+ self.request.headers['X%s-User-Domain-Name' % prefix])
+
+ def test_project_scoped_user_headers(self):
+ token = fixture.V3Token()
+ token.set_project_scope()
+ token_id = uuid.uuid4().hex
+
+ auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
+ self.request.set_user_headers(auth_ref, include_service_catalog=True)
+
+ self._test_v3_headers(token, '')
+
+ def test_project_scoped_service_headers(self):
+ token = fixture.V3Token()
+ token.set_project_scope()
+ token_id = uuid.uuid4().hex
+
+ auth_ref = access.AccessInfo.factory(token_id=token_id, body=token)
+ self.request.set_service_headers(auth_ref)
+
+ self._test_v3_headers(token, '-Service')
+
+ def test_auth_type(self):
+ self.assertIsNone(self.request.auth_type)
+ self.request.environ['AUTH_TYPE'] = 'NeGoTiatE'
+ self.assertEqual('negotiate', self.request.auth_type)
+
+ def test_user_token(self):
+ token = uuid.uuid4().hex
+ self.assertIsNone(self.request.user_token)
+ self.request.headers['X-Auth-Token'] = token
+ self.assertEqual(token, self.request.user_token)
+
+ def test_storage_token(self):
+ storage_token = uuid.uuid4().hex
+ user_token = uuid.uuid4().hex
+
+ self.assertIsNone(self.request.user_token)
+ self.request.headers['X-Storage-Token'] = storage_token
+ self.assertEqual(storage_token, self.request.user_token)
+ self.request.headers['X-Auth-Token'] = user_token
+ self.assertEqual(user_token, self.request.user_token)
+
+ def test_service_token(self):
+ token = uuid.uuid4().hex
+ self.assertIsNone(self.request.service_token)
+ self.request.headers['X-Service-Token'] = token
+ self.assertEqual(token, self.request.service_token)
+
+ def test_token_auth(self):
+ plugin = object()
+
+ self.assertNotIn('keystone.token_auth', self.request.environ)
+ self.request.token_auth = plugin
+ self.assertIs(plugin, self.request.environ['keystone.token_auth'])
+ self.assertIs(plugin, self.request.token_auth)
+
+
+class CatalogConversionTests(utils.TestCase):
+
+ PUBLIC_URL = 'http://server:5000/v2.0'
+ ADMIN_URL = 'http://admin:35357/v2.0'
+ INTERNAL_URL = 'http://internal:5000/v2.0'
+
+ REGION_ONE = 'RegionOne'
+ REGION_TWO = 'RegionTwo'
+ REGION_THREE = 'RegionThree'
+
+ def test_basic_convert(self):
+ token = fixture.V3Token()
+ s = token.add_service(type='identity')
+ s.add_standard_endpoints(public=self.PUBLIC_URL,
+ admin=self.ADMIN_URL,
+ internal=self.INTERNAL_URL,
+ region=self.REGION_ONE)
+
+ auth_ref = access.AccessInfo.factory(body=token)
+ catalog_data = auth_ref.service_catalog.get_data()
+ catalog = _request._v3_to_v2_catalog(catalog_data)
+
+ self.assertEqual(1, len(catalog))
+ service = catalog[0]
+ self.assertEqual(1, len(service['endpoints']))
+ endpoints = service['endpoints'][0]
+
+ self.assertEqual('identity', service['type'])
+ self.assertEqual(4, len(endpoints))
+ self.assertEqual(self.PUBLIC_URL, endpoints['publicURL'])
+ self.assertEqual(self.ADMIN_URL, endpoints['adminURL'])
+ self.assertEqual(self.INTERNAL_URL, endpoints['internalURL'])
+ self.assertEqual(self.REGION_ONE, endpoints['region'])
+
+ def test_multi_region(self):
+ token = fixture.V3Token()
+ s = token.add_service(type='identity')
+
+ s.add_endpoint('internal', self.INTERNAL_URL, region=self.REGION_ONE)
+ s.add_endpoint('public', self.PUBLIC_URL, region=self.REGION_TWO)
+ s.add_endpoint('admin', self.ADMIN_URL, region=self.REGION_THREE)
+
+ auth_ref = access.AccessInfo.factory(body=token)
+ catalog_data = auth_ref.service_catalog.get_data()
+ catalog = _request._v3_to_v2_catalog(catalog_data)
+
+ self.assertEqual(1, len(catalog))
+ service = catalog[0]
+
+ # the 3 regions will come through as 3 separate endpoints
+ expected = [{'internalURL': self.INTERNAL_URL,
+ 'region': self.REGION_ONE},
+ {'publicURL': self.PUBLIC_URL,
+ 'region': self.REGION_TWO},
+ {'adminURL': self.ADMIN_URL,
+ 'region': self.REGION_THREE}]
+
+ self.assertEqual('identity', service['type'])
+ self.assertEqual(3, len(service['endpoints']))
+ for e in expected:
+ self.assertIn(e, expected)