1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
|
# Copyright 2014 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.common import dependency
from keystone.common import sql
from keystone import identity
from keystone.identity.mapping_backends import mapping as identity_mapping
class IDMapping(sql.ModelBase, sql.ModelDictMixin):
__tablename__ = 'id_mapping'
public_id = sql.Column(sql.String(64), primary_key=True)
domain_id = sql.Column(sql.String(64), nullable=False)
local_id = sql.Column(sql.String(64), nullable=False)
# NOTE(henry-nash): Postgres requires a name to be defined for an Enum
entity_type = sql.Column(
sql.Enum(identity_mapping.EntityType.USER,
identity_mapping.EntityType.GROUP,
name='entity_type'),
nullable=False)
# Unique constraint to ensure you can't store more than one mapping to the
# same underlying values
__table_args__ = (
sql.UniqueConstraint('domain_id', 'local_id', 'entity_type'),)
@dependency.requires('id_generator_api')
class Mapping(identity.MappingDriverV8):
def get_public_id(self, local_entity):
# NOTE(henry-nash): Since the Public ID is regeneratable, rather
# than search for the entry using the local entity values, we
# could create the hash and do a PK lookup. However this would only
# work if we hashed all the entries, even those that already generate
# UUIDs, like SQL. Further, this would only work if the generation
# algorithm was immutable (e.g. it had always been sha256).
with sql.session_for_read() as session:
query = session.query(IDMapping.public_id)
query = query.filter_by(domain_id=local_entity['domain_id'])
query = query.filter_by(local_id=local_entity['local_id'])
query = query.filter_by(entity_type=local_entity['entity_type'])
try:
public_ref = query.one()
public_id = public_ref.public_id
return public_id
except sql.NotFound:
return None
def get_id_mapping(self, public_id):
with sql.session_for_read() as session:
mapping_ref = session.query(IDMapping).get(public_id)
if mapping_ref:
return mapping_ref.to_dict()
def create_id_mapping(self, local_entity, public_id=None):
entity = local_entity.copy()
with sql.session_for_write() as session:
if public_id is None:
public_id = self.id_generator_api.generate_public_ID(entity)
entity['public_id'] = public_id
mapping_ref = IDMapping.from_dict(entity)
session.add(mapping_ref)
return public_id
def delete_id_mapping(self, public_id):
with sql.session_for_write() as session:
try:
session.query(IDMapping).filter(
IDMapping.public_id == public_id).delete()
except sql.NotFound: # nosec
# NOTE(morganfainberg): There is nothing to delete and nothing
# to do.
pass
def purge_mappings(self, purge_filter):
with sql.session_for_write() as session:
query = session.query(IDMapping)
if 'domain_id' in purge_filter:
query = query.filter_by(domain_id=purge_filter['domain_id'])
if 'public_id' in purge_filter:
query = query.filter_by(public_id=purge_filter['public_id'])
if 'local_id' in purge_filter:
query = query.filter_by(local_id=purge_filter['local_id'])
if 'entity_type' in purge_filter:
query = query.filter_by(
entity_type=purge_filter['entity_type'])
query.delete()
|