1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import os
import shutil
import fixtures
from oslo_config import cfg
from oslo_db import options as db_options
from oslo_db.sqlalchemy import migration
from keystone.common import sql
from keystone.common.sql import migration_helpers
from keystone.tests import unit as tests
CONF = cfg.CONF
def run_once(f):
"""A decorator to ensure the decorated function is only executed once.
The decorated function cannot expect any arguments.
"""
@functools.wraps(f)
def wrapper():
if not wrapper.already_ran:
f()
wrapper.already_ran = True
wrapper.already_ran = False
return wrapper
def _setup_database(extensions=None):
if CONF.database.connection != tests.IN_MEM_DB_CONN_STRING:
db = tests.dirs.tmp('test.db')
pristine = tests.dirs.tmp('test.db.pristine')
if os.path.exists(db):
os.unlink(db)
if not os.path.exists(pristine):
migration.db_sync(sql.get_engine(),
migration_helpers.find_migrate_repo())
for extension in (extensions or []):
migration_helpers.sync_database_to_version(extension=extension)
shutil.copyfile(db, pristine)
else:
shutil.copyfile(pristine, db)
# NOTE(I159): Every execution all the options will be cleared. The method must
# be called at the every fixture initialization.
def initialize_sql_session():
# Make sure the DB is located in the correct location, in this case set
# the default value, as this should be able to be overridden in some
# test cases.
db_options.set_defaults(
CONF,
connection=tests.IN_MEM_DB_CONN_STRING)
@run_once
def _load_sqlalchemy_models():
"""Find all modules containing SQLAlchemy models and import them.
This creates more consistent, deterministic test runs because tables
for all core and extension models are always created in the test
database. We ensure this by importing all modules that contain model
definitions.
The database schema during test runs is created using reflection.
Reflection is simply SQLAlchemy taking the model definitions for
all models currently imported and making tables for each of them.
The database schema created during test runs may vary between tests
as more models are imported. Importing all models at the start of
the test run avoids this problem.
"""
keystone_root = os.path.normpath(os.path.join(
os.path.dirname(__file__), '..', '..', '..'))
for root, dirs, files in os.walk(keystone_root):
# NOTE(morganfainberg): Slice the keystone_root off the root to ensure
# we do not end up with a module name like:
# Users.home.openstack.keystone.assignment.backends.sql
root = root[len(keystone_root):]
if root.endswith('backends') and 'sql.py' in files:
# The root will be prefixed with an instance of os.sep, which will
# make the root after replacement '.<root>', the 'keystone' part
# of the module path is always added to the front
module_name = ('keystone.%s.sql' %
root.replace(os.sep, '.').lstrip('.'))
__import__(module_name)
class Database(fixtures.Fixture):
"""A fixture for setting up and tearing down a database.
"""
def __init__(self, extensions=None):
super(Database, self).__init__()
self._extensions = extensions
initialize_sql_session()
_load_sqlalchemy_models()
def setUp(self):
super(Database, self).setUp()
_setup_database(extensions=self._extensions)
self.engine = sql.get_engine()
sql.ModelBase.metadata.create_all(bind=self.engine)
self.addCleanup(sql.cleanup)
self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
|