diff options
Diffstat (limited to 'keystone-moon/keystone/token/providers')
-rw-r--r-- | keystone-moon/keystone/token/providers/__init__.py | 0 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/common.py | 808 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/fernet/__init__.py | 13 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/fernet/core.py | 211 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/fernet/token_formatters.py | 677 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/fernet/utils.py | 270 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/pki.py | 66 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/pkiz.py | 64 | ||||
-rw-r--r-- | keystone-moon/keystone/token/providers/uuid.py | 41 |
9 files changed, 0 insertions, 2150 deletions
diff --git a/keystone-moon/keystone/token/providers/__init__.py b/keystone-moon/keystone/token/providers/__init__.py deleted file mode 100644 index e69de29b..00000000 --- a/keystone-moon/keystone/token/providers/__init__.py +++ /dev/null diff --git a/keystone-moon/keystone/token/providers/common.py b/keystone-moon/keystone/token/providers/common.py deleted file mode 100644 index 94729178..00000000 --- a/keystone-moon/keystone/token/providers/common.py +++ /dev/null @@ -1,808 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_config import cfg -from oslo_log import log -from oslo_serialization import jsonutils -import six -from six.moves.urllib import parse - -from keystone.common import controller as common_controller -from keystone.common import dependency -from keystone.common import utils -from keystone import exception -from keystone.federation import constants as federation_constants -from keystone.i18n import _, _LE -from keystone import token -from keystone.token import provider - - -LOG = log.getLogger(__name__) -CONF = cfg.CONF - - -@dependency.requires('catalog_api', 'resource_api', 'assignment_api') -class V2TokenDataHelper(object): - """Creates V2 token data.""" - - def v3_to_v2_token(self, v3_token_data): - """Convert v3 token data into v2.0 token data. - - This method expects a dictionary generated from - V3TokenDataHelper.get_token_data() and converts it to look like a v2.0 - token dictionary. - - :param v3_token_data: dictionary formatted for v3 tokens - :returns: dictionary formatted for v2 tokens - :raises keystone.exception.Unauthorized: If a specific token type is - not supported in v2. - - """ - token_data = {} - # Build v2 token - v3_token = v3_token_data['token'] - - # NOTE(lbragstad): Version 2.0 tokens don't know about any domain other - # than the default domain specified in the configuration. - domain_id = v3_token.get('domain', {}).get('id') - if domain_id and CONF.identity.default_domain_id != domain_id: - msg = ('Unable to validate domain-scoped tokens outside of the ' - 'default domain') - raise exception.Unauthorized(msg) - - token = {} - token['expires'] = v3_token.get('expires_at') - token['issued_at'] = v3_token.get('issued_at') - token['audit_ids'] = v3_token.get('audit_ids') - - if 'project' in v3_token: - # v3 token_data does not contain all tenant attributes - tenant = self.resource_api.get_project( - v3_token['project']['id']) - # Drop domain specific fields since v2 calls are not domain-aware. - token['tenant'] = common_controller.V2Controller.v3_to_v2_project( - tenant) - token_data['token'] = token - - # Build v2 user - v3_user = v3_token['user'] - - user = common_controller.V2Controller.v3_to_v2_user(v3_user) - - if 'OS-TRUST:trust' in v3_token: - msg = ('Unable to validate trust-scoped tokens using version v2.0 ' - 'API.') - raise exception.Unauthorized(msg) - - if 'OS-OAUTH1' in v3_token: - msg = ('Unable to validate Oauth tokens using the version v2.0 ' - 'API.') - raise exception.Unauthorized(msg) - - # Set user roles - user['roles'] = [] - role_ids = [] - for role in v3_token.get('roles', []): - role_ids.append(role.pop('id')) - user['roles'].append(role) - user['roles_links'] = [] - - token_data['user'] = user - - # Get and build v2 service catalog - token_data['serviceCatalog'] = [] - if 'tenant' in token: - catalog_ref = self.catalog_api.get_catalog( - user['id'], token['tenant']['id']) - if catalog_ref: - token_data['serviceCatalog'] = self.format_catalog(catalog_ref) - - # Build v2 metadata - metadata = {} - metadata['roles'] = role_ids - # Setting is_admin to keep consistency in v2 response - metadata['is_admin'] = 0 - token_data['metadata'] = metadata - - return {'access': token_data} - - @classmethod - def format_token(cls, token_ref, roles_ref=None, catalog_ref=None, - trust_ref=None): - audit_info = None - user_ref = token_ref['user'] - metadata_ref = token_ref['metadata'] - if roles_ref is None: - roles_ref = [] - expires = token_ref.get('expires', provider.default_expire_time()) - if expires is not None: - if not isinstance(expires, six.text_type): - expires = utils.isotime(expires) - - token_data = token_ref.get('token_data') - if token_data: - token_audit = token_data.get( - 'access', token_data).get('token', {}).get('audit_ids') - audit_info = token_audit - - if audit_info is None: - audit_info = provider.audit_info(token_ref.get('parent_audit_id')) - - o = {'access': {'token': {'id': token_ref['id'], - 'expires': expires, - 'issued_at': utils.isotime(subsecond=True), - 'audit_ids': audit_info - }, - 'user': {'id': user_ref['id'], - 'name': user_ref['name'], - 'username': user_ref['name'], - 'roles': roles_ref, - 'roles_links': metadata_ref.get('roles_links', - []) - } - } - } - if 'bind' in token_ref: - o['access']['token']['bind'] = token_ref['bind'] - if 'tenant' in token_ref and token_ref['tenant']: - token_ref['tenant']['enabled'] = True - o['access']['token']['tenant'] = token_ref['tenant'] - if catalog_ref is not None: - o['access']['serviceCatalog'] = V2TokenDataHelper.format_catalog( - catalog_ref) - if metadata_ref: - if 'is_admin' in metadata_ref: - o['access']['metadata'] = {'is_admin': - metadata_ref['is_admin']} - else: - o['access']['metadata'] = {'is_admin': 0} - if 'roles' in metadata_ref: - o['access']['metadata']['roles'] = metadata_ref['roles'] - if CONF.trust.enabled and trust_ref: - o['access']['trust'] = {'trustee_user_id': - trust_ref['trustee_user_id'], - 'id': trust_ref['id'], - 'trustor_user_id': - trust_ref['trustor_user_id'], - 'impersonation': - trust_ref['impersonation'] - } - return o - - @classmethod - def format_catalog(cls, catalog_ref): - """Munge catalogs from internal to output format. - - Internal catalogs look like:: - - {$REGION: { - {$SERVICE: { - $key1: $value1, - ... - } - } - } - - The legacy api wants them to look like:: - - [{'name': $SERVICE[name], - 'type': $SERVICE, - 'endpoints': [{ - 'tenantId': $tenant_id, - ... - 'region': $REGION, - }], - 'endpoints_links': [], - }] - - """ - if not catalog_ref: - return [] - - services = {} - for region, region_ref in catalog_ref.items(): - for service, service_ref in region_ref.items(): - new_service_ref = services.get(service, {}) - new_service_ref['name'] = service_ref.pop('name') - new_service_ref['type'] = service - new_service_ref['endpoints_links'] = [] - service_ref['region'] = region - - endpoints_ref = new_service_ref.get('endpoints', []) - endpoints_ref.append(service_ref) - - new_service_ref['endpoints'] = endpoints_ref - services[service] = new_service_ref - - return list(services.values()) - - -@dependency.requires('assignment_api', 'catalog_api', 'federation_api', - 'identity_api', 'resource_api', 'role_api', 'trust_api') -class V3TokenDataHelper(object): - """Token data helper.""" - - def __init__(self): - # Keep __init__ around to ensure dependency injection works. - super(V3TokenDataHelper, self).__init__() - - def _get_filtered_domain(self, domain_id): - domain_ref = self.resource_api.get_domain(domain_id) - return {'id': domain_ref['id'], 'name': domain_ref['name']} - - def _get_filtered_project(self, project_id): - project_ref = self.resource_api.get_project(project_id) - filtered_project = { - 'id': project_ref['id'], - 'name': project_ref['name']} - if project_ref['domain_id'] is not None: - filtered_project['domain'] = ( - self._get_filtered_domain(project_ref['domain_id'])) - else: - # Projects acting as a domain do not have a domain_id attribute - filtered_project['domain'] = None - return filtered_project - - def _populate_scope(self, token_data, domain_id, project_id): - if 'domain' in token_data or 'project' in token_data: - # scope already exist, no need to populate it again - return - - if domain_id: - token_data['domain'] = self._get_filtered_domain(domain_id) - if project_id: - token_data['project'] = self._get_filtered_project(project_id) - - def _populate_is_admin_project(self, token_data): - # TODO(ayoung): Support the ability for a project acting as a domain - # to be the admin project once the rest of the code for projects - # acting as domains is merged. Code will likely be: - # (r.admin_project_name == None and project['is_domain'] == True - # and project['name'] == r.admin_project_domain_name) - project = token_data['project'] - r = CONF.resource - if (project['name'] == r.admin_project_name and - project['domain']['name'] == r.admin_project_domain_name): - token_data['is_admin_project'] = True - - def _get_roles_for_user(self, user_id, domain_id, project_id): - roles = [] - if domain_id: - roles = self.assignment_api.get_roles_for_user_and_domain( - user_id, domain_id) - if project_id: - roles = self.assignment_api.get_roles_for_user_and_project( - user_id, project_id) - return [self.role_api.get_role(role_id) for role_id in roles] - - def populate_roles_for_groups(self, token_data, group_ids, - project_id=None, domain_id=None, - user_id=None): - """Populate roles basing on provided groups and project/domain - - Used for ephemeral users with dynamically assigned groups. - This method does not return anything, yet it modifies token_data in - place. - - :param token_data: a dictionary used for building token response - :param group_ids: list of group IDs a user is a member of - :param project_id: project ID to scope to - :param domain_id: domain ID to scope to - :param user_id: user ID - - :raises keystone.exception.Unauthorized: when no roles were found for a - (group_ids, project_id) or (group_ids, domain_id) pairs. - - """ - def check_roles(roles, user_id, project_id, domain_id): - # User was granted roles so simply exit this function. - if roles: - return - if project_id: - msg = _('User %(user_id)s has no access ' - 'to project %(project_id)s') % { - 'user_id': user_id, - 'project_id': project_id} - elif domain_id: - msg = _('User %(user_id)s has no access ' - 'to domain %(domain_id)s') % { - 'user_id': user_id, - 'domain_id': domain_id} - # Since no roles were found a user is not authorized to - # perform any operations. Raise an exception with - # appropriate error message. - raise exception.Unauthorized(msg) - - roles = self.assignment_api.get_roles_for_groups(group_ids, - project_id, - domain_id) - check_roles(roles, user_id, project_id, domain_id) - token_data['roles'] = roles - - def _populate_user(self, token_data, user_id, trust): - if 'user' in token_data: - # no need to repopulate user if it already exists - return - - user_ref = self.identity_api.get_user(user_id) - if CONF.trust.enabled and trust and 'OS-TRUST:trust' not in token_data: - trustor_user_ref = (self.identity_api.get_user( - trust['trustor_user_id'])) - try: - self.identity_api.assert_user_enabled(trust['trustor_user_id']) - except AssertionError: - raise exception.Forbidden(_('Trustor is disabled.')) - if trust['impersonation']: - user_ref = trustor_user_ref - token_data['OS-TRUST:trust'] = ( - { - 'id': trust['id'], - 'trustor_user': {'id': trust['trustor_user_id']}, - 'trustee_user': {'id': trust['trustee_user_id']}, - 'impersonation': trust['impersonation'] - }) - filtered_user = { - 'id': user_ref['id'], - 'name': user_ref['name'], - 'domain': self._get_filtered_domain(user_ref['domain_id'])} - token_data['user'] = filtered_user - - def _populate_oauth_section(self, token_data, access_token): - if access_token: - access_token_id = access_token['id'] - consumer_id = access_token['consumer_id'] - token_data['OS-OAUTH1'] = ({'access_token_id': access_token_id, - 'consumer_id': consumer_id}) - - def _populate_roles(self, token_data, user_id, domain_id, project_id, - trust, access_token): - if 'roles' in token_data: - # no need to repopulate roles - return - - if access_token: - filtered_roles = [] - authed_role_ids = jsonutils.loads(access_token['role_ids']) - all_roles = self.role_api.list_roles() - for role in all_roles: - for authed_role in authed_role_ids: - if authed_role == role['id']: - filtered_roles.append({'id': role['id'], - 'name': role['name']}) - token_data['roles'] = filtered_roles - return - - if CONF.trust.enabled and trust: - # If redelegated_trust_id is set, then we must traverse the - # trust_chain in order to determine who the original trustor is. We - # need to do this because the user ID of the original trustor helps - # us determine scope in the redelegated context. - if trust.get('redelegated_trust_id'): - trust_chain = self.trust_api.get_trust_pedigree(trust['id']) - token_user_id = trust_chain[-1]['trustor_user_id'] - else: - token_user_id = trust['trustor_user_id'] - - token_project_id = trust['project_id'] - # trusts do not support domains yet - token_domain_id = None - else: - token_user_id = user_id - token_project_id = project_id - token_domain_id = domain_id - - if token_domain_id or token_project_id: - filtered_roles = [] - if CONF.trust.enabled and trust: - # First expand out any roles that were in the trust to include - # any implied roles, whether global or domain specific - refs = [{'role_id': role['id']} for role in trust['roles']] - effective_trust_roles = ( - self.assignment_api.add_implied_roles(refs)) - # Now get the current role assignments for the trustor, - # including any domain specific roles. - assignment_list = self.assignment_api.list_role_assignments( - user_id=token_user_id, - project_id=token_project_id, - effective=True, strip_domain_roles=False) - current_effective_trustor_roles = ( - list(set([x['role_id'] for x in assignment_list]))) - # Go through each of the effective trust roles, making sure the - # trustor still has them, if any have been removed, then we - # will treat the trust as invalid - for trust_role in effective_trust_roles: - - match_roles = [x for x in current_effective_trustor_roles - if x == trust_role['role_id']] - if match_roles: - role = self.role_api.get_role(match_roles[0]) - if role['domain_id'] is None: - filtered_roles.append(role) - else: - raise exception.Forbidden( - _('Trustee has no delegated roles.')) - else: - for role in self._get_roles_for_user(token_user_id, - token_domain_id, - token_project_id): - filtered_roles.append({'id': role['id'], - 'name': role['name']}) - - # user has no project or domain roles, therefore access denied - if not filtered_roles: - if token_project_id: - msg = _('User %(user_id)s has no access ' - 'to project %(project_id)s') % { - 'user_id': user_id, - 'project_id': token_project_id} - else: - msg = _('User %(user_id)s has no access ' - 'to domain %(domain_id)s') % { - 'user_id': user_id, - 'domain_id': token_domain_id} - LOG.debug(msg) - raise exception.Unauthorized(msg) - - token_data['roles'] = filtered_roles - - def _populate_service_catalog(self, token_data, user_id, - domain_id, project_id, trust): - if 'catalog' in token_data: - # no need to repopulate service catalog - return - - if CONF.trust.enabled and trust: - user_id = trust['trustor_user_id'] - if project_id or domain_id: - service_catalog = self.catalog_api.get_v3_catalog( - user_id, project_id) - token_data['catalog'] = service_catalog - - def _populate_service_providers(self, token_data): - if 'service_providers' in token_data: - return - - service_providers = self.federation_api.get_enabled_service_providers() - if service_providers: - token_data['service_providers'] = service_providers - - def _populate_token_dates(self, token_data, expires=None, trust=None, - issued_at=None): - if not expires: - expires = provider.default_expire_time() - if not isinstance(expires, six.string_types): - expires = utils.isotime(expires, subsecond=True) - token_data['expires_at'] = expires - token_data['issued_at'] = (issued_at or - utils.isotime(subsecond=True)) - - def _populate_audit_info(self, token_data, audit_info=None): - if audit_info is None or isinstance(audit_info, six.string_types): - token_data['audit_ids'] = provider.audit_info(audit_info) - elif isinstance(audit_info, list): - token_data['audit_ids'] = audit_info - else: - msg = (_('Invalid audit info data type: %(data)s (%(type)s)') % - {'data': audit_info, 'type': type(audit_info)}) - LOG.error(msg) - raise exception.UnexpectedError(msg) - - def get_token_data(self, user_id, method_names, domain_id=None, - project_id=None, expires=None, trust=None, token=None, - include_catalog=True, bind=None, access_token=None, - issued_at=None, audit_info=None): - token_data = {'methods': method_names} - - # We've probably already written these to the token - if token: - for x in ('roles', 'user', 'catalog', 'project', 'domain'): - if x in token: - token_data[x] = token[x] - - if bind: - token_data['bind'] = bind - - self._populate_scope(token_data, domain_id, project_id) - if token_data.get('project'): - self._populate_is_admin_project(token_data) - self._populate_user(token_data, user_id, trust) - self._populate_roles(token_data, user_id, domain_id, project_id, trust, - access_token) - self._populate_audit_info(token_data, audit_info) - - if include_catalog: - self._populate_service_catalog(token_data, user_id, domain_id, - project_id, trust) - self._populate_service_providers(token_data) - self._populate_token_dates(token_data, expires=expires, trust=trust, - issued_at=issued_at) - self._populate_oauth_section(token_data, access_token) - return {'token': token_data} - - -@dependency.requires('catalog_api', 'identity_api', 'oauth_api', - 'resource_api', 'role_api', 'trust_api') -class BaseProvider(provider.Provider): - def __init__(self, *args, **kwargs): - super(BaseProvider, self).__init__(*args, **kwargs) - self.v3_token_data_helper = V3TokenDataHelper() - self.v2_token_data_helper = V2TokenDataHelper() - - def get_token_version(self, token_data): - if token_data and isinstance(token_data, dict): - if 'token_version' in token_data: - if token_data['token_version'] in token.provider.VERSIONS: - return token_data['token_version'] - # FIXME(morganfainberg): deprecate the following logic in future - # revisions. It is better to just specify the token_version in - # the token_data itself. This way we can support future versions - # that might have the same fields. - if 'access' in token_data: - return token.provider.V2 - if 'token' in token_data and 'methods' in token_data['token']: - return token.provider.V3 - raise exception.UnsupportedTokenVersionException() - - def issue_v2_token(self, token_ref, roles_ref=None, - catalog_ref=None): - if token_ref.get('bind') and not self._supports_bind_authentication: - msg = _('The configured token provider does not support bind ' - 'authentication.') - raise exception.NotImplemented(message=msg) - - metadata_ref = token_ref['metadata'] - trust_ref = None - if CONF.trust.enabled and metadata_ref and 'trust_id' in metadata_ref: - trust_ref = self.trust_api.get_trust(metadata_ref['trust_id']) - - token_data = self.v2_token_data_helper.format_token( - token_ref, roles_ref, catalog_ref, trust_ref) - token_id = self._get_token_id(token_data) - token_data['access']['token']['id'] = token_id - return token_id, token_data - - def _is_mapped_token(self, auth_context): - return (federation_constants.IDENTITY_PROVIDER in auth_context and - federation_constants.PROTOCOL in auth_context) - - def issue_v3_token(self, user_id, method_names, expires_at=None, - project_id=None, domain_id=None, auth_context=None, - trust=None, metadata_ref=None, include_catalog=True, - parent_audit_id=None): - if auth_context and auth_context.get('bind'): - # NOTE(lbragstad): Check if the token provider being used actually - # supports bind authentication methods before proceeding. - if not self._supports_bind_authentication: - raise exception.NotImplemented(_( - 'The configured token provider does not support bind ' - 'authentication.')) - - # for V2, trust is stashed in metadata_ref - if (CONF.trust.enabled and not trust and metadata_ref and - 'trust_id' in metadata_ref): - trust = self.trust_api.get_trust(metadata_ref['trust_id']) - - if CONF.trust.enabled and trust: - if user_id != trust['trustee_user_id']: - raise exception.Forbidden(_('User is not a trustee.')) - - token_ref = None - if auth_context and self._is_mapped_token(auth_context): - token_ref = self._handle_mapped_tokens( - auth_context, project_id, domain_id) - - access_token = None - if 'oauth1' in method_names: - access_token_id = auth_context['access_token_id'] - access_token = self.oauth_api.get_access_token(access_token_id) - - token_data = self.v3_token_data_helper.get_token_data( - user_id, - method_names, - domain_id=domain_id, - project_id=project_id, - expires=expires_at, - trust=trust, - bind=auth_context.get('bind') if auth_context else None, - token=token_ref, - include_catalog=include_catalog, - access_token=access_token, - audit_info=parent_audit_id) - - token_id = self._get_token_id(token_data) - return token_id, token_data - - def _handle_mapped_tokens(self, auth_context, project_id, domain_id): - user_id = auth_context['user_id'] - group_ids = auth_context['group_ids'] - idp = auth_context[federation_constants.IDENTITY_PROVIDER] - protocol = auth_context[federation_constants.PROTOCOL] - token_data = { - 'user': { - 'id': user_id, - 'name': parse.unquote(user_id), - federation_constants.FEDERATION: { - 'groups': [{'id': x} for x in group_ids], - 'identity_provider': {'id': idp}, - 'protocol': {'id': protocol} - }, - 'domain': { - 'id': CONF.federation.federated_domain_name, - 'name': CONF.federation.federated_domain_name - } - } - } - - if project_id or domain_id: - self.v3_token_data_helper.populate_roles_for_groups( - token_data, group_ids, project_id, domain_id, user_id) - - return token_data - - def _verify_token_ref(self, token_ref): - """Verify and return the given token_ref.""" - if not token_ref: - raise exception.Unauthorized() - return token_ref - - def _assert_is_not_federation_token(self, token_ref): - """Make sure we aren't using v2 auth on a federation token.""" - token_data = token_ref.get('token_data') - if (token_data and self.get_token_version(token_data) == - token.provider.V3): - if 'OS-FEDERATION' in token_data['token']['user']: - msg = _('Attempting to use OS-FEDERATION token with V2 ' - 'Identity Service, use V3 Authentication') - raise exception.Unauthorized(msg) - - def _assert_default_domain(self, token_ref): - """Make sure we are operating on default domain only.""" - if (token_ref.get('token_data') and - self.get_token_version(token_ref.get('token_data')) == - token.provider.V3): - # this is a V3 token - msg = _('Non-default domain is not supported') - # domain scoping is prohibited - if token_ref['token_data']['token'].get('domain'): - raise exception.Unauthorized( - _('Domain scoped token is not supported')) - # if token is scoped to trust, both trustor and trustee must - # be in the default domain. Furthermore, the delegated project - # must also be in the default domain - metadata_ref = token_ref['metadata'] - if CONF.trust.enabled and 'trust_id' in metadata_ref: - trust_ref = self.trust_api.get_trust(metadata_ref['trust_id']) - trustee_user_ref = self.identity_api.get_user( - trust_ref['trustee_user_id']) - if (trustee_user_ref['domain_id'] != - CONF.identity.default_domain_id): - raise exception.Unauthorized(msg) - trustor_user_ref = self.identity_api.get_user( - trust_ref['trustor_user_id']) - if (trustor_user_ref['domain_id'] != - CONF.identity.default_domain_id): - raise exception.Unauthorized(msg) - project_ref = self.resource_api.get_project( - trust_ref['project_id']) - if (project_ref['domain_id'] != - CONF.identity.default_domain_id): - raise exception.Unauthorized(msg) - - def validate_v2_token(self, token_ref): - try: - self._assert_is_not_federation_token(token_ref) - self._assert_default_domain(token_ref) - # FIXME(gyee): performance or correctness? Should we return the - # cached token or reconstruct it? Obviously if we are going with - # the cached token, any role, project, or domain name changes - # will not be reflected. One may argue that with PKI tokens, - # we are essentially doing cached token validation anyway. - # Lets go with the cached token strategy. Since token - # management layer is now pluggable, one can always provide - # their own implementation to suit their needs. - token_data = token_ref.get('token_data') - if (self.get_token_version(token_data) != token.provider.V2): - # Validate the V3 token as V2 - token_data = self.v2_token_data_helper.v3_to_v2_token( - token_data) - - trust_id = token_data['access'].get('trust', {}).get('id') - if trust_id: - msg = ('Unable to validate trust-scoped tokens using version ' - 'v2.0 API.') - raise exception.Unauthorized(msg) - - return token_data - except exception.ValidationError: - LOG.exception(_LE('Failed to validate token')) - token_id = token_ref['token_data']['access']['token']['id'] - raise exception.TokenNotFound(token_id=token_id) - - def validate_non_persistent_token(self, token_id): - try: - (user_id, methods, audit_ids, domain_id, project_id, trust_id, - federated_info, access_token_id, created_at, expires_at) = ( - self.token_formatter.validate_token(token_id)) - except exception.ValidationError as e: - raise exception.TokenNotFound(e) - - token_dict = None - trust_ref = None - if federated_info: - # NOTE(lbragstad): We need to rebuild information about the - # federated token as well as the federated token roles. This is - # because when we validate a non-persistent token, we don't have a - # token reference to pull the federated token information out of. - # As a result, we have to extract it from the token itself and - # rebuild the federated context. These private methods currently - # live in the keystone.token.providers.fernet.Provider() class. - token_dict = self._rebuild_federated_info(federated_info, user_id) - if project_id or domain_id: - self._rebuild_federated_token_roles(token_dict, federated_info, - user_id, project_id, - domain_id) - if trust_id: - trust_ref = self.trust_api.get_trust(trust_id) - - access_token = None - if access_token_id: - access_token = self.oauth_api.get_access_token(access_token_id) - - return self.v3_token_data_helper.get_token_data( - user_id, - method_names=methods, - domain_id=domain_id, - project_id=project_id, - issued_at=created_at, - expires=expires_at, - trust=trust_ref, - token=token_dict, - access_token=access_token, - audit_info=audit_ids) - - def validate_v3_token(self, token_ref): - # FIXME(gyee): performance or correctness? Should we return the - # cached token or reconstruct it? Obviously if we are going with - # the cached token, any role, project, or domain name changes - # will not be reflected. One may argue that with PKI tokens, - # we are essentially doing cached token validation anyway. - # Lets go with the cached token strategy. Since token - # management layer is now pluggable, one can always provide - # their own implementation to suit their needs. - - trust_id = token_ref.get('trust_id') - if trust_id: - # token trust validation - self.trust_api.get_trust(trust_id) - - token_data = token_ref.get('token_data') - if not token_data or 'token' not in token_data: - # token ref is created by V2 API - project_id = None - project_ref = token_ref.get('tenant') - if project_ref: - project_id = project_ref['id'] - - issued_at = token_ref['token_data']['access']['token']['issued_at'] - audit = token_ref['token_data']['access']['token'].get('audit_ids') - - token_data = self.v3_token_data_helper.get_token_data( - token_ref['user']['id'], - ['password', 'token'], - project_id=project_id, - bind=token_ref.get('bind'), - expires=token_ref['expires'], - issued_at=issued_at, - audit_info=audit) - return token_data diff --git a/keystone-moon/keystone/token/providers/fernet/__init__.py b/keystone-moon/keystone/token/providers/fernet/__init__.py deleted file mode 100644 index 953ef624..00000000 --- a/keystone-moon/keystone/token/providers/fernet/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from keystone.token.providers.fernet.core import * # noqa diff --git a/keystone-moon/keystone/token/providers/fernet/core.py b/keystone-moon/keystone/token/providers/fernet/core.py deleted file mode 100644 index ff6fe9cc..00000000 --- a/keystone-moon/keystone/token/providers/fernet/core.py +++ /dev/null @@ -1,211 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from oslo_config import cfg - -from keystone.common import dependency -from keystone.common import utils as ks_utils -from keystone.federation import constants as federation_constants -from keystone.token.providers import common -from keystone.token.providers.fernet import token_formatters as tf - - -CONF = cfg.CONF - - -@dependency.requires('trust_api', 'oauth_api') -class Provider(common.BaseProvider): - def __init__(self, *args, **kwargs): - super(Provider, self).__init__(*args, **kwargs) - - self.token_formatter = tf.TokenFormatter() - - def needs_persistence(self): - """Should the token be written to a backend.""" - return False - - def issue_v2_token(self, *args, **kwargs): - token_id, token_data = super(Provider, self).issue_v2_token( - *args, **kwargs) - self._build_issued_at_info(token_id, token_data) - return token_id, token_data - - def issue_v3_token(self, *args, **kwargs): - token_id, token_data = super(Provider, self).issue_v3_token( - *args, **kwargs) - self._build_issued_at_info(token_id, token_data) - return token_id, token_data - - def _build_issued_at_info(self, token_id, token_data): - # NOTE(roxanaghe, lbragstad): We must use the creation time that - # Fernet builds into it's token. The Fernet spec details that the - # token creation time is built into the token, outside of the payload - # provided by Keystone. This is the reason why we don't pass the - # issued_at time in the payload. This also means that we shouldn't - # return a token reference with a creation time that we created - # when Fernet uses a different creation time. We should use the - # creation time provided by Fernet because it's the creation time - # that we have to rely on when we validate the token. - fernet_creation_datetime_obj = self.token_formatter.creation_time( - token_id) - if token_data.get('access'): - token_data['access']['token']['issued_at'] = ks_utils.isotime( - at=fernet_creation_datetime_obj, subsecond=True) - else: - token_data['token']['issued_at'] = ks_utils.isotime( - at=fernet_creation_datetime_obj, subsecond=True) - - def _build_federated_info(self, token_data): - """Extract everything needed for federated tokens. - - This dictionary is passed to federated token formatters, which unpack - the values and build federated Fernet tokens. - - """ - token_data = token_data['token'] - try: - user = token_data['user'] - federation = user[federation_constants.FEDERATION] - idp_id = federation['identity_provider']['id'] - protocol_id = federation['protocol']['id'] - except KeyError: - # The token data doesn't have federated info, so we aren't dealing - # with a federated token and no federated info to build. - return - - group_ids = federation.get('groups') - - return {'group_ids': group_ids, - 'idp_id': idp_id, - 'protocol_id': protocol_id} - - def _rebuild_federated_info(self, federated_dict, user_id): - """Format federated information into the token reference. - - The federated_dict is passed back from the federated token formatters. - The responsibility of this method is to format the information passed - back from the token formatter into the token reference before - constructing the token data from the V3TokenDataHelper. - - """ - g_ids = federated_dict['group_ids'] - idp_id = federated_dict['idp_id'] - protocol_id = federated_dict['protocol_id'] - - federated_info = { - 'groups': g_ids, - 'identity_provider': {'id': idp_id}, - 'protocol': {'id': protocol_id} - } - - token_dict = { - 'user': { - federation_constants.FEDERATION: federated_info, - 'id': user_id, - 'name': user_id, - 'domain': {'id': CONF.federation.federated_domain_name, - 'name': CONF.federation.federated_domain_name, }, - } - } - - return token_dict - - def _rebuild_federated_token_roles(self, token_dict, federated_dict, - user_id, project_id, domain_id): - """Populate roles based on (groups, project/domain) pair. - - We must populate roles from (groups, project/domain) as ephemeral users - don't exist in the backend. Upon success, a ``roles`` key will be added - to ``token_dict``. - - :param token_dict: dictionary with data used for building token - :param federated_dict: federated information such as identity provider - protocol and set of group IDs - :param user_id: user ID - :param project_id: project ID the token is being scoped to - :param domain_id: domain ID the token is being scoped to - - """ - group_ids = [x['id'] for x in federated_dict['group_ids']] - self.v3_token_data_helper.populate_roles_for_groups( - token_dict, group_ids, project_id, domain_id, user_id) - - def _extract_v2_token_data(self, token_data): - user_id = token_data['access']['user']['id'] - expires_at = token_data['access']['token']['expires'] - audit_ids = token_data['access']['token'].get('audit_ids') - methods = ['password'] - if len(audit_ids) > 1: - methods.append('token') - project_id = token_data['access']['token'].get('tenant', {}).get('id') - domain_id = None - trust_id = None - access_token_id = None - federated_info = None - return (user_id, expires_at, audit_ids, methods, domain_id, project_id, - trust_id, access_token_id, federated_info) - - def _extract_v3_token_data(self, token_data): - """Extract information from a v3 token reference.""" - user_id = token_data['token']['user']['id'] - expires_at = token_data['token']['expires_at'] - audit_ids = token_data['token']['audit_ids'] - methods = token_data['token'].get('methods') - domain_id = token_data['token'].get('domain', {}).get('id') - project_id = token_data['token'].get('project', {}).get('id') - trust_id = token_data['token'].get('OS-TRUST:trust', {}).get('id') - access_token_id = token_data['token'].get('OS-OAUTH1', {}).get( - 'access_token_id') - federated_info = self._build_federated_info(token_data) - - return (user_id, expires_at, audit_ids, methods, domain_id, project_id, - trust_id, access_token_id, federated_info) - - def _get_token_id(self, token_data): - """Generate the token_id based upon the data in token_data. - - :param token_data: token information - :type token_data: dict - :rtype: six.text_type - - """ - # NOTE(lbragstad): Only v2.0 token responses include an 'access' - # attribute. - if token_data.get('access'): - (user_id, expires_at, audit_ids, methods, domain_id, project_id, - trust_id, access_token_id, federated_info) = ( - self._extract_v2_token_data(token_data)) - else: - (user_id, expires_at, audit_ids, methods, domain_id, project_id, - trust_id, access_token_id, federated_info) = ( - self._extract_v3_token_data(token_data)) - - return self.token_formatter.create_token( - user_id, - expires_at, - audit_ids, - methods=methods, - domain_id=domain_id, - project_id=project_id, - trust_id=trust_id, - federated_info=federated_info, - access_token_id=access_token_id - ) - - @property - def _supports_bind_authentication(self): - """Return if the token provider supports bind authentication methods. - - :returns: False - - """ - return False diff --git a/keystone-moon/keystone/token/providers/fernet/token_formatters.py b/keystone-moon/keystone/token/providers/fernet/token_formatters.py deleted file mode 100644 index dfdd06e8..00000000 --- a/keystone-moon/keystone/token/providers/fernet/token_formatters.py +++ /dev/null @@ -1,677 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import base64 -import datetime -import struct -import uuid - -from cryptography import fernet -import msgpack -from oslo_config import cfg -from oslo_log import log -from oslo_utils import timeutils -from six.moves import map -from six.moves import urllib - -from keystone.auth import plugins as auth_plugins -from keystone.common import utils as ks_utils -from keystone import exception -from keystone.i18n import _, _LI -from keystone.token import provider -from keystone.token.providers.fernet import utils - - -CONF = cfg.CONF -LOG = log.getLogger(__name__) - -# Fernet byte indexes as as computed by pypi/keyless_fernet and defined in -# https://github.com/fernet/spec -TIMESTAMP_START = 1 -TIMESTAMP_END = 9 - - -class TokenFormatter(object): - """Packs and unpacks payloads into tokens for transport.""" - - @property - def crypto(self): - """Return a cryptography instance. - - You can extend this class with a custom crypto @property to provide - your own token encoding / decoding. For example, using a different - cryptography library (e.g. ``python-keyczar``) or to meet arbitrary - security requirements. - - This @property just needs to return an object that implements - ``encrypt(plaintext)`` and ``decrypt(ciphertext)``. - - """ - keys = utils.load_keys() - - if not keys: - raise exception.KeysNotFound() - - fernet_instances = [fernet.Fernet(key) for key in keys] - return fernet.MultiFernet(fernet_instances) - - def pack(self, payload): - """Pack a payload for transport as a token. - - :type payload: six.binary_type - :rtype: six.text_type - - """ - # base64 padding (if any) is not URL-safe - return self.crypto.encrypt(payload).rstrip(b'=').decode('utf-8') - - def unpack(self, token): - """Unpack a token, and validate the payload. - - :type token: six.text_type - :rtype: six.binary_type - - """ - # TODO(lbragstad): Restore padding on token before decoding it. - # Initially in Kilo, Fernet tokens were returned to the user with - # padding appended to the token. Later in Liberty this padding was - # removed and restored in the Fernet provider. The following if - # statement ensures that we can validate tokens with and without token - # padding, in the event of an upgrade and the tokens that are issued - # throughout the upgrade. Remove this if statement when Mitaka opens - # for development and exclusively use the restore_padding() class - # method. - if token.endswith('%3D'): - token = urllib.parse.unquote(token) - else: - token = TokenFormatter.restore_padding(token) - - try: - return self.crypto.decrypt(token.encode('utf-8')) - except fernet.InvalidToken: - raise exception.ValidationError( - _('This is not a recognized Fernet token %s') % token) - - @classmethod - def restore_padding(cls, token): - """Restore padding based on token size. - - :param token: token to restore padding on - :type token: six.text_type - :returns: token with correct padding - - """ - # Re-inflate the padding - mod_returned = len(token) % 4 - if mod_returned: - missing_padding = 4 - mod_returned - token += '=' * missing_padding - return token - - @classmethod - def creation_time(cls, fernet_token): - """Returns the creation time of a valid Fernet token. - - :type fernet_token: six.text_type - - """ - fernet_token = TokenFormatter.restore_padding(fernet_token) - # fernet_token is six.text_type - - # Fernet tokens are base64 encoded, so we need to unpack them first - # urlsafe_b64decode() requires six.binary_type - token_bytes = base64.urlsafe_b64decode(fernet_token.encode('utf-8')) - - # slice into the byte array to get just the timestamp - timestamp_bytes = token_bytes[TIMESTAMP_START:TIMESTAMP_END] - - # convert those bytes to an integer - # (it's a 64-bit "unsigned long long int" in C) - timestamp_int = struct.unpack(">Q", timestamp_bytes)[0] - - # and with an integer, it's trivial to produce a datetime object - created_at = datetime.datetime.utcfromtimestamp(timestamp_int) - - return created_at - - def create_token(self, user_id, expires_at, audit_ids, methods=None, - domain_id=None, project_id=None, trust_id=None, - federated_info=None, access_token_id=None): - """Given a set of payload attributes, generate a Fernet token.""" - for payload_class in PAYLOAD_CLASSES: - if payload_class.create_arguments_apply( - project_id=project_id, domain_id=domain_id, - trust_id=trust_id, federated_info=federated_info, - access_token_id=access_token_id): - break - - version = payload_class.version - payload = payload_class.assemble( - user_id, methods, project_id, domain_id, expires_at, audit_ids, - trust_id, federated_info, access_token_id - ) - - versioned_payload = (version,) + payload - serialized_payload = msgpack.packb(versioned_payload) - token = self.pack(serialized_payload) - - # NOTE(lbragstad): We should warn against Fernet tokens that are over - # 255 characters in length. This is mostly due to persisting the tokens - # in a backend store of some kind that might have a limit of 255 - # characters. Even though Keystone isn't storing a Fernet token - # anywhere, we can't say it isn't being stored somewhere else with - # those kind of backend constraints. - if len(token) > 255: - LOG.info(_LI('Fernet token created with length of %d ' - 'characters, which exceeds 255 characters'), - len(token)) - - return token - - def validate_token(self, token): - """Validates a Fernet token and returns the payload attributes. - - :type token: six.text_type - - """ - serialized_payload = self.unpack(token) - versioned_payload = msgpack.unpackb(serialized_payload) - version, payload = versioned_payload[0], versioned_payload[1:] - - for payload_class in PAYLOAD_CLASSES: - if version == payload_class.version: - (user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id) = ( - payload_class.disassemble(payload)) - break - else: - # If the token_format is not recognized, raise ValidationError. - raise exception.ValidationError(_( - 'This is not a recognized Fernet payload version: %s') % - version) - - # rather than appearing in the payload, the creation time is encoded - # into the token format itself - created_at = TokenFormatter.creation_time(token) - created_at = ks_utils.isotime(at=created_at, subsecond=True) - expires_at = timeutils.parse_isotime(expires_at) - expires_at = ks_utils.isotime(at=expires_at, subsecond=True) - - return (user_id, methods, audit_ids, domain_id, project_id, trust_id, - federated_info, access_token_id, created_at, expires_at) - - -class BasePayload(object): - # each payload variant should have a unique version - version = None - - @classmethod - def create_arguments_apply(cls, **kwargs): - """Check the arguments to see if they apply to this payload variant. - - :returns: True if the arguments indicate that this payload class is - needed for the token otherwise returns False. - :rtype: bool - - """ - raise NotImplementedError() - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - """Assemble the payload of a token. - - :param user_id: identifier of the user in the token request - :param methods: list of authentication methods used - :param project_id: ID of the project to scope to - :param domain_id: ID of the domain to scope to - :param expires_at: datetime of the token's expiration - :param audit_ids: list of the token's audit IDs - :param trust_id: ID of the trust in effect - :param federated_info: dictionary containing group IDs, the identity - provider ID, protocol ID, and federated domain - ID - :param access_token_id: ID of the secret in OAuth1 authentication - :returns: the payload of a token - - """ - raise NotImplementedError() - - @classmethod - def disassemble(cls, payload): - """Disassemble an unscoped payload into the component data. - - The tuple consists of:: - - (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - * ``methods`` are the auth methods. - * federated_info is a dict contains the group IDs, the identity - provider ID, the protocol ID, and the federated domain ID - - Fields will be set to None if they didn't apply to this payload type. - - :param payload: this variant of payload - :returns: a tuple of the payloads component data - - """ - raise NotImplementedError() - - @classmethod - def convert_uuid_hex_to_bytes(cls, uuid_string): - """Compress UUID formatted strings to bytes. - - :param uuid_string: uuid string to compress to bytes - :returns: a byte representation of the uuid - - """ - uuid_obj = uuid.UUID(uuid_string) - return uuid_obj.bytes - - @classmethod - def convert_uuid_bytes_to_hex(cls, uuid_byte_string): - """Generate uuid.hex format based on byte string. - - :param uuid_byte_string: uuid string to generate from - :returns: uuid hex formatted string - - """ - uuid_obj = uuid.UUID(bytes=uuid_byte_string) - return uuid_obj.hex - - @classmethod - def _convert_time_string_to_float(cls, time_string): - """Convert a time formatted string to a float. - - :param time_string: time formatted string - :returns: a timestamp as a float - - """ - time_object = timeutils.parse_isotime(time_string) - return (timeutils.normalize_time(time_object) - - datetime.datetime.utcfromtimestamp(0)).total_seconds() - - @classmethod - def _convert_float_to_time_string(cls, time_float): - """Convert a floating point timestamp to a string. - - :param time_float: integer representing timestamp - :returns: a time formatted strings - - """ - time_object = datetime.datetime.utcfromtimestamp(time_float) - return ks_utils.isotime(time_object, subsecond=True) - - @classmethod - def attempt_convert_uuid_hex_to_bytes(cls, value): - """Attempt to convert value to bytes or return value. - - :param value: value to attempt to convert to bytes - :returns: tuple containing boolean indicating whether user_id was - stored as bytes and uuid value as bytes or the original value - - """ - try: - return (True, cls.convert_uuid_hex_to_bytes(value)) - except ValueError: - # this might not be a UUID, depending on the situation (i.e. - # federation) - return (False, value) - - -class UnscopedPayload(BasePayload): - version = 0 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return True - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - return (b_user_id, methods, expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - expires_at_str = cls._convert_float_to_time_string(payload[2]) - audit_ids = list(map(provider.base64_encode, payload[3])) - project_id = None - domain_id = None - trust_id = None - federated_info = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class DomainScopedPayload(BasePayload): - version = 1 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['domain_id'] - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - try: - b_domain_id = cls.convert_uuid_hex_to_bytes(domain_id) - except ValueError: - # the default domain ID is configurable, and probably isn't a UUID - if domain_id == CONF.identity.default_domain_id: - b_domain_id = domain_id - else: - raise - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - return (b_user_id, methods, b_domain_id, expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - try: - domain_id = cls.convert_uuid_bytes_to_hex(payload[2]) - except ValueError: - # the default domain ID is configurable, and probably isn't a UUID - if payload[2] == CONF.identity.default_domain_id: - domain_id = payload[2] - else: - raise - expires_at_str = cls._convert_float_to_time_string(payload[3]) - audit_ids = list(map(provider.base64_encode, payload[4])) - project_id = None - trust_id = None - federated_info = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class ProjectScopedPayload(BasePayload): - version = 2 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['project_id'] - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - b_project_id = cls.attempt_convert_uuid_hex_to_bytes(project_id) - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - return (b_user_id, methods, b_project_id, expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - (is_stored_as_bytes, project_id) = payload[2] - if is_stored_as_bytes: - project_id = cls.convert_uuid_bytes_to_hex(project_id) - expires_at_str = cls._convert_float_to_time_string(payload[3]) - audit_ids = list(map(provider.base64_encode, payload[4])) - domain_id = None - trust_id = None - federated_info = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class TrustScopedPayload(BasePayload): - version = 3 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['trust_id'] - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - b_project_id = cls.attempt_convert_uuid_hex_to_bytes(project_id) - b_trust_id = cls.convert_uuid_hex_to_bytes(trust_id) - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - - return (b_user_id, methods, b_project_id, expires_at_int, b_audit_ids, - b_trust_id) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - (is_stored_as_bytes, project_id) = payload[2] - if is_stored_as_bytes: - project_id = cls.convert_uuid_bytes_to_hex(project_id) - expires_at_str = cls._convert_float_to_time_string(payload[3]) - audit_ids = list(map(provider.base64_encode, payload[4])) - trust_id = cls.convert_uuid_bytes_to_hex(payload[5]) - domain_id = None - federated_info = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class FederatedUnscopedPayload(BasePayload): - version = 4 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['federated_info'] - - @classmethod - def pack_group_id(cls, group_dict): - return cls.attempt_convert_uuid_hex_to_bytes(group_dict['id']) - - @classmethod - def unpack_group_id(cls, group_id_in_bytes): - (is_stored_as_bytes, group_id) = group_id_in_bytes - if is_stored_as_bytes: - group_id = cls.convert_uuid_bytes_to_hex(group_id) - return {'id': group_id} - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - b_group_ids = list(map(cls.pack_group_id, - federated_info['group_ids'])) - b_idp_id = cls.attempt_convert_uuid_hex_to_bytes( - federated_info['idp_id']) - protocol_id = federated_info['protocol_id'] - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - - return (b_user_id, methods, b_group_ids, b_idp_id, protocol_id, - expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - group_ids = list(map(cls.unpack_group_id, payload[2])) - (is_stored_as_bytes, idp_id) = payload[3] - if is_stored_as_bytes: - idp_id = cls.convert_uuid_bytes_to_hex(idp_id) - protocol_id = payload[4] - expires_at_str = cls._convert_float_to_time_string(payload[5]) - audit_ids = list(map(provider.base64_encode, payload[6])) - federated_info = dict(group_ids=group_ids, idp_id=idp_id, - protocol_id=protocol_id) - project_id = None - domain_id = None - trust_id = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class FederatedScopedPayload(FederatedUnscopedPayload): - version = None - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - b_scope_id = cls.attempt_convert_uuid_hex_to_bytes( - project_id or domain_id) - b_group_ids = list(map(cls.pack_group_id, - federated_info['group_ids'])) - b_idp_id = cls.attempt_convert_uuid_hex_to_bytes( - federated_info['idp_id']) - protocol_id = federated_info['protocol_id'] - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - - return (b_user_id, methods, b_scope_id, b_group_ids, b_idp_id, - protocol_id, expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - (is_stored_as_bytes, scope_id) = payload[2] - if is_stored_as_bytes: - scope_id = cls.convert_uuid_bytes_to_hex(scope_id) - project_id = ( - scope_id - if cls.version == FederatedProjectScopedPayload.version else None) - domain_id = ( - scope_id - if cls.version == FederatedDomainScopedPayload.version else None) - group_ids = list(map(cls.unpack_group_id, payload[3])) - (is_stored_as_bytes, idp_id) = payload[4] - if is_stored_as_bytes: - idp_id = cls.convert_uuid_bytes_to_hex(idp_id) - protocol_id = payload[5] - expires_at_str = cls._convert_float_to_time_string(payload[6]) - audit_ids = list(map(provider.base64_encode, payload[7])) - federated_info = dict(idp_id=idp_id, protocol_id=protocol_id, - group_ids=group_ids) - trust_id = None - access_token_id = None - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -class FederatedProjectScopedPayload(FederatedScopedPayload): - version = 5 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['project_id'] and kwargs['federated_info'] - - -class FederatedDomainScopedPayload(FederatedScopedPayload): - version = 6 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['domain_id'] and kwargs['federated_info'] - - -class OauthScopedPayload(BasePayload): - version = 7 - - @classmethod - def create_arguments_apply(cls, **kwargs): - return kwargs['access_token_id'] - - @classmethod - def assemble(cls, user_id, methods, project_id, domain_id, expires_at, - audit_ids, trust_id, federated_info, access_token_id): - b_user_id = cls.attempt_convert_uuid_hex_to_bytes(user_id) - methods = auth_plugins.convert_method_list_to_integer(methods) - b_project_id = cls.attempt_convert_uuid_hex_to_bytes(project_id) - expires_at_int = cls._convert_time_string_to_float(expires_at) - b_audit_ids = list(map(provider.random_urlsafe_str_to_bytes, - audit_ids)) - b_access_token_id = cls.attempt_convert_uuid_hex_to_bytes( - access_token_id) - return (b_user_id, methods, b_project_id, b_access_token_id, - expires_at_int, b_audit_ids) - - @classmethod - def disassemble(cls, payload): - (is_stored_as_bytes, user_id) = payload[0] - if is_stored_as_bytes: - user_id = cls.convert_uuid_bytes_to_hex(user_id) - methods = auth_plugins.convert_integer_to_method_list(payload[1]) - (is_stored_as_bytes, project_id) = payload[2] - if is_stored_as_bytes: - project_id = cls.convert_uuid_bytes_to_hex(project_id) - (is_stored_as_bytes, access_token_id) = payload[3] - if is_stored_as_bytes: - access_token_id = cls.convert_uuid_bytes_to_hex(access_token_id) - expires_at_str = cls._convert_float_to_time_string(payload[4]) - audit_ids = list(map(provider.base64_encode, payload[5])) - domain_id = None - trust_id = None - federated_info = None - - return (user_id, methods, project_id, domain_id, expires_at_str, - audit_ids, trust_id, federated_info, access_token_id) - - -# For now, the order of the classes in the following list is important. This -# is because the way they test that the payload applies to them in -# the create_arguments_apply method requires that the previous ones rejected -# the payload arguments. For example, UnscopedPayload must be last since it's -# the catch-all after all the other payloads have been checked. -# TODO(blk-u): Clean up the create_arguments_apply methods so that they don't -# depend on the previous classes then these can be in any order. -PAYLOAD_CLASSES = [ - OauthScopedPayload, - TrustScopedPayload, - FederatedProjectScopedPayload, - FederatedDomainScopedPayload, - FederatedUnscopedPayload, - ProjectScopedPayload, - DomainScopedPayload, - UnscopedPayload, -] diff --git a/keystone-moon/keystone/token/providers/fernet/utils.py b/keystone-moon/keystone/token/providers/fernet/utils.py deleted file mode 100644 index 1c3552d4..00000000 --- a/keystone-moon/keystone/token/providers/fernet/utils.py +++ /dev/null @@ -1,270 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import os -import stat - -from cryptography import fernet -from oslo_config import cfg -from oslo_log import log - -from keystone.i18n import _LE, _LW, _LI - - -LOG = log.getLogger(__name__) - -CONF = cfg.CONF - - -def validate_key_repository(requires_write=False): - """Validate permissions on the key repository directory.""" - # NOTE(lbragstad): We shouldn't need to check if the directory was passed - # in as None because we don't set allow_no_values to True. - - # ensure current user has sufficient access to the key repository - is_valid = (os.access(CONF.fernet_tokens.key_repository, os.R_OK) and - os.access(CONF.fernet_tokens.key_repository, os.X_OK)) - if requires_write: - is_valid = (is_valid and - os.access(CONF.fernet_tokens.key_repository, os.W_OK)) - - if not is_valid: - LOG.error( - _LE('Either [fernet_tokens] key_repository does not exist or ' - 'Keystone does not have sufficient permission to access it: ' - '%s'), CONF.fernet_tokens.key_repository) - else: - # ensure the key repository isn't world-readable - stat_info = os.stat(CONF.fernet_tokens.key_repository) - if(stat_info.st_mode & stat.S_IROTH or - stat_info.st_mode & stat.S_IXOTH): - LOG.warning(_LW( - '[fernet_tokens] key_repository is world readable: %s'), - CONF.fernet_tokens.key_repository) - - return is_valid - - -def _convert_to_integers(id_value): - """Cast user and group system identifiers to integers.""" - # NOTE(lbragstad) os.chown() will raise a TypeError here if - # keystone_user_id and keystone_group_id are not integers. Let's - # cast them to integers if we can because it's possible to pass non-integer - # values into the fernet_setup utility. - try: - id_int = int(id_value) - except ValueError as e: - msg = _LE('Unable to convert Keystone user or group ID. Error: %s') - LOG.error(msg, e) - raise - - return id_int - - -def create_key_directory(keystone_user_id=None, keystone_group_id=None): - """If the configured key directory does not exist, attempt to create it.""" - if not os.access(CONF.fernet_tokens.key_repository, os.F_OK): - LOG.info(_LI( - '[fernet_tokens] key_repository does not appear to exist; ' - 'attempting to create it')) - - try: - os.makedirs(CONF.fernet_tokens.key_repository, 0o700) - except OSError: - LOG.error(_LE( - 'Failed to create [fernet_tokens] key_repository: either it ' - 'already exists or you don\'t have sufficient permissions to ' - 'create it')) - - if keystone_user_id and keystone_group_id: - os.chown( - CONF.fernet_tokens.key_repository, - keystone_user_id, - keystone_group_id) - elif keystone_user_id or keystone_group_id: - LOG.warning(_LW( - 'Unable to change the ownership of [fernet_tokens] ' - 'key_repository without a keystone user ID and keystone group ' - 'ID both being provided: %s') % - CONF.fernet_tokens.key_repository) - - -def _create_new_key(keystone_user_id, keystone_group_id): - """Securely create a new encryption key. - - Create a new key that is readable by the Keystone group and Keystone user. - """ - key = fernet.Fernet.generate_key() # key is bytes - - # This ensures the key created is not world-readable - old_umask = os.umask(0o177) - if keystone_user_id and keystone_group_id: - old_egid = os.getegid() - old_euid = os.geteuid() - os.setegid(keystone_group_id) - os.seteuid(keystone_user_id) - elif keystone_user_id or keystone_group_id: - LOG.warning(_LW( - 'Unable to change the ownership of the new key without a keystone ' - 'user ID and keystone group ID both being provided: %s') % - CONF.fernet_tokens.key_repository) - # Determine the file name of the new key - key_file = os.path.join(CONF.fernet_tokens.key_repository, '0') - try: - with open(key_file, 'w') as f: - f.write(key.decode('utf-8')) # convert key to str for the file. - finally: - # After writing the key, set the umask back to it's original value. Do - # the same with group and user identifiers if a Keystone group or user - # was supplied. - os.umask(old_umask) - if keystone_user_id and keystone_group_id: - os.seteuid(old_euid) - os.setegid(old_egid) - - LOG.info(_LI('Created a new key: %s'), key_file) - - -def initialize_key_repository(keystone_user_id=None, keystone_group_id=None): - """Create a key repository and bootstrap it with a key. - - :param keystone_user_id: User ID of the Keystone user. - :param keystone_group_id: Group ID of the Keystone user. - - """ - # make sure we have work to do before proceeding - if os.access(os.path.join(CONF.fernet_tokens.key_repository, '0'), - os.F_OK): - LOG.info(_LI('Key repository is already initialized; aborting.')) - return - - # bootstrap an existing key - _create_new_key(keystone_user_id, keystone_group_id) - - # ensure that we end up with a primary and secondary key - rotate_keys(keystone_user_id, keystone_group_id) - - -def rotate_keys(keystone_user_id=None, keystone_group_id=None): - """Create a new primary key and revoke excess active keys. - - :param keystone_user_id: User ID of the Keystone user. - :param keystone_group_id: Group ID of the Keystone user. - - Key rotation utilizes the following behaviors: - - - The highest key number is used as the primary key (used for encryption). - - All keys can be used for decryption. - - New keys are always created as key "0," which serves as a placeholder - before promoting it to be the primary key. - - This strategy allows you to safely perform rotation on one node in a - cluster, before syncing the results of the rotation to all other nodes - (during both key rotation and synchronization, all nodes must recognize all - primary keys). - - """ - # read the list of key files - key_files = dict() - for filename in os.listdir(CONF.fernet_tokens.key_repository): - path = os.path.join(CONF.fernet_tokens.key_repository, str(filename)) - if os.path.isfile(path): - try: - key_id = int(filename) - except ValueError: # nosec : name isn't a number, ignore the file. - pass - else: - key_files[key_id] = path - - LOG.info(_LI('Starting key rotation with %(count)s key files: %(list)s'), { - 'count': len(key_files), - 'list': list(key_files.values())}) - - # determine the number of the new primary key - current_primary_key = max(key_files.keys()) - LOG.info(_LI('Current primary key is: %s'), current_primary_key) - new_primary_key = current_primary_key + 1 - LOG.info(_LI('Next primary key will be: %s'), new_primary_key) - - # promote the next primary key to be the primary - os.rename( - os.path.join(CONF.fernet_tokens.key_repository, '0'), - os.path.join(CONF.fernet_tokens.key_repository, str(new_primary_key))) - key_files.pop(0) - key_files[new_primary_key] = os.path.join( - CONF.fernet_tokens.key_repository, - str(new_primary_key)) - LOG.info(_LI('Promoted key 0 to be the primary: %s'), new_primary_key) - - # add a new key to the rotation, which will be the *next* primary - _create_new_key(keystone_user_id, keystone_group_id) - - max_active_keys = CONF.fernet_tokens.max_active_keys - # check for bad configuration - if max_active_keys < 1: - LOG.warning(_LW( - '[fernet_tokens] max_active_keys must be at least 1 to maintain a ' - 'primary key.')) - max_active_keys = 1 - - # purge excess keys - - # Note that key_files doesn't contain the new active key that was created, - # only the old active keys. - keys = sorted(key_files.keys(), reverse=True) - while len(keys) > (max_active_keys - 1): - index_to_purge = keys.pop() - key_to_purge = key_files[index_to_purge] - LOG.info(_LI('Excess key to purge: %s'), key_to_purge) - os.remove(key_to_purge) - - -def load_keys(): - """Load keys from disk into a list. - - The first key in the list is the primary key used for encryption. All - other keys are active secondary keys that can be used for decrypting - tokens. - - """ - if not validate_key_repository(): - return [] - - # build a dictionary of key_number:encryption_key pairs - keys = dict() - for filename in os.listdir(CONF.fernet_tokens.key_repository): - path = os.path.join(CONF.fernet_tokens.key_repository, str(filename)) - if os.path.isfile(path): - with open(path, 'r') as key_file: - try: - key_id = int(filename) - except ValueError: # nosec : filename isn't a number, ignore - # this file since it's not a key. - pass - else: - keys[key_id] = key_file.read() - - if len(keys) != CONF.fernet_tokens.max_active_keys: - # If there haven't been enough key rotations to reach max_active_keys, - # or if the configured value of max_active_keys has changed since the - # last rotation, then reporting the discrepancy might be useful. Once - # the number of keys matches max_active_keys, this log entry is too - # repetitive to be useful. - LOG.info(_LI( - 'Loaded %(count)d encryption keys (max_active_keys=%(max)d) from: ' - '%(dir)s'), { - 'count': len(keys), - 'max': CONF.fernet_tokens.max_active_keys, - 'dir': CONF.fernet_tokens.key_repository}) - - # return the encryption_keys, sorted by key number, descending - return [keys[x] for x in sorted(keys.keys(), reverse=True)] diff --git a/keystone-moon/keystone/token/providers/pki.py b/keystone-moon/keystone/token/providers/pki.py deleted file mode 100644 index 6a5a2999..00000000 --- a/keystone-moon/keystone/token/providers/pki.py +++ /dev/null @@ -1,66 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Keystone PKI Token Provider""" - -from keystoneclient.common import cms -from oslo_config import cfg -from oslo_log import log -from oslo_log import versionutils -from oslo_serialization import jsonutils - -from keystone.common import environment -from keystone.common import utils -from keystone import exception -from keystone.i18n import _, _LE -from keystone.token.providers import common - - -CONF = cfg.CONF - -LOG = log.getLogger(__name__) - - -@versionutils.deprecated( - as_of=versionutils.deprecated.MITAKA, - what='the PKI token provider', - in_favor_of='the Fernet or UUID token providers') -class Provider(common.BaseProvider): - def _get_token_id(self, token_data): - try: - # force conversion to a string as the keystone client cms code - # produces unicode. This can be removed if the client returns - # str() - # TODO(ayoung): Make to a byte_str for Python3 - token_json = jsonutils.dumps(token_data, cls=utils.PKIEncoder) - token_id = str(cms.cms_sign_token(token_json, - CONF.signing.certfile, - CONF.signing.keyfile)) - return token_id - except environment.subprocess.CalledProcessError: - LOG.exception(_LE('Unable to sign token')) - raise exception.UnexpectedError(_( - 'Unable to sign token.')) - - @property - def _supports_bind_authentication(self): - """Return if the token provider supports bind authentication methods. - - :returns: True - """ - return True - - def needs_persistence(self): - """Should the token be written to a backend.""" - return True diff --git a/keystone-moon/keystone/token/providers/pkiz.py b/keystone-moon/keystone/token/providers/pkiz.py deleted file mode 100644 index 3e78d2e4..00000000 --- a/keystone-moon/keystone/token/providers/pkiz.py +++ /dev/null @@ -1,64 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Keystone Compressed PKI Token Provider""" - -from keystoneclient.common import cms -from oslo_config import cfg -from oslo_log import log -from oslo_log import versionutils -from oslo_serialization import jsonutils - -from keystone.common import environment -from keystone.common import utils -from keystone import exception -from keystone.i18n import _ -from keystone.token.providers import common - - -CONF = cfg.CONF - -LOG = log.getLogger(__name__) -ERROR_MESSAGE = _('Unable to sign token.') - - -@versionutils.deprecated( - as_of=versionutils.deprecated.MITAKA, - what='the PKIZ token provider', - in_favor_of='the Fernet or UUID token providers') -class Provider(common.BaseProvider): - def _get_token_id(self, token_data): - try: - # force conversion to a string as the keystone client cms code - # produces unicode. This can be removed if the client returns - # str() - # TODO(ayoung): Make to a byte_str for Python3 - token_json = jsonutils.dumps(token_data, cls=utils.PKIEncoder) - token_id = str(cms.pkiz_sign(token_json, - CONF.signing.certfile, - CONF.signing.keyfile)) - return token_id - except environment.subprocess.CalledProcessError: - LOG.exception(ERROR_MESSAGE) - raise exception.UnexpectedError(ERROR_MESSAGE) - - @property - def _supports_bind_authentication(self): - """Return if the token provider supports bind authentication methods. - - :returns: True - """ - return True - - def needs_persistence(self): - """Should the token be written to a backend.""" - return True diff --git a/keystone-moon/keystone/token/providers/uuid.py b/keystone-moon/keystone/token/providers/uuid.py deleted file mode 100644 index f9a91617..00000000 --- a/keystone-moon/keystone/token/providers/uuid.py +++ /dev/null @@ -1,41 +0,0 @@ -# Copyright 2013 OpenStack Foundation -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Keystone UUID Token Provider""" - -from __future__ import absolute_import - -import uuid - -from keystone.token.providers import common - - -class Provider(common.BaseProvider): - def __init__(self, *args, **kwargs): - super(Provider, self).__init__(*args, **kwargs) - - def _get_token_id(self, token_data): - return uuid.uuid4().hex - - @property - def _supports_bind_authentication(self): - """Return if the token provider supports bind authentication methods. - - :returns: True - """ - return True - - def needs_persistence(self): - """Should the token be written to a backend.""" - return True |