aboutsummaryrefslogtreecommitdiffstats
path: root/keystone-moon/keystone/tests/unit/test_sql_upgrade.py
diff options
context:
space:
mode:
Diffstat (limited to 'keystone-moon/keystone/tests/unit/test_sql_upgrade.py')
-rw-r--r--keystone-moon/keystone/tests/unit/test_sql_upgrade.py1234
1 files changed, 770 insertions, 464 deletions
diff --git a/keystone-moon/keystone/tests/unit/test_sql_upgrade.py b/keystone-moon/keystone/tests/unit/test_sql_upgrade.py
index d617d445..5ca12f66 100644
--- a/keystone-moon/keystone/tests/unit/test_sql_upgrade.py
+++ b/keystone-moon/keystone/tests/unit/test_sql_upgrade.py
@@ -29,11 +29,13 @@ WARNING::
all data will be lost.
"""
-import copy
import json
import uuid
+import migrate
from migrate.versioning import api as versioning_api
+from migrate.versioning import repository
+import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import migration
@@ -41,12 +43,10 @@ from oslo_db.sqlalchemy import session as db_session
from sqlalchemy.engine import reflection
import sqlalchemy.exc
from sqlalchemy import schema
+from testtools import matchers
from keystone.common import sql
-from keystone.common.sql import migrate_repo
from keystone.common.sql import migration_helpers
-from keystone.contrib import federation
-from keystone.contrib import revoke
from keystone import exception
from keystone.tests import unit
from keystone.tests.unit import default_fixtures
@@ -54,7 +54,6 @@ from keystone.tests.unit.ksfixtures import database
CONF = cfg.CONF
-DEFAULT_DOMAIN_ID = CONF.identity.default_domain_id
# NOTE(morganfainberg): This should be updated when each DB migration collapse
# is done to mirror the expected structure of the DB in the format of
@@ -67,8 +66,8 @@ INITIAL_TABLE_STRUCTURE = {
'id', 'name', 'enabled', 'extra',
],
'endpoint': [
- 'id', 'legacy_endpoint_id', 'interface', 'region', 'service_id', 'url',
- 'enabled', 'extra',
+ 'id', 'legacy_endpoint_id', 'interface', 'region_id', 'service_id',
+ 'url', 'enabled', 'extra',
],
'group': [
'id', 'domain_id', 'name', 'description', 'extra',
@@ -78,6 +77,7 @@ INITIAL_TABLE_STRUCTURE = {
],
'project': [
'id', 'name', 'extra', 'description', 'enabled', 'domain_id',
+ 'parent_id',
],
'role': [
'id', 'name', 'extra',
@@ -108,23 +108,82 @@ INITIAL_TABLE_STRUCTURE = {
'assignment': [
'type', 'actor_id', 'target_id', 'role_id', 'inherited',
],
-}
-
-
-INITIAL_EXTENSION_TABLE_STRUCTURE = {
- 'revocation_event': [
- 'id', 'domain_id', 'project_id', 'user_id', 'role_id',
- 'trust_id', 'consumer_id', 'access_token_id',
- 'issued_before', 'expires_at', 'revoked_at', 'audit_id',
- 'audit_chain_id',
+ 'id_mapping': [
+ 'public_id', 'domain_id', 'local_id', 'entity_type',
+ ],
+ 'whitelisted_config': [
+ 'domain_id', 'group', 'option', 'value',
+ ],
+ 'sensitive_config': [
+ 'domain_id', 'group', 'option', 'value',
],
}
-EXTENSIONS = {'federation': federation,
- 'revoke': revoke}
+
+# Test migration_helpers.get_init_version separately to ensure it works before
+# using in the SqlUpgrade tests.
+class MigrationHelpersGetInitVersionTests(unit.TestCase):
+ @mock.patch.object(repository, 'Repository')
+ def test_get_init_version_no_path(self, repo):
+ migrate_versions = mock.MagicMock()
+ # make a version list starting with zero. `get_init_version` will
+ # return None for this value.
+ migrate_versions.versions.versions = list(range(0, 5))
+ repo.return_value = migrate_versions
+
+ # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid
+ # an exception.
+ with mock.patch('os.path.isdir', return_value=True):
+ # since 0 is the smallest version expect None
+ version = migration_helpers.get_init_version()
+ self.assertIsNone(version)
+
+ # check that the default path was used as the first argument to the
+ # first invocation of repo. Cannot match the full path because it is
+ # based on where the test is run.
+ param = repo.call_args_list[0][0][0]
+ self.assertTrue(param.endswith('/sql/migrate_repo'))
+
+ @mock.patch.object(repository, 'Repository')
+ def test_get_init_version_with_path_initial_version_0(self, repo):
+ migrate_versions = mock.MagicMock()
+ # make a version list starting with zero. `get_init_version` will
+ # return None for this value.
+ migrate_versions.versions.versions = list(range(0, 5))
+ repo.return_value = migrate_versions
+
+ # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid
+ # an exception.
+ with mock.patch('os.path.isdir', return_value=True):
+ path = '/keystone/migrate_repo/'
+
+ # since 0 is the smallest version expect None
+ version = migration_helpers.get_init_version(abs_path=path)
+ self.assertIsNone(version)
+
+ @mock.patch.object(repository, 'Repository')
+ def test_get_init_version_with_path(self, repo):
+ initial_version = 10
+
+ migrate_versions = mock.MagicMock()
+ migrate_versions.versions.versions = list(range(initial_version + 1,
+ initial_version + 5))
+ repo.return_value = migrate_versions
+
+ # os.path.isdir() is called by `find_migrate_repo()`. Mock it to avoid
+ # an exception.
+ with mock.patch('os.path.isdir', return_value=True):
+ path = '/keystone/migrate_repo/'
+
+ version = migration_helpers.get_init_version(abs_path=path)
+ self.assertEqual(initial_version, version)
class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
+ # override this in subclasses. The default of zero covers tests such
+ # as extensions upgrades.
+ _initial_db_version = 0
+
def initialize_sql(self):
self.metadata = sqlalchemy.MetaData()
self.metadata.bind = self.engine
@@ -139,6 +198,7 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
def setUp(self):
super(SqlMigrateBase, self).setUp()
+ self.load_backends()
database.initialize_sql_session()
conn_str = CONF.database.connection
if (conn_str != unit.IN_MEM_DB_CONN_STRING and
@@ -155,7 +215,9 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
connection='sqlite:///%s' % db_file)
# create and share a single sqlalchemy engine for testing
- self.engine = sql.get_engine()
+ with sql.session_for_write() as session:
+ self.engine = session.get_bind()
+ self.addCleanup(self.cleanup_instance('engine'))
self.Session = db_session.get_maker(self.engine, autocommit=False)
self.addCleanup(sqlalchemy.orm.session.Session.close_all)
@@ -164,7 +226,8 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
self.repo_package())
self.schema = versioning_api.ControlledSchema.create(
self.engine,
- self.repo_path, self.initial_db_version)
+ self.repo_path,
+ self._initial_db_version)
# auto-detect the highest available schema version in the migrate_repo
self.max_version = self.schema.repository.version().version
@@ -229,6 +292,23 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
else:
raise AssertionError('Table "%s" already exists' % table_name)
+ def assertTableCountsMatch(self, table1_name, table2_name):
+ try:
+ table1 = self.select_table(table1_name)
+ except sqlalchemy.exc.NoSuchTableError:
+ raise AssertionError('Table "%s" does not exist' % table1_name)
+ try:
+ table2 = self.select_table(table2_name)
+ except sqlalchemy.exc.NoSuchTableError:
+ raise AssertionError('Table "%s" does not exist' % table2_name)
+ session = self.Session()
+ table1_count = session.execute(table1.count()).scalar()
+ table2_count = session.execute(table2.count()).scalar()
+ if table1_count != table2_count:
+ raise AssertionError('Table counts do not match: {0} ({1}), {2} '
+ '({3})'.format(table1_name, table1_count,
+ table2_name, table2_count))
+
def upgrade(self, *args, **kwargs):
self._migrate(*args, **kwargs)
@@ -257,50 +337,30 @@ class SqlMigrateBase(unit.SQLDriverOverrides, unit.TestCase):
self.assertItemsEqual(expected_cols, actual_cols,
'%s table' % table_name)
- @property
- def initial_db_version(self):
- return getattr(self, '_initial_db_version', 0)
-
class SqlUpgradeTests(SqlMigrateBase):
-
- _initial_db_version = migrate_repo.DB_INIT_VERSION
+ _initial_db_version = migration_helpers.get_init_version()
def test_blank_db_to_start(self):
self.assertTableDoesNotExist('user')
def test_start_version_db_init_version(self):
- version = migration.db_version(sql.get_engine(), self.repo_path,
- migrate_repo.DB_INIT_VERSION)
+ with sql.session_for_write() as session:
+ version = migration.db_version(session.get_bind(), self.repo_path,
+ self._initial_db_version)
self.assertEqual(
- migrate_repo.DB_INIT_VERSION,
+ self._initial_db_version,
version,
- 'DB is not at version %s' % migrate_repo.DB_INIT_VERSION)
+ 'DB is not at version %s' % self._initial_db_version)
def test_upgrade_add_initial_tables(self):
- self.upgrade(migrate_repo.DB_INIT_VERSION + 1)
+ self.upgrade(self._initial_db_version + 1)
self.check_initial_table_structure()
def check_initial_table_structure(self):
for table in INITIAL_TABLE_STRUCTURE:
self.assertTableColumns(table, INITIAL_TABLE_STRUCTURE[table])
- # Ensure the default domain was properly created.
- default_domain = migration_helpers.get_default_domain()
-
- meta = sqlalchemy.MetaData()
- meta.bind = self.engine
-
- domain_table = sqlalchemy.Table('domain', meta, autoload=True)
-
- session = self.Session()
- q = session.query(domain_table)
- refs = q.all()
-
- self.assertEqual(1, len(refs))
- for k in default_domain.keys():
- self.assertEqual(default_domain[k], getattr(refs[0], k))
-
def insert_dict(self, session, table_name, d, table=None):
"""Naively inserts key-value pairs into a table, given a dictionary."""
if table is None:
@@ -312,127 +372,43 @@ class SqlUpgradeTests(SqlMigrateBase):
session.execute(insert)
session.commit()
- def test_id_mapping(self):
- self.upgrade(50)
- self.assertTableDoesNotExist('id_mapping')
- self.upgrade(51)
- self.assertTableExists('id_mapping')
-
- def test_region_url_upgrade(self):
- self.upgrade(52)
- self.assertTableColumns('region',
- ['id', 'description', 'parent_region_id',
- 'extra', 'url'])
-
- def test_endpoint_region_upgrade_columns(self):
- self.upgrade(53)
- self.assertTableColumns('endpoint',
- ['id', 'legacy_endpoint_id', 'interface',
- 'service_id', 'url', 'extra', 'enabled',
- 'region_id'])
- region_table = sqlalchemy.Table('region', self.metadata, autoload=True)
- self.assertEqual(255, region_table.c.id.type.length)
- self.assertEqual(255, region_table.c.parent_region_id.type.length)
- endpoint_table = sqlalchemy.Table('endpoint',
- self.metadata,
- autoload=True)
- self.assertEqual(255, endpoint_table.c.region_id.type.length)
-
- def test_endpoint_region_migration(self):
- self.upgrade(52)
- session = self.Session()
- _small_region_name = '0' * 30
- _long_region_name = '0' * 255
- _clashing_region_name = '0' * 70
-
- def add_service():
- service_id = uuid.uuid4().hex
-
- service = {
- 'id': service_id,
- 'type': uuid.uuid4().hex
- }
-
- self.insert_dict(session, 'service', service)
-
- return service_id
-
- def add_endpoint(service_id, region):
- endpoint_id = uuid.uuid4().hex
-
- endpoint = {
- 'id': endpoint_id,
- 'interface': uuid.uuid4().hex[:8],
- 'service_id': service_id,
- 'url': uuid.uuid4().hex,
- 'region': region
- }
- self.insert_dict(session, 'endpoint', endpoint)
-
- return endpoint_id
-
- _service_id_ = add_service()
- add_endpoint(_service_id_, region=_long_region_name)
- add_endpoint(_service_id_, region=_long_region_name)
- add_endpoint(_service_id_, region=_clashing_region_name)
- add_endpoint(_service_id_, region=_small_region_name)
- add_endpoint(_service_id_, region=None)
-
- # upgrade to 53
- session.close()
- self.upgrade(53)
- session = self.Session()
- self.metadata.clear()
+ def test_kilo_squash(self):
+ self.upgrade(67)
- region_table = sqlalchemy.Table('region', self.metadata, autoload=True)
- self.assertEqual(1, session.query(region_table).
- filter_by(id=_long_region_name).count())
- self.assertEqual(1, session.query(region_table).
- filter_by(id=_clashing_region_name).count())
- self.assertEqual(1, session.query(region_table).
- filter_by(id=_small_region_name).count())
+ # In 053 the size of ID and parent region ID columns were changed
+ table = sqlalchemy.Table('region', self.metadata, autoload=True)
+ self.assertEqual(255, table.c.id.type.length)
+ self.assertEqual(255, table.c.parent_region_id.type.length)
+ table = sqlalchemy.Table('endpoint', self.metadata, autoload=True)
+ self.assertEqual(255, table.c.region_id.type.length)
- endpoint_table = sqlalchemy.Table('endpoint',
- self.metadata,
- autoload=True)
- self.assertEqual(5, session.query(endpoint_table).count())
- self.assertEqual(2, session.query(endpoint_table).
- filter_by(region_id=_long_region_name).count())
- self.assertEqual(1, session.query(endpoint_table).
- filter_by(region_id=_clashing_region_name).count())
- self.assertEqual(1, session.query(endpoint_table).
- filter_by(region_id=_small_region_name).count())
-
- def test_add_actor_id_index(self):
- self.upgrade(53)
- self.upgrade(54)
+ # In 054 an index was created for the actor_id of the assignment table
table = sqlalchemy.Table('assignment', self.metadata, autoload=True)
index_data = [(idx.name, list(idx.columns.keys()))
for idx in table.indexes]
self.assertIn(('ix_actor_id', ['actor_id']), index_data)
- def test_token_user_id_and_trust_id_index_upgrade(self):
- self.upgrade(54)
- self.upgrade(55)
+ # In 055 indexes were created for user and trust IDs in the token table
table = sqlalchemy.Table('token', self.metadata, autoload=True)
index_data = [(idx.name, list(idx.columns.keys()))
for idx in table.indexes]
self.assertIn(('ix_token_user_id', ['user_id']), index_data)
self.assertIn(('ix_token_trust_id', ['trust_id']), index_data)
- def test_project_parent_id_upgrade(self):
- self.upgrade(61)
- self.assertTableColumns('project',
- ['id', 'name', 'extra', 'description',
- 'enabled', 'domain_id', 'parent_id'])
+ # In 062 the role ID foreign key was removed from the assignment table
+ if self.engine.name == "mysql":
+ self.assertFalse(self.does_fk_exist('assignment', 'role_id'))
- def test_drop_assignment_role_fk(self):
- self.upgrade(61)
- self.assertTrue(self.does_fk_exist('assignment', 'role_id'))
- self.upgrade(62)
+ # In 064 the domain ID FK was removed from the group and user tables
if self.engine.name != 'sqlite':
# sqlite does not support FK deletions (or enforcement)
- self.assertFalse(self.does_fk_exist('assignment', 'role_id'))
+ self.assertFalse(self.does_fk_exist('group', 'domain_id'))
+ self.assertFalse(self.does_fk_exist('user', 'domain_id'))
+
+ # In 067 the role ID index was removed from the assignment table
+ if self.engine.name == "mysql":
+ self.assertFalse(self._does_index_exist('assignment',
+ 'assignment_role_id_fkey'))
def test_insert_assignment_inherited_pk(self):
ASSIGNMENT_TABLE_NAME = 'assignment'
@@ -502,7 +478,6 @@ class SqlUpgradeTests(SqlMigrateBase):
def does_pk_exist(self, table, pk_column):
"""Checks whether a column is primary key on a table."""
-
inspector = reflection.Inspector.from_engine(self.engine)
pk_columns = inspector.get_pk_constraint(table)['constrained_columns']
@@ -515,119 +490,164 @@ class SqlUpgradeTests(SqlMigrateBase):
return True
return False
- def test_drop_region_url_upgrade(self):
- self.upgrade(63)
- self.assertTableColumns('region',
- ['id', 'description', 'parent_region_id',
- 'extra'])
-
- def test_domain_fk(self):
- self.upgrade(63)
- self.assertTrue(self.does_fk_exist('group', 'domain_id'))
- self.assertTrue(self.does_fk_exist('user', 'domain_id'))
- self.upgrade(64)
- if self.engine.name != 'sqlite':
- # sqlite does not support FK deletions (or enforcement)
- self.assertFalse(self.does_fk_exist('group', 'domain_id'))
- self.assertFalse(self.does_fk_exist('user', 'domain_id'))
-
- def test_add_domain_config(self):
- whitelisted_table = 'whitelisted_config'
- sensitive_table = 'sensitive_config'
- self.upgrade(64)
- self.assertTableDoesNotExist(whitelisted_table)
- self.assertTableDoesNotExist(sensitive_table)
- self.upgrade(65)
- self.assertTableColumns(whitelisted_table,
- ['domain_id', 'group', 'option', 'value'])
- self.assertTableColumns(sensitive_table,
- ['domain_id', 'group', 'option', 'value'])
-
- def test_fixup_service_name_value_upgrade(self):
- """Update service name data from `extra` to empty string."""
- def add_service(**extra_data):
- service_id = uuid.uuid4().hex
-
- service = {
- 'id': service_id,
- 'type': uuid.uuid4().hex,
- 'extra': json.dumps(extra_data),
- }
-
- self.insert_dict(session, 'service', service)
-
- return service_id
-
- self.upgrade(65)
- session = self.Session()
-
- # Services with extra values having a random attribute and
- # different combinations of name
- random_attr_name = uuid.uuid4().hex
- random_attr_value = uuid.uuid4().hex
- random_attr_str = "%s='%s'" % (random_attr_name, random_attr_value)
- random_attr_no_name = {random_attr_name: random_attr_value}
- random_attr_no_name_str = "%s='%s'" % (random_attr_name,
- random_attr_value)
- random_attr_name_value = {random_attr_name: random_attr_value,
- 'name': 'myname'}
- random_attr_name_value_str = 'name=myname,%s' % random_attr_str
- random_attr_name_empty = {random_attr_name: random_attr_value,
- 'name': ''}
- random_attr_name_empty_str = 'name=,%s' % random_attr_str
- random_attr_name_none = {random_attr_name: random_attr_value,
- 'name': None}
- random_attr_name_none_str = 'name=None,%s' % random_attr_str
-
- services = [
- (add_service(**random_attr_no_name),
- random_attr_name_empty, random_attr_no_name_str),
- (add_service(**random_attr_name_value),
- random_attr_name_value, random_attr_name_value_str),
- (add_service(**random_attr_name_empty),
- random_attr_name_empty, random_attr_name_empty_str),
- (add_service(**random_attr_name_none),
- random_attr_name_empty, random_attr_name_none_str),
- ]
-
- # NOTE(viktors): Add a service with empty extra field
- self.insert_dict(session, 'service',
- {'id': uuid.uuid4().hex, 'type': uuid.uuid4().hex})
-
- session.close()
- self.upgrade(66)
- session = self.Session()
-
- # Verify that the services have the expected values.
- self.metadata.clear()
- service_table = sqlalchemy.Table('service', self.metadata,
- autoload=True)
-
- def fetch_service_extra(service_id):
- cols = [service_table.c.extra]
- f = service_table.c.id == service_id
- s = sqlalchemy.select(cols).where(f)
- service = session.execute(s).fetchone()
- return json.loads(service.extra)
-
- for service_id, exp_extra, msg in services:
- extra = fetch_service_extra(service_id)
- self.assertDictEqual(exp_extra, extra, msg)
-
- def _does_index_exist(self, table_name, index_name):
+ def does_index_exist(self, table_name, index_name):
meta = sqlalchemy.MetaData(bind=self.engine)
- table = sqlalchemy.Table('assignment', meta, autoload=True)
+ table = sqlalchemy.Table(table_name, meta, autoload=True)
return index_name in [idx.name for idx in table.indexes]
- def test_drop_assignment_role_id_index_mysql(self):
- self.upgrade(66)
- if self.engine.name == "mysql":
- self.assertTrue(self._does_index_exist('assignment',
- 'assignment_role_id_fkey'))
- self.upgrade(67)
- if self.engine.name == "mysql":
- self.assertFalse(self._does_index_exist('assignment',
- 'assignment_role_id_fkey'))
+ def does_constraint_exist(self, table_name, constraint_name):
+ meta = sqlalchemy.MetaData(bind=self.engine)
+ table = sqlalchemy.Table(table_name, meta, autoload=True)
+ return constraint_name in [con.name for con in table.constraints]
+
+ def test_endpoint_policy_upgrade(self):
+ self.assertTableDoesNotExist('policy_association')
+ self.upgrade(81)
+ self.assertTableColumns('policy_association',
+ ['id', 'policy_id', 'endpoint_id',
+ 'service_id', 'region_id'])
+
+ @mock.patch.object(migration_helpers, 'get_db_version', return_value=1)
+ def test_endpoint_policy_already_migrated(self, mock_ep):
+
+ # By setting the return value to 1, the migration has already been
+ # run, and there's no need to create the table again
+
+ self.upgrade(81)
+
+ mock_ep.assert_called_once_with(extension='endpoint_policy',
+ engine=mock.ANY)
+
+ # It won't exist because we are mocking it, but we can verify
+ # that 081 did not create the table
+ self.assertTableDoesNotExist('policy_association')
+
+ def test_create_federation_tables(self):
+ self.identity_provider = 'identity_provider'
+ self.federation_protocol = 'federation_protocol'
+ self.service_provider = 'service_provider'
+ self.mapping = 'mapping'
+ self.remote_ids = 'idp_remote_ids'
+
+ self.assertTableDoesNotExist(self.identity_provider)
+ self.assertTableDoesNotExist(self.federation_protocol)
+ self.assertTableDoesNotExist(self.service_provider)
+ self.assertTableDoesNotExist(self.mapping)
+ self.assertTableDoesNotExist(self.remote_ids)
+
+ self.upgrade(82)
+ self.assertTableColumns(self.identity_provider,
+ ['id', 'description', 'enabled'])
+
+ self.assertTableColumns(self.federation_protocol,
+ ['id', 'idp_id', 'mapping_id'])
+
+ self.assertTableColumns(self.mapping,
+ ['id', 'rules'])
+
+ self.assertTableColumns(self.service_provider,
+ ['id', 'description', 'enabled', 'auth_url',
+ 'relay_state_prefix', 'sp_url'])
+
+ self.assertTableColumns(self.remote_ids, ['idp_id', 'remote_id'])
+
+ federation_protocol = sqlalchemy.Table(self.federation_protocol,
+ self.metadata,
+ autoload=True)
+ self.assertFalse(federation_protocol.c.mapping_id.nullable)
+
+ sp_table = sqlalchemy.Table(self.service_provider,
+ self.metadata,
+ autoload=True)
+ self.assertFalse(sp_table.c.auth_url.nullable)
+ self.assertFalse(sp_table.c.sp_url.nullable)
+
+ @mock.patch.object(migration_helpers, 'get_db_version', return_value=8)
+ def test_federation_already_migrated(self, mock_federation):
+
+ # By setting the return value to 8, the migration has already been
+ # run, and there's no need to create the table again.
+ self.upgrade(82)
+
+ mock_federation.assert_any_call(extension='federation',
+ engine=mock.ANY)
+
+ # It won't exist because we are mocking it, but we can verify
+ # that 082 did not create the table.
+ self.assertTableDoesNotExist('identity_provider')
+ self.assertTableDoesNotExist('federation_protocol')
+ self.assertTableDoesNotExist('mapping')
+ self.assertTableDoesNotExist('service_provider')
+ self.assertTableDoesNotExist('idp_remote_ids')
+
+ def test_create_oauth_tables(self):
+ consumer = 'consumer'
+ request_token = 'request_token'
+ access_token = 'access_token'
+ self.assertTableDoesNotExist(consumer)
+ self.assertTableDoesNotExist(request_token)
+ self.assertTableDoesNotExist(access_token)
+ self.upgrade(83)
+ self.assertTableColumns(consumer,
+ ['id',
+ 'description',
+ 'secret',
+ 'extra'])
+ self.assertTableColumns(request_token,
+ ['id',
+ 'request_secret',
+ 'verifier',
+ 'authorizing_user_id',
+ 'requested_project_id',
+ 'role_ids',
+ 'consumer_id',
+ 'expires_at'])
+ self.assertTableColumns(access_token,
+ ['id',
+ 'access_secret',
+ 'authorizing_user_id',
+ 'project_id',
+ 'role_ids',
+ 'consumer_id',
+ 'expires_at'])
+
+ @mock.patch.object(migration_helpers, 'get_db_version', return_value=5)
+ def test_oauth1_already_migrated(self, mock_oauth1):
+
+ # By setting the return value to 5, the migration has already been
+ # run, and there's no need to create the table again.
+ self.upgrade(83)
+
+ mock_oauth1.assert_any_call(extension='oauth1', engine=mock.ANY)
+
+ # It won't exist because we are mocking it, but we can verify
+ # that 083 did not create the table.
+ self.assertTableDoesNotExist('consumer')
+ self.assertTableDoesNotExist('request_token')
+ self.assertTableDoesNotExist('access_token')
+
+ def test_create_revoke_table(self):
+ self.assertTableDoesNotExist('revocation_event')
+ self.upgrade(84)
+ self.assertTableColumns('revocation_event',
+ ['id', 'domain_id', 'project_id', 'user_id',
+ 'role_id', 'trust_id', 'consumer_id',
+ 'access_token_id', 'issued_before',
+ 'expires_at', 'revoked_at',
+ 'audit_chain_id', 'audit_id'])
+
+ @mock.patch.object(migration_helpers, 'get_db_version', return_value=2)
+ def test_revoke_already_migrated(self, mock_revoke):
+
+ # By setting the return value to 2, the migration has already been
+ # run, and there's no need to create the table again.
+ self.upgrade(84)
+
+ mock_revoke.assert_any_call(extension='revoke', engine=mock.ANY)
+
+ # It won't exist because we are mocking it, but we can verify
+ # that 084 did not create the table.
+ self.assertTableDoesNotExist('revocation_event')
def test_project_is_domain_upgrade(self):
self.upgrade(74)
@@ -636,6 +656,13 @@ class SqlUpgradeTests(SqlMigrateBase):
'enabled', 'domain_id', 'parent_id',
'is_domain'])
+ def test_implied_roles_upgrade(self):
+ self.upgrade(87)
+ self.assertTableColumns('implied_role',
+ ['prior_role_id', 'implied_role_id'])
+ self.assertTrue(self.does_fk_exist('implied_role', 'prior_role_id'))
+ self.assertTrue(self.does_fk_exist('implied_role', 'implied_role_id'))
+
def test_add_config_registration(self):
config_registration = 'config_register'
self.upgrade(74)
@@ -643,136 +670,497 @@ class SqlUpgradeTests(SqlMigrateBase):
self.upgrade(75)
self.assertTableColumns(config_registration, ['type', 'domain_id'])
- def populate_user_table(self, with_pass_enab=False,
- with_pass_enab_domain=False):
- # Populate the appropriate fields in the user
- # table, depending on the parameters:
- #
- # Default: id, name, extra
- # pass_enab: Add password, enabled as well
- # pass_enab_domain: Add password, enabled and domain as well
- #
- this_table = sqlalchemy.Table("user",
- self.metadata,
- autoload=True)
- for user in default_fixtures.USERS:
- extra = copy.deepcopy(user)
- extra.pop('id')
- extra.pop('name')
-
- if with_pass_enab:
- password = extra.pop('password', None)
- enabled = extra.pop('enabled', True)
- ins = this_table.insert().values(
+ def test_endpoint_filter_upgrade(self):
+ def assert_tables_columns_exist():
+ self.assertTableColumns('project_endpoint',
+ ['endpoint_id', 'project_id'])
+ self.assertTableColumns('endpoint_group',
+ ['id', 'name', 'description', 'filters'])
+ self.assertTableColumns('project_endpoint_group',
+ ['endpoint_group_id', 'project_id'])
+
+ self.assertTableDoesNotExist('project_endpoint')
+ self.upgrade(85)
+ assert_tables_columns_exist()
+
+ @mock.patch.object(migration_helpers, 'get_db_version', return_value=2)
+ def test_endpoint_filter_already_migrated(self, mock_endpoint_filter):
+
+ # By setting the return value to 2, the migration has already been
+ # run, and there's no need to create the table again.
+ self.upgrade(85)
+
+ mock_endpoint_filter.assert_any_call(extension='endpoint_filter',
+ engine=mock.ANY)
+
+ # It won't exist because we are mocking it, but we can verify
+ # that 085 did not create the table.
+ self.assertTableDoesNotExist('project_endpoint')
+ self.assertTableDoesNotExist('endpoint_group')
+ self.assertTableDoesNotExist('project_endpoint_group')
+
+ def test_add_trust_unique_constraint_upgrade(self):
+ self.upgrade(86)
+ inspector = reflection.Inspector.from_engine(self.engine)
+ constraints = inspector.get_unique_constraints('trust')
+ constraint_names = [constraint['name'] for constraint in constraints]
+ self.assertIn('duplicate_trust_constraint', constraint_names)
+
+ def test_add_domain_specific_roles(self):
+ """Check database upgraded successfully for domain specific roles.
+
+ The following items need to be checked:
+
+ - The domain_id column has been added
+ - That it has been added to the uniqueness constraints
+ - Existing roles have their domain_id columns set to the specific
+ string of '<<null>>'
+
+ """
+ NULL_DOMAIN_ID = '<<null>>'
+
+ self.upgrade(87)
+ session = self.Session()
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ # Add a role before we upgrade, so we can check that its new domain_id
+ # attribute is handled correctly
+ role_id = uuid.uuid4().hex
+ self.insert_dict(session, 'role',
+ {'id': role_id, 'name': uuid.uuid4().hex})
+ session.close()
+
+ self.upgrade(88)
+
+ session = self.Session()
+ self.metadata.clear()
+ self.assertTableColumns('role', ['id', 'name', 'domain_id', 'extra'])
+ # Check the domain_id has been added to the uniqueness constraint
+ inspector = reflection.Inspector.from_engine(self.engine)
+ constraints = inspector.get_unique_constraints('role')
+ constraint_columns = [
+ constraint['column_names'] for constraint in constraints
+ if constraint['name'] == 'ixu_role_name_domain_id']
+ self.assertIn('domain_id', constraint_columns[0])
+
+ # Now check our role has its domain_id attribute set correctly
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ cols = [role_table.c.domain_id]
+ filter = role_table.c.id == role_id
+ statement = sqlalchemy.select(cols).where(filter)
+ role_entry = session.execute(statement).fetchone()
+ self.assertEqual(NULL_DOMAIN_ID, role_entry[0])
+
+ def test_add_root_of_all_domains(self):
+ NULL_DOMAIN_ID = '<<keystone.domain.root>>'
+ self.upgrade(89)
+ session = self.Session()
+
+ domain_table = sqlalchemy.Table(
+ 'domain', self.metadata, autoload=True)
+ query = session.query(domain_table).filter_by(id=NULL_DOMAIN_ID)
+ domain_from_db = query.one()
+ self.assertIn(NULL_DOMAIN_ID, domain_from_db)
+
+ project_table = sqlalchemy.Table(
+ 'project', self.metadata, autoload=True)
+ query = session.query(project_table).filter_by(id=NULL_DOMAIN_ID)
+ project_from_db = query.one()
+ self.assertIn(NULL_DOMAIN_ID, project_from_db)
+
+ session.close()
+
+ def test_add_local_user_and_password_tables(self):
+ local_user_table = 'local_user'
+ password_table = 'password'
+ self.upgrade(89)
+ self.assertTableDoesNotExist(local_user_table)
+ self.assertTableDoesNotExist(password_table)
+ self.upgrade(90)
+ self.assertTableColumns(local_user_table,
+ ['id',
+ 'user_id',
+ 'domain_id',
+ 'name'])
+ self.assertTableColumns(password_table,
+ ['id',
+ 'local_user_id',
+ 'password'])
+
+ def test_migrate_data_to_local_user_and_password_tables(self):
+ def get_expected_users():
+ expected_users = []
+ for test_user in default_fixtures.USERS:
+ user = {}
+ user['id'] = uuid.uuid4().hex
+ user['name'] = test_user['name']
+ user['domain_id'] = test_user['domain_id']
+ user['password'] = test_user['password']
+ user['enabled'] = True
+ user['extra'] = json.dumps(uuid.uuid4().hex)
+ user['default_project_id'] = uuid.uuid4().hex
+ expected_users.append(user)
+ return expected_users
+
+ def add_users_to_db(expected_users, user_table):
+ for user in expected_users:
+ ins = user_table.insert().values(
{'id': user['id'],
'name': user['name'],
- 'password': password,
- 'enabled': bool(enabled),
- 'extra': json.dumps(extra)})
- else:
- if with_pass_enab_domain:
- password = extra.pop('password', None)
- enabled = extra.pop('enabled', True)
- extra.pop('domain_id')
- ins = this_table.insert().values(
- {'id': user['id'],
- 'name': user['name'],
- 'domain_id': user['domain_id'],
- 'password': password,
- 'enabled': bool(enabled),
- 'extra': json.dumps(extra)})
- else:
- ins = this_table.insert().values(
- {'id': user['id'],
- 'name': user['name'],
- 'extra': json.dumps(extra)})
- self.engine.execute(ins)
-
- def populate_tenant_table(self, with_desc_enab=False,
- with_desc_enab_domain=False):
- # Populate the appropriate fields in the tenant or
- # project table, depending on the parameters
- #
- # Default: id, name, extra
- # desc_enab: Add description, enabled as well
- # desc_enab_domain: Add description, enabled and domain as well,
- # plus use project instead of tenant
- #
- if with_desc_enab_domain:
- # By this time tenants are now projects
- this_table = sqlalchemy.Table("project",
- self.metadata,
+ 'domain_id': user['domain_id'],
+ 'password': user['password'],
+ 'enabled': user['enabled'],
+ 'extra': user['extra'],
+ 'default_project_id': user['default_project_id']})
+ ins.execute()
+
+ def get_users_from_db(user_table, local_user_table, password_table):
+ sel = (
+ sqlalchemy.select([user_table.c.id,
+ user_table.c.enabled,
+ user_table.c.extra,
+ user_table.c.default_project_id,
+ local_user_table.c.name,
+ local_user_table.c.domain_id,
+ password_table.c.password])
+ .select_from(user_table.join(local_user_table,
+ user_table.c.id ==
+ local_user_table.c.user_id)
+ .join(password_table,
+ local_user_table.c.id ==
+ password_table.c.local_user_id))
+ )
+ user_rows = sel.execute()
+ users = []
+ for row in user_rows:
+ users.append(
+ {'id': row['id'],
+ 'name': row['name'],
+ 'domain_id': row['domain_id'],
+ 'password': row['password'],
+ 'enabled': row['enabled'],
+ 'extra': row['extra'],
+ 'default_project_id': row['default_project_id']})
+ return users
+
+ meta = sqlalchemy.MetaData()
+ meta.bind = self.engine
+
+ user_table_name = 'user'
+ local_user_table_name = 'local_user'
+ password_table_name = 'password'
+
+ # populate current user table
+ self.upgrade(90)
+ user_table = sqlalchemy.Table(user_table_name, meta, autoload=True)
+ expected_users = get_expected_users()
+ add_users_to_db(expected_users, user_table)
+
+ # upgrade to migration and test
+ self.upgrade(91)
+ self.assertTableCountsMatch(user_table_name, local_user_table_name)
+ self.assertTableCountsMatch(local_user_table_name, password_table_name)
+ meta.clear()
+ user_table = sqlalchemy.Table(user_table_name, meta, autoload=True)
+ local_user_table = sqlalchemy.Table(local_user_table_name, meta,
+ autoload=True)
+ password_table = sqlalchemy.Table(password_table_name, meta,
autoload=True)
+ actual_users = get_users_from_db(user_table, local_user_table,
+ password_table)
+ self.assertListEqual(expected_users, actual_users)
+
+ def test_migrate_user_with_null_password_to_password_tables(self):
+ USER_TABLE_NAME = 'user'
+ LOCAL_USER_TABLE_NAME = 'local_user'
+ PASSWORD_TABLE_NAME = 'password'
+ self.upgrade(90)
+ user_ref = unit.new_user_ref(uuid.uuid4().hex)
+ user_ref.pop('password')
+ # pop extra attribute which doesn't recognized by SQL expression
+ # layer.
+ user_ref.pop('email')
+ session = self.Session()
+ self.insert_dict(session, USER_TABLE_NAME, user_ref)
+ self.metadata.clear()
+ self.upgrade(91)
+ # migration should be successful.
+ self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME)
+ # no new entry was added to the password table because the
+ # user doesn't have a password.
+ password_table = self.select_table(PASSWORD_TABLE_NAME)
+ rows = session.execute(password_table.count()).scalar()
+ self.assertEqual(0, rows)
+
+ def test_migrate_user_skip_user_already_exist_in_local_user(self):
+ USER_TABLE_NAME = 'user'
+ LOCAL_USER_TABLE_NAME = 'local_user'
+ self.upgrade(90)
+ user1_ref = unit.new_user_ref(uuid.uuid4().hex)
+ # pop extra attribute which doesn't recognized by SQL expression
+ # layer.
+ user1_ref.pop('email')
+ user2_ref = unit.new_user_ref(uuid.uuid4().hex)
+ user2_ref.pop('email')
+ session = self.Session()
+ self.insert_dict(session, USER_TABLE_NAME, user1_ref)
+ self.insert_dict(session, USER_TABLE_NAME, user2_ref)
+ user_id = user1_ref.pop('id')
+ user_name = user1_ref.pop('name')
+ domain_id = user1_ref.pop('domain_id')
+ local_user_ref = {'user_id': user_id, 'name': user_name,
+ 'domain_id': domain_id}
+ self.insert_dict(session, LOCAL_USER_TABLE_NAME, local_user_ref)
+ self.metadata.clear()
+ self.upgrade(91)
+ # migration should be successful and user2_ref has been migrated to
+ # `local_user` table.
+ self.assertTableCountsMatch(USER_TABLE_NAME, LOCAL_USER_TABLE_NAME)
+
+ def test_implied_roles_fk_on_delete_cascade(self):
+ if self.engine.name == 'sqlite':
+ self.skipTest('sqlite backend does not support foreign keys')
+
+ self.upgrade(92)
+
+ def _create_three_roles():
+ id_list = []
+ for _ in range(3):
+ role = unit.new_role_ref()
+ self.role_api.create_role(role['id'], role)
+ id_list.append(role['id'])
+ return id_list
+
+ role_id_list = _create_three_roles()
+ self.role_api.create_implied_role(role_id_list[0], role_id_list[1])
+ self.role_api.create_implied_role(role_id_list[0], role_id_list[2])
+
+ # assert that there are two roles implied by role 0.
+ implied_roles = self.role_api.list_implied_roles(role_id_list[0])
+ self.assertThat(implied_roles, matchers.HasLength(2))
+
+ self.role_api.delete_role(role_id_list[0])
+ # assert the cascade deletion is effective.
+ implied_roles = self.role_api.list_implied_roles(role_id_list[0])
+ self.assertThat(implied_roles, matchers.HasLength(0))
+
+ def test_domain_as_project_upgrade(self):
+
+ def _populate_domain_and_project_tables(session):
+ # Three domains, with various different attributes
+ self.domains = [{'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': {'description': uuid.uuid4().hex,
+ 'another_attribute': True}},
+ {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': True,
+ 'extra': {'description': uuid.uuid4().hex}},
+ {'id': uuid.uuid4().hex,
+ 'name': uuid.uuid4().hex,
+ 'enabled': False}]
+ # Four projects, two top level, two children
+ self.projects = []
+ self.projects.append(unit.new_project_ref(
+ domain_id=self.domains[0]['id'],
+ parent_id=None))
+ self.projects.append(unit.new_project_ref(
+ domain_id=self.domains[0]['id'],
+ parent_id=self.projects[0]['id']))
+ self.projects.append(unit.new_project_ref(
+ domain_id=self.domains[1]['id'],
+ parent_id=None))
+ self.projects.append(unit.new_project_ref(
+ domain_id=self.domains[1]['id'],
+ parent_id=self.projects[2]['id']))
+
+ for domain in self.domains:
+ this_domain = domain.copy()
+ if 'extra' in this_domain:
+ this_domain['extra'] = json.dumps(this_domain['extra'])
+ self.insert_dict(session, 'domain', this_domain)
+ for project in self.projects:
+ self.insert_dict(session, 'project', project)
+
+ def _check_projects(projects):
+
+ def _assert_domain_matches_project(project):
+ for domain in self.domains:
+ if project.id == domain['id']:
+ self.assertEqual(domain['name'], project.name)
+ self.assertEqual(domain['enabled'], project.enabled)
+ if domain['id'] == self.domains[0]['id']:
+ self.assertEqual(domain['extra']['description'],
+ project.description)
+ self.assertEqual({'another_attribute': True},
+ json.loads(project.extra))
+ elif domain['id'] == self.domains[1]['id']:
+ self.assertEqual(domain['extra']['description'],
+ project.description)
+ self.assertEqual({}, json.loads(project.extra))
+
+ # We had domains 3 we created, which should now be projects acting
+ # as domains, To this we add the 4 original projects, plus the root
+ # of all domains row.
+ self.assertEqual(8, projects.count())
+
+ project_ids = []
+ for project in projects:
+ if project.is_domain:
+ self.assertEqual(NULL_DOMAIN_ID, project.domain_id)
+ self.assertIsNone(project.parent_id)
+ else:
+ self.assertIsNotNone(project.domain_id)
+ self.assertIsNotNone(project.parent_id)
+ project_ids.append(project.id)
+
+ for domain in self.domains:
+ self.assertIn(domain['id'], project_ids)
+ for project in self.projects:
+ self.assertIn(project['id'], project_ids)
+
+ # Now check the attributes of the domains came across OK
+ for project in projects:
+ _assert_domain_matches_project(project)
+
+ NULL_DOMAIN_ID = '<<keystone.domain.root>>'
+ self.upgrade(92)
+
+ session = self.Session()
+
+ _populate_domain_and_project_tables(session)
+
+ self.upgrade(93)
+ proj_table = sqlalchemy.Table('project', self.metadata, autoload=True)
+
+ projects = session.query(proj_table)
+ _check_projects(projects)
+
+ def test_add_federated_user_table(self):
+ federated_user_table = 'federated_user'
+ self.upgrade(93)
+ self.assertTableDoesNotExist(federated_user_table)
+ self.upgrade(94)
+ self.assertTableColumns(federated_user_table,
+ ['id',
+ 'user_id',
+ 'idp_id',
+ 'protocol_id',
+ 'unique_id',
+ 'display_name'])
+
+ def test_add_int_pkey_to_revocation_event_table(self):
+ meta = sqlalchemy.MetaData()
+ meta.bind = self.engine
+ REVOCATION_EVENT_TABLE_NAME = 'revocation_event'
+ self.upgrade(94)
+ revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME,
+ meta, autoload=True)
+ # assert id column is a string (before)
+ self.assertEqual('VARCHAR(64)', str(revocation_event_table.c.id.type))
+ self.upgrade(95)
+ meta.clear()
+ revocation_event_table = sqlalchemy.Table(REVOCATION_EVENT_TABLE_NAME,
+ meta, autoload=True)
+ # assert id column is an integer (after)
+ self.assertEqual('INTEGER', str(revocation_event_table.c.id.type))
+
+ def _add_unique_constraint_to_role_name(self,
+ constraint_name='ixu_role_name'):
+ meta = sqlalchemy.MetaData()
+ meta.bind = self.engine
+ role_table = sqlalchemy.Table('role', meta, autoload=True)
+ migrate.UniqueConstraint(role_table.c.name,
+ name=constraint_name).create()
+
+ def _drop_unique_constraint_to_role_name(self,
+ constraint_name='ixu_role_name'):
+ role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
+ migrate.UniqueConstraint(role_table.c.name,
+ name=constraint_name).drop()
+
+ def test_migration_88_drops_unique_constraint(self):
+ self.upgrade(87)
+ if self.engine.name == 'mysql':
+ self.assertTrue(self.does_index_exist('role', 'ixu_role_name'))
else:
- this_table = sqlalchemy.Table("tenant",
- self.metadata,
- autoload=True)
+ self.assertTrue(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+ self.upgrade(88)
+ if self.engine.name == 'mysql':
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
- for tenant in default_fixtures.TENANTS:
- extra = copy.deepcopy(tenant)
- extra.pop('id')
- extra.pop('name')
-
- if with_desc_enab:
- desc = extra.pop('description', None)
- enabled = extra.pop('enabled', True)
- ins = this_table.insert().values(
- {'id': tenant['id'],
- 'name': tenant['name'],
- 'description': desc,
- 'enabled': bool(enabled),
- 'extra': json.dumps(extra)})
- else:
- if with_desc_enab_domain:
- desc = extra.pop('description', None)
- enabled = extra.pop('enabled', True)
- extra.pop('domain_id')
- ins = this_table.insert().values(
- {'id': tenant['id'],
- 'name': tenant['name'],
- 'domain_id': tenant['domain_id'],
- 'description': desc,
- 'enabled': bool(enabled),
- 'extra': json.dumps(extra)})
- else:
- ins = this_table.insert().values(
- {'id': tenant['id'],
- 'name': tenant['name'],
- 'extra': json.dumps(extra)})
- self.engine.execute(ins)
-
- def _mysql_check_all_tables_innodb(self):
- database = self.engine.url.database
-
- connection = self.engine.connect()
- # sanity check
- total = connection.execute("SELECT count(*) "
- "from information_schema.TABLES "
- "where TABLE_SCHEMA='%(database)s'" %
- dict(database=database))
- self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
-
- noninnodb = connection.execute("SELECT table_name "
- "from information_schema.TABLES "
- "where TABLE_SCHEMA='%(database)s' "
- "and ENGINE!='InnoDB' "
- "and TABLE_NAME!='migrate_version'" %
- dict(database=database))
- names = [x[0] for x in noninnodb]
- self.assertEqual([], names,
- "Non-InnoDB tables exist")
-
- connection.close()
+ def test_migration_88_inconsistent_constraint_name(self):
+ self.upgrade(87)
+ self._drop_unique_constraint_to_role_name()
+
+ constraint_name = uuid.uuid4().hex
+ self._add_unique_constraint_to_role_name(
+ constraint_name=constraint_name)
+
+ if self.engine.name == 'mysql':
+ self.assertTrue(self.does_index_exist('role', constraint_name))
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertTrue(self.does_constraint_exist('role',
+ constraint_name))
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+
+ self.upgrade(88)
+ if self.engine.name == 'mysql':
+ self.assertFalse(self.does_index_exist('role', constraint_name))
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertFalse(self.does_constraint_exist('role',
+ constraint_name))
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+
+ def test_migration_96(self):
+ self.upgrade(95)
+ if self.engine.name == 'mysql':
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+
+ self.upgrade(96)
+ if self.engine.name == 'mysql':
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+
+ def test_migration_96_constraint_exists(self):
+ self.upgrade(95)
+ self._add_unique_constraint_to_role_name()
+
+ if self.engine.name == 'mysql':
+ self.assertTrue(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertTrue(self.does_constraint_exist('role',
+ 'ixu_role_name'))
+
+ self.upgrade(96)
+ if self.engine.name == 'mysql':
+ self.assertFalse(self.does_index_exist('role', 'ixu_role_name'))
+ else:
+ self.assertFalse(self.does_constraint_exist('role',
+ 'ixu_role_name'))
class VersionTests(SqlMigrateBase):
- _initial_db_version = migrate_repo.DB_INIT_VERSION
+ _initial_db_version = migration_helpers.get_init_version()
def test_core_initial(self):
"""Get the version before migrated, it's the initial DB version."""
version = migration_helpers.get_db_version()
- self.assertEqual(migrate_repo.DB_INIT_VERSION, version)
+ self.assertEqual(self._initial_db_version, version)
def test_core_max(self):
"""When get the version after upgrading, it's the new version."""
@@ -793,97 +1181,15 @@ class VersionTests(SqlMigrateBase):
migration_helpers.get_db_version,
extension='federation')
- def test_extension_initial(self):
- """When get the initial version of an extension, it's 0."""
- for name, extension in EXTENSIONS.items():
- abs_path = migration_helpers.find_migrate_repo(extension)
- migration.db_version_control(sql.get_engine(), abs_path)
- version = migration_helpers.get_db_version(extension=name)
- self.assertEqual(0, version,
- 'Migrate version for %s is not 0' % name)
-
- def test_extension_migrated(self):
- """When get the version after migrating an extension, it's not 0."""
- for name, extension in EXTENSIONS.items():
- abs_path = migration_helpers.find_migrate_repo(extension)
- migration.db_version_control(sql.get_engine(), abs_path)
- migration.db_sync(sql.get_engine(), abs_path)
- version = migration_helpers.get_db_version(extension=name)
- self.assertTrue(
- version > 0,
- "Version for %s didn't change after migrated?" % name)
- # Verify downgrades cannot occur
- self.assertRaises(
- db_exception.DbMigrationError,
- migration_helpers._sync_extension_repo,
- extension=name,
- version=0)
-
- def test_extension_federation_upgraded_values(self):
- abs_path = migration_helpers.find_migrate_repo(federation)
- migration.db_version_control(sql.get_engine(), abs_path)
- migration.db_sync(sql.get_engine(), abs_path, version=6)
- idp_table = sqlalchemy.Table("identity_provider",
- self.metadata,
- autoload=True)
- idps = [{'id': uuid.uuid4().hex,
- 'enabled': True,
- 'description': uuid.uuid4().hex,
- 'remote_id': uuid.uuid4().hex},
- {'id': uuid.uuid4().hex,
- 'enabled': True,
- 'description': uuid.uuid4().hex,
- 'remote_id': uuid.uuid4().hex}]
- for idp in idps:
- ins = idp_table.insert().values({'id': idp['id'],
- 'enabled': idp['enabled'],
- 'description': idp['description'],
- 'remote_id': idp['remote_id']})
- self.engine.execute(ins)
- migration.db_sync(sql.get_engine(), abs_path)
- idp_remote_ids_table = sqlalchemy.Table("idp_remote_ids",
- self.metadata,
- autoload=True)
- for idp in idps:
- s = idp_remote_ids_table.select().where(
- idp_remote_ids_table.c.idp_id == idp['id'])
- remote = self.engine.execute(s).fetchone()
- self.assertEqual(idp['remote_id'],
- remote['remote_id'],
- 'remote_ids must be preserved during the '
- 'migration from identity_provider table to '
- 'idp_remote_ids table')
-
def test_unexpected_extension(self):
- """The version for an extension that doesn't exist raises ImportError.
-
- """
-
+ """The version for a non-existent extension raises ImportError."""
extension_name = uuid.uuid4().hex
self.assertRaises(ImportError,
migration_helpers.get_db_version,
extension=extension_name)
def test_unversioned_extension(self):
- """The version for extensions without migrations raise an exception.
-
- """
-
+ """The version for extensions without migrations raise an exception."""
self.assertRaises(exception.MigrationNotProvided,
migration_helpers.get_db_version,
extension='admin_crud')
-
- def test_initial_with_extension_version_None(self):
- """When performing a default migration, also migrate extensions."""
- migration_helpers.sync_database_to_version(extension=None,
- version=None)
- for table in INITIAL_EXTENSION_TABLE_STRUCTURE:
- self.assertTableColumns(table,
- INITIAL_EXTENSION_TABLE_STRUCTURE[table])
-
- def test_initial_with_extension_version_max(self):
- """When migrating to max version, do not migrate extensions."""
- migration_helpers.sync_database_to_version(extension=None,
- version=self.max_version)
- for table in INITIAL_EXTENSION_TABLE_STRUCTURE:
- self.assertTableDoesNotExist(table)