aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/models/token_model.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/models/token_model.py')
-rw-r--r--keystone-moon/keystone/models/token_model.py339
1 files changed, 0 insertions, 339 deletions
diff --git a/keystone-moon/keystone/models/token_model.py b/keystone-moon/keystone/models/token_model.py
deleted file mode 100644
index 32e6b365..00000000
--- a/keystone-moon/keystone/models/token_model.py
+++ /dev/null
@@ -1,339 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-"""Unified in-memory token model."""
-
-from keystoneclient.common import cms
-from oslo_config import cfg
-from oslo_utils import reflection
-from oslo_utils import timeutils
-import six
-
-from keystone import exception
-from keystone.federation import constants
-from keystone.i18n import _
-
-CONF = cfg.CONF
-# supported token versions
-V2 = 'v2.0'
-V3 = 'v3.0'
-VERSIONS = frozenset([V2, V3])
-
-
-def _parse_and_normalize_time(time_data):
- if isinstance(time_data, six.string_types):
- time_data = timeutils.parse_isotime(time_data)
- return timeutils.normalize_time(time_data)
-
-
-class KeystoneToken(dict):
- """An in-memory representation that unifies v2 and v3 tokens."""
-
- # TODO(morganfainberg): Align this in-memory representation with the
- # objects in keystoneclient. This object should be eventually updated
- # to be the source of token data with the ability to emit any version
- # of the token instead of only consuming the token dict and providing
- # property accessors for the underlying data.
-
- def __init__(self, token_id, token_data):
- self.token_data = token_data
- if 'access' in token_data:
- super(KeystoneToken, self).__init__(**token_data['access'])
- self.version = V2
- elif 'token' in token_data and 'methods' in token_data['token']:
- super(KeystoneToken, self).__init__(**token_data['token'])
- self.version = V3
- else:
- raise exception.UnsupportedTokenVersionException()
- self.token_id = token_id
- self.short_id = cms.cms_hash_token(token_id,
- mode=CONF.token.hash_algorithm)
-
- if self.project_scoped and self.domain_scoped:
- raise exception.UnexpectedError(_('Found invalid token: scoped to '
- 'both project and domain.'))
-
- def __repr__(self):
- desc = ('<%(type)s (audit_id=%(audit_id)s, '
- 'audit_chain_id=%(audit_chain_id)s) at %(loc)s>')
- self_cls_name = reflection.get_class_name(self,
- fully_qualified=False)
- return desc % {'type': self_cls_name,
- 'audit_id': self.audit_id,
- 'audit_chain_id': self.audit_chain_id,
- 'loc': hex(id(self))}
-
- @property
- def expires(self):
- if self.version is V3:
- expires_at = self['expires_at']
- else:
- expires_at = self['token']['expires']
- return _parse_and_normalize_time(expires_at)
-
- @property
- def issued(self):
- if self.version is V3:
- issued_at = self['issued_at']
- else:
- issued_at = self['token']['issued_at']
- return _parse_and_normalize_time(issued_at)
-
- @property
- def audit_id(self):
- if self.version is V3:
- return self.get('audit_ids', [None])[0]
- return self['token'].get('audit_ids', [None])[0]
-
- @property
- def audit_chain_id(self):
- if self.version is V3:
- return self.get('audit_ids', [None])[-1]
- return self['token'].get('audit_ids', [None])[-1]
-
- @property
- def auth_token(self):
- return self.token_id
-
- @property
- def user_id(self):
- return self['user']['id']
-
- @property
- def user_name(self):
- return self['user']['name']
-
- @property
- def user_domain_name(self):
- try:
- if self.version == V3:
- return self['user']['domain']['name']
- elif 'user' in self:
- return "Default"
- except KeyError: # nosec
- # Do not raise KeyError, raise UnexpectedError
- pass
- raise exception.UnexpectedError()
-
- @property
- def user_domain_id(self):
- try:
- if self.version == V3:
- return self['user']['domain']['id']
- elif 'user' in self:
- return CONF.identity.default_domain_id
- except KeyError: # nosec
- # Do not raise KeyError, raise UnexpectedError
- pass
- raise exception.UnexpectedError()
-
- @property
- def domain_id(self):
- if self.version is V3:
- try:
- return self['domain']['id']
- except KeyError:
- # Do not raise KeyError, raise UnexpectedError
- raise exception.UnexpectedError()
- # No domain scoped tokens in V2.
- raise NotImplementedError()
-
- @property
- def domain_name(self):
- if self.version is V3:
- try:
- return self['domain']['name']
- except KeyError:
- # Do not raise KeyError, raise UnexpectedError
- raise exception.UnexpectedError()
- # No domain scoped tokens in V2.
- raise NotImplementedError()
-
- @property
- def project_id(self):
- try:
- if self.version is V3:
- return self['project']['id']
- else:
- return self['token']['tenant']['id']
- except KeyError:
- # Do not raise KeyError, raise UnexpectedError
- raise exception.UnexpectedError()
-
- @property
- def project_name(self):
- try:
- if self.version is V3:
- return self['project']['name']
- else:
- return self['token']['tenant']['name']
- except KeyError:
- # Do not raise KeyError, raise UnexpectedError
- raise exception.UnexpectedError()
-
- @property
- def project_domain_id(self):
- try:
- if self.version is V3:
- return self['project']['domain']['id']
- elif 'tenant' in self['token']:
- return CONF.identity.default_domain_id
- except KeyError: # nosec
- # Do not raise KeyError, raise UnexpectedError
- pass
-
- raise exception.UnexpectedError()
-
- @property
- def project_domain_name(self):
- try:
- if self.version is V3:
- return self['project']['domain']['name']
- if 'tenant' in self['token']:
- return 'Default'
- except KeyError: # nosec
- # Do not raise KeyError, raise UnexpectedError
- pass
-
- raise exception.UnexpectedError()
-
- @property
- def project_scoped(self):
- if self.version is V3:
- return 'project' in self
- else:
- return 'tenant' in self['token']
-
- @property
- def domain_scoped(self):
- if self.version is V3:
- return 'domain' in self
- return False
-
- @property
- def scoped(self):
- return self.project_scoped or self.domain_scoped
-
- @property
- def trust_id(self):
- if self.version is V3:
- return self.get('OS-TRUST:trust', {}).get('id')
- else:
- return self.get('trust', {}).get('id')
-
- @property
- def trust_scoped(self):
- if self.version is V3:
- return 'OS-TRUST:trust' in self
- else:
- return 'trust' in self
-
- @property
- def trustee_user_id(self):
- if self.version is V3:
- return self.get(
- 'OS-TRUST:trust', {}).get('trustee_user_id')
- else:
- return self.get('trust', {}).get('trustee_user_id')
-
- @property
- def trustor_user_id(self):
- if self.version is V3:
- return self.get(
- 'OS-TRUST:trust', {}).get('trustor_user_id')
- else:
- return self.get('trust', {}).get('trustor_user_id')
-
- @property
- def trust_impersonation(self):
- if self.version is V3:
- return self.get('OS-TRUST:trust', {}).get('impersonation')
- else:
- return self.get('trust', {}).get('impersonation')
-
- @property
- def oauth_scoped(self):
- return 'OS-OAUTH1' in self
-
- @property
- def oauth_access_token_id(self):
- if self.version is V3 and self.oauth_scoped:
- return self['OS-OAUTH1']['access_token_id']
- return None
-
- @property
- def oauth_consumer_id(self):
- if self.version is V3 and self.oauth_scoped:
- return self['OS-OAUTH1']['consumer_id']
- return None
-
- @property
- def role_ids(self):
- if self.version is V3:
- return [r['id'] for r in self.get('roles', [])]
- else:
- return self.get('metadata', {}).get('roles', [])
-
- @property
- def role_names(self):
- if self.version is V3:
- return [r['name'] for r in self.get('roles', [])]
- else:
- return [r['name'] for r in self['user'].get('roles', [])]
-
- @property
- def bind(self):
- if self.version is V3:
- return self.get('bind')
- return self.get('token', {}).get('bind')
-
- @property
- def is_federated_user(self):
- try:
- return (self.version is V3 and
- constants.FEDERATION in self['user'])
- except KeyError:
- raise exception.UnexpectedError()
-
- @property
- def federation_group_ids(self):
- if self.is_federated_user:
- if self.version is V3:
- try:
- groups = self['user'][constants.FEDERATION].get(
- 'groups', [])
- return [g['id'] for g in groups]
- except KeyError:
- raise exception.UnexpectedError()
- return []
-
- @property
- def federation_idp_id(self):
- if self.version is not V3 or not self.is_federated_user:
- return None
- return self['user'][constants.FEDERATION]['identity_provider']['id']
-
- @property
- def federation_protocol_id(self):
- if self.version is V3 and self.is_federated_user:
- return self['user'][constants.FEDERATION]['protocol']['id']
- return None
-
- @property
- def metadata(self):
- return self.get('metadata', {})
-
- @property
- def methods(self):
- if self.version is V3:
- return self.get('methods', [])
- return []