diff options
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r-- | keystone-moon/keystone/tests/unit/test_v3.py | 1283 |
1 files changed, 1283 insertions, 0 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py new file mode 100644 index 00000000..f6d6ed93 --- /dev/null +++ b/keystone-moon/keystone/tests/unit/test_v3.py @@ -0,0 +1,1283 @@ +# Copyright 2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import datetime +import uuid + +from oslo_config import cfg +from oslo_serialization import jsonutils +from oslo_utils import timeutils +import six +from testtools import matchers + +from keystone import auth +from keystone.common import authorization +from keystone.common import cache +from keystone import exception +from keystone import middleware +from keystone.policy.backends import rules +from keystone.tests import unit as tests +from keystone.tests.unit import rest + + +CONF = cfg.CONF +DEFAULT_DOMAIN_ID = 'default' + +TIME_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ' + + +class AuthTestMixin(object): + """To hold auth building helper functions.""" + def build_auth_scope(self, project_id=None, project_name=None, + project_domain_id=None, project_domain_name=None, + domain_id=None, domain_name=None, trust_id=None, + unscoped=None): + scope_data = {} + if unscoped: + scope_data['unscoped'] = {} + if project_id or project_name: + scope_data['project'] = {} + if project_id: + scope_data['project']['id'] = project_id + else: + scope_data['project']['name'] = project_name + if project_domain_id or project_domain_name: + project_domain_json = {} + if project_domain_id: + project_domain_json['id'] = project_domain_id + else: + project_domain_json['name'] = project_domain_name + scope_data['project']['domain'] = project_domain_json + if domain_id or domain_name: + scope_data['domain'] = {} + if domain_id: + scope_data['domain']['id'] = domain_id + else: + scope_data['domain']['name'] = domain_name + if trust_id: + scope_data['OS-TRUST:trust'] = {} + scope_data['OS-TRUST:trust']['id'] = trust_id + return scope_data + + def build_password_auth(self, user_id=None, username=None, + user_domain_id=None, user_domain_name=None, + password=None): + password_data = {'user': {}} + if user_id: + password_data['user']['id'] = user_id + else: + password_data['user']['name'] = username + if user_domain_id or user_domain_name: + password_data['user']['domain'] = {} + if user_domain_id: + password_data['user']['domain']['id'] = user_domain_id + else: + password_data['user']['domain']['name'] = user_domain_name + password_data['user']['password'] = password + return password_data + + def build_token_auth(self, token): + return {'id': token} + + def build_authentication_request(self, token=None, user_id=None, + username=None, user_domain_id=None, + user_domain_name=None, password=None, + kerberos=False, **kwargs): + """Build auth dictionary. + + It will create an auth dictionary based on all the arguments + that it receives. + """ + auth_data = {} + auth_data['identity'] = {'methods': []} + if kerberos: + auth_data['identity']['methods'].append('kerberos') + auth_data['identity']['kerberos'] = {} + if token: + auth_data['identity']['methods'].append('token') + auth_data['identity']['token'] = self.build_token_auth(token) + if user_id or username: + auth_data['identity']['methods'].append('password') + auth_data['identity']['password'] = self.build_password_auth( + user_id, username, user_domain_id, user_domain_name, password) + if kwargs: + auth_data['scope'] = self.build_auth_scope(**kwargs) + return {'auth': auth_data} + + +class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase, + AuthTestMixin): + def config_files(self): + config_files = super(RestfulTestCase, self).config_files() + config_files.append(tests.dirs.tests_conf('backend_sql.conf')) + return config_files + + def get_extensions(self): + extensions = set(['revoke']) + if hasattr(self, 'EXTENSION_NAME'): + extensions.add(self.EXTENSION_NAME) + return extensions + + def generate_paste_config(self): + new_paste_file = None + try: + new_paste_file = tests.generate_paste_config(self.EXTENSION_TO_ADD) + except AttributeError: + # no need to report this error here, as most tests will not have + # EXTENSION_TO_ADD defined. + pass + finally: + return new_paste_file + + def remove_generated_paste_config(self): + try: + tests.remove_generated_paste_config(self.EXTENSION_TO_ADD) + except AttributeError: + pass + + def setUp(self, app_conf='keystone'): + """Setup for v3 Restful Test Cases. + + """ + new_paste_file = self.generate_paste_config() + self.addCleanup(self.remove_generated_paste_config) + if new_paste_file: + app_conf = 'config:%s' % (new_paste_file) + + super(RestfulTestCase, self).setUp(app_conf=app_conf) + + self.empty_context = {'environment': {}} + + # Initialize the policy engine and allow us to write to a temp + # file in each test to create the policies + rules.reset() + + # drop the policy rules + self.addCleanup(rules.reset) + + def load_backends(self): + # ensure the cache region instance is setup + cache.configure_cache_region(cache.REGION) + + super(RestfulTestCase, self).load_backends() + + def load_fixtures(self, fixtures): + self.load_sample_data() + + def _populate_default_domain(self): + if CONF.database.connection == tests.IN_MEM_DB_CONN_STRING: + # NOTE(morganfainberg): If an in-memory db is being used, be sure + # to populate the default domain, this is typically done by + # a migration, but the in-mem db uses model definitions to create + # the schema (no migrations are run). + try: + self.resource_api.get_domain(DEFAULT_DOMAIN_ID) + except exception.DomainNotFound: + domain = {'description': (u'Owns users and tenants (i.e. ' + u'projects) available on Identity ' + u'API v2.'), + 'enabled': True, + 'id': DEFAULT_DOMAIN_ID, + 'name': u'Default'} + self.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain) + + def load_sample_data(self): + self._populate_default_domain() + self.domain_id = uuid.uuid4().hex + self.domain = self.new_domain_ref() + self.domain['id'] = self.domain_id + self.resource_api.create_domain(self.domain_id, self.domain) + + self.project_id = uuid.uuid4().hex + self.project = self.new_project_ref( + domain_id=self.domain_id) + self.project['id'] = self.project_id + self.resource_api.create_project(self.project_id, self.project) + + self.user = self.new_user_ref(domain_id=self.domain_id) + password = self.user['password'] + self.user = self.identity_api.create_user(self.user) + self.user['password'] = password + self.user_id = self.user['id'] + + self.default_domain_project_id = uuid.uuid4().hex + self.default_domain_project = self.new_project_ref( + domain_id=DEFAULT_DOMAIN_ID) + self.default_domain_project['id'] = self.default_domain_project_id + self.resource_api.create_project(self.default_domain_project_id, + self.default_domain_project) + + self.default_domain_user = self.new_user_ref( + domain_id=DEFAULT_DOMAIN_ID) + password = self.default_domain_user['password'] + self.default_domain_user = ( + self.identity_api.create_user(self.default_domain_user)) + self.default_domain_user['password'] = password + self.default_domain_user_id = self.default_domain_user['id'] + + # create & grant policy.json's default role for admin_required + self.role_id = uuid.uuid4().hex + self.role = self.new_role_ref() + self.role['id'] = self.role_id + self.role['name'] = 'admin' + self.role_api.create_role(self.role_id, self.role) + self.assignment_api.add_role_to_user_and_project( + self.user_id, self.project_id, self.role_id) + self.assignment_api.add_role_to_user_and_project( + self.default_domain_user_id, self.default_domain_project_id, + self.role_id) + self.assignment_api.add_role_to_user_and_project( + self.default_domain_user_id, self.project_id, + self.role_id) + + self.region_id = uuid.uuid4().hex + self.region = self.new_region_ref() + self.region['id'] = self.region_id + self.catalog_api.create_region( + self.region.copy()) + + self.service_id = uuid.uuid4().hex + self.service = self.new_service_ref() + self.service['id'] = self.service_id + self.catalog_api.create_service( + self.service_id, + self.service.copy()) + + self.endpoint_id = uuid.uuid4().hex + self.endpoint = self.new_endpoint_ref(service_id=self.service_id) + self.endpoint['id'] = self.endpoint_id + self.endpoint['region_id'] = self.region['id'] + self.catalog_api.create_endpoint( + self.endpoint_id, + self.endpoint.copy()) + # The server adds 'enabled' and defaults to True. + self.endpoint['enabled'] = True + + def new_ref(self): + """Populates a ref with attributes common to all API entities.""" + return { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'description': uuid.uuid4().hex, + 'enabled': True} + + def new_region_ref(self): + ref = self.new_ref() + # Region doesn't have name or enabled. + del ref['name'] + del ref['enabled'] + ref['parent_region_id'] = None + return ref + + def new_service_ref(self): + ref = self.new_ref() + ref['type'] = uuid.uuid4().hex + return ref + + def new_endpoint_ref(self, service_id, interface='public', **kwargs): + ref = self.new_ref() + del ref['enabled'] # enabled is optional + ref['interface'] = interface + ref['service_id'] = service_id + ref['url'] = 'https://' + uuid.uuid4().hex + '.com' + ref['region_id'] = self.region_id + ref.update(kwargs) + return ref + + def new_domain_ref(self): + ref = self.new_ref() + return ref + + def new_project_ref(self, domain_id, parent_id=None): + ref = self.new_ref() + ref['domain_id'] = domain_id + ref['parent_id'] = parent_id + return ref + + def new_user_ref(self, domain_id, project_id=None): + ref = self.new_ref() + ref['domain_id'] = domain_id + ref['email'] = uuid.uuid4().hex + ref['password'] = uuid.uuid4().hex + if project_id: + ref['default_project_id'] = project_id + return ref + + def new_group_ref(self, domain_id): + ref = self.new_ref() + ref['domain_id'] = domain_id + return ref + + def new_credential_ref(self, user_id, project_id=None, cred_type=None): + ref = dict() + ref['id'] = uuid.uuid4().hex + ref['user_id'] = user_id + if cred_type == 'ec2': + ref['type'] = 'ec2' + ref['blob'] = {'blah': 'test'} + else: + ref['type'] = 'cert' + ref['blob'] = uuid.uuid4().hex + if project_id: + ref['project_id'] = project_id + return ref + + def new_role_ref(self): + ref = self.new_ref() + # Roles don't have a description or the enabled flag + del ref['description'] + del ref['enabled'] + return ref + + def new_policy_ref(self): + ref = self.new_ref() + ref['blob'] = uuid.uuid4().hex + ref['type'] = uuid.uuid4().hex + return ref + + def new_trust_ref(self, trustor_user_id, trustee_user_id, project_id=None, + impersonation=None, expires=None, role_ids=None, + role_names=None, remaining_uses=None, + allow_redelegation=False): + ref = dict() + ref['id'] = uuid.uuid4().hex + ref['trustor_user_id'] = trustor_user_id + ref['trustee_user_id'] = trustee_user_id + ref['impersonation'] = impersonation or False + ref['project_id'] = project_id + ref['remaining_uses'] = remaining_uses + ref['allow_redelegation'] = allow_redelegation + + if isinstance(expires, six.string_types): + ref['expires_at'] = expires + elif isinstance(expires, dict): + ref['expires_at'] = timeutils.strtime( + timeutils.utcnow() + datetime.timedelta(**expires), + fmt=TIME_FORMAT) + elif expires is None: + pass + else: + raise NotImplementedError('Unexpected value for "expires"') + + role_ids = role_ids or [] + role_names = role_names or [] + if role_ids or role_names: + ref['roles'] = [] + for role_id in role_ids: + ref['roles'].append({'id': role_id}) + for role_name in role_names: + ref['roles'].append({'name': role_name}) + + return ref + + def create_new_default_project_for_user(self, user_id, domain_id, + enable_project=True): + ref = self.new_project_ref(domain_id=domain_id) + ref['enabled'] = enable_project + r = self.post('/projects', body={'project': ref}) + project = self.assertValidProjectResponse(r, ref) + # set the user's preferred project + body = {'user': {'default_project_id': project['id']}} + r = self.patch('/users/%(user_id)s' % { + 'user_id': user_id}, + body=body) + self.assertValidUserResponse(r) + + return project + + def get_scoped_token(self): + """Convenience method so that we can test authenticated requests.""" + r = self.admin_request( + method='POST', + path='/v3/auth/tokens', + body={ + 'auth': { + 'identity': { + 'methods': ['password'], + 'password': { + 'user': { + 'name': self.user['name'], + 'password': self.user['password'], + 'domain': { + 'id': self.user['domain_id'] + } + } + } + }, + 'scope': { + 'project': { + 'id': self.project['id'], + } + } + } + }) + return r.headers.get('X-Subject-Token') + + def get_requested_token(self, auth): + """Request the specific token we want.""" + + r = self.v3_authenticate_token(auth) + return r.headers.get('X-Subject-Token') + + def v3_authenticate_token(self, auth, expected_status=201): + return self.admin_request(method='POST', + path='/v3/auth/tokens', + body=auth, + expected_status=expected_status) + + def v3_noauth_request(self, path, **kwargs): + # request does not require auth token header + path = '/v3' + path + return self.admin_request(path=path, **kwargs) + + def v3_request(self, path, **kwargs): + # check to see if caller requires token for the API call. + if kwargs.pop('noauth', None): + return self.v3_noauth_request(path, **kwargs) + + # Check if the caller has passed in auth details for + # use in requesting the token + auth_arg = kwargs.pop('auth', None) + if auth_arg: + token = self.get_requested_token(auth_arg) + else: + token = kwargs.pop('token', None) + if not token: + token = self.get_scoped_token() + path = '/v3' + path + + return self.admin_request(path=path, token=token, **kwargs) + + def get(self, path, **kwargs): + r = self.v3_request(method='GET', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 200) + return r + + def head(self, path, **kwargs): + r = self.v3_request(method='HEAD', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) + self.assertEqual('', r.body) + return r + + def post(self, path, **kwargs): + r = self.v3_request(method='POST', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 201) + return r + + def put(self, path, **kwargs): + r = self.v3_request(method='PUT', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) + return r + + def patch(self, path, **kwargs): + r = self.v3_request(method='PATCH', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 200) + return r + + def delete(self, path, **kwargs): + r = self.v3_request(method='DELETE', path=path, **kwargs) + if 'expected_status' not in kwargs: + self.assertResponseStatus(r, 204) + return r + + def assertValidErrorResponse(self, r): + resp = r.result + self.assertIsNotNone(resp.get('error')) + self.assertIsNotNone(resp['error'].get('code')) + self.assertIsNotNone(resp['error'].get('title')) + self.assertIsNotNone(resp['error'].get('message')) + self.assertEqual(int(resp['error']['code']), r.status_code) + + def assertValidListLinks(self, links, resource_url=None): + self.assertIsNotNone(links) + self.assertIsNotNone(links.get('self')) + self.assertThat(links['self'], matchers.StartsWith('http://localhost')) + + if resource_url: + self.assertThat(links['self'], matchers.EndsWith(resource_url)) + + self.assertIn('next', links) + if links['next'] is not None: + self.assertThat(links['next'], + matchers.StartsWith('http://localhost')) + + self.assertIn('previous', links) + if links['previous'] is not None: + self.assertThat(links['previous'], + matchers.StartsWith('http://localhost')) + + def assertValidListResponse(self, resp, key, entity_validator, ref=None, + expected_length=None, keys_to_check=None, + resource_url=None): + """Make assertions common to all API list responses. + + If a reference is provided, it's ID will be searched for in the + response, and asserted to be equal. + + """ + entities = resp.result.get(key) + self.assertIsNotNone(entities) + + if expected_length is not None: + self.assertEqual(expected_length, len(entities)) + elif ref is not None: + # we're at least expecting the ref + self.assertNotEmpty(entities) + + # collections should have relational links + self.assertValidListLinks(resp.result.get('links'), + resource_url=resource_url) + + for entity in entities: + self.assertIsNotNone(entity) + self.assertValidEntity(entity, keys_to_check=keys_to_check) + entity_validator(entity) + if ref: + entity = [x for x in entities if x['id'] == ref['id']][0] + self.assertValidEntity(entity, ref=ref, + keys_to_check=keys_to_check) + entity_validator(entity, ref) + return entities + + def assertValidResponse(self, resp, key, entity_validator, *args, + **kwargs): + """Make assertions common to all API responses.""" + entity = resp.result.get(key) + self.assertIsNotNone(entity) + keys = kwargs.pop('keys_to_check', None) + self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs) + entity_validator(entity, *args, **kwargs) + return entity + + def assertValidEntity(self, entity, ref=None, keys_to_check=None): + """Make assertions common to all API entities. + + If a reference is provided, the entity will also be compared against + the reference. + """ + if keys_to_check is not None: + keys = keys_to_check + else: + keys = ['name', 'description', 'enabled'] + + for k in ['id'] + keys: + msg = '%s unexpectedly None in %s' % (k, entity) + self.assertIsNotNone(entity.get(k), msg) + + self.assertIsNotNone(entity.get('links')) + self.assertIsNotNone(entity['links'].get('self')) + self.assertThat(entity['links']['self'], + matchers.StartsWith('http://localhost')) + self.assertIn(entity['id'], entity['links']['self']) + + if ref: + for k in keys: + msg = '%s not equal: %s != %s' % (k, ref[k], entity[k]) + self.assertEqual(ref[k], entity[k]) + + return entity + + def assertDictContainsSubset(self, expected, actual): + """"Asserts if dictionary actual is a superset of expected. + + Tests whether the key/value pairs in dictionary actual are a superset + of those in expected. + + """ + for k, v in expected.iteritems(): + self.assertIn(k, actual) + if isinstance(v, dict): + self.assertDictContainsSubset(v, actual[k]) + else: + self.assertEqual(v, actual[k]) + + # auth validation + + def assertValidISO8601ExtendedFormatDatetime(self, dt): + try: + return timeutils.parse_strtime(dt, fmt=TIME_FORMAT) + except Exception: + msg = '%s is not a valid ISO 8601 extended format date time.' % dt + raise AssertionError(msg) + self.assertIsInstance(dt, datetime.datetime) + + def assertValidTokenResponse(self, r, user=None): + self.assertTrue(r.headers.get('X-Subject-Token')) + token = r.result['token'] + + self.assertIsNotNone(token.get('expires_at')) + expires_at = self.assertValidISO8601ExtendedFormatDatetime( + token['expires_at']) + self.assertIsNotNone(token.get('issued_at')) + issued_at = self.assertValidISO8601ExtendedFormatDatetime( + token['issued_at']) + self.assertTrue(issued_at < expires_at) + + self.assertIn('user', token) + self.assertIn('id', token['user']) + self.assertIn('name', token['user']) + self.assertIn('domain', token['user']) + self.assertIn('id', token['user']['domain']) + + if user is not None: + self.assertEqual(user['id'], token['user']['id']) + self.assertEqual(user['name'], token['user']['name']) + self.assertEqual(user['domain_id'], token['user']['domain']['id']) + + return token + + def assertValidUnscopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidTokenResponse(r, *args, **kwargs) + + self.assertNotIn('roles', token) + self.assertNotIn('catalog', token) + self.assertNotIn('project', token) + self.assertNotIn('domain', token) + + return token + + def assertValidScopedTokenResponse(self, r, *args, **kwargs): + require_catalog = kwargs.pop('require_catalog', True) + endpoint_filter = kwargs.pop('endpoint_filter', False) + ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0) + token = self.assertValidTokenResponse(r, *args, **kwargs) + + if require_catalog: + endpoint_num = 0 + self.assertIn('catalog', token) + + if isinstance(token['catalog'], list): + # only test JSON + for service in token['catalog']: + for endpoint in service['endpoints']: + self.assertNotIn('enabled', endpoint) + self.assertNotIn('legacy_endpoint_id', endpoint) + self.assertNotIn('service_id', endpoint) + endpoint_num += 1 + + # sub test for the OS-EP-FILTER extension enabled + if endpoint_filter: + self.assertEqual(ep_filter_assoc, endpoint_num) + else: + self.assertNotIn('catalog', token) + + self.assertIn('roles', token) + self.assertTrue(token['roles']) + for role in token['roles']: + self.assertIn('id', role) + self.assertIn('name', role) + + return token + + def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('project', token) + self.assertIn('id', token['project']) + self.assertIn('name', token['project']) + self.assertIn('domain', token['project']) + self.assertIn('id', token['project']['domain']) + self.assertIn('name', token['project']['domain']) + + self.assertEqual(self.role_id, token['roles'][0]['id']) + + return token + + def assertValidProjectTrustScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidProjectScopedTokenResponse(r, *args, **kwargs) + + trust = token.get('OS-TRUST:trust') + self.assertIsNotNone(trust) + self.assertIsNotNone(trust.get('id')) + self.assertIsInstance(trust.get('impersonation'), bool) + self.assertIsNotNone(trust.get('trustor_user')) + self.assertIsNotNone(trust.get('trustee_user')) + self.assertIsNotNone(trust['trustor_user'].get('id')) + self.assertIsNotNone(trust['trustee_user'].get('id')) + + def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs): + token = self.assertValidScopedTokenResponse(r, *args, **kwargs) + + self.assertIn('domain', token) + self.assertIn('id', token['domain']) + self.assertIn('name', token['domain']) + + return token + + def assertEqualTokens(self, a, b): + """Assert that two tokens are equal. + + Compare two tokens except for their ids. This also truncates + the time in the comparison. + """ + def normalize(token): + del token['token']['expires_at'] + del token['token']['issued_at'] + return token + + a_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['expires_at']) + b_expires_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['expires_at']) + self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at) + + a_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + a['token']['issued_at']) + b_issued_at = self.assertValidISO8601ExtendedFormatDatetime( + b['token']['issued_at']) + self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at) + + return self.assertDictEqual(normalize(a), normalize(b)) + + # catalog validation + + def assertValidCatalogResponse(self, resp, *args, **kwargs): + self.assertEqual(set(['catalog', 'links']), set(resp.json.keys())) + self.assertValidCatalog(resp.json['catalog']) + self.assertIn('links', resp.json) + self.assertIsInstance(resp.json['links'], dict) + self.assertEqual(['self'], resp.json['links'].keys()) + self.assertEqual( + 'http://localhost/v3/auth/catalog', + resp.json['links']['self']) + + def assertValidCatalog(self, entity): + self.assertIsInstance(entity, list) + self.assertTrue(len(entity) > 0) + for service in entity: + self.assertIsNotNone(service.get('id')) + self.assertIsNotNone(service.get('name')) + self.assertIsNotNone(service.get('type')) + self.assertNotIn('enabled', service) + self.assertTrue(len(service['endpoints']) > 0) + for endpoint in service['endpoints']: + self.assertIsNotNone(endpoint.get('id')) + self.assertIsNotNone(endpoint.get('interface')) + self.assertIsNotNone(endpoint.get('url')) + self.assertNotIn('enabled', endpoint) + self.assertNotIn('legacy_endpoint_id', endpoint) + self.assertNotIn('service_id', endpoint) + + # region validation + + def assertValidRegionListResponse(self, resp, *args, **kwargs): + # NOTE(jaypipes): I have to pass in a blank keys_to_check parameter + # below otherwise the base assertValidEntity method + # tries to find a "name" and an "enabled" key in the + # returned ref dicts. The issue is, I don't understand + # how the service and endpoint entity assertions below + # actually work (they don't raise assertions), since + # AFAICT, the service and endpoint tables don't have + # a "name" column either... :( + return self.assertValidListResponse( + resp, + 'regions', + self.assertValidRegion, + keys_to_check=[], + *args, + **kwargs) + + def assertValidRegionResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'region', + self.assertValidRegion, + keys_to_check=[], + *args, + **kwargs) + + def assertValidRegion(self, entity, ref=None): + self.assertIsNotNone(entity.get('description')) + if ref: + self.assertEqual(ref['description'], entity['description']) + return entity + + # service validation + + def assertValidServiceListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'services', + self.assertValidService, + *args, + **kwargs) + + def assertValidServiceResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'service', + self.assertValidService, + *args, + **kwargs) + + def assertValidService(self, entity, ref=None): + self.assertIsNotNone(entity.get('type')) + self.assertIsInstance(entity.get('enabled'), bool) + if ref: + self.assertEqual(ref['type'], entity['type']) + return entity + + # endpoint validation + + def assertValidEndpointListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'endpoints', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpointResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'endpoint', + self.assertValidEndpoint, + *args, + **kwargs) + + def assertValidEndpoint(self, entity, ref=None): + self.assertIsNotNone(entity.get('interface')) + self.assertIsNotNone(entity.get('service_id')) + self.assertIsInstance(entity['enabled'], bool) + + # this is intended to be an unexposed implementation detail + self.assertNotIn('legacy_endpoint_id', entity) + + if ref: + self.assertEqual(ref['interface'], entity['interface']) + self.assertEqual(ref['service_id'], entity['service_id']) + if ref.get('region') is not None: + self.assertEqual(ref['region_id'], entity.get('region_id')) + + return entity + + # domain validation + + def assertValidDomainListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'domains', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomainResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'domain', + self.assertValidDomain, + *args, + **kwargs) + + def assertValidDomain(self, entity, ref=None): + if ref: + pass + return entity + + # project validation + + def assertValidProjectListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'projects', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProjectResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'project', + self.assertValidProject, + *args, + **kwargs) + + def assertValidProject(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + return entity + + # user validation + + def assertValidUserListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'users', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUserResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'user', + self.assertValidUser, + *args, + **kwargs) + + def assertValidUser(self, entity, ref=None): + self.assertIsNotNone(entity.get('domain_id')) + self.assertIsNotNone(entity.get('email')) + self.assertIsNone(entity.get('password')) + self.assertNotIn('tenantId', entity) + if ref: + self.assertEqual(ref['domain_id'], entity['domain_id']) + self.assertEqual(ref['email'], entity['email']) + if 'default_project_id' in ref: + self.assertIsNotNone(ref['default_project_id']) + self.assertEqual(ref['default_project_id'], + entity['default_project_id']) + return entity + + # group validation + + def assertValidGroupListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'groups', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroupResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'group', + self.assertValidGroup, + *args, + **kwargs) + + def assertValidGroup(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # credential validation + + def assertValidCredentialListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'credentials', + self.assertValidCredential, + keys_to_check=['blob', 'user_id', 'type'], + *args, + **kwargs) + + def assertValidCredentialResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'credential', + self.assertValidCredential, + keys_to_check=['blob', 'user_id', 'type'], + *args, + **kwargs) + + def assertValidCredential(self, entity, ref=None): + self.assertIsNotNone(entity.get('user_id')) + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['user_id'], entity['user_id']) + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + self.assertEqual(ref.get('project_id'), entity.get('project_id')) + return entity + + # role validation + + def assertValidRoleListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'roles', + self.assertValidRole, + keys_to_check=['name'], + *args, + **kwargs) + + def assertValidRoleResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'role', + self.assertValidRole, + keys_to_check=['name'], + *args, + **kwargs) + + def assertValidRole(self, entity, ref=None): + self.assertIsNotNone(entity.get('name')) + if ref: + self.assertEqual(ref['name'], entity['name']) + return entity + + # role assignment validation + + def assertValidRoleAssignmentListResponse(self, resp, expected_length=None, + resource_url=None): + entities = resp.result.get('role_assignments') + + if expected_length: + self.assertEqual(expected_length, len(entities)) + + # Collections should have relational links + self.assertValidListLinks(resp.result.get('links'), + resource_url=resource_url) + + for entity in entities: + self.assertIsNotNone(entity) + self.assertValidRoleAssignment(entity) + return entities + + def assertValidRoleAssignment(self, entity, ref=None): + # A role should be present + self.assertIsNotNone(entity.get('role')) + self.assertIsNotNone(entity['role'].get('id')) + + # Only one of user or group should be present + if entity.get('user'): + self.assertIsNone(entity.get('group')) + self.assertIsNotNone(entity['user'].get('id')) + else: + self.assertIsNotNone(entity.get('group')) + self.assertIsNotNone(entity['group'].get('id')) + + # A scope should be present and have only one of domain or project + self.assertIsNotNone(entity.get('scope')) + + if entity['scope'].get('project'): + self.assertIsNone(entity['scope'].get('domain')) + self.assertIsNotNone(entity['scope']['project'].get('id')) + else: + self.assertIsNotNone(entity['scope'].get('domain')) + self.assertIsNotNone(entity['scope']['domain'].get('id')) + + # An assignment link should be present + self.assertIsNotNone(entity.get('links')) + self.assertIsNotNone(entity['links'].get('assignment')) + + if ref: + links = ref.pop('links') + try: + self.assertDictContainsSubset(ref, entity) + self.assertIn(links['assignment'], + entity['links']['assignment']) + finally: + if links: + ref['links'] = links + + def assertRoleAssignmentInListResponse(self, resp, ref, expected=1): + + found_count = 0 + for entity in resp.result.get('role_assignments'): + try: + self.assertValidRoleAssignment(entity, ref=ref) + except Exception: + # It doesn't match, so let's go onto the next one + pass + else: + found_count += 1 + self.assertEqual(expected, found_count) + + def assertRoleAssignmentNotInListResponse(self, resp, ref): + self.assertRoleAssignmentInListResponse(resp, ref=ref, expected=0) + + # policy validation + + def assertValidPolicyListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'policies', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicyResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'policy', + self.assertValidPolicy, + *args, + **kwargs) + + def assertValidPolicy(self, entity, ref=None): + self.assertIsNotNone(entity.get('blob')) + self.assertIsNotNone(entity.get('type')) + if ref: + self.assertEqual(ref['blob'], entity['blob']) + self.assertEqual(ref['type'], entity['type']) + return entity + + # trust validation + + def assertValidTrustListResponse(self, resp, *args, **kwargs): + return self.assertValidListResponse( + resp, + 'trusts', + self.assertValidTrustSummary, + keys_to_check=['trustor_user_id', + 'trustee_user_id', + 'impersonation'], + *args, + **kwargs) + + def assertValidTrustResponse(self, resp, *args, **kwargs): + return self.assertValidResponse( + resp, + 'trust', + self.assertValidTrust, + keys_to_check=['trustor_user_id', + 'trustee_user_id', + 'impersonation'], + *args, + **kwargs) + + def assertValidTrustSummary(self, entity, ref=None): + return self.assertValidTrust(entity, ref, summary=True) + + def assertValidTrust(self, entity, ref=None, summary=False): + self.assertIsNotNone(entity.get('trustor_user_id')) + self.assertIsNotNone(entity.get('trustee_user_id')) + self.assertIsNotNone(entity.get('impersonation')) + + self.assertIn('expires_at', entity) + if entity['expires_at'] is not None: + self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at']) + + if summary: + # Trust list contains no roles, but getting a specific + # trust by ID provides the detailed response containing roles + self.assertNotIn('roles', entity) + self.assertIn('project_id', entity) + else: + for role in entity['roles']: + self.assertIsNotNone(role) + self.assertValidEntity(role, keys_to_check=['name']) + self.assertValidRole(role) + + self.assertValidListLinks(entity.get('roles_links')) + + # always disallow role xor project_id (neither or both is allowed) + has_roles = bool(entity.get('roles')) + has_project = bool(entity.get('project_id')) + self.assertFalse(has_roles ^ has_project) + + if ref: + self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id']) + self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id']) + self.assertEqual(ref['project_id'], entity['project_id']) + if entity.get('expires_at') or ref.get('expires_at'): + entity_exp = self.assertValidISO8601ExtendedFormatDatetime( + entity['expires_at']) + ref_exp = self.assertValidISO8601ExtendedFormatDatetime( + ref['expires_at']) + self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp) + else: + self.assertEqual(ref.get('expires_at'), + entity.get('expires_at')) + + return entity + + def build_external_auth_request(self, remote_user, + remote_domain=None, auth_data=None, + kerberos=False): + context = {'environment': {'REMOTE_USER': remote_user, + 'AUTH_TYPE': 'Negotiate'}} + if remote_domain: + context['environment']['REMOTE_DOMAIN'] = remote_domain + if not auth_data: + auth_data = self.build_authentication_request( + kerberos=kerberos)['auth'] + no_context = None + auth_info = auth.controllers.AuthInfo.create(no_context, auth_data) + auth_context = {'extras': {}, 'method_names': []} + return context, auth_info, auth_context + + +class VersionTestCase(RestfulTestCase): + def test_get_version(self): + pass + + +# NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py +# because we need the token +class AuthContextMiddlewareTestCase(RestfulTestCase): + def _mock_request_object(self, token_id): + + class fake_req(object): + headers = {middleware.AUTH_TOKEN_HEADER: token_id} + environ = {} + + return fake_req() + + def test_auth_context_build_by_middleware(self): + # test to make sure AuthContextMiddleware successful build the auth + # context from the incoming auth token + admin_token = self.get_scoped_token() + req = self._mock_request_object(admin_token) + application = None + middleware.AuthContextMiddleware(application).process_request(req) + self.assertEqual( + self.user['id'], + req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id']) + + def test_auth_context_override(self): + overridden_context = 'OVERRIDDEN_CONTEXT' + # this token should not be used + token = uuid.uuid4().hex + req = self._mock_request_object(token) + req.environ[authorization.AUTH_CONTEXT_ENV] = overridden_context + application = None + middleware.AuthContextMiddleware(application).process_request(req) + # make sure overridden context take precedence + self.assertEqual(overridden_context, + req.environ.get(authorization.AUTH_CONTEXT_ENV)) + + def test_admin_token_auth_context(self): + # test to make sure AuthContextMiddleware does not attempt to build + # auth context if the incoming auth token is the special admin token + req = self._mock_request_object(CONF.admin_token) + application = None + middleware.AuthContextMiddleware(application).process_request(req) + self.assertDictEqual(req.environ.get(authorization.AUTH_CONTEXT_ENV), + {}) + + +class JsonHomeTestMixin(object): + """JSON Home test + + Mixin this class to provide a test for the JSON-Home response for an + extension. + + The base class must set JSON_HOME_DATA to a dict of relationship URLs + (rels) to the JSON-Home data for the relationship. The rels and associated + data must be in the response. + + """ + def test_get_json_home(self): + resp = self.get('/', convert=False, + headers={'Accept': 'application/json-home'}) + self.assertThat(resp.headers['Content-Type'], + matchers.Equals('application/json-home')) + resp_data = jsonutils.loads(resp.body) + + # Check that the example relationships are present. + for rel in self.JSON_HOME_DATA: + self.assertThat(resp_data['resources'][rel], + matchers.Equals(self.JSON_HOME_DATA[rel])) |