diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3.py | 681 |
1 files changed, 475 insertions, 206 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py index 32c5e295..216d8c79 100644 --- a/keystone-moon/keystone/tests/unit/test_v3.py +++ b/keystone-moon/keystone/tests/unit/test_v3.py @@ -12,20 +12,25 @@ # License for the specific language governing permissions and limitations # under the License. -import datetime import uuid +import mock from oslo_config import cfg +import oslo_context.context from oslo_serialization import jsonutils from oslo_utils import timeutils +from six.moves import http_client from testtools import matchers +import webtest from keystone import auth from keystone.common import authorization from keystone.common import cache +from keystone.common.validation import validators from keystone import exception from keystone import middleware -from keystone.policy.backends import rules +from keystone.middleware import auth as middleware_auth +from keystone.tests.common import auth as common_auth from keystone.tests import unit from keystone.tests.unit import rest @@ -38,6 +43,7 @@ TIME_FORMAT = unit.TIME_FORMAT class AuthTestMixin(object): """To hold auth building helper functions.""" + def build_auth_scope(self, project_id=None, project_name=None, project_domain_id=None, project_domain_name=None, domain_id=None, domain_name=None, trust_id=None, @@ -116,7 +122,127 @@ class AuthTestMixin(object): class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, - AuthTestMixin): + common_auth.AuthTestMixin): + + def generate_token_schema(self, domain_scoped=False, project_scoped=False): + """Return a dictionary of token properties to validate against.""" + properties = { + 'audit_ids': { + 'type': 'array', + 'items': { + 'type': 'string', + }, + 'minItems': 1, + 'maxItems': 2, + }, + 'bind': { + 'type': 'object', + 'properties': { + 'kerberos': { + 'type': 'string', + }, + }, + 'required': ['kerberos'], + 'additionalProperties': False, + }, + 'expires_at': {'type': 'string'}, + 'issued_at': {'type': 'string'}, + 'methods': { + 'type': 'array', + 'items': { + 'type': 'string', + }, + }, + 'user': { + 'type': 'object', + 'required': ['id', 'name', 'domain'], + 'properties': { + 'id': {'type': 'string'}, + 'name': {'type': 'string'}, + 'domain': { + 'type': 'object', + 'properties': { + 'id': {'type': 'string'}, + 'name': {'type': 'string'} + }, + 'required': ['id', 'name'], + 'additonalProperties': False, + } + }, + 'additionalProperties': False, + } + } + + if domain_scoped: + properties['catalog'] = {'type': 'array'} + properties['roles'] = { + 'type': 'array', + 'items': { + 'type': 'object', + 'properties': { + 'id': {'type': 'string', }, + 'name': {'type': 'string', }, + }, + 'required': ['id', 'name', ], + 'additionalProperties': False, + }, + 'minItems': 1, + } + properties['domain'] = { + 'domain': { + 'type': 'object', + 'required': ['id', 'name'], + 'properties': { + 'id': {'type': 'string'}, + 'name': {'type': 'string'} + }, + 'additionalProperties': False + } + } + elif project_scoped: + properties['is_admin_project'] = {'type': 'boolean'} + properties['catalog'] = {'type': 'array'} + properties['roles'] = {'type': 'array'} + properties['project'] = { + 'type': ['object'], + 'required': ['id', 'name', 'domain'], + 'properties': { + 'id': {'type': 'string'}, + 'name': {'type': 'string'}, + 'domain': { + 'type': ['object'], + 'required': ['id', 'name'], + 'properties': { + 'id': {'type': 'string'}, + 'name': {'type': 'string'} + }, + 'additionalProperties': False + } + }, + 'additionalProperties': False + } + + schema = { + 'type': 'object', + 'properties': properties, + 'required': ['audit_ids', 'expires_at', 'issued_at', 'methods', + 'user'], + 'optional': ['bind'], + 'additionalProperties': False + } + + if domain_scoped: + schema['required'].extend(['domain', 'roles']) + schema['optional'].append('catalog') + elif project_scoped: + schema['required'].append('project') + schema['optional'].append('bind') + schema['optional'].append('catalog') + schema['optional'].append('OS-TRUST:trust') + schema['optional'].append('is_admin_project') + + return schema + def config_files(self): config_files = super(RestfulTestCase, self).config_files() config_files.append(unit.dirs.tests_conf('backend_sql.conf')) @@ -146,9 +272,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, pass def setUp(self, app_conf='keystone'): - """Setup for v3 Restful Test Cases. - - """ + """Setup for v3 Restful Test Cases.""" new_paste_file = self.generate_paste_config() self.addCleanup(self.remove_generated_paste_config) if new_paste_file: @@ -158,16 +282,9 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, self.empty_context = {'environment': {}} - # Initialize the policy engine and allow us to write to a temp - # file in each test to create the policies - rules.reset() - - # drop the policy rules - self.addCleanup(rules.reset) - def load_backends(self): # ensure the cache region instance is setup - cache.configure_cache_region(cache.REGION) + cache.configure_cache() super(RestfulTestCase, self).load_backends() @@ -183,53 +300,42 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, try: self.resource_api.get_domain(DEFAULT_DOMAIN_ID) except exception.DomainNotFound: - domain = {'description': (u'Owns users and tenants (i.e. ' - u'projects) available on Identity ' - u'API v2.'), - 'enabled': True, - 'id': DEFAULT_DOMAIN_ID, - 'name': u'Default'} + domain = unit.new_domain_ref( + description=(u'The default domain'), + id=DEFAULT_DOMAIN_ID, + name=u'Default') self.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain) def load_sample_data(self): self._populate_default_domain() - self.domain_id = uuid.uuid4().hex - self.domain = self.new_domain_ref() - self.domain['id'] = self.domain_id + self.domain = unit.new_domain_ref() + self.domain_id = self.domain['id'] self.resource_api.create_domain(self.domain_id, self.domain) - self.project_id = uuid.uuid4().hex - self.project = self.new_project_ref( - domain_id=self.domain_id) - self.project['id'] = self.project_id - self.resource_api.create_project(self.project_id, self.project) + self.project = unit.new_project_ref(domain_id=self.domain_id) + self.project_id = self.project['id'] + self.project = self.resource_api.create_project(self.project_id, + self.project) - self.user = self.new_user_ref(domain_id=self.domain_id) - password = self.user['password'] - self.user = self.identity_api.create_user(self.user) - self.user['password'] = password + self.user = unit.create_user(self.identity_api, + domain_id=self.domain_id) self.user_id = self.user['id'] self.default_domain_project_id = uuid.uuid4().hex - self.default_domain_project = self.new_project_ref( + self.default_domain_project = unit.new_project_ref( domain_id=DEFAULT_DOMAIN_ID) self.default_domain_project['id'] = self.default_domain_project_id self.resource_api.create_project(self.default_domain_project_id, self.default_domain_project) - self.default_domain_user = self.new_user_ref( + self.default_domain_user = unit.create_user( + self.identity_api, domain_id=DEFAULT_DOMAIN_ID) - password = self.default_domain_user['password'] - self.default_domain_user = ( - self.identity_api.create_user(self.default_domain_user)) - self.default_domain_user['password'] = password self.default_domain_user_id = self.default_domain_user['id'] # create & grant policy.json's default role for admin_required - self.role_id = uuid.uuid4().hex - self.role = self.new_role_ref() - self.role['id'] = self.role_id - self.role['name'] = 'admin' + self.role = unit.new_role_ref(name='admin') + self.role_id = self.role['id'] self.role_api.create_role(self.role_id, self.role) self.assignment_api.add_role_to_user_and_project( self.user_id, self.project_id, self.role_id) @@ -240,81 +346,35 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, self.default_domain_user_id, self.project_id, self.role_id) - self.region_id = uuid.uuid4().hex - self.region = self.new_region_ref() - self.region['id'] = self.region_id - self.catalog_api.create_region( - self.region.copy()) - - self.service_id = uuid.uuid4().hex - self.service = self.new_service_ref() - self.service['id'] = self.service_id - self.catalog_api.create_service( - self.service_id, - self.service.copy()) - - self.endpoint_id = uuid.uuid4().hex - self.endpoint = self.new_endpoint_ref(service_id=self.service_id) - self.endpoint['id'] = self.endpoint_id - self.endpoint['region_id'] = self.region['id'] - self.catalog_api.create_endpoint( - self.endpoint_id, - self.endpoint.copy()) - # The server adds 'enabled' and defaults to True. - self.endpoint['enabled'] = True - - def new_ref(self): - """Populates a ref with attributes common to some API entities.""" - return unit.new_ref() - - def new_region_ref(self): - return unit.new_region_ref() - - def new_service_ref(self): - return unit.new_service_ref() - - def new_endpoint_ref(self, service_id, interface='public', **kwargs): - return unit.new_endpoint_ref( - service_id, interface=interface, default_region_id=self.region_id, - **kwargs) - - def new_domain_ref(self): - return unit.new_domain_ref() - - def new_project_ref(self, domain_id=None, parent_id=None, is_domain=False): - return unit.new_project_ref(domain_id=domain_id, parent_id=parent_id, - is_domain=is_domain) - - def new_user_ref(self, domain_id, project_id=None): - return unit.new_user_ref(domain_id, project_id=project_id) - - def new_group_ref(self, domain_id): - return unit.new_group_ref(domain_id) - - def new_credential_ref(self, user_id, project_id=None, cred_type=None): - return unit.new_credential_ref(user_id, project_id=project_id, - cred_type=cred_type) + # Create "req_admin" user for simulating a real user instead of the + # admin_token_auth middleware + self.user_reqadmin = unit.create_user(self.identity_api, + DEFAULT_DOMAIN_ID) + self.assignment_api.add_role_to_user_and_project( + self.user_reqadmin['id'], + self.default_domain_project_id, + self.role_id) - def new_role_ref(self): - return unit.new_role_ref() + self.region = unit.new_region_ref() + self.region_id = self.region['id'] + self.catalog_api.create_region(self.region) - def new_policy_ref(self): - return unit.new_policy_ref() + self.service = unit.new_service_ref() + self.service_id = self.service['id'] + self.catalog_api.create_service(self.service_id, self.service.copy()) - def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None, - impersonation=None, expires=None, role_ids=None, - role_names=None, remaining_uses=None, - allow_redelegation=False): - return unit.new_trust_ref( - trustor_user_id, trustee_user_id, project_id=project_id, - impersonation=impersonation, expires=expires, role_ids=role_ids, - role_names=role_names, remaining_uses=remaining_uses, - allow_redelegation=allow_redelegation) + self.endpoint = unit.new_endpoint_ref(service_id=self.service_id, + interface='public', + region_id=self.region_id) + self.endpoint_id = self.endpoint['id'] + self.catalog_api.create_endpoint(self.endpoint_id, + self.endpoint.copy()) + # The server adds 'enabled' and defaults to True. + self.endpoint['enabled'] = True def create_new_default_project_for_user(self, user_id, domain_id, enable_project=True): - ref = self.new_project_ref(domain_id=domain_id) - ref['enabled'] = enable_project + ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project) r = self.post('/projects', body={'project': ref}) project = self.assertValidProjectResponse(r, ref) # set the user's preferred project @@ -326,6 +386,34 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, return project + def get_admin_token(self): + """Convenience method so that we can test authenticated requests.""" + r = self.admin_request( + method='POST', + path='/v3/auth/tokens', + body={ + 'auth': { + 'identity': { + 'methods': ['password'], + 'password': { + 'user': { + 'name': self.user_reqadmin['name'], + 'password': self.user_reqadmin['password'], + 'domain': { + 'id': self.user_reqadmin['domain_id'] + } + } + } + }, + 'scope': { + 'project': { + 'id': self.default_domain_project_id, + } + } + } + }) + return r.headers.get('X-Subject-Token') + def get_unscoped_token(self): """Convenience method so that we can test authenticated requests.""" r = self.admin_request( @@ -407,11 +495,10 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, def get_requested_token(self, auth): """Request the specific token we want.""" - - r = self.v3_authenticate_token(auth) + r = self.v3_create_token(auth) return r.headers.get('X-Subject-Token') - def v3_authenticate_token(self, auth, expected_status=201): + def v3_create_token(self, auth, expected_status=http_client.CREATED): return self.admin_request(method='POST', path='/v3/auth/tokens', body=auth, @@ -440,42 +527,31 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, return self.admin_request(path=path, token=token, **kwargs) - def get(self, path, **kwargs): - r = self.v3_request(method='GET', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 200) - return r + def get(self, path, expected_status=http_client.OK, **kwargs): + return self.v3_request(path, method='GET', + expected_status=expected_status, **kwargs) - def head(self, path, **kwargs): - r = self.v3_request(method='HEAD', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) - self.assertEqual('', r.body) + def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + r = self.v3_request(path, method='HEAD', + expected_status=expected_status, **kwargs) + self.assertEqual(b'', r.body) return r - def post(self, path, **kwargs): - r = self.v3_request(method='POST', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 201) - return r + def post(self, path, expected_status=http_client.CREATED, **kwargs): + return self.v3_request(path, method='POST', + expected_status=expected_status, **kwargs) - def put(self, path, **kwargs): - r = self.v3_request(method='PUT', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) - return r + def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + return self.v3_request(path, method='PUT', + expected_status=expected_status, **kwargs) - def patch(self, path, **kwargs): - r = self.v3_request(method='PATCH', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 200) - return r + def patch(self, path, expected_status=http_client.OK, **kwargs): + return self.v3_request(path, method='PATCH', + expected_status=expected_status, **kwargs) - def delete(self, path, **kwargs): - r = self.v3_request(method='DELETE', path=path, **kwargs) - if 'expected_status' not in kwargs: - self.assertResponseStatus(r, 204) - return r + def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs): + return self.v3_request(path, method='DELETE', + expected_status=expected_status, **kwargs) def assertValidErrorResponse(self, r): resp = r.result @@ -582,7 +658,6 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, except Exception: msg = '%s is not a valid ISO 8601 extended format date time.' % dt raise AssertionError(msg) - self.assertIsInstance(dt, datetime.datetime) def assertValidTokenResponse(self, r, user=None): self.assertTrue(r.headers.get('X-Subject-Token')) @@ -611,11 +686,10 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, def assertValidUnscopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidTokenResponse(r, *args, **kwargs) - - self.assertNotIn('roles', token) - self.assertNotIn('catalog', token) - self.assertNotIn('project', token) - self.assertNotIn('domain', token) + validator_object = validators.SchemaValidator( + self.generate_token_schema() + ) + validator_object.validate(token) return token @@ -623,6 +697,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, require_catalog = kwargs.pop('require_catalog', True) endpoint_filter = kwargs.pop('endpoint_filter', False) ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0) + is_admin_project = kwargs.pop('is_admin_project', False) token = self.assertValidTokenResponse(r, *args, **kwargs) if require_catalog: @@ -650,40 +725,66 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, self.assertIn('id', role) self.assertIn('name', role) + if is_admin_project: + # NOTE(samueldmq): We want to explicitly test for boolean + self.assertIs(True, token['is_admin_project']) + else: + self.assertNotIn('is_admin_project', token) + return token def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidScopedTokenResponse(r, *args, **kwargs) - self.assertIn('project', token) - self.assertIn('id', token['project']) - self.assertIn('name', token['project']) - self.assertIn('domain', token['project']) - self.assertIn('id', token['project']['domain']) - self.assertIn('name', token['project']['domain']) + project_scoped_token_schema = self.generate_token_schema( + project_scoped=True) + + if token.get('OS-TRUST:trust'): + trust_properties = { + 'OS-TRUST:trust': { + 'type': ['object'], + 'required': ['id', 'impersonation', 'trustor_user', + 'trustee_user'], + 'properties': { + 'id': {'type': 'string'}, + 'impersonation': {'type': 'boolean'}, + 'trustor_user': { + 'type': 'object', + 'required': ['id'], + 'properties': { + 'id': {'type': 'string'} + }, + 'additionalProperties': False + }, + 'trustee_user': { + 'type': 'object', + 'required': ['id'], + 'properties': { + 'id': {'type': 'string'} + }, + 'additionalProperties': False + } + }, + 'additionalProperties': False + } + } + project_scoped_token_schema['properties'].update(trust_properties) + + validator_object = validators.SchemaValidator( + project_scoped_token_schema) + validator_object.validate(token) self.assertEqual(self.role_id, token['roles'][0]['id']) return token - def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs): - token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs) - - trust = token.get('OS-TRUST:trust') - self.assertIsNotNone(trust) - self.assertIsNotNone(trust.get('id')) - self.assertIsInstance(trust.get('impersonation'), bool) - self.assertIsNotNone(trust.get('trustor_user')) - self.assertIsNotNone(trust.get('trustee_user')) - self.assertIsNotNone(trust['trustor_user'].get('id')) - self.assertIsNotNone(trust['trustee_user'].get('id')) - def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs): token = self.assertValidScopedTokenResponse(r, *args, **kwargs) - self.assertIn('domain', token) - self.assertIn('id', token['domain']) - self.assertIn('name', token['domain']) + validator_object = validators.SchemaValidator( + self.generate_token_schema(domain_scoped=True) + ) + validator_object.validate(token) return token @@ -876,7 +977,6 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, **kwargs) def assertValidProject(self, entity, ref=None): - self.assertIsNotNone(entity.get('domain_id')) if ref: self.assertEqual(ref['domain_id'], entity['domain_id']) return entity @@ -888,6 +988,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, resp, 'users', self.assertValidUser, + keys_to_check=['name', 'enabled'], *args, **kwargs) @@ -896,6 +997,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, resp, 'user', self.assertValidUser, + keys_to_check=['name', 'enabled'], *args, **kwargs) @@ -920,6 +1022,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, resp, 'groups', self.assertValidGroup, + keys_to_check=['name', 'description', 'domain_id'], *args, **kwargs) @@ -928,6 +1031,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, resp, 'group', self.assertValidGroup, + keys_to_check=['name', 'description', 'domain_id'], *args, **kwargs) @@ -979,6 +1083,21 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, *args, **kwargs) + def assertRoleInListResponse(self, resp, ref, expected=1): + found_count = 0 + for entity in resp.result.get('roles'): + try: + self.assertValidRole(entity, ref=ref) + except Exception: + # It doesn't match, so let's go onto the next one + pass + else: + found_count += 1 + self.assertEqual(expected, found_count) + + def assertRoleNotInListResponse(self, resp, ref): + self.assertRoleInListResponse(resp, ref=ref, expected=0) + def assertValidRoleResponse(self, resp, *args, **kwargs): return self.assertValidResponse( resp, @@ -992,6 +1111,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, self.assertIsNotNone(entity.get('name')) if ref: self.assertEqual(ref['name'], entity['name']) + self.assertEqual(ref['domain_id'], entity['domain_id']) return entity # role assignment validation @@ -1161,6 +1281,27 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, return entity + # Service providers (federation) + + def assertValidServiceProvider(self, entity, ref=None, *args, **kwargs): + + attributes = frozenset(['auth_url', 'id', 'enabled', 'description', + 'links', 'relay_state_prefix', 'sp_url']) + for attribute in attributes: + self.assertIsNotNone(entity.get(attribute)) + + def assertValidServiceProviderListResponse(self, resp, *args, **kwargs): + if kwargs.get('keys_to_check') is None: + kwargs['keys_to_check'] = ['auth_url', 'id', 'enabled', + 'description', 'relay_state_prefix', + 'sp_url'] + return self.assertValidListResponse( + resp, + 'service_providers', + self.assertValidServiceProvider, + *args, + **kwargs) + def build_external_auth_request(self, remote_user, remote_domain=None, auth_data=None, kerberos=False): @@ -1182,24 +1323,81 @@ class VersionTestCase(RestfulTestCase): pass +# NOTE(morganfainberg): To be removed when admin_token_auth is removed. This +# has been split out to allow testing admin_token auth without enabling it +# for other tests. +class AuthContextMiddlewareAdminTokenTestCase(RestfulTestCase): + EXTENSION_TO_ADD = 'admin_token_auth' + + def config_overrides(self): + super(AuthContextMiddlewareAdminTokenTestCase, self).config_overrides() + self.config_fixture.config( + admin_token='ADMIN') + + # NOTE(morganfainberg): This is knowingly copied from below for simplicity + # during the deprecation cycle. + def _middleware_request(self, token, extra_environ=None): + + def application(environ, start_response): + body = b'body' + headers = [('Content-Type', 'text/html; charset=utf8'), + ('Content-Length', str(len(body)))] + start_response('200 OK', headers) + return [body] + + app = webtest.TestApp(middleware.AuthContextMiddleware(application), + extra_environ=extra_environ) + resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token}) + self.assertEqual('body', resp.text) # just to make sure it worked + return resp.request + + def test_admin_auth_context(self): + # test to make sure AuthContextMiddleware does not attempt to build the + # auth context if the admin_token middleware indicates it's admin + # already. + token_id = uuid.uuid4().hex # token doesn't matter. + # the admin_token middleware sets is_admin in the context. + extra_environ = {middleware.CONTEXT_ENV: {'is_admin': True}} + req = self._middleware_request(token_id, extra_environ) + auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV) + self.assertDictEqual({}, auth_context) + + @mock.patch.object(middleware_auth.versionutils, + 'report_deprecated_feature') + def test_admin_token_auth_context_deprecated(self, mock_report_deprecated): + # For backwards compatibility AuthContextMiddleware will check that the + # admin token (as configured in the CONF file) is present and not + # attempt to build the auth context. This is deprecated. + req = self._middleware_request('ADMIN') + auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV) + self.assertDictEqual({}, auth_context) + self.assertEqual(1, mock_report_deprecated.call_count) + + # NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py # because we need the token class AuthContextMiddlewareTestCase(RestfulTestCase): - def _mock_request_object(self, token_id): - class fake_req(object): - headers = {middleware.AUTH_TOKEN_HEADER: token_id} - environ = {} + def _middleware_request(self, token, extra_environ=None): + + def application(environ, start_response): + body = b'body' + headers = [('Content-Type', 'text/html; charset=utf8'), + ('Content-Length', str(len(body)))] + start_response('200 OK', headers) + return [body] - return fake_req() + app = webtest.TestApp(middleware.AuthContextMiddleware(application), + extra_environ=extra_environ) + resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token}) + self.assertEqual(b'body', resp.body) # just to make sure it worked + return resp.request def test_auth_context_build_by_middleware(self): # test to make sure AuthContextMiddleware successful build the auth # context from the incoming auth token admin_token = self.get_scoped_token() - req = self._mock_request_object(admin_token) - application = None - middleware.AuthContextMiddleware(application).process_request(req) + req = self._middleware_request(admin_token) self.assertEqual( self.user['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id']) @@ -1208,28 +1406,16 @@ class AuthContextMiddlewareTestCase(RestfulTestCase): overridden_context = 'OVERRIDDEN_CONTEXT' # this token should not be used token = uuid.uuid4().hex - req = self._mock_request_object(token) - req.environ[authorization.AUTH_CONTEXT_ENV] = overridden_context - application = None - middleware.AuthContextMiddleware(application).process_request(req) + + extra_environ = {authorization.AUTH_CONTEXT_ENV: overridden_context} + req = self._middleware_request(token, extra_environ=extra_environ) # make sure overridden context take precedence self.assertEqual(overridden_context, req.environ.get(authorization.AUTH_CONTEXT_ENV)) - def test_admin_token_auth_context(self): - # test to make sure AuthContextMiddleware does not attempt to build - # auth context if the incoming auth token is the special admin token - req = self._mock_request_object(CONF.admin_token) - application = None - middleware.AuthContextMiddleware(application).process_request(req) - self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV), - {}) - def test_unscoped_token_auth_context(self): unscoped_token = self.get_unscoped_token() - req = self._mock_request_object(unscoped_token) - application = None - middleware.AuthContextMiddleware(application).process_request(req) + req = self._middleware_request(unscoped_token) for key in ['project_id', 'domain_id', 'domain_name']: self.assertNotIn( key, @@ -1237,9 +1423,7 @@ class AuthContextMiddlewareTestCase(RestfulTestCase): def test_project_scoped_token_auth_context(self): project_scoped_token = self.get_scoped_token() - req = self._mock_request_object(project_scoped_token) - application = None - middleware.AuthContextMiddleware(application).process_request(req) + req = self._middleware_request(project_scoped_token) self.assertEqual( self.project['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id']) @@ -1251,9 +1435,7 @@ class AuthContextMiddlewareTestCase(RestfulTestCase): self.put(path=path) domain_scoped_token = self.get_domain_scoped_token() - req = self._mock_request_object(domain_scoped_token) - application = None - middleware.AuthContextMiddleware(application).process_request(req) + req = self._middleware_request(domain_scoped_token) self.assertEqual( self.domain['id'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id']) @@ -1261,6 +1443,30 @@ class AuthContextMiddlewareTestCase(RestfulTestCase): self.domain['name'], req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name']) + def test_oslo_context(self): + # After AuthContextMiddleware runs, an + # oslo_context.context.RequestContext was created so that its fields + # can be logged. This test validates that the RequestContext was + # created and the fields are set as expected. + + # Use a scoped token so more fields can be set. + token = self.get_scoped_token() + + # oslo_middleware RequestId middleware sets openstack.request_id. + request_id = uuid.uuid4().hex + environ = {'openstack.request_id': request_id} + self._middleware_request(token, extra_environ=environ) + + req_context = oslo_context.context.get_current() + self.assertEqual(request_id, req_context.request_id) + self.assertEqual(token, req_context.auth_token) + self.assertEqual(self.user['id'], req_context.user) + self.assertEqual(self.project['id'], req_context.tenant) + self.assertIsNone(req_context.domain) + self.assertEqual(self.user['domain_id'], req_context.user_domain) + self.assertEqual(self.project['domain_id'], req_context.project_domain) + self.assertFalse(req_context.is_admin) + class JsonHomeTestMixin(object): """JSON Home test @@ -1273,6 +1479,7 @@ class JsonHomeTestMixin(object): data must be in the response. """ + def test_get_json_home(self): resp = self.get('/', convert=False, headers={'Accept': 'application/json-home'}) @@ -1295,7 +1502,6 @@ class AssignmentTestMixin(object): Available filters are: domain_id, project_id, user_id, group_id, role_id and inherited_to_projects. """ - query_params = '?effective' if effective else '' for k, v in filters.items(): @@ -1320,7 +1526,6 @@ class AssignmentTestMixin(object): Provided attributes are expected to contain: domain_id or project_id, user_id or group_id, role_id and, optionally, inherited_to_projects. """ - if attribs.get('domain_id'): link = '/domains/' + attribs['domain_id'] else: @@ -1338,13 +1543,13 @@ class AssignmentTestMixin(object): return link - def build_role_assignment_entity(self, link=None, **attribs): + def build_role_assignment_entity( + self, link=None, prior_role_link=None, **attribs): """Build and return a role assignment entity with provided attributes. Provided attributes are expected to contain: domain_id or project_id, user_id or group_id, role_id and, optionally, inherited_to_projects. """ - entity = {'links': {'assignment': ( link or self.build_role_assignment_link(**attribs))}} @@ -1368,4 +1573,68 @@ class AssignmentTestMixin(object): if attribs.get('inherited_to_projects'): entity['scope']['OS-INHERIT:inherited_to'] = 'projects' + if prior_role_link: + entity['links']['prior_role'] = prior_role_link + + return entity + + def build_role_assignment_entity_include_names(self, + domain_ref=None, + role_ref=None, + group_ref=None, + user_ref=None, + project_ref=None, + inherited_assignment=None): + """Build and return a role assignment entity with provided attributes. + + The expected attributes are: domain_ref or project_ref, + user_ref or group_ref, role_ref and, optionally, inherited_to_projects. + """ + entity = {'links': {}} + attributes_for_links = {} + if project_ref: + dmn_name = self.resource_api.get_domain( + project_ref['domain_id'])['name'] + + entity['scope'] = {'project': { + 'id': project_ref['id'], + 'name': project_ref['name'], + 'domain': { + 'id': project_ref['domain_id'], + 'name': dmn_name}}} + attributes_for_links['project_id'] = project_ref['id'] + else: + entity['scope'] = {'domain': {'id': domain_ref['id'], + 'name': domain_ref['name']}} + attributes_for_links['domain_id'] = domain_ref['id'] + if user_ref: + dmn_name = self.resource_api.get_domain( + user_ref['domain_id'])['name'] + entity['user'] = {'id': user_ref['id'], + 'name': user_ref['name'], + 'domain': {'id': user_ref['domain_id'], + 'name': dmn_name}} + attributes_for_links['user_id'] = user_ref['id'] + else: + dmn_name = self.resource_api.get_domain( + group_ref['domain_id'])['name'] + entity['group'] = {'id': group_ref['id'], + 'name': group_ref['name'], + 'domain': { + 'id': group_ref['domain_id'], + 'name': dmn_name}} + attributes_for_links['group_id'] = group_ref['id'] + + if role_ref: + entity['role'] = {'id': role_ref['id'], + 'name': role_ref['name']} + attributes_for_links['role_id'] = role_ref['id'] + + if inherited_assignment: + entity['scope']['OS-INHERIT:inherited_to'] = 'projects' + attributes_for_links['inherited_to_projects'] = True + + entity['links']['assignment'] = self.build_role_assignment_link( + **attributes_for_links) + return entity |