aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_v3.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_v3.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_v3.py1640
1 files changed, 0 insertions, 1640 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_v3.py b/keystone-moon/keystone/tests/unit/test_v3.py
deleted file mode 100644
index 216d8c79..00000000
--- a/keystone-moon/keystone/tests/unit/test_v3.py
+++ /dev/null
@@ -1,1640 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-import mock
-from oslo_config import cfg
-import oslo_context.context
-from oslo_serialization import jsonutils
-from oslo_utils import timeutils
-from six.moves import http_client
-from testtools import matchers
-import webtest
-
-from keystone import auth
-from keystone.common import authorization
-from keystone.common import cache
-from keystone.common.validation import validators
-from keystone import exception
-from keystone import middleware
-from keystone.middleware import auth as middleware_auth
-from keystone.tests.common import auth as common_auth
-from keystone.tests import unit
-from keystone.tests.unit import rest
-
-
-CONF = cfg.CONF
-DEFAULT_DOMAIN_ID = 'default'
-
-TIME_FORMAT = unit.TIME_FORMAT
-
-
-class AuthTestMixin(object):
- """To hold auth building helper functions."""
-
- def build_auth_scope(self, project_id=None, project_name=None,
- project_domain_id=None, project_domain_name=None,
- domain_id=None, domain_name=None, trust_id=None,
- unscoped=None):
- scope_data = {}
- if unscoped:
- scope_data['unscoped'] = {}
- if project_id or project_name:
- scope_data['project'] = {}
- if project_id:
- scope_data['project']['id'] = project_id
- else:
- scope_data['project']['name'] = project_name
- if project_domain_id or project_domain_name:
- project_domain_json = {}
- if project_domain_id:
- project_domain_json['id'] = project_domain_id
- else:
- project_domain_json['name'] = project_domain_name
- scope_data['project']['domain'] = project_domain_json
- if domain_id or domain_name:
- scope_data['domain'] = {}
- if domain_id:
- scope_data['domain']['id'] = domain_id
- else:
- scope_data['domain']['name'] = domain_name
- if trust_id:
- scope_data['OS-TRUST:trust'] = {}
- scope_data['OS-TRUST:trust']['id'] = trust_id
- return scope_data
-
- def build_password_auth(self, user_id=None, username=None,
- user_domain_id=None, user_domain_name=None,
- password=None):
- password_data = {'user': {}}
- if user_id:
- password_data['user']['id'] = user_id
- else:
- password_data['user']['name'] = username
- if user_domain_id or user_domain_name:
- password_data['user']['domain'] = {}
- if user_domain_id:
- password_data['user']['domain']['id'] = user_domain_id
- else:
- password_data['user']['domain']['name'] = user_domain_name
- password_data['user']['password'] = password
- return password_data
-
- def build_token_auth(self, token):
- return {'id': token}
-
- def build_authentication_request(self, token=None, user_id=None,
- username=None, user_domain_id=None,
- user_domain_name=None, password=None,
- kerberos=False, **kwargs):
- """Build auth dictionary.
-
- It will create an auth dictionary based on all the arguments
- that it receives.
- """
- auth_data = {}
- auth_data['identity'] = {'methods': []}
- if kerberos:
- auth_data['identity']['methods'].append('kerberos')
- auth_data['identity']['kerberos'] = {}
- if token:
- auth_data['identity']['methods'].append('token')
- auth_data['identity']['token'] = self.build_token_auth(token)
- if user_id or username:
- auth_data['identity']['methods'].append('password')
- auth_data['identity']['password'] = self.build_password_auth(
- user_id, username, user_domain_id, user_domain_name, password)
- if kwargs:
- auth_data['scope'] = self.build_auth_scope(**kwargs)
- return {'auth': auth_data}
-
-
-class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
- common_auth.AuthTestMixin):
-
- def generate_token_schema(self, domain_scoped=False, project_scoped=False):
- """Return a dictionary of token properties to validate against."""
- properties = {
- 'audit_ids': {
- 'type': 'array',
- 'items': {
- 'type': 'string',
- },
- 'minItems': 1,
- 'maxItems': 2,
- },
- 'bind': {
- 'type': 'object',
- 'properties': {
- 'kerberos': {
- 'type': 'string',
- },
- },
- 'required': ['kerberos'],
- 'additionalProperties': False,
- },
- 'expires_at': {'type': 'string'},
- 'issued_at': {'type': 'string'},
- 'methods': {
- 'type': 'array',
- 'items': {
- 'type': 'string',
- },
- },
- 'user': {
- 'type': 'object',
- 'required': ['id', 'name', 'domain'],
- 'properties': {
- 'id': {'type': 'string'},
- 'name': {'type': 'string'},
- 'domain': {
- 'type': 'object',
- 'properties': {
- 'id': {'type': 'string'},
- 'name': {'type': 'string'}
- },
- 'required': ['id', 'name'],
- 'additonalProperties': False,
- }
- },
- 'additionalProperties': False,
- }
- }
-
- if domain_scoped:
- properties['catalog'] = {'type': 'array'}
- properties['roles'] = {
- 'type': 'array',
- 'items': {
- 'type': 'object',
- 'properties': {
- 'id': {'type': 'string', },
- 'name': {'type': 'string', },
- },
- 'required': ['id', 'name', ],
- 'additionalProperties': False,
- },
- 'minItems': 1,
- }
- properties['domain'] = {
- 'domain': {
- 'type': 'object',
- 'required': ['id', 'name'],
- 'properties': {
- 'id': {'type': 'string'},
- 'name': {'type': 'string'}
- },
- 'additionalProperties': False
- }
- }
- elif project_scoped:
- properties['is_admin_project'] = {'type': 'boolean'}
- properties['catalog'] = {'type': 'array'}
- properties['roles'] = {'type': 'array'}
- properties['project'] = {
- 'type': ['object'],
- 'required': ['id', 'name', 'domain'],
- 'properties': {
- 'id': {'type': 'string'},
- 'name': {'type': 'string'},
- 'domain': {
- 'type': ['object'],
- 'required': ['id', 'name'],
- 'properties': {
- 'id': {'type': 'string'},
- 'name': {'type': 'string'}
- },
- 'additionalProperties': False
- }
- },
- 'additionalProperties': False
- }
-
- schema = {
- 'type': 'object',
- 'properties': properties,
- 'required': ['audit_ids', 'expires_at', 'issued_at', 'methods',
- 'user'],
- 'optional': ['bind'],
- 'additionalProperties': False
- }
-
- if domain_scoped:
- schema['required'].extend(['domain', 'roles'])
- schema['optional'].append('catalog')
- elif project_scoped:
- schema['required'].append('project')
- schema['optional'].append('bind')
- schema['optional'].append('catalog')
- schema['optional'].append('OS-TRUST:trust')
- schema['optional'].append('is_admin_project')
-
- return schema
-
- def config_files(self):
- config_files = super(RestfulTestCase, self).config_files()
- config_files.append(unit.dirs.tests_conf('backend_sql.conf'))
- return config_files
-
- def get_extensions(self):
- extensions = set(['revoke'])
- if hasattr(self, 'EXTENSION_NAME'):
- extensions.add(self.EXTENSION_NAME)
- return extensions
-
- def generate_paste_config(self):
- new_paste_file = None
- try:
- new_paste_file = unit.generate_paste_config(self.EXTENSION_TO_ADD)
- except AttributeError:
- # no need to report this error here, as most tests will not have
- # EXTENSION_TO_ADD defined.
- pass
- finally:
- return new_paste_file
-
- def remove_generated_paste_config(self):
- try:
- unit.remove_generated_paste_config(self.EXTENSION_TO_ADD)
- except AttributeError:
- pass
-
- def setUp(self, app_conf='keystone'):
- """Setup for v3 Restful Test Cases."""
- new_paste_file = self.generate_paste_config()
- self.addCleanup(self.remove_generated_paste_config)
- if new_paste_file:
- app_conf = 'config:%s' % (new_paste_file)
-
- super(RestfulTestCase, self).setUp(app_conf=app_conf)
-
- self.empty_context = {'environment': {}}
-
- def load_backends(self):
- # ensure the cache region instance is setup
- cache.configure_cache()
-
- super(RestfulTestCase, self).load_backends()
-
- def load_fixtures(self, fixtures):
- self.load_sample_data()
-
- def _populate_default_domain(self):
- if CONF.database.connection == unit.IN_MEM_DB_CONN_STRING:
- # NOTE(morganfainberg): If an in-memory db is being used, be sure
- # to populate the default domain, this is typically done by
- # a migration, but the in-mem db uses model definitions to create
- # the schema (no migrations are run).
- try:
- self.resource_api.get_domain(DEFAULT_DOMAIN_ID)
- except exception.DomainNotFound:
- domain = unit.new_domain_ref(
- description=(u'The default domain'),
- id=DEFAULT_DOMAIN_ID,
- name=u'Default')
- self.resource_api.create_domain(DEFAULT_DOMAIN_ID, domain)
-
- def load_sample_data(self):
- self._populate_default_domain()
- self.domain = unit.new_domain_ref()
- self.domain_id = self.domain['id']
- self.resource_api.create_domain(self.domain_id, self.domain)
-
- self.project = unit.new_project_ref(domain_id=self.domain_id)
- self.project_id = self.project['id']
- self.project = self.resource_api.create_project(self.project_id,
- self.project)
-
- self.user = unit.create_user(self.identity_api,
- domain_id=self.domain_id)
- self.user_id = self.user['id']
-
- self.default_domain_project_id = uuid.uuid4().hex
- self.default_domain_project = unit.new_project_ref(
- domain_id=DEFAULT_DOMAIN_ID)
- self.default_domain_project['id'] = self.default_domain_project_id
- self.resource_api.create_project(self.default_domain_project_id,
- self.default_domain_project)
-
- self.default_domain_user = unit.create_user(
- self.identity_api,
- domain_id=DEFAULT_DOMAIN_ID)
- self.default_domain_user_id = self.default_domain_user['id']
-
- # create & grant policy.json's default role for admin_required
- self.role = unit.new_role_ref(name='admin')
- self.role_id = self.role['id']
- self.role_api.create_role(self.role_id, self.role)
- self.assignment_api.add_role_to_user_and_project(
- self.user_id, self.project_id, self.role_id)
- self.assignment_api.add_role_to_user_and_project(
- self.default_domain_user_id, self.default_domain_project_id,
- self.role_id)
- self.assignment_api.add_role_to_user_and_project(
- self.default_domain_user_id, self.project_id,
- self.role_id)
-
- # Create "req_admin" user for simulating a real user instead of the
- # admin_token_auth middleware
- self.user_reqadmin = unit.create_user(self.identity_api,
- DEFAULT_DOMAIN_ID)
- self.assignment_api.add_role_to_user_and_project(
- self.user_reqadmin['id'],
- self.default_domain_project_id,
- self.role_id)
-
- self.region = unit.new_region_ref()
- self.region_id = self.region['id']
- self.catalog_api.create_region(self.region)
-
- self.service = unit.new_service_ref()
- self.service_id = self.service['id']
- self.catalog_api.create_service(self.service_id, self.service.copy())
-
- self.endpoint = unit.new_endpoint_ref(service_id=self.service_id,
- interface='public',
- region_id=self.region_id)
- self.endpoint_id = self.endpoint['id']
- self.catalog_api.create_endpoint(self.endpoint_id,
- self.endpoint.copy())
- # The server adds 'enabled' and defaults to True.
- self.endpoint['enabled'] = True
-
- def create_new_default_project_for_user(self, user_id, domain_id,
- enable_project=True):
- ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project)
- r = self.post('/projects', body={'project': ref})
- project = self.assertValidProjectResponse(r, ref)
- # set the user's preferred project
- body = {'user': {'default_project_id': project['id']}}
- r = self.patch('/users/%(user_id)s' % {
- 'user_id': user_id},
- body=body)
- self.assertValidUserResponse(r)
-
- return project
-
- def get_admin_token(self):
- """Convenience method so that we can test authenticated requests."""
- r = self.admin_request(
- method='POST',
- path='/v3/auth/tokens',
- body={
- 'auth': {
- 'identity': {
- 'methods': ['password'],
- 'password': {
- 'user': {
- 'name': self.user_reqadmin['name'],
- 'password': self.user_reqadmin['password'],
- 'domain': {
- 'id': self.user_reqadmin['domain_id']
- }
- }
- }
- },
- 'scope': {
- 'project': {
- 'id': self.default_domain_project_id,
- }
- }
- }
- })
- return r.headers.get('X-Subject-Token')
-
- def get_unscoped_token(self):
- """Convenience method so that we can test authenticated requests."""
- r = self.admin_request(
- method='POST',
- path='/v3/auth/tokens',
- body={
- 'auth': {
- 'identity': {
- 'methods': ['password'],
- 'password': {
- 'user': {
- 'name': self.user['name'],
- 'password': self.user['password'],
- 'domain': {
- 'id': self.user['domain_id']
- }
- }
- }
- }
- }
- })
- return r.headers.get('X-Subject-Token')
-
- def get_scoped_token(self):
- """Convenience method so that we can test authenticated requests."""
- r = self.admin_request(
- method='POST',
- path='/v3/auth/tokens',
- body={
- 'auth': {
- 'identity': {
- 'methods': ['password'],
- 'password': {
- 'user': {
- 'name': self.user['name'],
- 'password': self.user['password'],
- 'domain': {
- 'id': self.user['domain_id']
- }
- }
- }
- },
- 'scope': {
- 'project': {
- 'id': self.project['id'],
- }
- }
- }
- })
- return r.headers.get('X-Subject-Token')
-
- def get_domain_scoped_token(self):
- """Convenience method for requesting domain scoped token."""
- r = self.admin_request(
- method='POST',
- path='/v3/auth/tokens',
- body={
- 'auth': {
- 'identity': {
- 'methods': ['password'],
- 'password': {
- 'user': {
- 'name': self.user['name'],
- 'password': self.user['password'],
- 'domain': {
- 'id': self.user['domain_id']
- }
- }
- }
- },
- 'scope': {
- 'domain': {
- 'id': self.domain['id'],
- }
- }
- }
- })
- return r.headers.get('X-Subject-Token')
-
- def get_requested_token(self, auth):
- """Request the specific token we want."""
- r = self.v3_create_token(auth)
- return r.headers.get('X-Subject-Token')
-
- def v3_create_token(self, auth, expected_status=http_client.CREATED):
- return self.admin_request(method='POST',
- path='/v3/auth/tokens',
- body=auth,
- expected_status=expected_status)
-
- def v3_noauth_request(self, path, **kwargs):
- # request does not require auth token header
- path = '/v3' + path
- return self.admin_request(path=path, **kwargs)
-
- def v3_request(self, path, **kwargs):
- # check to see if caller requires token for the API call.
- if kwargs.pop('noauth', None):
- return self.v3_noauth_request(path, **kwargs)
-
- # Check if the caller has passed in auth details for
- # use in requesting the token
- auth_arg = kwargs.pop('auth', None)
- if auth_arg:
- token = self.get_requested_token(auth_arg)
- else:
- token = kwargs.pop('token', None)
- if not token:
- token = self.get_scoped_token()
- path = '/v3' + path
-
- return self.admin_request(path=path, token=token, **kwargs)
-
- def get(self, path, expected_status=http_client.OK, **kwargs):
- return self.v3_request(path, method='GET',
- expected_status=expected_status, **kwargs)
-
- def head(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- r = self.v3_request(path, method='HEAD',
- expected_status=expected_status, **kwargs)
- self.assertEqual(b'', r.body)
- return r
-
- def post(self, path, expected_status=http_client.CREATED, **kwargs):
- return self.v3_request(path, method='POST',
- expected_status=expected_status, **kwargs)
-
- def put(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- return self.v3_request(path, method='PUT',
- expected_status=expected_status, **kwargs)
-
- def patch(self, path, expected_status=http_client.OK, **kwargs):
- return self.v3_request(path, method='PATCH',
- expected_status=expected_status, **kwargs)
-
- def delete(self, path, expected_status=http_client.NO_CONTENT, **kwargs):
- return self.v3_request(path, method='DELETE',
- expected_status=expected_status, **kwargs)
-
- def assertValidErrorResponse(self, r):
- resp = r.result
- self.assertIsNotNone(resp.get('error'))
- self.assertIsNotNone(resp['error'].get('code'))
- self.assertIsNotNone(resp['error'].get('title'))
- self.assertIsNotNone(resp['error'].get('message'))
- self.assertEqual(int(resp['error']['code']), r.status_code)
-
- def assertValidListLinks(self, links, resource_url=None):
- self.assertIsNotNone(links)
- self.assertIsNotNone(links.get('self'))
- self.assertThat(links['self'], matchers.StartsWith('http://localhost'))
-
- if resource_url:
- self.assertThat(links['self'], matchers.EndsWith(resource_url))
-
- self.assertIn('next', links)
- if links['next'] is not None:
- self.assertThat(links['next'],
- matchers.StartsWith('http://localhost'))
-
- self.assertIn('previous', links)
- if links['previous'] is not None:
- self.assertThat(links['previous'],
- matchers.StartsWith('http://localhost'))
-
- def assertValidListResponse(self, resp, key, entity_validator, ref=None,
- expected_length=None, keys_to_check=None,
- resource_url=None):
- """Make assertions common to all API list responses.
-
- If a reference is provided, it's ID will be searched for in the
- response, and asserted to be equal.
-
- """
- entities = resp.result.get(key)
- self.assertIsNotNone(entities)
-
- if expected_length is not None:
- self.assertEqual(expected_length, len(entities))
- elif ref is not None:
- # we're at least expecting the ref
- self.assertNotEmpty(entities)
-
- # collections should have relational links
- self.assertValidListLinks(resp.result.get('links'),
- resource_url=resource_url)
-
- for entity in entities:
- self.assertIsNotNone(entity)
- self.assertValidEntity(entity, keys_to_check=keys_to_check)
- entity_validator(entity)
- if ref:
- entity = [x for x in entities if x['id'] == ref['id']][0]
- self.assertValidEntity(entity, ref=ref,
- keys_to_check=keys_to_check)
- entity_validator(entity, ref)
- return entities
-
- def assertValidResponse(self, resp, key, entity_validator, *args,
- **kwargs):
- """Make assertions common to all API responses."""
- entity = resp.result.get(key)
- self.assertIsNotNone(entity)
- keys = kwargs.pop('keys_to_check', None)
- self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
- entity_validator(entity, *args, **kwargs)
- return entity
-
- def assertValidEntity(self, entity, ref=None, keys_to_check=None):
- """Make assertions common to all API entities.
-
- If a reference is provided, the entity will also be compared against
- the reference.
- """
- if keys_to_check is not None:
- keys = keys_to_check
- else:
- keys = ['name', 'description', 'enabled']
-
- for k in ['id'] + keys:
- msg = '%s unexpectedly None in %s' % (k, entity)
- self.assertIsNotNone(entity.get(k), msg)
-
- self.assertIsNotNone(entity.get('links'))
- self.assertIsNotNone(entity['links'].get('self'))
- self.assertThat(entity['links']['self'],
- matchers.StartsWith('http://localhost'))
- self.assertIn(entity['id'], entity['links']['self'])
-
- if ref:
- for k in keys:
- msg = '%s not equal: %s != %s' % (k, ref[k], entity[k])
- self.assertEqual(ref[k], entity[k])
-
- return entity
-
- # auth validation
-
- def assertValidISO8601ExtendedFormatDatetime(self, dt):
- try:
- return timeutils.parse_strtime(dt, fmt=TIME_FORMAT)
- except Exception:
- msg = '%s is not a valid ISO 8601 extended format date time.' % dt
- raise AssertionError(msg)
-
- def assertValidTokenResponse(self, r, user=None):
- self.assertTrue(r.headers.get('X-Subject-Token'))
- token = r.result['token']
-
- self.assertIsNotNone(token.get('expires_at'))
- expires_at = self.assertValidISO8601ExtendedFormatDatetime(
- token['expires_at'])
- self.assertIsNotNone(token.get('issued_at'))
- issued_at = self.assertValidISO8601ExtendedFormatDatetime(
- token['issued_at'])
- self.assertTrue(issued_at < expires_at)
-
- self.assertIn('user', token)
- self.assertIn('id', token['user'])
- self.assertIn('name', token['user'])
- self.assertIn('domain', token['user'])
- self.assertIn('id', token['user']['domain'])
-
- if user is not None:
- self.assertEqual(user['id'], token['user']['id'])
- self.assertEqual(user['name'], token['user']['name'])
- self.assertEqual(user['domain_id'], token['user']['domain']['id'])
-
- return token
-
- def assertValidUnscopedTokenResponse(self, r, *args, **kwargs):
- token = self.assertValidTokenResponse(r, *args, **kwargs)
- validator_object = validators.SchemaValidator(
- self.generate_token_schema()
- )
- validator_object.validate(token)
-
- return token
-
- def assertValidScopedTokenResponse(self, r, *args, **kwargs):
- require_catalog = kwargs.pop('require_catalog', True)
- endpoint_filter = kwargs.pop('endpoint_filter', False)
- ep_filter_assoc = kwargs.pop('ep_filter_assoc', 0)
- is_admin_project = kwargs.pop('is_admin_project', False)
- token = self.assertValidTokenResponse(r, *args, **kwargs)
-
- if require_catalog:
- endpoint_num = 0
- self.assertIn('catalog', token)
-
- if isinstance(token['catalog'], list):
- # only test JSON
- for service in token['catalog']:
- for endpoint in service['endpoints']:
- self.assertNotIn('enabled', endpoint)
- self.assertNotIn('legacy_endpoint_id', endpoint)
- self.assertNotIn('service_id', endpoint)
- endpoint_num += 1
-
- # sub test for the OS-EP-FILTER extension enabled
- if endpoint_filter:
- self.assertEqual(ep_filter_assoc, endpoint_num)
- else:
- self.assertNotIn('catalog', token)
-
- self.assertIn('roles', token)
- self.assertTrue(token['roles'])
- for role in token['roles']:
- self.assertIn('id', role)
- self.assertIn('name', role)
-
- if is_admin_project:
- # NOTE(samueldmq): We want to explicitly test for boolean
- self.assertIs(True, token['is_admin_project'])
- else:
- self.assertNotIn('is_admin_project', token)
-
- return token
-
- def assertValidProjectScopedTokenResponse(self, r, *args, **kwargs):
- token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
-
- project_scoped_token_schema = self.generate_token_schema(
- project_scoped=True)
-
- if token.get('OS-TRUST:trust'):
- trust_properties = {
- 'OS-TRUST:trust': {
- 'type': ['object'],
- 'required': ['id', 'impersonation', 'trustor_user',
- 'trustee_user'],
- 'properties': {
- 'id': {'type': 'string'},
- 'impersonation': {'type': 'boolean'},
- 'trustor_user': {
- 'type': 'object',
- 'required': ['id'],
- 'properties': {
- 'id': {'type': 'string'}
- },
- 'additionalProperties': False
- },
- 'trustee_user': {
- 'type': 'object',
- 'required': ['id'],
- 'properties': {
- 'id': {'type': 'string'}
- },
- 'additionalProperties': False
- }
- },
- 'additionalProperties': False
- }
- }
- project_scoped_token_schema['properties'].update(trust_properties)
-
- validator_object = validators.SchemaValidator(
- project_scoped_token_schema)
- validator_object.validate(token)
-
- self.assertEqual(self.role_id, token['roles'][0]['id'])
-
- return token
-
- def assertValidDomainScopedTokenResponse(self, r, *args, **kwargs):
- token = self.assertValidScopedTokenResponse(r, *args, **kwargs)
-
- validator_object = validators.SchemaValidator(
- self.generate_token_schema(domain_scoped=True)
- )
- validator_object.validate(token)
-
- return token
-
- def assertEqualTokens(self, a, b):
- """Assert that two tokens are equal.
-
- Compare two tokens except for their ids. This also truncates
- the time in the comparison.
- """
- def normalize(token):
- del token['token']['expires_at']
- del token['token']['issued_at']
- return token
-
- a_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
- a['token']['expires_at'])
- b_expires_at = self.assertValidISO8601ExtendedFormatDatetime(
- b['token']['expires_at'])
- self.assertCloseEnoughForGovernmentWork(a_expires_at, b_expires_at)
-
- a_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
- a['token']['issued_at'])
- b_issued_at = self.assertValidISO8601ExtendedFormatDatetime(
- b['token']['issued_at'])
- self.assertCloseEnoughForGovernmentWork(a_issued_at, b_issued_at)
-
- return self.assertDictEqual(normalize(a), normalize(b))
-
- # catalog validation
-
- def assertValidCatalogResponse(self, resp, *args, **kwargs):
- self.assertEqual(set(['catalog', 'links']), set(resp.json.keys()))
- self.assertValidCatalog(resp.json['catalog'])
- self.assertIn('links', resp.json)
- self.assertIsInstance(resp.json['links'], dict)
- self.assertEqual(['self'], list(resp.json['links'].keys()))
- self.assertEqual(
- 'http://localhost/v3/auth/catalog',
- resp.json['links']['self'])
-
- def assertValidCatalog(self, entity):
- self.assertIsInstance(entity, list)
- self.assertTrue(len(entity) > 0)
- for service in entity:
- self.assertIsNotNone(service.get('id'))
- self.assertIsNotNone(service.get('name'))
- self.assertIsNotNone(service.get('type'))
- self.assertNotIn('enabled', service)
- self.assertTrue(len(service['endpoints']) > 0)
- for endpoint in service['endpoints']:
- self.assertIsNotNone(endpoint.get('id'))
- self.assertIsNotNone(endpoint.get('interface'))
- self.assertIsNotNone(endpoint.get('url'))
- self.assertNotIn('enabled', endpoint)
- self.assertNotIn('legacy_endpoint_id', endpoint)
- self.assertNotIn('service_id', endpoint)
-
- # region validation
-
- def assertValidRegionListResponse(self, resp, *args, **kwargs):
- # NOTE(jaypipes): I have to pass in a blank keys_to_check parameter
- # below otherwise the base assertValidEntity method
- # tries to find a "name" and an "enabled" key in the
- # returned ref dicts. The issue is, I don't understand
- # how the service and endpoint entity assertions below
- # actually work (they don't raise assertions), since
- # AFAICT, the service and endpoint tables don't have
- # a "name" column either... :(
- return self.assertValidListResponse(
- resp,
- 'regions',
- self.assertValidRegion,
- keys_to_check=[],
- *args,
- **kwargs)
-
- def assertValidRegionResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'region',
- self.assertValidRegion,
- keys_to_check=[],
- *args,
- **kwargs)
-
- def assertValidRegion(self, entity, ref=None):
- self.assertIsNotNone(entity.get('description'))
- if ref:
- self.assertEqual(ref['description'], entity['description'])
- return entity
-
- # service validation
-
- def assertValidServiceListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'services',
- self.assertValidService,
- *args,
- **kwargs)
-
- def assertValidServiceResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'service',
- self.assertValidService,
- *args,
- **kwargs)
-
- def assertValidService(self, entity, ref=None):
- self.assertIsNotNone(entity.get('type'))
- self.assertIsInstance(entity.get('enabled'), bool)
- if ref:
- self.assertEqual(ref['type'], entity['type'])
- return entity
-
- # endpoint validation
-
- def assertValidEndpointListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'endpoints',
- self.assertValidEndpoint,
- *args,
- **kwargs)
-
- def assertValidEndpointResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'endpoint',
- self.assertValidEndpoint,
- *args,
- **kwargs)
-
- def assertValidEndpoint(self, entity, ref=None):
- self.assertIsNotNone(entity.get('interface'))
- self.assertIsNotNone(entity.get('service_id'))
- self.assertIsInstance(entity['enabled'], bool)
-
- # this is intended to be an unexposed implementation detail
- self.assertNotIn('legacy_endpoint_id', entity)
-
- if ref:
- self.assertEqual(ref['interface'], entity['interface'])
- self.assertEqual(ref['service_id'], entity['service_id'])
- if ref.get('region') is not None:
- self.assertEqual(ref['region_id'], entity.get('region_id'))
-
- return entity
-
- # domain validation
-
- def assertValidDomainListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'domains',
- self.assertValidDomain,
- *args,
- **kwargs)
-
- def assertValidDomainResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'domain',
- self.assertValidDomain,
- *args,
- **kwargs)
-
- def assertValidDomain(self, entity, ref=None):
- if ref:
- pass
- return entity
-
- # project validation
-
- def assertValidProjectListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'projects',
- self.assertValidProject,
- *args,
- **kwargs)
-
- def assertValidProjectResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'project',
- self.assertValidProject,
- *args,
- **kwargs)
-
- def assertValidProject(self, entity, ref=None):
- if ref:
- self.assertEqual(ref['domain_id'], entity['domain_id'])
- return entity
-
- # user validation
-
- def assertValidUserListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'users',
- self.assertValidUser,
- keys_to_check=['name', 'enabled'],
- *args,
- **kwargs)
-
- def assertValidUserResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'user',
- self.assertValidUser,
- keys_to_check=['name', 'enabled'],
- *args,
- **kwargs)
-
- def assertValidUser(self, entity, ref=None):
- self.assertIsNotNone(entity.get('domain_id'))
- self.assertIsNotNone(entity.get('email'))
- self.assertIsNone(entity.get('password'))
- self.assertNotIn('tenantId', entity)
- if ref:
- self.assertEqual(ref['domain_id'], entity['domain_id'])
- self.assertEqual(ref['email'], entity['email'])
- if 'default_project_id' in ref:
- self.assertIsNotNone(ref['default_project_id'])
- self.assertEqual(ref['default_project_id'],
- entity['default_project_id'])
- return entity
-
- # group validation
-
- def assertValidGroupListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'groups',
- self.assertValidGroup,
- keys_to_check=['name', 'description', 'domain_id'],
- *args,
- **kwargs)
-
- def assertValidGroupResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'group',
- self.assertValidGroup,
- keys_to_check=['name', 'description', 'domain_id'],
- *args,
- **kwargs)
-
- def assertValidGroup(self, entity, ref=None):
- self.assertIsNotNone(entity.get('name'))
- if ref:
- self.assertEqual(ref['name'], entity['name'])
- return entity
-
- # credential validation
-
- def assertValidCredentialListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'credentials',
- self.assertValidCredential,
- keys_to_check=['blob', 'user_id', 'type'],
- *args,
- **kwargs)
-
- def assertValidCredentialResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'credential',
- self.assertValidCredential,
- keys_to_check=['blob', 'user_id', 'type'],
- *args,
- **kwargs)
-
- def assertValidCredential(self, entity, ref=None):
- self.assertIsNotNone(entity.get('user_id'))
- self.assertIsNotNone(entity.get('blob'))
- self.assertIsNotNone(entity.get('type'))
- if ref:
- self.assertEqual(ref['user_id'], entity['user_id'])
- self.assertEqual(ref['blob'], entity['blob'])
- self.assertEqual(ref['type'], entity['type'])
- self.assertEqual(ref.get('project_id'), entity.get('project_id'))
- return entity
-
- # role validation
-
- def assertValidRoleListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'roles',
- self.assertValidRole,
- keys_to_check=['name'],
- *args,
- **kwargs)
-
- def assertRoleInListResponse(self, resp, ref, expected=1):
- found_count = 0
- for entity in resp.result.get('roles'):
- try:
- self.assertValidRole(entity, ref=ref)
- except Exception:
- # It doesn't match, so let's go onto the next one
- pass
- else:
- found_count += 1
- self.assertEqual(expected, found_count)
-
- def assertRoleNotInListResponse(self, resp, ref):
- self.assertRoleInListResponse(resp, ref=ref, expected=0)
-
- def assertValidRoleResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'role',
- self.assertValidRole,
- keys_to_check=['name'],
- *args,
- **kwargs)
-
- def assertValidRole(self, entity, ref=None):
- self.assertIsNotNone(entity.get('name'))
- if ref:
- self.assertEqual(ref['name'], entity['name'])
- self.assertEqual(ref['domain_id'], entity['domain_id'])
- return entity
-
- # role assignment validation
-
- def assertValidRoleAssignmentListResponse(self, resp, expected_length=None,
- resource_url=None):
- entities = resp.result.get('role_assignments')
-
- if expected_length:
- self.assertEqual(expected_length, len(entities))
-
- # Collections should have relational links
- self.assertValidListLinks(resp.result.get('links'),
- resource_url=resource_url)
-
- for entity in entities:
- self.assertIsNotNone(entity)
- self.assertValidRoleAssignment(entity)
- return entities
-
- def assertValidRoleAssignment(self, entity, ref=None):
- # A role should be present
- self.assertIsNotNone(entity.get('role'))
- self.assertIsNotNone(entity['role'].get('id'))
-
- # Only one of user or group should be present
- if entity.get('user'):
- self.assertIsNone(entity.get('group'))
- self.assertIsNotNone(entity['user'].get('id'))
- else:
- self.assertIsNotNone(entity.get('group'))
- self.assertIsNotNone(entity['group'].get('id'))
-
- # A scope should be present and have only one of domain or project
- self.assertIsNotNone(entity.get('scope'))
-
- if entity['scope'].get('project'):
- self.assertIsNone(entity['scope'].get('domain'))
- self.assertIsNotNone(entity['scope']['project'].get('id'))
- else:
- self.assertIsNotNone(entity['scope'].get('domain'))
- self.assertIsNotNone(entity['scope']['domain'].get('id'))
-
- # An assignment link should be present
- self.assertIsNotNone(entity.get('links'))
- self.assertIsNotNone(entity['links'].get('assignment'))
-
- if ref:
- links = ref.pop('links')
- try:
- self.assertDictContainsSubset(ref, entity)
- self.assertIn(links['assignment'],
- entity['links']['assignment'])
- finally:
- if links:
- ref['links'] = links
-
- def assertRoleAssignmentInListResponse(self, resp, ref, expected=1):
-
- found_count = 0
- for entity in resp.result.get('role_assignments'):
- try:
- self.assertValidRoleAssignment(entity, ref=ref)
- except Exception:
- # It doesn't match, so let's go onto the next one
- pass
- else:
- found_count += 1
- self.assertEqual(expected, found_count)
-
- def assertRoleAssignmentNotInListResponse(self, resp, ref):
- self.assertRoleAssignmentInListResponse(resp, ref=ref, expected=0)
-
- # policy validation
-
- def assertValidPolicyListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'policies',
- self.assertValidPolicy,
- *args,
- **kwargs)
-
- def assertValidPolicyResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'policy',
- self.assertValidPolicy,
- *args,
- **kwargs)
-
- def assertValidPolicy(self, entity, ref=None):
- self.assertIsNotNone(entity.get('blob'))
- self.assertIsNotNone(entity.get('type'))
- if ref:
- self.assertEqual(ref['blob'], entity['blob'])
- self.assertEqual(ref['type'], entity['type'])
- return entity
-
- # trust validation
-
- def assertValidTrustListResponse(self, resp, *args, **kwargs):
- return self.assertValidListResponse(
- resp,
- 'trusts',
- self.assertValidTrustSummary,
- keys_to_check=['trustor_user_id',
- 'trustee_user_id',
- 'impersonation'],
- *args,
- **kwargs)
-
- def assertValidTrustResponse(self, resp, *args, **kwargs):
- return self.assertValidResponse(
- resp,
- 'trust',
- self.assertValidTrust,
- keys_to_check=['trustor_user_id',
- 'trustee_user_id',
- 'impersonation'],
- *args,
- **kwargs)
-
- def assertValidTrustSummary(self, entity, ref=None):
- return self.assertValidTrust(entity, ref, summary=True)
-
- def assertValidTrust(self, entity, ref=None, summary=False):
- self.assertIsNotNone(entity.get('trustor_user_id'))
- self.assertIsNotNone(entity.get('trustee_user_id'))
- self.assertIsNotNone(entity.get('impersonation'))
-
- self.assertIn('expires_at', entity)
- if entity['expires_at'] is not None:
- self.assertValidISO8601ExtendedFormatDatetime(entity['expires_at'])
-
- if summary:
- # Trust list contains no roles, but getting a specific
- # trust by ID provides the detailed response containing roles
- self.assertNotIn('roles', entity)
- self.assertIn('project_id', entity)
- else:
- for role in entity['roles']:
- self.assertIsNotNone(role)
- self.assertValidEntity(role, keys_to_check=['name'])
- self.assertValidRole(role)
-
- self.assertValidListLinks(entity.get('roles_links'))
-
- # always disallow role xor project_id (neither or both is allowed)
- has_roles = bool(entity.get('roles'))
- has_project = bool(entity.get('project_id'))
- self.assertFalse(has_roles ^ has_project)
-
- if ref:
- self.assertEqual(ref['trustor_user_id'], entity['trustor_user_id'])
- self.assertEqual(ref['trustee_user_id'], entity['trustee_user_id'])
- self.assertEqual(ref['project_id'], entity['project_id'])
- if entity.get('expires_at') or ref.get('expires_at'):
- entity_exp = self.assertValidISO8601ExtendedFormatDatetime(
- entity['expires_at'])
- ref_exp = self.assertValidISO8601ExtendedFormatDatetime(
- ref['expires_at'])
- self.assertCloseEnoughForGovernmentWork(entity_exp, ref_exp)
- else:
- self.assertEqual(ref.get('expires_at'),
- entity.get('expires_at'))
-
- return entity
-
- # Service providers (federation)
-
- def assertValidServiceProvider(self, entity, ref=None, *args, **kwargs):
-
- attributes = frozenset(['auth_url', 'id', 'enabled', 'description',
- 'links', 'relay_state_prefix', 'sp_url'])
- for attribute in attributes:
- self.assertIsNotNone(entity.get(attribute))
-
- def assertValidServiceProviderListResponse(self, resp, *args, **kwargs):
- if kwargs.get('keys_to_check') is None:
- kwargs['keys_to_check'] = ['auth_url', 'id', 'enabled',
- 'description', 'relay_state_prefix',
- 'sp_url']
- return self.assertValidListResponse(
- resp,
- 'service_providers',
- self.assertValidServiceProvider,
- *args,
- **kwargs)
-
- def build_external_auth_request(self, remote_user,
- remote_domain=None, auth_data=None,
- kerberos=False):
- context = {'environment': {'REMOTE_USER': remote_user,
- 'AUTH_TYPE': 'Negotiate'}}
- if remote_domain:
- context['environment']['REMOTE_DOMAIN'] = remote_domain
- if not auth_data:
- auth_data = self.build_authentication_request(
- kerberos=kerberos)['auth']
- no_context = None
- auth_info = auth.controllers.AuthInfo.create(no_context, auth_data)
- auth_context = {'extras': {}, 'method_names': []}
- return context, auth_info, auth_context
-
-
-class VersionTestCase(RestfulTestCase):
- def test_get_version(self):
- pass
-
-
-# NOTE(morganfainberg): To be removed when admin_token_auth is removed. This
-# has been split out to allow testing admin_token auth without enabling it
-# for other tests.
-class AuthContextMiddlewareAdminTokenTestCase(RestfulTestCase):
- EXTENSION_TO_ADD = 'admin_token_auth'
-
- def config_overrides(self):
- super(AuthContextMiddlewareAdminTokenTestCase, self).config_overrides()
- self.config_fixture.config(
- admin_token='ADMIN')
-
- # NOTE(morganfainberg): This is knowingly copied from below for simplicity
- # during the deprecation cycle.
- def _middleware_request(self, token, extra_environ=None):
-
- def application(environ, start_response):
- body = b'body'
- headers = [('Content-Type', 'text/html; charset=utf8'),
- ('Content-Length', str(len(body)))]
- start_response('200 OK', headers)
- return [body]
-
- app = webtest.TestApp(middleware.AuthContextMiddleware(application),
- extra_environ=extra_environ)
- resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token})
- self.assertEqual('body', resp.text) # just to make sure it worked
- return resp.request
-
- def test_admin_auth_context(self):
- # test to make sure AuthContextMiddleware does not attempt to build the
- # auth context if the admin_token middleware indicates it's admin
- # already.
- token_id = uuid.uuid4().hex # token doesn't matter.
- # the admin_token middleware sets is_admin in the context.
- extra_environ = {middleware.CONTEXT_ENV: {'is_admin': True}}
- req = self._middleware_request(token_id, extra_environ)
- auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
- self.assertDictEqual({}, auth_context)
-
- @mock.patch.object(middleware_auth.versionutils,
- 'report_deprecated_feature')
- def test_admin_token_auth_context_deprecated(self, mock_report_deprecated):
- # For backwards compatibility AuthContextMiddleware will check that the
- # admin token (as configured in the CONF file) is present and not
- # attempt to build the auth context. This is deprecated.
- req = self._middleware_request('ADMIN')
- auth_context = req.environ.get(authorization.AUTH_CONTEXT_ENV)
- self.assertDictEqual({}, auth_context)
- self.assertEqual(1, mock_report_deprecated.call_count)
-
-
-# NOTE(gyee): test AuthContextMiddleware here instead of test_middleware.py
-# because we need the token
-class AuthContextMiddlewareTestCase(RestfulTestCase):
-
- def _middleware_request(self, token, extra_environ=None):
-
- def application(environ, start_response):
- body = b'body'
- headers = [('Content-Type', 'text/html; charset=utf8'),
- ('Content-Length', str(len(body)))]
- start_response('200 OK', headers)
- return [body]
-
- app = webtest.TestApp(middleware.AuthContextMiddleware(application),
- extra_environ=extra_environ)
- resp = app.get('/', headers={middleware.AUTH_TOKEN_HEADER: token})
- self.assertEqual(b'body', resp.body) # just to make sure it worked
- return resp.request
-
- def test_auth_context_build_by_middleware(self):
- # test to make sure AuthContextMiddleware successful build the auth
- # context from the incoming auth token
- admin_token = self.get_scoped_token()
- req = self._middleware_request(admin_token)
- self.assertEqual(
- self.user['id'],
- req.environ.get(authorization.AUTH_CONTEXT_ENV)['user_id'])
-
- def test_auth_context_override(self):
- overridden_context = 'OVERRIDDEN_CONTEXT'
- # this token should not be used
- token = uuid.uuid4().hex
-
- extra_environ = {authorization.AUTH_CONTEXT_ENV: overridden_context}
- req = self._middleware_request(token, extra_environ=extra_environ)
- # make sure overridden context take precedence
- self.assertEqual(overridden_context,
- req.environ.get(authorization.AUTH_CONTEXT_ENV))
-
- def test_unscoped_token_auth_context(self):
- unscoped_token = self.get_unscoped_token()
- req = self._middleware_request(unscoped_token)
- for key in ['project_id', 'domain_id', 'domain_name']:
- self.assertNotIn(
- key,
- req.environ.get(authorization.AUTH_CONTEXT_ENV))
-
- def test_project_scoped_token_auth_context(self):
- project_scoped_token = self.get_scoped_token()
- req = self._middleware_request(project_scoped_token)
- self.assertEqual(
- self.project['id'],
- req.environ.get(authorization.AUTH_CONTEXT_ENV)['project_id'])
-
- def test_domain_scoped_token_auth_context(self):
- # grant the domain role to user
- path = '/domains/%s/users/%s/roles/%s' % (
- self.domain['id'], self.user['id'], self.role['id'])
- self.put(path=path)
-
- domain_scoped_token = self.get_domain_scoped_token()
- req = self._middleware_request(domain_scoped_token)
- self.assertEqual(
- self.domain['id'],
- req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_id'])
- self.assertEqual(
- self.domain['name'],
- req.environ.get(authorization.AUTH_CONTEXT_ENV)['domain_name'])
-
- def test_oslo_context(self):
- # After AuthContextMiddleware runs, an
- # oslo_context.context.RequestContext was created so that its fields
- # can be logged. This test validates that the RequestContext was
- # created and the fields are set as expected.
-
- # Use a scoped token so more fields can be set.
- token = self.get_scoped_token()
-
- # oslo_middleware RequestId middleware sets openstack.request_id.
- request_id = uuid.uuid4().hex
- environ = {'openstack.request_id': request_id}
- self._middleware_request(token, extra_environ=environ)
-
- req_context = oslo_context.context.get_current()
- self.assertEqual(request_id, req_context.request_id)
- self.assertEqual(token, req_context.auth_token)
- self.assertEqual(self.user['id'], req_context.user)
- self.assertEqual(self.project['id'], req_context.tenant)
- self.assertIsNone(req_context.domain)
- self.assertEqual(self.user['domain_id'], req_context.user_domain)
- self.assertEqual(self.project['domain_id'], req_context.project_domain)
- self.assertFalse(req_context.is_admin)
-
-
-class JsonHomeTestMixin(object):
- """JSON Home test
-
- Mixin this class to provide a test for the JSON-Home response for an
- extension.
-
- The base class must set JSON_HOME_DATA to a dict of relationship URLs
- (rels) to the JSON-Home data for the relationship. The rels and associated
- data must be in the response.
-
- """
-
- def test_get_json_home(self):
- resp = self.get('/', convert=False,
- headers={'Accept': 'application/json-home'})
- self.assertThat(resp.headers['Content-Type'],
- matchers.Equals('application/json-home'))
- resp_data = jsonutils.loads(resp.body)
-
- # Check that the example relationships are present.
- for rel in self.JSON_HOME_DATA:
- self.assertThat(resp_data['resources'][rel],
- matchers.Equals(self.JSON_HOME_DATA[rel]))
-
-
-class AssignmentTestMixin(object):
- """To hold assignment helper functions."""
-
- def build_role_assignment_query_url(self, effective=False, **filters):
- """Build and return a role assignment query url with provided params.
-
- Available filters are: domain_id, project_id, user_id, group_id,
- role_id and inherited_to_projects.
- """
- query_params = '?effective' if effective else ''
-
- for k, v in filters.items():
- query_params += '?' if not query_params else '&'
-
- if k == 'inherited_to_projects':
- query_params += 'scope.OS-INHERIT:inherited_to=projects'
- else:
- if k in ['domain_id', 'project_id']:
- query_params += 'scope.'
- elif k not in ['user_id', 'group_id', 'role_id']:
- raise ValueError(
- 'Invalid key \'%s\' in provided filters.' % k)
-
- query_params += '%s=%s' % (k.replace('_', '.'), v)
-
- return '/role_assignments%s' % query_params
-
- def build_role_assignment_link(self, **attribs):
- """Build and return a role assignment link with provided attributes.
-
- Provided attributes are expected to contain: domain_id or project_id,
- user_id or group_id, role_id and, optionally, inherited_to_projects.
- """
- if attribs.get('domain_id'):
- link = '/domains/' + attribs['domain_id']
- else:
- link = '/projects/' + attribs['project_id']
-
- if attribs.get('user_id'):
- link += '/users/' + attribs['user_id']
- else:
- link += '/groups/' + attribs['group_id']
-
- link += '/roles/' + attribs['role_id']
-
- if attribs.get('inherited_to_projects'):
- return '/OS-INHERIT%s/inherited_to_projects' % link
-
- return link
-
- def build_role_assignment_entity(
- self, link=None, prior_role_link=None, **attribs):
- """Build and return a role assignment entity with provided attributes.
-
- Provided attributes are expected to contain: domain_id or project_id,
- user_id or group_id, role_id and, optionally, inherited_to_projects.
- """
- entity = {'links': {'assignment': (
- link or self.build_role_assignment_link(**attribs))}}
-
- if attribs.get('domain_id'):
- entity['scope'] = {'domain': {'id': attribs['domain_id']}}
- else:
- entity['scope'] = {'project': {'id': attribs['project_id']}}
-
- if attribs.get('user_id'):
- entity['user'] = {'id': attribs['user_id']}
-
- if attribs.get('group_id'):
- entity['links']['membership'] = ('/groups/%s/users/%s' %
- (attribs['group_id'],
- attribs['user_id']))
- else:
- entity['group'] = {'id': attribs['group_id']}
-
- entity['role'] = {'id': attribs['role_id']}
-
- if attribs.get('inherited_to_projects'):
- entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
-
- if prior_role_link:
- entity['links']['prior_role'] = prior_role_link
-
- return entity
-
- def build_role_assignment_entity_include_names(self,
- domain_ref=None,
- role_ref=None,
- group_ref=None,
- user_ref=None,
- project_ref=None,
- inherited_assignment=None):
- """Build and return a role assignment entity with provided attributes.
-
- The expected attributes are: domain_ref or project_ref,
- user_ref or group_ref, role_ref and, optionally, inherited_to_projects.
- """
- entity = {'links': {}}
- attributes_for_links = {}
- if project_ref:
- dmn_name = self.resource_api.get_domain(
- project_ref['domain_id'])['name']
-
- entity['scope'] = {'project': {
- 'id': project_ref['id'],
- 'name': project_ref['name'],
- 'domain': {
- 'id': project_ref['domain_id'],
- 'name': dmn_name}}}
- attributes_for_links['project_id'] = project_ref['id']
- else:
- entity['scope'] = {'domain': {'id': domain_ref['id'],
- 'name': domain_ref['name']}}
- attributes_for_links['domain_id'] = domain_ref['id']
- if user_ref:
- dmn_name = self.resource_api.get_domain(
- user_ref['domain_id'])['name']
- entity['user'] = {'id': user_ref['id'],
- 'name': user_ref['name'],
- 'domain': {'id': user_ref['domain_id'],
- 'name': dmn_name}}
- attributes_for_links['user_id'] = user_ref['id']
- else:
- dmn_name = self.resource_api.get_domain(
- group_ref['domain_id'])['name']
- entity['group'] = {'id': group_ref['id'],
- 'name': group_ref['name'],
- 'domain': {
- 'id': group_ref['domain_id'],
- 'name': dmn_name}}
- attributes_for_links['group_id'] = group_ref['id']
-
- if role_ref:
- entity['role'] = {'id': role_ref['id'],
- 'name': role_ref['name']}
- attributes_for_links['role_id'] = role_ref['id']
-
- if inherited_assignment:
- entity['scope']['OS-INHERIT:inherited_to'] = 'projects'
- attributes_for_links['inherited_to_projects'] = True
-
- entity['links']['assignment'] = self.build_role_assignment_link(
- **attributes_for_links)
-
- return entity