aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3.py681
1 files changed, 475 insertions, 206 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py
index 32c5e295..216d8c79 100644
--- a/keystone-moon/keystone/tests/unit/test_v3.py
+++ b/keystone-moon/keystone/tests/unit/test_v3.py
@@ -12,20 +12,25 @@
# License for the specific language governing permissions and limitations
# under the License.
-import datetime
import uuid
+import mock
from oslo_config import cfg
+import oslo_context.context
from oslo_serialization import jsonutils
from oslo_utils import timeutils
+from six.moves import http_client
from testtools import matchers
+import webtest
from keystone import auth
from keystone.common import authorization
from keystone.common import cache
+from keystone.common.validation import validators
from keystone import exception
from keystone import middleware
-from keystone.policy.backends import rules
+from keystone.middleware import auth as middleware_auth
+from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import rest
@@ -38,6 +43,7 @@ TIME_FORMAT = unit.TIME_FORMAT
class AuthTestMixin(object):
"""To hold auth building helper functions."""
+
def build_auth_scope(self, project_id=None, project_name=None,
project_domain_id=None, project_domain_name=None,
domain_id=None, domain_name=None, trust_id=None,
@@ -116,7 +122,127 @@ class AuthTestMixin(object):
class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
- AuthTestMixin):
+ common_auth.AuthTestMixin):
+
+ def generate_token_schema(self, domain_scoped=False, project_scoped=False):
+ """Return a dictionary of token properties to validate against."""
+ properties = {
+ 'audit_ids': {
+ 'type': 'array',
+ 'items': {
+ 'type': 'string',
+ },
+ 'minItems': 1,
+ 'maxItems': 2,
+ },
+ 'bind': {
+ 'type': 'object',
+ 'properties': {
+ 'kerberos': {
+ 'type': 'string',
+ },
+ },
+ 'required': ['kerberos'],
+ 'additionalProperties': False,
+ },
+ 'expires_at': {'type': 'string'},
+ 'issued_at': {'type': 'string'},
+ 'methods': {
+ 'type': 'array',
+ 'items': {
+ 'type': 'string',
+ },
+ },
+ 'user': {
+ 'type': 'object',
+ 'required': ['id', 'name', 'domain'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'name': {'type': 'string'},
+ 'domain': {
+ 'type': 'object',
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'name': {'type': 'string'}
+ },
+ 'required': ['id', 'name'],
+ 'additonalProperties': False,
+ }
+ },
+ 'additionalProperties': False,
+ }
+ }
+
+ if domain_scoped:
+ properties['catalog'] = {'type': 'array'}
+ properties['roles'] = {
+ 'type': 'array',
+ 'items': {
+ 'type': 'object',
+ 'properties': {
+ 'id': {'type': 'string', },
+ 'name': {'type': 'string', },
+ },
+ 'required': ['id', 'name', ],
+ 'additionalProperties': False,
+ },
+ 'minItems': 1,
+ }
+ properties['domain'] = {
+ 'domain': {
+ 'type': 'object',
+ 'required': ['id', 'name'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'name': {'type': 'string'}
+ },
+ 'additionalProperties': False
+ }
+ }
+ elif project_scoped:
+ properties['is_admin_project'] = {'type': 'boolean'}
+ properties['catalog'] = {'type': 'array'}
+ properties['roles'] = {'type': 'array'}
+ properties['project'] = {
+ 'type': ['object'],
+ 'required': ['id', 'name', 'domain'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'name': {'type': 'string'},
+ 'domain': {
+ 'type': ['object'],
+ 'required': ['id', 'name'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'name': {'type': 'string'}
+ },
+ 'additionalProperties': False
+ }
+ },
+ 'additionalProperties': False
+ }
+
+ schema = {
+ 'type': 'object',
+ 'properties': properties,
+ 'required': ['audit_ids', 'expires_at', 'issued_at', 'methods',
+ 'user'],
+ 'optional': ['bind'],
+ 'additionalProperties': False
+ }
+
+ if domain_scoped:
+ schema['required'].extend(['domain', 'roles'])
+ schema['optional'].append('catalog')
+ elif project_scoped:
+ schema['required'].append('project')
+ schema['optional'].append('bind')
+ schema['optional'].append('catalog')
+ schema['optional'].append('OS-TRUST:trust')
+ schema['optional'].append('is_admin_project')
+
+ return schema
+
def config_files(self):
config_files = super(RestfulTestCase, self).config_files()
config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
@@ -146,9 +272,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
pass
def setUp(self, app_conf='keystone'):
- """Setup for v3 Restful Test Cases.
-
- """
+ """Setup for v3 Restful Test Cases."""
new_paste_file = self.generate_paste_config()
self.addCleanup(self.remove_generated_paste_config)
if new_paste_file:
@@ -158,16 +282,9 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
self.empty_context = {'environment': {}}
- # Initialize the policy engine and allow us to write to a temp
- # file in each test to create the policies
- rules.reset()
-
- # drop the policy rules
- self.addCleanup(rules.reset)
-
def load_backends(self):
# ensure the cache region instance is setup
- cache.configure_cache_region(cache.REGION)
+ cache.configure_cache()
super(RestfulTestCase, self).load_backends()
@@ -183,53 +300,42 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
try:
self.resource_api.get_domain(DEFAULT_DOMAIN_ID)
except exception.DomainNotFound:
- domain = {'description': (u'Owns users and tenants (i.e. '
- u'projects) available on Identity '
- u'API v2.'),
- 'enabled': True,
- 'id': DEFAULT_DOMAIN_ID,
- 'name': u'Default'}
+ domain = unit.new_domain_ref(
+ description=(u'The default domain'),
+ id=DEFAULT_DOMAIN_ID,
+ name=u'Default')
self.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain)
def load_sample_data(self):
self._populate_default_domain()
- self.domain_id = uuid.uuid4().hex
- self.domain = self.new_domain_ref()
- self.domain['id'] = self.domain_id
+ self.domain = unit.new_domain_ref()
+ self.domain_id = self.domain['id']
self.resource_api.create_domain(self.domain_id, self.domain)
- self.project_id = uuid.uuid4().hex
- self.project = self.new_project_ref(
- domain_id=self.domain_id)
- self.project['id'] = self.project_id
- self.resource_api.create_project(self.project_id, self.project)
+ self.project = unit.new_project_ref(domain_id=self.domain_id)
+ self.project_id = self.project['id']
+ self.project = self.resource_api.create_project(self.project_id,
+ self.project)
- self.user = self.new_user_ref(domain_id=self.domain_id)
- password = self.user['password']
- self.user = self.identity_api.create_user(self.user)
- self.user['password'] = password
+ self.user = unit.create_user(self.identity_api,
+ domain_id=self.domain_id)
self.user_id = self.user['id']
self.default_domain_project_id = uuid.uuid4().hex
- self.default_domain_project = self.new_project_ref(
+ self.default_domain_project = unit.new_project_ref(
domain_id=DEFAULT_DOMAIN_ID)
self.default_domain_project['id'] = self.default_domain_project_id
self.resource_api.create_project(self.default_domain_project_id,
self.default_domain_project)
- self.default_domain_user = self.new_user_ref(
+ self.default_domain_user = unit.create_user(
+ self.identity_api,
domain_id=DEFAULT_DOMAIN_ID)
- password = self.default_domain_user['password']
- self.default_domain_user = (
- self.identity_api.create_user(self.default_domain_user))
- self.default_domain_user['password'] = password
self.default_domain_user_id = self.default_domain_user['id']
# create & grant policy.json's default role for admin_required
- self.role_id = uuid.uuid4().hex
- self.role = self.new_role_ref()
- self.role['id'] = self.role_id
- self.role['name'] = 'admin'
+ self.role = unit.new_role_ref(name='admin')
+ self.role_id = self.role['id']
self.role_api.create_role(self.role_id, self.role)
self.assignment_api.add_role_to_user_and_project(
self.user_id, self.project_id, self.role_id)
@@ -240,81 +346,35 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
self.default_domain_user_id, self.project_id,
self.role_id)
- self.region_id = uuid.uuid4().hex
- self.region = self.new_region_ref()
- self.region['id'] = self.region_id
- self.catalog_api.create_region(
- self.region.copy())
-
- self.service_id = uuid.uuid4().hex
- self.service = self.new_service_ref()
- self.service['id'] = self.service_id
- self.catalog_api.create_service(
- self.service_id,
- self.service.copy())
-
- self.endpoint_id = uuid.uuid4().hex
- self.endpoint = self.new_endpoint_ref(service_id=self.service_id)
- self.endpoint['id'] = self.endpoint_id
- self.endpoint['region_id'] = self.region['id']
- self.catalog_api.create_endpoint(
- self.endpoint_id,
- self.endpoint.copy())
- # The server adds 'enabled' and defaults to True.
- self.endpoint['enabled'] = True
-
- def new_ref(self):
- """Populates a ref with attributes common to some API entities."""
- return unit.new_ref()
-
- def new_region_ref(self):
- return unit.new_region_ref()
-
- def new_service_ref(self):
- return unit.new_service_ref()
-
- def new_endpoint_ref(self, service_id, interface='public', **kwargs):
- return unit.new_endpoint_ref(
- service_id, interface=interface, default_region_id=self.region_id,
- **kwargs)
-
- def new_domain_ref(self):
- return unit.new_domain_ref()
-
- def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False):
- return unit.new_project_ref(domain_id=domain_id, parent_id=parent_id,
- is_domain=is_domain)
-
- def new_user_ref(self, domain_id, project_id=None):
- return unit.new_user_ref(domain_id, project_id=project_id)
-
- def new_group_ref(self, domain_id):
- return unit.new_group_ref(domain_id)
-
- def new_credential_ref(self, user_id, project_id=None, cred_type=None):
- return unit.new_credential_ref(user_id, project_id=project_id,
- cred_type=cred_type)
+ # Create "req_admin" user for simulating a real user instead of the
+ # admin_token_auth middleware
+ self.user_reqadmin = unit.create_user(self.identity_api,
+ DEFAULT_DOMAIN_ID)
+ self.assignment_api.add_role_to_user_and_project(
+ self.user_reqadmin['id'],
+ self.default_domain_project_id,
+ self.role_id)
- def new_role_ref(self):
- return unit.new_role_ref()
+ self.region = unit.new_region_ref()
+ self.region_id = self.region['id']
+ self.catalog_api.create_region(self.region)
- def new_policy_ref(self):
- return unit.new_policy_ref()
+ self.service = unit.new_service_ref()
+ self.service_id = self.service['id']
+ self.catalog_api.create_service(self.service_id, self.service.copy())
- def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None,
- impersonation=None, expires=None, role_ids=None,
- role_names=None, remaining_uses=None,
- allow_redelegation=False):
- return unit.new_trust_ref(
- trustor_user_id, trustee_user_id, project_id=project_id,
- impersonation=impersonation, expires=expires, role_ids=role_ids,
- role_names=role_names, remaining_uses=remaining_uses,
- allow_redelegation=allow_redelegation)
+ self.endpoint = unit.new_endpoint_ref(service_id=self.service_id,
+ interface='public',
+ region_id=self.region_id)
+ self.endpoint_id = self.endpoint['id']
+ self.catalog_api.create_endpoint(self.endpoint_id,
+ self.endpoint.copy())
+ # The server adds 'enabled' and defaults to True.
+ self.endpoint['enabled'] = True
def create_new_default_project_for_user(self, user_id, domain_id,
enable_project=True):
- ref = self.new_project_ref(domain_id=domain_id)
- ref['enabled'] = enable_project
+ ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project)
r = self.post('/projects', body={'project': ref})
project = self.assertValidProjectResponse(r, ref)
# set the user's preferred project
@@ -326,6 +386,34 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
return project
+ def get_admin_token(self):
+ """Convenience method so that we can test authenticated requests."""
+ r = self.admin_request(
+ method='POST',
+ path='/v3/auth/tokens',
+ body={
+ 'auth': {
+ 'identity': {
+ 'methods': ['password'],
+ 'password': {
+ 'user': {
+ 'name': self.user_reqadmin['name'],
+ 'password': self.user_reqadmin['password'],
+ 'domain': {
+ 'id': self.user_reqadmin['domain_id']
+ }
+ }
+ }
+ },
+ 'scope': {
+ 'project': {
+ 'id': self.default_domain_project_id,
+ }
+ }
+ }
+ })
+ return r.headers.get('X-Subject-Token')
+
def get_unscoped_token(self):
"""Convenience method so that we can test authenticated requests."""
r = self.admin_request(
@@ -407,11 +495,10 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
def get_requested_token(self, auth):
"""Request the specific token we want."""
-
- r = self.v3_authenticate_token(auth)
+ r = self.v3_create_token(auth)
return r.headers.get('X-Subject-Token')
- def v3_authenticate_token(self, auth, expected_status=201):
+ def v3_create_token(self, auth, expected_status=http_client.CREATED):
return self.admin_request(method='POST',
path='/v3/auth/tokens',
body=auth,
@@ -440,42 +527,31 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
return self.admin_request(path=path, token=token, **kwargs)
- def get(self, path, **kwargs):
- r = self.v3_request(method='GET', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 200)
- return r
+ def get(self, path, expected_status=http_client.OK, **kwargs):
+ return self.v3_request(path, method='GET',
+ expected_status=expected_status, **kwargs)
- def head(self, path, **kwargs):
- r = self.v3_request(method='HEAD', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
- self.assertEqual('', r.body)
+ def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ r = self.v3_request(path, method='HEAD',
+ expected_status=expected_status, **kwargs)
+ self.assertEqual(b'', r.body)
return r
- def post(self, path, **kwargs):
- r = self.v3_request(method='POST', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 201)
- return r
+ def post(self, path, expected_status=http_client.CREATED, **kwargs):
+ return self.v3_request(path, method='POST',
+ expected_status=expected_status, **kwargs)
- def put(self, path, **kwargs):
- r = self.v3_request(method='PUT', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
- return r
+ def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ return self.v3_request(path, method='PUT',
+ expected_status=expected_status, **kwargs)
- def patch(self, path, **kwargs):
- r = self.v3_request(method='PATCH', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 200)
- return r
+ def patch(self, path, expected_status=http_client.OK, **kwargs):
+ return self.v3_request(path, method='PATCH',
+ expected_status=expected_status, **kwargs)
- def delete(self, path, **kwargs):
- r = self.v3_request(method='DELETE', path=path, **kwargs)
- if 'expected_status' not in kwargs:
- self.assertResponseStatus(r, 204)
- return r
+ def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
+ return self.v3_request(path, method='DELETE',
+ expected_status=expected_status, **kwargs)
def assertValidErrorResponse(self, r):
resp = r.result
@@ -582,7 +658,6 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
except Exception:
msg = '%s is not a valid ISO 8601 extended format date time.' % dt
raise AssertionError(msg)
- self.assertIsInstance(dt, datetime.datetime)
def assertValidTokenResponse(self, r, user=None):
self.assertTrue(r.headers.get('X-Subject-Token'))
@@ -611,11 +686,10 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidTokenResponse(r, *args, **kwargs)
-
- self.assertNotIn('roles', token)
- self.assertNotIn('catalog', token)
- self.assertNotIn('project', token)
- self.assertNotIn('domain', token)
+ validator_object = validators.SchemaValidator(
+ self.generate_token_schema()
+ )
+ validator_object.validate(token)
return token
@@ -623,6 +697,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
require_catalog = kwargs.pop('require_catalog', True)
endpoint_filter = kwargs.pop('endpoint_filter', False)
ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0)
+ is_admin_project = kwargs.pop('is_admin_project', False)
token = self.assertValidTokenResponse(r, *args, **kwargs)
if require_catalog:
@@ -650,40 +725,66 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
self.assertIn('id', role)
self.assertIn('name', role)
+ if is_admin_project:
+ # NOTE(samueldmq): We want to explicitly test for boolean
+ self.assertIs(True, token['is_admin_project'])
+ else:
+ self.assertNotIn('is_admin_project', token)
+
return token
def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
- self.assertIn('project', token)
- self.assertIn('id', token['project'])
- self.assertIn('name', token['project'])
- self.assertIn('domain', token['project'])
- self.assertIn('id', token['project']['domain'])
- self.assertIn('name', token['project']['domain'])
+ project_scoped_token_schema = self.generate_token_schema(
+ project_scoped=True)
+
+ if token.get('OS-TRUST:trust'):
+ trust_properties = {
+ 'OS-TRUST:trust': {
+ 'type': ['object'],
+ 'required': ['id', 'impersonation', 'trustor_user',
+ 'trustee_user'],
+ 'properties': {
+ 'id': {'type': 'string'},
+ 'impersonation': {'type': 'boolean'},
+ 'trustor_user': {
+ 'type': 'object',
+ 'required': ['id'],
+ 'properties': {
+ 'id': {'type': 'string'}
+ },
+ 'additionalProperties': False
+ },
+ 'trustee_user': {
+ 'type': 'object',
+ 'required': ['id'],
+ 'properties': {
+ 'id': {'type': 'string'}
+ },
+ 'additionalProperties': False
+ }
+ },
+ 'additionalProperties': False
+ }
+ }
+ project_scoped_token_schema['properties'].update(trust_properties)
+
+ validator_object = validators.SchemaValidator(
+ project_scoped_token_schema)
+ validator_object.validate(token)
self.assertEqual(self.role_id, token['roles'][0]['id'])
return token
- def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs):
- token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs)
-
- trust = token.get('OS-TRUST:trust')
- self.assertIsNotNone(trust)
- self.assertIsNotNone(trust.get('id'))
- self.assertIsInstance(trust.get('impersonation'), bool)
- self.assertIsNotNone(trust.get('trustor_user'))
- self.assertIsNotNone(trust.get('trustee_user'))
- self.assertIsNotNone(trust['trustor_user'].get('id'))
- self.assertIsNotNone(trust['trustee_user'].get('id'))
-
def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
- self.assertIn('domain', token)
- self.assertIn('id', token['domain'])
- self.assertIn('name', token['domain'])
+ validator_object = validators.SchemaValidator(
+ self.generate_token_schema(domain_scoped=True)
+ )
+ validator_object.validate(token)
return token
@@ -876,7 +977,6 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
**kwargs)
def assertValidProject(self, entity, ref=None):
- self.assertIsNotNone(entity.get('domain_id'))
if ref:
self.assertEqual(ref['domain_id'], entity['domain_id'])
return entity
@@ -888,6 +988,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
resp,
'users',
self.assertValidUser,
+ keys_to_check=['name', 'enabled'],
*args,
**kwargs)
@@ -896,6 +997,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
resp,
'user',
self.assertValidUser,
+ keys_to_check=['name', 'enabled'],
*args,
**kwargs)
@@ -920,6 +1022,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
resp,
'groups',
self.assertValidGroup,
+ keys_to_check=['name', 'description', 'domain_id'],
*args,
**kwargs)
@@ -928,6 +1031,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
resp,
'group',
self.assertValidGroup,
+ keys_to_check=['name', 'description', 'domain_id'],
*args,
**kwargs)
@@ -979,6 +1083,21 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
*args,
**kwargs)
+ def assertRoleInListResponse(self, resp, ref, expected=1):
+ found_count = 0
+ for entity in resp.result.get('roles'):
+ try:
+ self.assertValidRole(entity, ref=ref)
+ except Exception:
+ # It doesn't match, so let's go onto the next one
+ pass
+ else:
+ found_count += 1
+ self.assertEqual(expected, found_count)
+
+ def assertRoleNotInListResponse(self, resp, ref):
+ self.assertRoleInListResponse(resp, ref=ref, expected=0)
+
def assertValidRoleResponse(self, resp, *args, **kwargs):
return self.assertValidResponse(
resp,
@@ -992,6 +1111,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
self.assertIsNotNone(entity.get('name'))
if ref:
self.assertEqual(ref['name'], entity['name'])
+ self.assertEqual(ref['domain_id'], entity['domain_id'])
return entity
# role assignment validation
@@ -1161,6 +1281,27 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
return entity
+ # Service providers (federation)
+
+ def assertValidServiceProvider(self, entity, ref=None, *args, **kwargs):
+
+ attributes = frozenset(['auth_url', 'id', 'enabled', 'description',
+ 'links', 'relay_state_prefix', 'sp_url'])
+ for attribute in attributes:
+ self.assertIsNotNone(entity.get(attribute))
+
+ def assertValidServiceProviderListResponse(self, resp, *args, **kwargs):
+ if kwargs.get('keys_to_check') is None:
+ kwargs['keys_to_check'] = ['auth_url', 'id', 'enabled',
+ 'description', 'relay_state_prefix',
+ 'sp_url']
+ return self.assertValidListResponse(
+ resp,
+ 'service_providers',
+ self.assertValidServiceProvider,
+ *args,
+ **kwargs)
+
def build_external_auth_request(self, remote_user,
remote_domain=None, auth_data=None,
kerberos=False):
@@ -1182,24 +1323,81 @@ class VersionTestCase(RestfulTestCase):
pass
+# NOTE(morganfainberg): To be removed when admin_token_auth is removed. This
+# has been split out to allow testing admin_token auth without enabling it
+# for other tests.
+class AuthContextMiddlewareAdminTokenTestCase(RestfulTestCase):
+ EXTENSION_TO_ADD = 'admin_token_auth'
+
+ def config_overrides(self):
+ super(AuthContextMiddlewareAdminTokenTestCase, self).config_overrides()
+ self.config_fixture.config(
+ admin_token='ADMIN')
+
+ # NOTE(morganfainberg): This is knowingly copied from below for simplicity
+ # during the deprecation cycle.
+ def _middleware_request(self, token, extra_environ=None):
+
+ def application(environ, start_response):
+ body = b'body'
+ headers = [('Content-Type', 'text/html; charset=utf8'),
+ ('Content-Length', str(len(body)))]
+ start_response('200 OK', headers)
+ return [body]
+
+ app = webtest.TestApp(middleware.AuthContextMiddleware(application),
+ extra_environ=extra_environ)
+ resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token})
+ self.assertEqual('body', resp.text) # just to make sure it worked
+ return resp.request
+
+ def test_admin_auth_context(self):
+ # test to make sure AuthContextMiddleware does not attempt to build the
+ # auth context if the admin_token middleware indicates it's admin
+ # already.
+ token_id = uuid.uuid4().hex # token doesn't matter.
+ # the admin_token middleware sets is_admin in the context.
+ extra_environ = {middleware.CONTEXT_ENV: {'is_admin': True}}
+ req = self._middleware_request(token_id, extra_environ)
+ auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
+ self.assertDictEqual({}, auth_context)
+
+ @mock.patch.object(middleware_auth.versionutils,
+ 'report_deprecated_feature')
+ def test_admin_token_auth_context_deprecated(self, mock_report_deprecated):
+ # For backwards compatibility AuthContextMiddleware will check that the
+ # admin token (as configured in the CONF file) is present and not
+ # attempt to build the auth context. This is deprecated.
+ req = self._middleware_request('ADMIN')
+ auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
+ self.assertDictEqual({}, auth_context)
+ self.assertEqual(1, mock_report_deprecated.call_count)
+
+
# NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py
# because we need the token
class AuthContextMiddlewareTestCase(RestfulTestCase):
- def _mock_request_object(self, token_id):
- class fake_req(object):
- headers = {middleware.AUTH_TOKEN_HEADER: token_id}
- environ = {}
+ def _middleware_request(self, token, extra_environ=None):
+
+ def application(environ, start_response):
+ body = b'body'
+ headers = [('Content-Type', 'text/html; charset=utf8'),
+ ('Content-Length', str(len(body)))]
+ start_response('200 OK', headers)
+ return [body]
- return fake_req()
+ app = webtest.TestApp(middleware.AuthContextMiddleware(application),
+ extra_environ=extra_environ)
+ resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token})
+ self.assertEqual(b'body', resp.body) # just to make sure it worked
+ return resp.request
def test_auth_context_build_by_middleware(self):
# test to make sure AuthContextMiddleware successful build the auth
# context from the incoming auth token
admin_token = self.get_scoped_token()
- req = self._mock_request_object(admin_token)
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
+ req = self._middleware_request(admin_token)
self.assertEqual(
self.user['id'],
req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id'])
@@ -1208,28 +1406,16 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
overridden_context = 'OVERRIDDEN_CONTEXT'
# this token should not be used
token = uuid.uuid4().hex
- req = self._mock_request_object(token)
- req.environ[authorization.AUTH_CONTEXT_ENV] = overridden_context
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
+
+ extra_environ = {authorization.AUTH_CONTEXT_ENV: overridden_context}
+ req = self._middleware_request(token, extra_environ=extra_environ)
# make sure overridden context take precedence
self.assertEqual(overridden_context,
req.environ.get(authorization.AUTH_CONTEXT_ENV))
- def test_admin_token_auth_context(self):
- # test to make sure AuthContextMiddleware does not attempt to build
- # auth context if the incoming auth token is the special admin token
- req = self._mock_request_object(CONF.admin_token)
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
- self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV),
- {})
-
def test_unscoped_token_auth_context(self):
unscoped_token = self.get_unscoped_token()
- req = self._mock_request_object(unscoped_token)
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
+ req = self._middleware_request(unscoped_token)
for key in ['project_id', 'domain_id', 'domain_name']:
self.assertNotIn(
key,
@@ -1237,9 +1423,7 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
def test_project_scoped_token_auth_context(self):
project_scoped_token = self.get_scoped_token()
- req = self._mock_request_object(project_scoped_token)
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
+ req = self._middleware_request(project_scoped_token)
self.assertEqual(
self.project['id'],
req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id'])
@@ -1251,9 +1435,7 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
self.put(path=path)
domain_scoped_token = self.get_domain_scoped_token()
- req = self._mock_request_object(domain_scoped_token)
- application = None
- middleware.AuthContextMiddleware(application).process_request(req)
+ req = self._middleware_request(domain_scoped_token)
self.assertEqual(
self.domain['id'],
req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id'])
@@ -1261,6 +1443,30 @@ class AuthContextMiddlewareTestCase(RestfulTestCase):
self.domain['name'],
req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name'])
+ def test_oslo_context(self):
+ # After AuthContextMiddleware runs, an
+ # oslo_context.context.RequestContext was created so that its fields
+ # can be logged. This test validates that the RequestContext was
+ # created and the fields are set as expected.
+
+ # Use a scoped token so more fields can be set.
+ token = self.get_scoped_token()
+
+ # oslo_middleware RequestId middleware sets openstack.request_id.
+ request_id = uuid.uuid4().hex
+ environ = {'openstack.request_id': request_id}
+ self._middleware_request(token, extra_environ=environ)
+
+ req_context = oslo_context.context.get_current()
+ self.assertEqual(request_id, req_context.request_id)
+ self.assertEqual(token, req_context.auth_token)
+ self.assertEqual(self.user['id'], req_context.user)
+ self.assertEqual(self.project['id'], req_context.tenant)
+ self.assertIsNone(req_context.domain)
+ self.assertEqual(self.user['domain_id'], req_context.user_domain)
+ self.assertEqual(self.project['domain_id'], req_context.project_domain)
+ self.assertFalse(req_context.is_admin)
+
class JsonHomeTestMixin(object):
"""JSON Home test
@@ -1273,6 +1479,7 @@ class JsonHomeTestMixin(object):
data must be in the response.
"""
+
def test_get_json_home(self):
resp = self.get('/', convert=False,
headers={'Accept': 'application/json-home'})
@@ -1295,7 +1502,6 @@ class AssignmentTestMixin(object):
Available filters are: domain_id, project_id, user_id, group_id,
role_id and inherited_to_projects.
"""
-
query_params = '?effective' if effective else ''
for k, v in filters.items():
@@ -1320,7 +1526,6 @@ class AssignmentTestMixin(object):
Provided attributes are expected to contain: domain_id or project_id,
user_id or group_id, role_id and, optionally, inherited_to_projects.
"""
-
if attribs.get('domain_id'):
link = '/domains/' + attribs['domain_id']
else:
@@ -1338,13 +1543,13 @@ class AssignmentTestMixin(object):
return link
- def build_role_assignment_entity(self, link=None, **attribs):
+ def build_role_assignment_entity(
+ self, link=None, prior_role_link=None, **attribs):
"""Build and return a role assignment entity with provided attributes.
Provided attributes are expected to contain: domain_id or project_id,
user_id or group_id, role_id and, optionally, inherited_to_projects.
"""
-
entity = {'links': {'assignment': (
link or self.build_role_assignment_link(**attribs))}}
@@ -1368,4 +1573,68 @@ class AssignmentTestMixin(object):
if attribs.get('inherited_to_projects'):
entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
+ if prior_role_link:
+ entity['links']['prior_role'] = prior_role_link
+
+ return entity
+
+ def build_role_assignment_entity_include_names(self,
+ domain_ref=None,
+ role_ref=None,
+ group_ref=None,
+ user_ref=None,
+ project_ref=None,
+ inherited_assignment=None):
+ """Build and return a role assignment entity with provided attributes.
+
+ The expected attributes are: domain_ref or project_ref,
+ user_ref or group_ref, role_ref and, optionally, inherited_to_projects.
+ """
+ entity = {'links': {}}
+ attributes_for_links = {}
+ if project_ref:
+ dmn_name = self.resource_api.get_domain(
+ project_ref['domain_id'])['name']
+
+ entity['scope'] = {'project': {
+ 'id': project_ref['id'],
+ 'name': project_ref['name'],
+ 'domain': {
+ 'id': project_ref['domain_id'],
+ 'name': dmn_name}}}
+ attributes_for_links['project_id'] = project_ref['id']
+ else:
+ entity['scope'] = {'domain': {'id': domain_ref['id'],
+ 'name': domain_ref['name']}}
+ attributes_for_links['domain_id'] = domain_ref['id']
+ if user_ref:
+ dmn_name = self.resource_api.get_domain(
+ user_ref['domain_id'])['name']
+ entity['user'] = {'id': user_ref['id'],
+ 'name': user_ref['name'],
+ 'domain': {'id': user_ref['domain_id'],
+ 'name': dmn_name}}
+ attributes_for_links['user_id'] = user_ref['id']
+ else:
+ dmn_name = self.resource_api.get_domain(
+ group_ref['domain_id'])['name']
+ entity['group'] = {'id': group_ref['id'],
+ 'name': group_ref['name'],
+ 'domain': {
+ 'id': group_ref['domain_id'],
+ 'name': dmn_name}}
+ attributes_for_links['group_id'] = group_ref['id']
+
+ if role_ref:
+ entity['role'] = {'id': role_ref['id'],
+ 'name': role_ref['name']}
+ attributes_for_links['role_id'] = role_ref['id']
+
+ if inherited_assignment:
+ entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
+ attributes_for_links['inherited_to_projects'] = True
+
+ entity['links']['assignment'] = self.build_role_assignment_link(
+ **attributes_for_links)
+
return entity